Workflow - Notification After First Activity

Hello,

I wonder what the best practice for the following situation is: I have a user registration workflow with two activities.

  1. It inserts the user into the database
  2. It sends an email to the user

The workflow is executed from a REST API endpoint. I would like the REST API to wait until the user has been inserted into the database or a timeout is reached before responding. Otherwise, the user would receive a success message even though only the workflow has been created and not the actual user.

I have the following ideas and wonder what the best practice is:

  1. After the user has been inserted into the database, create an abandoned child workflow. That way I can await the parent workflow’s result.
  2. Implement a separate activity that notifies the REST API.
  3. Send an update immediately after the workflow has been created and ensure that the update handler in the workflow only responds after the first activity has completed or a timeout, cancellation or failure occurs. Though, I am not completely sure what an implementation would look like:
    3.1. Can an update handler block the workflow execution? Is the update handler executed in a separate goroutine?
    3.2. Is the update guaranteed to be sent to the same worker?
    3.3. How do I have to handle workflow timeouts, cancellations and failures inside the update handler that is waiting for the database activity to complete?

I would go with 3.

3.1. Can an update handler block the workflow execution? Is the update handler executed in a separate goroutine?

The update handler is executed in a different goroutine. The workflow execution can wait on a Future resolved from an update handler. Another approach is to use Await to block on a variable set from an update handler.

3.2. Is the update guaranteed to be sent to the same worker?

This question doesn’t make sense, as workflow is not guaranteed to run at any specific worker. The update is guaranteed to be sent to a particular workflow execution identified by its id.

How do I have to handle workflow timeouts, cancellations and failures inside the update handler that is waiting for the database activity to complete?

Workflow timeouts cannot be handled by any workflow code. Don’t use them for business logic. Use timers instead. In the majority of cases it is better to call activities from the main workflow goroutine and communicate to updates using shared variables.

Thank you. I started implementing a solution based on your answers but encountered an error I can’t seem to find a solution for. Would you mind taking a look?

Example on GitHub: GitHub - christoph-pflueger/temporal-test

The call to UpdateWorkflow does not stop blocking even though the update handler in the workflow returned. It waits until the entire workflow completes multiple seconds later and then throws an error “workflow execution already completed”.

As you can see in the logs, even though “Update handler called” is logged, the starter app executing the update does not stop blocking and continues to wait until the workflow.Sleep in the workflow concludes.

Am I missing something fairly obvious?

Worker Logs

2024/08/16 23:52:07 INFO  No logger configured for temporal client. Created default one.
2024/08/16 23:52:07 INFO  Started Worker Namespace default TaskQueue register WorkerID 8833@PC@
2024/08/16 23:52:10 INFO  Starting to sleep Namespace default TaskQueue register WorkerID 8833@PC@ WorkflowType Register WorkflowID register RunID bc2a1c5d-73b3-4fe8-ad0f-61b9cc64c638 Attempt 1
2024/08/16 23:52:10 DEBUG NewTimer Namespace default TaskQueue register WorkerID 8833@PC@ WorkflowType Register WorkflowID register RunID bc2a1c5d-73b3-4fe8-ad0f-61b9cc64c638 Attempt 1 TimerID 5 Duration 5s
2024/08/16 23:52:10 INFO  Update handler called Namespace default TaskQueue register WorkerID 8833@PC@ WorkflowType Register WorkflowID register RunID bc2a1c5d-73b3-4fe8-ad0f-61b9cc64c638 Attempt 1
2024/08/16 23:52:15 INFO  Finished sleeping Namespace default TaskQueue register WorkerID 8833@PC@ WorkflowType Register WorkflowID register RunID bc2a1c5d-73b3-4fe8-ad0f-61b9cc64c638 Attempt 1

Starter Logs

2024/08/16 23:52:10 INFO  No logger configured for temporal client. Created default one.
2024/08/16 23:52:10 INFO Workflow started
2024/08/16 23:52:15 ERROR failed to issue update error="workflow execution already completed"
exit status 1

Are you using the latest versions of the SDK and the service?

I installed everything for the first time today. I’m starting the server with temporal server start-dev --ui-port 8888 --dynamic-config-value frontend.enableUpdateWorkflowExecution=true --dynamic-config-value frontend.enableUpdateWorkflowExecutionAsyncAccepted=true as specified in the README.md

CLI

temporal version 0.11.0 (server 1.22.4) (ui 2.21.3)

go.mod

require go.temporal.io/sdk v1.28.1

go.sum

go.temporal.io/api v1.36.0 h1:WdntOw9m38lFvMdMXuOO+3BQ0R8HpVLgtk9+f+FwiDk=
go.temporal.io/api v1.36.0/go.mod h1:0nWIrFRVPlcrkopXqxir/UWOtz/NZCo+EE9IX4UwVxw=
go.temporal.io/sdk v1.28.1 h1:PsexsNDWXyWdJp4KWTOD+DfSZD1z0k5U/dIJF05akT4=
go.temporal.io/sdk v1.28.1/go.mod h1:zHcmZNXPaKXQJ6Hn98Ebcii7VlHL1mI4RJW8R6GQa1k=

I’m not able to reproduce this:

2024/08/16 15:21:18 INFO  No logger configured for temporal client. Created default one.
2024/08/16 15:21:18 INFO Workflow started
2024/08/16 15:21:18 INFO Issued update
2024/08/16 15:21:18 INFO Received update result
2024/08/16 15:21:23 INFO Workflow completed

temporal version 1.0.0 (Server 1.24.2, UI 2.28.0)
go.temporal.io/sdk v1.28.1 // indirect

1 Like

I found the culprit. The links to download of the CLI in this quick start are outdated: Set up a local development environment for Temporal and Go | Learn Temporal

Thank you for your rapid assistance. What do you think of the following implementation based on your answers?

func SignUp(ctx context.Context, c client.Client, signUpInput signup.SignUpInput) error {
	workflowRun, err := c.ExecuteWorkflow(
		ctx,
		client.StartWorkflowOptions{TaskQueue: signup.TaskQueueName},
		signup.SignUp,
		signUpInput,
	)

	if err != nil {
		return fmt.Errorf("failed to execute workflow: %w", err)
	}

	if _, err := c.UpdateWorkflow(ctx, client.UpdateWorkflowOptions{
		WorkflowID:   workflowRun.GetID(),
		RunID:        workflowRun.GetRunID(),
		UpdateName:   signup.UpdateName,
		WaitForStage: client.WorkflowUpdateStageCompleted,
	}); err != nil {
		response, describeWorkflowExecutionErr := c.DescribeWorkflowExecution(
			ctx,
			workflowRun.GetID(),
			workflowRun.GetRunID(),
		)

		if describeWorkflowExecutionErr != nil ||
			response.WorkflowExecutionInfo.Status == enums.WORKFLOW_EXECUTION_STATUS_RUNNING {
			return ErrSignUpPending
		}

		return nil
	}

	return nil
}
func SignUp(
	ctx workflow.Context,
	signUpInput SignUpInput,
) (*SignUpOutput, error) {
	channel := workflow.NewChannel(ctx)
	defer workflow.Await(ctx, func() bool {
		return workflow.AllHandlersFinished(ctx)
	})

	defer channel.Close()
	if err := workflow.SetUpdateHandler(ctx, UpdateName, func(ctx workflow.Context) error {
		selector := workflow.NewSelector(ctx)
		selector.AddReceive(channel, func(c workflow.ReceiveChannel, more bool) {
			c.Receive(ctx, nil)
		})

		selector.Select(ctx)
		return nil
	}); err != nil {
		return nil, fmt.Errorf("failed to set update handler: %w", err)
	}

	if err := workflow.ExecuteActivity(workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
		StartToCloseTimeout: 30 * time.Second,
	}), Insert, InsertInput(signUpInput)).Get(ctx, nil); err != nil {
		return nil, fmt.Errorf("failed to insert: %w", err)
	}

	channel.Close()
	if err := workflow.ExecuteActivity(workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
		StartToCloseTimeout: time.Minute,
	}), SendEmail, SendEmailInput(signUpInput)).Get(ctx, nil); err != nil {
		return nil, fmt.Errorf("failed to send email: %w", err)
	}

	return &SignUpOutput{}, ctx.Err()
}

I don’t understand why you call DescribeWorkflowExecution in case of the UpdateFailure.

You don’t need to use Selector to listen on a single channel. Channel.Receive blocks until a message is available or the channel is closed.

True, the Selector makes little sense. Thank you.

I am using DescribeWorkflowExecution because I noticed that UpdateWorkflow will throw an error when the Workflow already completed. Hence, when handling the error, I have no idea whether the Workflow succeeded and the update was merely too slow or the update failed (e.g. failed to be sent). Is there a better way to handle this case?

Look at the error. It will contain a “NOT FOUND” failure if workflow is already completed.

1 Like

I wasn’t aware of the serviceerror package in the api module. I had previously only searched in the sdk module itself. Thank you.

func SignUp(ctx context.Context, c client.Client, signUpInput signup.SignUpInput) error {
	workflowRun, err := c.ExecuteWorkflow(
		ctx,
		client.StartWorkflowOptions{TaskQueue: signup.TaskQueueName},
		signup.SignUp,
		signUpInput,
	)

	if err != nil {
		return fmt.Errorf("failed to execute workflow: %w", err)
	}

	if _, err := c.UpdateWorkflow(ctx, client.UpdateWorkflowOptions{
		WorkflowID:   workflowRun.GetID(),
		RunID:        workflowRun.GetRunID(),
		UpdateName:   signup.UpdateName,
		WaitForStage: client.WorkflowUpdateStageCompleted,
	}); err != nil {
		_, ok := err.(*serviceerror.NotFound)
		if ok {
			return nil
		}

		return ErrSignUpPending
	}

	return nil
}
func SignUp(
	ctx workflow.Context,
	signUpInput SignUpInput,
) (*SignUpOutput, error) {
	channel := workflow.NewChannel(ctx)
	defer workflow.Await(ctx, func() bool {
		return workflow.AllHandlersFinished(ctx)
	})

	defer channel.Close()
	if err := workflow.SetUpdateHandler(ctx, UpdateName, func(ctx workflow.Context) error {
		channel.Receive(ctx, nil)
		return nil
	}); err != nil {
		return nil, fmt.Errorf("failed to set update handler: %w", err)
	}

	if err := workflow.ExecuteActivity(workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
		StartToCloseTimeout: 30 * time.Second,
	}), Insert, InsertInput(signUpInput)).Get(ctx, nil); err != nil {
		return nil, fmt.Errorf("failed to insert: %w", err)
	}

	channel.Close()
	if err := workflow.ExecuteActivity(workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
		StartToCloseTimeout: time.Minute,
	}), SendEmail, SendEmailInput(signUpInput)).Get(ctx, nil); err != nil {
		return nil, fmt.Errorf("failed to send email: %w", err)
	}

	return &SignUpOutput{}, ctx.Err()
}
1 Like