I am trying to find a way to pause/unpause a workflow running anytime a man want, I was using interceptor and signal to implement this. but i met some problems, seeking for help
I did 3 actions
define a struct to receive signal, it contains “pause/unpause/skipActivityErr”
i want to receive the signal at anytime when a workflow run, so i use “workflow.Go” to start function which contains a unbreak loop to fetch signal to avoid block , I run it in “func (w *workflowInboundInterceptor) ExecuteWorkflow”
I add pause/unpause check logic to “func (a *activityInboundInterceptor) ExecuteActivity”
func (a *activityInboundInterceptor) ExecuteActivity(
ctx context.Context,
in *interceptor.ExecuteActivityInput,
) (interface{}, error) {
// check is pause
flowControl := ctx.Value("flowControl").(workflows.FlowControl)
if flowControl.PauseActivity {
for flowControl.PauseActivity {
time.Sleep(time.Second * 5)
}
}
result, err := a.Next.ExecuteActivity(ctx, in)
// check is skip err
if err != nil && flowControl.SkipErrActivityForOneTime {
flowControl.SkipErrActivityForOneTime = false
return result, nil
}
return result, err
}
my questions are :
I want to receive the signal at the flow running for many times and avoid block , should i use “workflow.go()” and unbreak loop? is this an expect way?
I want the signal to control a specific workflow running, so I want to save the signal at workflow.context. but I can not save it in “workflow.go()” neither read it from “func (a *activityInboundInterceptor) ExecuteActivity” cause they has different context. how can i do it ?
The signal handler is OK. There is no need to use a selector when you are receiving from a single channel. Receive in a loop is enough.
Edit:
This is an incorrect answer as I missed that the activity is paused from an activity interceptor that runs in a separate process. See @Chad_Retz answer below with details:
The flowControl value is initialized before workflow.Go and ExectueWorkflow are called. So it should be shared within workflow without a problem as all contexts are forked from the root one.
I initialized “flowControl” in ExecuteActivity and save it to workflow.context by “workflow.WithValue”, it is truly saved, it is fine. but I receive the flowControl from signal within the func “workflow.Go(ctx, flowSignalHandler)”, in the function “flowSignalHandler” any ctx.WithValue() update would be useless, because it update a copy of ctx. the workflow ctx would not be update
I can’t read value from “workflow.ctx” in “func (a *activityInboundInterceptor) ExecuteActivity”, beacase ExecuteActivity ctx is a type of “context.Context” not “workflow.Context” , it is different
A workflow (interceptor) context is not shared with an activity (interceptor) context by default. Those can be executed in completely different processes/workers. If you want to pass context values between the two, you can look into creating an implementation of of ContextPropagator and setting it as a client option.
And that information is serialized. So you can’t set a PauseActivity field on FlowControl in a workflow and have it automatically appear in an activity (not to mention in Go you aren’t using a pointer in your activity context anyways when retrieving which means no field update would happen). So for flowControl.PauseActivity { would be an infinite loop.
Assuming you are trying to prevent activities from starting, maybe you want to do this on execute activity in a workflow outbound interceptor instead. This is called with the same context that can have a pointer to a mutable value if you must. But it won’t affect running activities. You’d have to do the same for outbound child workflows, maybe timers depending on the use case, etc.
Now I change to workflow outbound interceptor, receive signal in workflow.Go() and save the params to workflow.Context with pointer, it works as i expected, thank u , and timer make the flow more concise ~
but there is one more question, I found “blocked on chan…” err at the temporal ui “stack trace” . it is block cause i use “selector.Select(ctx)” in a coroutine started by “workflow.Go”. does it matter? how can i avoid this without changing my demands?