What's the right way to pause/unpause a workflow with interceptor and signal?

I am trying to find a way to pause/unpause a workflow running anytime a man want, I was using interceptor and signal to implement this. but i met some problems, seeking for help

I did 3 actions

  1. define a struct to receive signal, it contains “pause/unpause/skipActivityErr”
type FlowControl struct {
	SkipErrActivityForOneTime bool `json:"skipErrActivityForOneTime" default:"false"`
	PauseActivity             bool `json:"pauseActivity" default:"false"`
}
  1. i want to receive the signal at anytime when a workflow run, so i use “workflow.Go” to start function which contains a unbreak loop to fetch signal to avoid block , I run it in “func (w *workflowInboundInterceptor) ExecuteWorkflow”
func (w *workflowInboundInterceptor) ExecuteWorkflow(
	ctx workflow.Context,
	in *interceptor.ExecuteWorkflowInput,
) (interface{}, error) {
	flowControl := workflows.FlowControl{
		SkipErrActivityForOneTime: false,
		PauseActivity:             false,
	}
	ctx = workflow.WithValue(ctx, "flowControl", &flowControl)
        // to avoid block
	workflow.Go(ctx, flowSignalHandler)

	return w.Next.ExecuteWorkflow(ctx, in)
}
func flowSignalHandler(ctx workflow.Context) {
	flowControl := ctx.Value("flowControl").(*workflows.FlowControl)
	signalChan := workflow.GetSignalChannel(ctx, "flowControl")
	selector := workflow.NewSelector(ctx)
	selector.AddReceive(signalChan, func(receiveChannel workflow.ReceiveChannel, more bool) {
		receiveChannel.ReceiveAsync(&flowControl)
	})
	for {
		selector.Select(ctx)
	}
}
  1. I add pause/unpause check logic to “func (a *activityInboundInterceptor) ExecuteActivity”
func (a *activityInboundInterceptor) ExecuteActivity(
	ctx context.Context,
	in *interceptor.ExecuteActivityInput,
) (interface{}, error) {
	// check is pause
	flowControl := ctx.Value("flowControl").(workflows.FlowControl)
	if flowControl.PauseActivity {
		for flowControl.PauseActivity {
			time.Sleep(time.Second * 5)
		}
	}

	result, err := a.Next.ExecuteActivity(ctx, in)

	// check is skip err
	if err != nil && flowControl.SkipErrActivityForOneTime {
		flowControl.SkipErrActivityForOneTime = false
		return result, nil
	}

	return result, err
}

my questions are :

  1. I want to receive the signal at the flow running for many times and avoid block , should i use “workflow.go()” and unbreak loop? is this an expect way?

  2. I want the signal to control a specific workflow running, so I want to save the signal at workflow.context. but I can not save it in “workflow.go()” neither read it from “func (a *activityInboundInterceptor) ExecuteActivity” cause they has different context. how can i do it ?

  1. The signal handler is OK. There is no need to use a selector when you are receiving from a single channel. Receive in a loop is enough.

Edit:
This is an incorrect answer as I missed that the activity is paused from an activity interceptor that runs in a separate process. See @Chad_Retz answer below with details:

  1. The flowControl value is initialized before workflow.Go and ExectueWorkflow are called. So it should be shared within workflow without a problem as all contexts are forked from the root one.

for 1. do you mean like this ?

func flowSignalHandler(ctx workflow.Context) {
	flowControl := ctx.Value("flowControl").(*workflows.FlowControl)
	signalChan := workflow.GetSignalChannel(ctx, "flowControl")
	for {
		signalChan.Receive(ctx, &flowControl)
	}
}

for 2. still a problem

  1. I initialized “flowControl” in ExecuteActivity and save it to workflow.context by “workflow.WithValue”, it is truly saved, it is fine. but I receive the flowControl from signal within the func “workflow.Go(ctx, flowSignalHandler)”, in the function “flowSignalHandler” any ctx.WithValue() update would be useless, because it update a copy of ctx. the workflow ctx would not be update

  2. I can’t read value from “workflow.ctx” in “func (a *activityInboundInterceptor) ExecuteActivity”, beacase ExecuteActivity ctx is a type of “context.Context” not “workflow.Context” , it is different

A workflow (interceptor) context is not shared with an activity (interceptor) context by default. Those can be executed in completely different processes/workers. If you want to pass context values between the two, you can look into creating an implementation of of ContextPropagator and setting it as a client option.

And that information is serialized. So you can’t set a PauseActivity field on FlowControl in a workflow and have it automatically appear in an activity (not to mention in Go you aren’t using a pointer in your activity context anyways when retrieving which means no field update would happen). So for flowControl.PauseActivity { would be an infinite loop.

Assuming you are trying to prevent activities from starting, maybe you want to do this on execute activity in a workflow outbound interceptor instead. This is called with the same context that can have a pointer to a mutable value if you must. But it won’t affect running activities. You’d have to do the same for outbound child workflows, maybe timers depending on the use case, etc.

thank u ~ LoL

Now I change to workflow outbound interceptor, receive signal in workflow.Go() and save the params to workflow.Context with pointer, it works as i expected, thank u , and timer make the flow more concise ~

but there is one more question, I found “blocked on chan…” err at the temporal ui “stack trace” . it is block cause i use “selector.Select(ctx)” in a coroutine started by “workflow.Go”. does it matter? how can i avoid this without changing my demands?

We need to rename “Stack Trace” to thread dump. This window shows all the goroutines of the workflow. It doesn’t indicate any problem.

1 Like