Signalling Workflows

HI all, I’m really new to temporal and I have an issue with signalling.
The application that I’m working on accepts a http request from a http client and triggers workflow with talks to few downstream systems through activities: A, B & C

err = workflow.ExecuteActivity(options.WithTaskCompletionRetryPolicy(ctx), activityA, ActivityAParams).Get(ctx, &activityAResponse)
	for err != nil {
		var signalValue string
		signal.WaitAndReceiveSignal(ctx, signal.ResumeSignalChannel, &signalValue)
		ActivityAParams.value = signalValue
		err = workflow.ExecuteActivity(options.WithTaskCompletionRetryPolicy(ctx), activityA, ActivityAParams).Get(ctx, &activityAResponse)
	}

When the endpoint is hit, I’m calling the below

return c.SignalWithStartWorkflow(ctx, workflowId, signal.ResumeSignalChannel, "new-signal-value", workflowOptions, myWorkflowABC, workflowParams)

If activity A fails, the workflow hangs in running state as expected and the http client times out (which is ok), but when I retry, I’m expecting a signal will be sent to the previous workflow run and given that the downstream system A is now working, activity A should be successful and workflow should continue.

But, I don’t see the signal being received by the running workflow. I have tried adding debug logs to see. Instead it returns an error context deadline exceeded almost instantaneously.

Please advise. Thank you.

I may be misunderstanding, but if activity A is failing, it is likely retrying and never getting to your waiting signal. Otherwise, can you show what WithTaskCompletionRetryPolicy and WaitAndReceiveSignal are doing?

Hi Chad,

WithTaskCompletionRetryPolicy will set to not retry by setting the max attempts to 1.

var NoRetryPolicy = temporal.RetryPolicy{
	InitialInterval:        time.Second * 1,
	BackoffCoefficient:     2.0,
	MaximumInterval:        time.Second * 60,
	MaximumAttempts:        1,
	NonRetryableErrorTypes: []string{},
}

WaitAndReceiveSignal is just a wrapper function

func WaitAndReceiveSignal(ctx workflow.Context, channel string, valuePtr interface{}) {
	s := workflow.NewSelector(ctx)
	s.AddReceive(workflow.GetSignalChannel(ctx, channel), func(c workflow.ReceiveChannel, more bool) {
		c.Receive(ctx, valuePtr)
	})
	s.Select(ctx)
}

I’m seeing the execution enters the for loop and just waits there. I added a debug log and it only gets printed once.

Hrmm, nothing stands out. Assuming you have the same workflow ID it should properly signal. Is it possible to provide a full standalone test case? I too will go write a test real quick to confirm.

I have this logic in place

    var value interface{}
	workflowRun := c.GetWorkflow(ctx, workflowId, "")
	err := workflowRun.Get(ctx, &value)
	workflowExists := true
	switch err.(type) {
	case *serviceerror.NotFound: // this means that there is no existing workflow with this workflow id
		workflowExists = false
	}

	if workflowExists {
		logging.Info(ctx).Msg("SENDING SIGNAL >>>>>>> " + workflowId)
		return c.SignalWithStartWorkflow(ctx, workflowId, signal.ResumeSignalChannel, "new-signal-value", workflowOptions, myWorkflowABC, workflowParams)

	}

When I’m retrying, workflowExists becomes true and I can see the debug log as well.

This is what I want to accomplish

I threw this together real quick. It works as expected where it does a signal with start 3 times, the first 2 failing:

package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/google/uuid"
	"go.temporal.io/sdk/client"
	"go.temporal.io/sdk/temporal"
	"go.temporal.io/sdk/worker"
	"go.temporal.io/sdk/workflow"
)

func main() {
	if err := run(); err != nil {
		log.Fatal(err)
	}
}

func run() error {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// Create client
	c, err := client.NewClient(client.Options{})
	if err != nil {
		return fmt.Errorf("failed starting client: %w", err)
	}
	defer c.Close()

	// Start worker
	taskQueue := "test-task-queue-" + uuid.NewString()
	w := worker.New(c, taskQueue, worker.Options{})
	w.RegisterWorkflow(Workflow)
	w.RegisterActivity(MaybeFail)
	if err := w.Start(); err != nil {
		return fmt.Errorf("failed starting workflow: %w", err)
	}
	defer w.Stop()

	// Signal with start telling it to fail the first activity run
	workflowID := "my-workflow-" + uuid.NewString()
	startOpts := client.StartWorkflowOptions{TaskQueue: taskQueue}
	_, err = c.SignalWithStartWorkflow(ctx, workflowID, "should-fail", true, startOpts, Workflow)
	if err != nil {
		return fmt.Errorf("failed starting workflow: %w", err)
	}

	// Signal with start again, telling it to fail again
	_, err = c.SignalWithStartWorkflow(ctx, workflowID, "should-fail", true, startOpts, Workflow)
	if err != nil {
		return fmt.Errorf("failed resuming workflow: %w", err)
	}

	// Signal with start again, this time asking to succeed
	run, err := c.SignalWithStartWorkflow(ctx, workflowID, "should-fail", false, startOpts, Workflow)
	if err != nil {
		return fmt.Errorf("failed resuming workflow: %w", err)
	}

	// Confirm success
	if err := run.Get(ctx, nil); err != nil {
		return fmt.Errorf("workflow failed: %w", err)
	}
	return nil
}

func Workflow(ctx workflow.Context) error {
	// No retries
	ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
		ScheduleToCloseTimeout: 1 * time.Minute,
		RetryPolicy:            &temporal.RetryPolicy{MaximumAttempts: 1},
	})

	// Continually run and wait for signal to tell us whether to error
	shouldFailSignal := workflow.GetSignalChannel(ctx, "should-fail")
	for attempt := 1; ; attempt++ {
		// Wait for signal
		workflow.GetLogger(ctx).Info("Waiting for signal before executing activity")
		var shouldFail bool
		if !shouldFailSignal.Receive(ctx, &shouldFail) {
			return fmt.Errorf("channel closed")
		}

		// Execute activity and return on success
		if err := workflow.ExecuteActivity(ctx, MaybeFail, shouldFail).Get(ctx, nil); err != nil {
			workflow.GetLogger(ctx).Info("Activity failed, waiting before retry", "Attempt", attempt, "Error", err)
		} else {
			workflow.GetLogger(ctx).Info("Activity succeeded, exiting workflow", "Attempt", attempt)
			return nil
		}
	}
}

func MaybeFail(ctx context.Context, shouldFail bool) error {
	if shouldFail {
		return fmt.Errorf("intentional error")
	}
	return nil
}

I think I’m doing almost the similar thing - except my signal receive.
Thanks Chad, I’ll go through your code again in the morning and try to identify what I have done wrong.

1 Like

You don’t need to use a Selector to listen on a single channel. So this code

Can be replaced with

more := c.Receive(ctx, valuePtr)

I tried using tctl to send the signal and it’s being received fine. So the problem should be in sending the signal.

I’m directly calling SignalWithStartWorkflow if the Workflow is found

return c.SignalWithStartWorkflow(ctx, workflowId, signal.ResumeSignalChannel, "new-signal-value", workflowOptions, myWorkflowABC, workflowParams)

Logged the error returned

{"level":"error","error":"ServerError(Kind=Internal Server Error, Message=Unexpected panic, Cause=runtime error: invalid memory address or nil pointer dereference)","datetime":"2021-11-12T12:18:29.782221+11:00","appId":"stravinsky:frontdoor","appVersion":"1.0","traceid":"13eb184b-b64b-4809-9e28-d788a0e9a544","message":"Error"}

This logic doesn’t make sense to me:

var value interface{}
	workflowRun := c.GetWorkflow(ctx, workflowId, "")
	err := workflowRun.Get(ctx, &value)
	workflowExists := true
	switch err.(type) {
	case *serviceerror.NotFound: // this means that there is no existing workflow with this workflow id
		workflowExists = false
	}

	if workflowExists {
		logging.Info(ctx).Msg("SENDING SIGNAL >>>>>>> " + workflowId)
		return c.SignalWithStartWorkflow(ctx, workflowId, signal.ResumeSignalChannel, "new-signal-value", workflowOptions, myWorkflowABC, workflowParams)

	}

It waits for a workflow to complete and then calls SignalWithStart to start it again. Is it the intention?