Hello,
I’d like to clarify some aspects related to ensuring that long-running workflows function effectively. How do I design my architecture in such a way to ensure the smooth operation of these workflows? Should I consider a transition to short-lived workflow architecture, where each action simply creates its own workflow and concludes it? My current workflows tend to have long lifespans. Therefore, I intend to use the continue-as-new pattern, but considering the various approaches, I am unsure about what guidelines I should follow. Here’s my code:
func (w *B2B) ProcessOrder(ctx workflow.Context, input *ProcessOrderInput) (*ProcessOrderOutput, error) {
w.setUp(ctx, &state{Order: &entity.Order{}})
defer w.logger.Info("%v workflow with runID %v has been completed", workflow.GetInfo(ctx).WorkflowType, workflow.GetInfo(ctx).WorkflowExecution.RunID)
// Child Workflow
if err := w.createOrder(ctx, &createOrderInput{
Source: input.Source,
CreatedBy: input.CreatedBy,
ManagerName: input.ManagerName,
ContractorType: input.ContractorType,
}); err != nil {
return nil, err
}
// query handler
if err := w.awaitForGetState(ctx); err != nil {
return nil, err
}
if w.state.Order.OrderStatus != entity.OrderStatusInWork {
// update handler
if err := w.awaitForAddItem(ctx); err != nil {
return nil, err
}
// update handler
if err := w.awaitForDeleteItem(ctx); err != nil {
return nil, err
}
// update handler
if err := w.awaitForApplyDiscount(ctx); err != nil {
return nil, err
}
}
selector := workflow.NewSelector(ctx)
// signal handler (run Child Workflow)
w.awaitForSyncWtisOms(ctx, selector)
// signal handler
w.awaitForSmartReserve(ctx, selector)
for workflow.GetInfo(ctx).GetCurrentHistoryLength() < 300 || selector.HasPending() {
fmt.Println(workflow.GetInfo(ctx).GetCurrentHistoryLength())
selector.Select(ctx)
}
return &ProcessOrderOutput{}, workflow.NewContinueAsNewError(ctx, workflow.GetInfo(ctx).WorkflowType.Name, input)
}
I have update handlers that process specific actions inferred from their names and should not function under certain statuses. Also, there are two signal waits, which can occur anytime during the workflow’s operation and in any order.
Given these conditions, I am planning to create a continue-as-new at certain times, but I need to consider a few things:
- Is workflow.GetInfo(ctx).GetCurrentHistoryLength() < 300 || selector.HasPending() a suitable check for this (selector.HasPending()), and does this guarantee that signals won’t be lost? Or will I need to use ReceiveAsync? If so, how should this ideally look considering I have multiple signals? I’ve seen on the forum both selector.HasPending() and ReceiveAsync variants and I don’t understand the difference.
- If signals don’t arrive, the update handler still increases the history, and this won’t be processed in my code. How can I solve this issue? Perhaps I should put all the code with update handlers and signals into a for loop?
func (w *B2B) ProcessOrder(ctx workflow.Context, input *ProcessOrderInput) (*ProcessOrderOutput, error) {
w.setUp(ctx, &state{Order: &entity.Order{}})
defer w.logger.Info("%v workflow with runID %v has been completed", workflow.GetInfo(ctx).WorkflowType, workflow.GetInfo(ctx).WorkflowExecution.RunID)
// Child Workflow
if err := w.createOrder(ctx, &createOrderInput{
Source: input.Source,
CreatedBy: input.CreatedBy,
ManagerName: input.ManagerName,
ContractorType: input.ContractorType,
}); err != nil {
return nil, err
}
// query handler
if err := w.awaitForGetState(ctx); err != nil {
return nil, err
}
selector := workflow.NewSelector(ctx)
for workflow.GetInfo(ctx).GetCurrentHistoryLength() < 300 || selector.HasPending() {
if w.state.Order.OrderStatus != entity.OrderStatusInWork {
// update handler
if err := w.awaitForAddItem(ctx); err != nil {
return nil, err
}
// update handler
if err := w.awaitForDeleteItem(ctx); err != nil {
return nil, err
}
// update handler
if err := w.awaitForApplyDiscount(ctx); err != nil {
return nil, err
}
}
// signal handler (run Child Workflow)
w.awaitForSyncWtisOms(ctx, selector)
// signal handler
w.awaitForSmartReserve(ctx, selector)
fmt.Println(workflow.GetInfo(ctx).GetCurrentHistoryLength())
selector.Select(ctx)
}
return &ProcessOrderOutput{}, workflow.NewContinueAsNewError(ctx, workflow.GetInfo(ctx).WorkflowType.Name, input)
}
- What’s the fundamental difference in operation between initially creating signal handlers and then performing a for loop with select (this is how my current implementation looks), versus placing all signal handlers and select within the for loop? I’ve seen both variants on the forum but don’t understand the difference.
- I was planning to use ProcessOrderOutput to display execution results. Potentially some code if there was an error, or if there wasn’t, what should I pass there considering that I’m calling continue-as-new? Where will this result be displayed - in the final execution or when the first workflow run is closed before the continue-as-new call?
- Are there any other potential issues that I might have overlooked in my code?
- Adding to the above, given this code structure, will I be able to launch and adequately operate 400 million open workflows? Is there anything specific I should consider or precautions I should take to handle such a high volume of workflows simultaneously?
Looking forward to your guidance. Thank you.