I have a workflow that calls a LoaderActivity, which sends a bunch of signals, and for each signal received, a DescribeActivity is kicked off. While testing with a small set of inputs, I can see on the WebUI that all activities successfully reach completion, but my workflow never seems to reach “Complete” state. Adding my workflow for reference.
func SyncWorkflow(ctx workflow.Context, w WorkflowArgs) error {
metrics := &WorkflowMetrics{}
if err := setupQueryHandler(ctx, metrics); err != nil {
return err
}
ctx = setupActivityOptions(ctx)
info := workflow.GetInfo(ctx)
future, err := startLoaderActivity(ctx, w, info)
if err != nil {
return err
}
err = handleProjectIDs(ctx, future, w, metrics)
if err != nil {
return err
}
return nil
}
// setupQueryHandler sets up the query handler for the workflow.
func setupQueryHandler(ctx workflow.Context, metrics *WorkflowMetrics) error {
return workflow.SetQueryHandler(ctx, TotalActivitiesQuery, func() (int, error) {
return metrics.DescribeStarted, nil
})
}
// setupActivityOptions sets up the activity options for the workflow.
func setupActivityOptions(ctx workflow.Context) workflow.Context {
ao := workflow.ActivityOptions{
TaskQueue: sync.TaskQueue,
StartToCloseTimeout: time.Minute * 15,
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: 100 * time.Second,
MaximumAttempts: 3,
},
}
return workflow.WithActivityOptions(ctx, ao)
}
// startLoaderActivity starts the loader activity for the workflow.
func startLoaderActivity(ctx workflow.Context, w WorkflowArgs, info *workflow.Info) (workflow.Future, error) {
return workflow.ExecuteActivity(ctx, LoaderActivity, w.Notification, info.WorkflowExecution.ID, info.WorkflowExecution.RunID), nil
}
// handleProjectIDs handles the project IDs for the workflow.
func handleProjectIDs(ctx workflow.Context, future workflow.Future, w WorkflowArgs, metrics *WorkflowMetrics) error {
signalChan := workflow.GetSignalChannel(ctx, ProjectIDSignal)
selector := workflow.NewSelector(ctx)
activeFutures := 1 // Starts with one for the loader activity
selector.AddFuture(future, func(f workflow.Future) {
workflow.GetLogger(ctx).Info("Loader activity completed")
_ = f.Get(ctx, nil)
activeFutures--
})
selector.AddReceive(signalChan, func(c workflow.ReceiveChannel, more bool) {
var projectID string
for c.Receive(ctx, &projectID) {
if projectID == "" {
continue
}
workflow.GetLogger(ctx).Info("Received project ID", "projectID", projectID)
metrics.DescribeStarted++
fut := workflow.ExecuteActivity(ctx, DescribeSyncActivity, w.Notification, projectID)
selector.AddFuture(fut, func(f workflow.Future) {
workflow.GetLogger(ctx).Info("Describe activity completed", "projectID", projectID)
_ = f.Get(ctx, nil)
activeFutures--
})
activeFutures++
}
})
// Wait for all activities to complete
for activeFutures > 0 {
workflow.GetLogger(ctx).Info("Waiting for activities to complete", "activeFutures", activeFutures)
selector.Select(ctx)
}
workflow.GetLogger(ctx).Info("All activities completed")
return nil
}
Am I waiting for completion in the right way?