Scaling Temporal: 400M Workflows with Continue-as-New Pattern

Hello,

I’d like to clarify some aspects related to ensuring that long-running workflows function effectively. How do I design my architecture in such a way to ensure the smooth operation of these workflows? Should I consider a transition to short-lived workflow architecture, where each action simply creates its own workflow and concludes it? My current workflows tend to have long lifespans. Therefore, I intend to use the continue-as-new pattern, but considering the various approaches, I am unsure about what guidelines I should follow. Here’s my code:

func (w *B2B) ProcessOrder(ctx workflow.Context, input *ProcessOrderInput) (*ProcessOrderOutput, error) {
	w.setUp(ctx, &state{Order: &entity.Order{}})

	defer w.logger.Info("%v workflow with runID %v has been completed", workflow.GetInfo(ctx).WorkflowType, workflow.GetInfo(ctx).WorkflowExecution.RunID)

	// Child Workflow
	if err := w.createOrder(ctx, &createOrderInput{
		Source:         input.Source,
		CreatedBy:      input.CreatedBy,
		ManagerName:    input.ManagerName,
		ContractorType: input.ContractorType,
	}); err != nil {
		return nil, err
	}

	// query handler
	if err := w.awaitForGetState(ctx); err != nil {
		return nil, err
	}

	if w.state.Order.OrderStatus != entity.OrderStatusInWork {
		// update handler
		if err := w.awaitForAddItem(ctx); err != nil {
			return nil, err
		}

		// update handler
		if err := w.awaitForDeleteItem(ctx); err != nil {
			return nil, err
		}

		// update handler
		if err := w.awaitForApplyDiscount(ctx); err != nil {
			return nil, err
		}
	}

	selector := workflow.NewSelector(ctx)

	// signal handler (run Child Workflow)
	w.awaitForSyncWtisOms(ctx, selector)

	// signal handler
	w.awaitForSmartReserve(ctx, selector)

	for workflow.GetInfo(ctx).GetCurrentHistoryLength() < 300 || selector.HasPending() {
		fmt.Println(workflow.GetInfo(ctx).GetCurrentHistoryLength())

		selector.Select(ctx)
	}

	return &ProcessOrderOutput{}, workflow.NewContinueAsNewError(ctx, workflow.GetInfo(ctx).WorkflowType.Name, input)
}

I have update handlers that process specific actions inferred from their names and should not function under certain statuses. Also, there are two signal waits, which can occur anytime during the workflow’s operation and in any order.

Given these conditions, I am planning to create a continue-as-new at certain times, but I need to consider a few things:

  1. Is workflow.GetInfo(ctx).GetCurrentHistoryLength() < 300 || selector.HasPending() a suitable check for this (selector.HasPending()), and does this guarantee that signals won’t be lost? Or will I need to use ReceiveAsync? If so, how should this ideally look considering I have multiple signals? I’ve seen on the forum both selector.HasPending() and ReceiveAsync variants and I don’t understand the difference.
  2. If signals don’t arrive, the update handler still increases the history, and this won’t be processed in my code. How can I solve this issue? Perhaps I should put all the code with update handlers and signals into a for loop?
func (w *B2B) ProcessOrder(ctx workflow.Context, input *ProcessOrderInput) (*ProcessOrderOutput, error) {
	w.setUp(ctx, &state{Order: &entity.Order{}})

	defer w.logger.Info("%v workflow with runID %v has been completed", workflow.GetInfo(ctx).WorkflowType, workflow.GetInfo(ctx).WorkflowExecution.RunID)

	// Child Workflow
	if err := w.createOrder(ctx, &createOrderInput{
		Source:         input.Source,
		CreatedBy:      input.CreatedBy,
		ManagerName:    input.ManagerName,
		ContractorType: input.ContractorType,
	}); err != nil {
		return nil, err
	}

	// query handler
	if err := w.awaitForGetState(ctx); err != nil {
		return nil, err
	}

	selector := workflow.NewSelector(ctx)

	for workflow.GetInfo(ctx).GetCurrentHistoryLength() < 300 || selector.HasPending() {

		if w.state.Order.OrderStatus != entity.OrderStatusInWork {
			// update handler
			if err := w.awaitForAddItem(ctx); err != nil {
				return nil, err
			}

			// update handler
			if err := w.awaitForDeleteItem(ctx); err != nil {
				return nil, err
			}

			// update handler
			if err := w.awaitForApplyDiscount(ctx); err != nil {
				return nil, err
			}
		}

		// signal handler (run Child Workflow)
		w.awaitForSyncWtisOms(ctx, selector)

		// signal handler
		w.awaitForSmartReserve(ctx, selector)

		fmt.Println(workflow.GetInfo(ctx).GetCurrentHistoryLength())

		selector.Select(ctx)
	}

	return &ProcessOrderOutput{}, workflow.NewContinueAsNewError(ctx, workflow.GetInfo(ctx).WorkflowType.Name, input)
}
  1. What’s the fundamental difference in operation between initially creating signal handlers and then performing a for loop with select (this is how my current implementation looks), versus placing all signal handlers and select within the for loop? I’ve seen both variants on the forum but don’t understand the difference.
  2. I was planning to use ProcessOrderOutput to display execution results. Potentially some code if there was an error, or if there wasn’t, what should I pass there considering that I’m calling continue-as-new? Where will this result be displayed - in the final execution or when the first workflow run is closed before the continue-as-new call?
  3. Are there any other potential issues that I might have overlooked in my code?
  4. Adding to the above, given this code structure, will I be able to launch and adequately operate 400 million open workflows? Is there anything specific I should consider or precautions I should take to handle such a high volume of workflows simultaneously?

Looking forward to your guidance. Thank you.

I would recommend the pattern outlined in the await-signals sample.

Maxim, thank you for your response, but could I please get some answers to the other points in my question?

Sure.

  1. HasPending is enough
  2. I don’t understand this question. Why do you think the history grows if signals are not sent?
  3. I think they are equal. There might be some performance difference, but I’m not sure.
  4. I think the Go the convention is to pass an empty struct. If error is returned the result is ignored anyway.
  5. What is awaitForGetState? Is it doing some polling?
  6. Make sure that DB has enough disk space to store all the workflows and their histories for the retention period. The scale of the cluster is proportional to the number of actions per second, not the number of parallel workflows.

Because I’ve seen in the web interface that when an updates (What is a Temporal Workflow? | Temporal Documentation) come to me, it also increases the event history, isn’t that so? And what I meant was that in my code, even if signals don’t arrive, updates can come in and it seems to me they should be in the same cycle as waiting for signals. Do you need more details?

func (w *B2B) awaitForGetState(ctx workflow.Context) error {
	fmt.Println("order", w.state.Order.ID)
	if err := workflow.SetQueryHandler(ctx, "getState", func() (*state, error) {
		return &w.state, nil
	}); err != nil {
		return err
	}

	return nil
}

Do you mean the correct configuration of the cluster shards when starting it?

Sorry, I’m still confused. Are you talking about the update or signal feature? When signal is accepted by the service it is recorded in the workflow history. Then it is delivered asynchronously to the workflow code. So it is possible to exceed the history size by just sending enough signals if workflow fails to call continue-as-new at some point. The update allows rejecting request without writing it into the workflow history.

I still don’t understand the meaning of “If signals don’t arrive, the update handler still increases the history,”.

Do you mean the correct configuration of the cluster shards when starting it?

I mean the cluster size and configuration (including number of shards) should be proportional to the number of actions per second the cluster can process. It doesn’t directly depend on the number of parallel workflows.

selector := workflow.NewSelector(ctx)

	for workflow.GetInfo(ctx).GetCurrentHistoryLength() < 300 || selector.HasPending() {

		if w.state.Order.OrderStatus != entity.OrderStatusInWork {
			// update handler
			if err := w.awaitForAddItem(ctx); err != nil {
				return nil, err
			}

			// update handler
			if err := w.awaitForDeleteItem(ctx); err != nil {
				return nil, err
			}

			// update handler
			if err := w.awaitForApplyDiscount(ctx); err != nil {
				return nil, err
			}
		}

		// signal handler (run Child Workflow)
		w.awaitForSyncWtisOms(ctx, selector)

		// signal handler
		w.awaitForSmartReserve(ctx, selector)

		fmt.Println(workflow.GetInfo(ctx).GetCurrentHistoryLength())

		selector.Select(ctx)
	}

What I meant is that in the current implementation of the code, when everything runs in a loop and the size of the history is checked on each iteration, there, where the signal is expected, if it is received, we go to the next iteration and the size of the history can be checked again, but when the update handlers are located in the same loop, they do not work as signals and therefore a new iteration does not start, hence I can’t check whether the size of the history has increased or not, simply because the call to the update handler does not give control for a new iteration and the history can continue to grow.

I see some possible solutions here:

  1. Take Update handlers out of the loop with the signals and somehow process them separately and when they are received, check the size of the history.
  2. Leave it in the same loop, but when receiving an update do a continue, to go to the next iteration, but here the flow will not be clear, who executes first, update or signal?

I would just like to have some kind of central control block, rather than spread checks for the size of the history across different parts.

Am I correct in understanding that when receiving an update, I can immediately refer to the state, namely the size of the history workflow.GetInfo(ctx).GetCurrentHistoryLength(), and if it exceeds the size I need, then I simply reject the update request and it won’t save in the history? I just thought that we can’t refer to the state in the update.

If I correctly understand this aspect with update, then what is their workflow, with signals it’s more or less clear, there are just channels and our task is not to lose signals, so we can check this through HasPending and in case of anything continue processing, but what about update? how do they work and how do we prevent them from being lost?

Is it correct to say that if I need an RPS of 400, then I will need to create 400 shards?

I would create a channel using workflow.NewChannel and add updates from the update handler to that channel. Then selector can select on the signal channel and “update channel”.

but what about update? how do they work and how do we prevent them from being lost?

SDK ensures that the update lambda goroutine is always unblocked before the main workflow goroutine.

Is it correct to say that if I need an RPS of 400, then I will need to create 400 shards?

Not really unless a single DB update takes a second. Also, what do you mean by RPS? Workflows per second, signals per second, activities per second?

Thank you very much. Excellent advice. Please tell me, in this case, will HasPending() consider this channel in its response for update handlers?

But then I don’t quite understand how to correctly estimate the number of shards that might be needed.

Oh yes, I think I didn’t specify my question. But most likely I meant 400 workflows and, consequently, 400 activities, 400 signals. Just imagine processing 400 orders every second, so all of them should successfully trigger the workflow and further work of activities, signals.

Thank you very much. Excellent advice. Please tell me, in this case, will HasPending() consider this channel in its response for update handlers?

I don’t understand the question. HasPending is a property of a Selector. So if any branch of Selector is ready to fire, it will return true.

But then I don’t quite understand how to correctly estimate the number of shards that might be needed.

There is no simple answer as it is very hardware and use case specific. See this blog for more info: Choosing the Number of Shards in Temporal History Service | Mikhail Shilkov

Oh yes, I think I didn’t specify my question. But most likely I meant 400 workflows and, consequently, 400 activities, 400 signals. Just imagine processing 400 orders every second, so all of them should successfully trigger the workflow and further work of activities, signals.

I see. This might push limits of what is possible with non Cassandra DB. I personally would recommend using Temporal Cloud. It would be cheaper than self-hosting and much less stressful for you.

Thank you for the suggestion, but it seems that this might not be feasible for the company I work for. Could you please confirm if I understood correctly that PostgreSQL is likely not going to be able to handle such throughput and it’s not even worth trying?

but it seems that this might not be feasible for the company I work for.

Do you have specific concerns or it is more like we are a top secret government agency that never uses any services?

The PostreSQL might handle this throughput. But it wouldn’t be able to scale out if you need more. It is hard to give concrete recommendations without testing a specific setup.