How to cascade workflow cancellation to related workflows in golang

I’ve been reading @maxim 's articles 1 & 2 on how to implement a cleanup() method when a cancellation occurs but I’m stuck. The documentation (too many links for a new user) seems to only talk about passing that cancellation to the activity which is different from what I want.

The pseudo code what I’m trying to do is something like this:

func MyWorkflow(ctx workflow.Context, request MyWorkflowRequest) error {
	// Register signal channels
	// ...

	// Related workflows
	relatedWorkflowIds := []string{
		//...
	}

	// Handle cancellation from this point forward, anywhere in the workflow
	go func(ctx workflow.Context) {
		for {
			select {
			case <-ctx.Done():
				// Cascade cancellation to related workflows
				fmt.Println("Context cancelled:", ctx.Err())
				newCtx, _ := workflow.NewDisconnectedContext(ctx)
				client, err := clients.NewClient() 
				for _, workflowId := range relatedWorkflowIds {
					err = client.CancelWorkflow(newCtx.Background(), workflowId, "")
					if err != nil {
						log.Fatalln("Unable to cancel Workflow Execution", err)
					}
				}
				return
			}
		}
	}(ctx)

	// Do some activities

	// sleep for set period

	// Do some more activities
	return nil
}

This doesn’t seem to work because of something @maxim said about the context is not the same as the go-lang’s standard context I think.

You cannot use the client in the workflow code as it performs direct IO. All calls through the client must be done from activities.

Are you trying to cancel a child workflow or some unrelated workflow?

it’s a different workflow (not a child). Although, I’m considering switching to child workflows if this is not achievable. I would be OK with calling an activity that triggers the cascade. My issue with the documentation is that it talks about canceling an activity that is already running.

For unrelated workflow use RequestCancelExternalWorkflow.

Thank you. But my main issue was how to capture the cancellation event. My pseudo code doesn’t work. Can you show a working example?

The pseudo code idea is fine. But workflow context.Done is a Channel. Use Selector instead of native select to wait on multiple channels.

sorry, I’m a bit of a golang newbie. I’m reading the docs on selectors but I can’t see how this can be used to capture a cancellation anywhere in the workflow to then trigger the cascade activity.

I couldn’t figure out how to do the Selector way but was able to get it working based on the example in the docs by moving my timer (expire) inside the activity.

func MyWorkflow(ctx workflow.Context, request MyWorkflowRequest) error {
	// Register signal channels
	// ...

	// Related workflows
	relatedWorkflowIds := []string{
		//...
	}

	// Handle cancellation from this point forward, anywhere in the workflow
	completionHandlerOptions := workflow.ActivityOptions{
		StartToCloseTimeout: expireTime.Sub(currentTime),
		HeartbeatTimeout:    5 * time.Second,
		WaitForCancellation: true,
	}
	defer func() {
		// This is executed at the very end of the workflow
		logger.Info("Beginning cleanup")
		cancelled := false
		newCtx := ctx

		if errors.Is(ctx.Err(), workflow.ErrCanceled) {
			cancelled = true
			// get a new disconnected context
			newCtx, _ = workflow.NewDisconnectedContext(ctx)
			logger.Debug("Created a new disconnected context")
		}

		// Send signal
		err := workflow.SignalExternalWorkflow(newCtx, SomeWorkflowId, "", SomeWorkflowSignal, data).Get(ctx, nil)
		if err != nil {
			logger.Error("Failed to trigger signal", "error", err)
		}

		// if the workflow was not actually cancelled, we don't need to do anything else
		if !cancelled {
			logger.Debug("Workflow was not cancelled")
			return
		}

		if len(relatedWorkflowIds) == 0 {
			logger.Debug("No related workflows to cancel")
			return
		}

		// Cascade the cancellation to any related workflows
		logger.Debug("Cascading cancellation to related workflows", "relatedWorkflowIds", relatedWorkflowIds)
		for _, relatedWorkflowId := range relatedWorkflowIds {
			err := workflow.RequestCancelExternalWorkflow(newCtx, relatedWorkflowId, "").Get(ctx, nil)
			if err != nil {
				logger.Error("Failed to cancel related workflow", "relatedWorkflowId", relatedWorkflowId, "error", err)
			}
			logger.Info("Related workflow cancelled", "relatedWorkflowId", relatedWorkflowId)
		}
	}()

	// Completion handler activity
	ctx = workflow.WithActivityOptions(ctx, completionHandlerOptions)
	var result string
	err = workflow.ExecuteActivity(ctx, CompletionHanlder, currentTime, expireTime).Get(ctx, &result)
	if err != nil {
		logger.Error("MyWorkflow Completion Activity failed", "Error", err)
		return err
	}
	logger.Info("MyWorkflow Completion Activity completed", "result", result)
	return nil
}

Then I created a new activity which just does this:

func CompletionHanlder(ctx context.Context, startTime time.Time, expireTime time.Time) (string, error) {
	for {
		select {
		case <-time.After(1 * time.Second):
			activity.RecordHeartbeat(ctx, "")
		case <-time.After(expireTime.Sub(startTime)):
			log.Println("This Activity is completed!")
			return "I expired", nil
		case <-ctx.Done():
			log.Println("This Activity is canceled!")
			return "I was canceled", nil
		}
	}
}

@maxim after some time with this, we’ve identified that this requires an active worker to continually process the activity until either the expiration time or the cancellation. I would guess that using a selector in the workflow would also do this. Is there any other way to put a workflow to sleep (to free up the worker) and then “catch” a cancel to perform cleanup in Go-lang?

Okay. I figured out and I feel pretty dumb. I didn’t realize the defer func() in the example was associated to the end of the workflow. Based on the syntax, I thought it was a deferred till the end of the activity. So the solution is much much simpler than I had originally plan:

func MyWorkflow(ctx workflow.Context, request MyWorkflowRequest) error {
	// Register signal channels
	// ...

	// Related workflows
	relatedWorkflowIds := []string{
		//...
	}

	// Handle cancellation at the end of the workflow
	defer func() {
		// This is executed at the very end of the workflow
		logger.Info("Beginning cleanup")
		cancelled := false
		newCtx := ctx

		if errors.Is(ctx.Err(), workflow.ErrCanceled) {
			cancelled = true
			// get a new disconnected context
			newCtx, _ = workflow.NewDisconnectedContext(ctx)
			logger.Debug("Created a new disconnected context")
		}

		// Send signal
		err := workflow.SignalExternalWorkflow(newCtx, SomeWorkflowId, "", SomeWorkflowSignal, data).Get(ctx, nil)
		if err != nil {
			logger.Error("Failed to trigger signal", "error", err)
		}

		// if the workflow was not actually cancelled, we don't need to do anything else
		if !cancelled {
			logger.Debug("Workflow was not cancelled")
			return
		}

		if len(relatedWorkflowIds) == 0 {
			logger.Debug("No related workflows to cancel")
			return
		}

		// Cascade the cancellation to any related workflows
		logger.Debug("Cascading cancellation to related workflows", "relatedWorkflowIds", relatedWorkflowIds)
		for _, relatedWorkflowId := range relatedWorkflowIds {
			err := workflow.RequestCancelExternalWorkflow(newCtx, relatedWorkflowId, "").Get(ctx, nil)
			if err != nil {
				logger.Error("Failed to cancel related workflow", "relatedWorkflowId", relatedWorkflowId, "error", err)
			}
			logger.Info("Related workflow cancelled", "relatedWorkflowId", relatedWorkflowId)
		}
	}()

	// Go to sleep until it's time to close the workflow
	workflow.Sleep(ctx, crawler.ArchiveTime.Sub(currentTime))

	return nil
}
1 Like