Continue As New has no chance to run in a continuously streaming signal workflow

Hi, We have a workflow that listens on a signal forever, in order to avoid the history size issue, we’d like to use the Continue As New feature to start the same workflow and continue to listen to the same signal again.

Please check the code below, the issue here is the sender have no idea(can not) if the workflow needs to ContinueAsNew and keep sending the signals continuously, meanwhile, the workflow does not want to lose any signals that are sent by the sender, so it uses selector.HasPending() to do the check if the polled signals are read or not. however, it seems like the selector.HasPending() always returns true.

Not sure if I’m doing the right code on the workflow-code side. or temporal has no such feature support yet for the moment.

The workflow code is written & served by Go, it’s something like the below

func RunWorkflow(ctx workflow.Context) error {
	maximumSignals := 100
	numberSignals := 0

	channel := workflow.GetSignalChannel(ctx, "abc")
	selector := workflow.NewSelector(ctx)
	selector.AddReceive(channel, func(c workflow.ReceiveChannel, more bool) {
		numberSignals += 1

		var signalVal interface{}
		c.Receive(ctx, &signalVal)
		// start a chile workflow for the received signal.
	})
	for numberSignals < maximumSignals || selector.HasPending() {
		// got infinite loop here, 
		// 1. even numberSignals < maximumSignals, 
		// 2. but selector.HasPending() always is true.
		selector.Select(ctx) 
	}
	return workflow.NewContinueAsNewError(ctx, RunWorkflow)
}

And we have different clients(in Java) are sending the signals continuously to the channel. here for simplicity, I just write this pseudo code in Go as an example

func SignalWorkflow() {
	c, err := client.NewClient(client.Options{})
	if err != nil {
		log.Fatalln("unable to create Temporal client", err)
	}
	defer c.Close()
	val := 0
	for {
		val += 1
		err = c.SignalWorkflow(context.Background(), "test-workflow", "", "abc", val)
		if err != nil {
			log.Fatalf("error signalling workflow: %s", err)
		}
		time.Sleep(3 * time.Millisecond)
	}
}

At 3ms per signal, you may be sending signals faster than the workflow is processing them. Can you check if you, say, send a signal once every couple of seconds (and maybe lower maximumSignals to like 5) that selector.HasPending() returns false?

1 Like

Well, I tried to slow the interval to 1 second, it got a chance for selector.HasPending() returns false.
But in production 1 second per signal is not a good idea. basically, the activity is just querying the database and sending the queried rows/tuples/data to the workflow as a signal continuously.

Is there a best practice for setting the speed of sending signals. for example, in what case we should slow down the sender. or something else.

Thanks

You might have to devise another mechanism. Signals and communication with workflows in general is not expected to be extremely high performing in the single-digit milliseconds. Signals are sent through the server and are polled by the worker and then handled by the workflow, all of which can take time.

In your use case, you can either batch signals to be delivered less frequently, or you can use an activity directly. For example, can you just have the workflow call your activity to query the database for updates and have the activity return what the workflow should operate on instead of using signals?

Not specifically beyond “don’t send signals faster than the workflow will process them” which is subject to server speeds and testing.

Our use case is we are building a DSL-based ETL pipeline on top of temporal workflow, the reason we are using signals instead of using the direct call from the workflow is that the activity that pull the original data source in the database is a long-running process, it seems like we can’t interact with the remain ETL activities without return from the long-running activity(it’s like a blocking activity that will poll the data from the database continuously).

but as far as I understand is the only way we can interact with the next ETL activities without returning from the long-running activity is through Signals. Maybe I’m wrong, Is there some other ways that allow us to interact with the remaining ETL activities without returning from the long-running activity instead of using the signal?

Why does it have to be a long running activity?

If you have an activity NextToProcess(context.Context) ([]ETLThing, error) it doesn’t matter how you implement that. It is just a normal Go call. If it needs to communicate with something long running, it can, it doesn’t have to be the long running thing itself. You just have to make sure you take into account the activity may run on multiple workers if you register it on multiple worker processes. And note that when handling serially, you’ll still be limited by workflow handling speeds.

While I don’t understand the details of your use case, usually if it’s going to poll anyways, polling and returning when something is available can be short lived and cheap (depending on query preparedness, connection reuse, DB impl, etc).

I do not think, for your high-throughput usecase, signals are the best way to go here. Also, whether using signals or activities, keep in mind that large numbers of either can make the workflow history size very large, see Go SDK developer's guide - Foundations | Temporal Documentation.

Finally, Temporal workflows are built for consistency and are not optimized for high-throughput streaming data handling. If batching is not acceptable for your use case and you need to handle things like your example at 3ms, Temporal may not be the best fit.

1 Like

It seems like it’s not about the signal send speed, because, from my observations, I can see it process/receive the signals pretty fast, it’s just the selector.HasPending(), can’t return true, because of the sending speed. and from my understanding, this HasPending() might be checking if the polled the signals but did not hand over to the upper callback handler inside the worker only. so if we can have a way that tells the worker to pause the poll on the signal, the HasPending may return true, after the upper caller processed all the polled signals.

Consider this case, we are querying a big table with Sort by, and our memory cannot fit all the table data, the fast way is to let the query finish in one go(it will utilize the DB sequential scan speed) and whenever we queried out data(available in memory) we send to next workflow activity, this will requires we keep the query open, i.e, can not return to the main workflow.

If we, let’s say, fetch 100 rows each time and return from the activity to the main workflow and pass the 100 rows to the next activity. we need to repeat to fetch the database by using the limit/offset or cursor to find the previous fetch point and continue to fetch the next 100 rows. this would be very heavy for the database servers and time expensive for the application.

Basically HasPending() is saying “do I have any unhandled signals in channel” which you have to drain before continue-as-new. If you cannot handle those signals fast enough (whether you’re using HasPending(), ReceiveChannel.ReceiveAsync or whatever other mechanism), you’ll always have unhandled signals in the channel. Granted, other SDKs which operate on a signal callback basis do not suffer from a buffered channel with potential loss of signal on continue-as-new. I will see if others have options here.

This may not be the best forum to discuss DB cursors and the like, but just know that if you must be long-running, it does not have to be the activity. The activity can instead communicate with the long-running piece.

Thanks very much.

I think this channel only lives in the worker. between the worker and temporal servers, can we have a way that tells the worker, “Hey, I want to drain the already polled signals, pause poll new signals from the server a little bit”? then the callback handler will drain all the polled signals, and get a chance to “Continue As New”.

Temporal is not a big data platform. So passing large amounts of data through workflows is not really a use case that it is a good fit for. Use it as a control plane, not as a data plane.

Hi @maxim , Thanks for the insight and thanks for the awesome open source product!

I work with @maxnilz. Our original problem is essentially an ETL use case which is actually described in Temporal.io Home.

We move data from external data source e.g. typical RDBMS and spreadsheets into a Elasticsearch index. During this process, we transform the data row by row, doing text analysis like NLP.

We would like to have the individual “Transformation Activities” reusable and isolated on an dedicated worker. We connect the activities using an DSL showing in the image below:

From “Query Rows” to “Analysis 1” and “Analysis 2”, we used “Signal workflow” mechanism and repeated launch children activities for each row. This obviously will make history size exceeds default limit quickly. Batch the rows doesn’t really solve the problem - it just delays it.

We borrowed ideas from the “temporal-samples” repo called “child-workflow-continue-as-new”: we launch the children activities in an child workflow so that we can “ContinueAsNew”. And now we come to the original question @maxnilz posted.

I guess our ultimate question is: what’s the right way of solving this? Or how does the official supported “Data Pipelines” use case look like?

Kind Regards,
Jackie

Devil is as always in details.

  • How many records do you expect to push through the system?
  • What is the maximum record size?
  • What is the average latency of each Analysis?
  • Do you need to run each analysis type on a separate machine? Or they can be executed by the same machine?
  • Do you expect “poison pill” problem?
  • What is the expected throughput?

In our use case, consistency & durability is more important that speed and throughput:

  • How many records do you expect to push through the system?

We need to assume this is infinite - a forever running workflow that ingests data and pipe to destination

  • What is the maximum record size?

Record size is usually small - KB size level.

  • What is the average latency of each Analysis?

Analysis can range from milli seconds to seconds.

  • Do you need to run each analysis type on a separate machine? Or they can be executed by the same machine?

We need to assume they run on separate machine - different worker. We need to give our customer ability to plug in their own analysis. Hence the DSL engine so that they can design and plugin.

  • Do you expect “poison pill” problem?

We mostly run in a well confined environment. We would enforce the worker behaviour via procedure than security. Although this is still on our plan to further discuss

  • What is the expected throughput?

Throughput is not our primary concern. Probably one of the last problems to think about. We would like to expand the use cases. However we’re also aware that we need look at the target problem case by case: there is a line where this solution just doesn’t suite the problem.

Our current plan is to introduce a dedicated external queue in places where we need to ingest messages forever: for each incoming message, we start a child workflow. This way we get the control from Temporal but also able to keep our event stream processing model. Does this make sense?

Thanks & Regards,
Jackie

throughput is not our primary concern. Probably one of the last problems to think about.

This sounds dangerous to me. You can end up building an architecture that doesn’t scale and then hit a wall if throughput ends up being an issue. Could you give at least some estimate of the largest throughput you will need to support in the future?

It’s been a while since this question was posted. We figured out a solution following @maxim 's hints. Thanks a lot @maxim ! Hope this solution can help others in similar situations.

Use Temporal as a control plane, not as a data plane - @maxim

Problem

In a typical ETL-like scenario, the data come as streams of rows. The workflow need to wait for a row to arrive and then kick off a series of processing steps synchronously or asynchronously. Example like:

Using intuition, we might implement the workflow like this:

...
sig := workflow.GetSignalChannel(ctx, "receiveRow")
// signal internally or externally to "receiveRow" channel

for ctx.Err() == nil { // break if context cancelled
    var row any
    more := sig.Receive(ctx, &row)
    workflow.Go(ctx, func(ctx workflow.Context) {
	    err := workflow.ExecuteActivity(ctx, AddFullTextToRow).Get(&row)
	    if err != nil { ... }
	    err = workflow.ExecuteActivity(ctx, PersistElasticsearch).Get(nil)
	    if err != nil { ... }
    })
    if !more {
         break
    }
}
...

However, one would quickly realise this would not work because all signals are recorded in the workflow’s history and the history would quickly run into the maximum limit: Temporal Docs: Event History

The problem here hits home with what @maxim stated: Temporal should be the control plane, not data plane. And yet we are piping all the rows through temporal. This obviously goes against what Temporal is designed for.

So clearly, we need a data plane. But how would a data plane would fit in this picture?

Our Approach

TL;DR; combine Continue As New with an external queue.

In the above flow, we introduce a “data plane” using Kafka, and have a child workflow that have first activity subscribe to a Kafka topic, receive one row and return the result. Code would look like this:

parentWorkflow.go:

topicName := createTopic()
// here we launch child workflow first so that we don't miss any message
childFuture := workflow.ExecuteChildWorkflow(ctx, etlPipeline, topicName)

// if we're emitting the rows in an activity, e.g. iterate through a database table:
err := workflow.ExecuteActivity(ctx, rowsEmitter, topicName).Get(ctx, nil)
...

In the above snippet, we created the topic and pass the topic name to start of the ChildWorkflow . Then we start the rowsEmitter activity that streams the rows to the Kafka topic.

childWorkflow.go:

var row any
err = workflow.ExecuteActivity(ctx, ReceiveRow, topicName).Get(&row)
err = workflow.ExecuteActivity(ctx, AddFullTextToRow, row).Get(&row)
err = workflow.ExecuteActivity(ctx, PersistElasticsearch, row).Get(&row)
...
return workflow.NewContinueAsNewError(ctx, etlPipeline)

In ChildWorkflow, we always run ReceiveRow as the first activity which subscribes to the topic and returns the row content for the next steps to carry on.

After all activity finishes, just return ContinueAsNew to process another row.

We won’t run into history limit because each execution counts as a new workflow.

In this mode, we synchronously process each row but we can easily add a worker pool in the parent workflow to launch several child workflow. However, the queue configuration needs to be done carefully because we want consumers to retrieve different rows from the queue not the same row to be processed twice.

Caveats

  1. workflow replay would require custom gluing
    In this design, although we have an external queue deals with data streaming, history replay is possible by querying child workflow with parent workflow id and all the data rows would be in the history of each child workflow
  2. garbage collection
    As child workflow history can get huge, you would need a good garbage collection strategy (i.e. utilising a DELETE_HISTORY_EVENT timer task: Domain history cleanup
  3. performance
    We haven’t run any benchmark or any comparison with other “big data” processing. In our scenario, we need the reliable execution. However, we would like to do certain perfs to know what’s the limit.
1 Like