Correct way to wait for all activities to complete

I have a workflow that calls a LoaderActivity, which sends a bunch of signals, and for each signal received, a DescribeActivity is kicked off. While testing with a small set of inputs, I can see on the WebUI that all activities successfully reach completion, but my workflow never seems to reach “Complete” state. Adding my workflow for reference.

func SyncWorkflow(ctx workflow.Context, w WorkflowArgs) error {
	metrics := &WorkflowMetrics{}
	if err := setupQueryHandler(ctx, metrics); err != nil {
		return err
	}

	ctx = setupActivityOptions(ctx)
	info := workflow.GetInfo(ctx)
	future, err := startLoaderActivity(ctx, w, info)
	if err != nil {
		return err
	}

	err = handleProjectIDs(ctx, future, w, metrics)
	if err != nil {
		return err
	}
	return nil
}

// setupQueryHandler sets up the query handler for the workflow.
func setupQueryHandler(ctx workflow.Context, metrics *WorkflowMetrics) error {
	return workflow.SetQueryHandler(ctx, TotalActivitiesQuery, func() (int, error) {
		return metrics.DescribeStarted, nil
	})
}

// setupActivityOptions sets up the activity options for the workflow.
func setupActivityOptions(ctx workflow.Context) workflow.Context {
	ao := workflow.ActivityOptions{
		TaskQueue:           sync.TaskQueue,
		StartToCloseTimeout: time.Minute * 15,
		RetryPolicy: &temporal.RetryPolicy{
			InitialInterval:    time.Second,
			BackoffCoefficient: 2.0,
			MaximumInterval:    100 * time.Second,
			MaximumAttempts:    3,
		},
	}
	return workflow.WithActivityOptions(ctx, ao)
}

// startLoaderActivity starts the loader activity for the workflow.
func startLoaderActivity(ctx workflow.Context, w WorkflowArgs, info *workflow.Info) (workflow.Future, error) {
	return workflow.ExecuteActivity(ctx, LoaderActivity, w.Notification, info.WorkflowExecution.ID, info.WorkflowExecution.RunID), nil
}

// handleProjectIDs handles the project IDs for the workflow.
func handleProjectIDs(ctx workflow.Context, future workflow.Future, w WorkflowArgs, metrics *WorkflowMetrics) error {
	signalChan := workflow.GetSignalChannel(ctx, ProjectIDSignal)
	selector := workflow.NewSelector(ctx)
	activeFutures := 1 // Starts with one for the loader activity

	selector.AddFuture(future, func(f workflow.Future) {
		workflow.GetLogger(ctx).Info("Loader activity completed")
		_ = f.Get(ctx, nil)
		activeFutures--
	})

	selector.AddReceive(signalChan, func(c workflow.ReceiveChannel, more bool) {
		var projectID string
		for c.Receive(ctx, &projectID) {
			if projectID == "" {
				continue
			}
			workflow.GetLogger(ctx).Info("Received project ID", "projectID", projectID)
			metrics.DescribeStarted++
			fut := workflow.ExecuteActivity(ctx, DescribeSyncActivity, w.Notification, projectID)
			selector.AddFuture(fut, func(f workflow.Future) {
				workflow.GetLogger(ctx).Info("Describe activity completed", "projectID", projectID)
				_ = f.Get(ctx, nil)
				activeFutures--
			})
			activeFutures++
		}
	})

	// Wait for all activities to complete
	for activeFutures > 0 {
		workflow.GetLogger(ctx).Info("Waiting for activities to complete", "activeFutures", activeFutures)
		selector.Select(ctx)
	}

	workflow.GetLogger(ctx).Info("All activities completed")
	return nil
}

Am I waiting for completion in the right way?

This code looks problematic to me:

Channel.Receive is a blocking call. Selector.AddReceive callback guarantees that there is at least one message in the channel. So calling it once inside the callback never blocks. But you are calling Channel.Receive in a loop. So it eventually blocks which in turn blocks Selector.Select.

I also believe there is a logical error in the code. Selector doesn’t guarantee any ordering of callback calls. This is similar to the Go native select statement. So the following sequence of events can lead to the situation when no signals are processed:

  1. LoaderActivity is started
  2. Workflow worker is down
  3. LoaderActivity sends 10 signals and completes.
  4. Workflow worker is back
  5. The LoaderActivity Future callback branch is invoked.
  6. activeFutures is decremented to 0
  7. Workflow completes without processing a single received signal.

To fix this, drain the signal channel asynchronously (using Channel.ReceiveAsync) before completing the workflow.

1 Like

Thanks @maxim .

Replacing ReceiveAsync with Receive solved the blocking issue.

selector.AddReceive(signalChan, func(c workflow.ReceiveChannel, more bool) {
	var projectID string
	for c.ReceiveAsync(&projectID) {
		if projectID == "" {
			continue
		}
		metrics.DescribeStarted++
		fut := workflow.ExecuteActivity(ctx, DescribeSyncActivity, w.Notification, projectID)
		selector.AddFuture(fut, func(f workflow.Future) {
			workflow.GetLogger(ctx).Info("Describe activity completed", "projectID", projectID)
			_ = f.Get(ctx, nil)
			activeFutures--
		})
		activeFutures++
	}
})

Regarding your point on the issue with the workflow logic, I made the LoaderActivity return the totalSignals and added a second comparison like so:

for activeFutures > 0 || totalSignals != metrics.DescribeStarted {
		workflow.GetLogger(ctx).Info("Waiting for activities to complete", "activeFutures", activeFutures)
		selector.Select(ctx)
		if shouldContinueAsNew {
			return true, nil
		}
	}

I think this pattern eliminates the need for a signal drain (in case the worker goes down after initiating LoaderActivity), as the workflow now either stays active till TTL or completes.

Open to any suggestions/improvements :slightly_smiling_face:

Your changes look good. I don’t think you need the loop inside the AddReceive callback as this callback is called for each message sent to the channel.

1 Like