Hi, I’ve gone through some of the previous posts around pause/resume, and am trying to figure out how to implement a general pause/resume pattern using the Go SDK. To avoid leaking the logic into my workflow, I’m trying to implement this through interceptors as mentioned in previous posts. Here’s what my first pass at this looks like (see gist):
package pause
import (
"go.temporal.io/sdk/interceptor"
"go.temporal.io/sdk/workflow"
)
const (
PauseEvent = "pause-event"
ResumeEvent = "resume-event"
)
type PauseState struct {
// TODO: does this need to be protected? Will there be concurrent access?
paused bool
}
type PauseInterceptor struct {
interceptor.WorkerInterceptorBase
state *PauseState
}
func (pi *PauseInterceptor) InterceptWorkflow(ctx workflow.Context, next interceptor.WorkflowInboundInterceptor) interceptor.WorkflowInboundInterceptor {
return &PauseInboundInterceptor{WorkflowInboundInterceptorBase: interceptor.WorkflowInboundInterceptorBase{Next: next}, state: pi.state}
}
func NewPauseInterceptor() *PauseInterceptor {
return &PauseInterceptor{state: &PauseState{}}
}
type PauseInboundInterceptor struct {
interceptor.WorkflowInboundInterceptorBase
state *PauseState
}
func (pii *PauseInboundInterceptor) Init(outbound interceptor.WorkflowOutboundInterceptor) error {
poi := &PauseOutboundInterceptor{WorkflowOutboundInterceptorBase: interceptor.WorkflowOutboundInterceptorBase{Next: outbound}, state: pii.state}
return pii.Next.Init(poi)
}
func (pii *PauseInboundInterceptor) HandleSignal(ctx workflow.Context, in *interceptor.HandleSignalInput) error {
// TODO: if we match the signal, set the shared state and swallow the signal?
switch in.SignalName {
case PauseEvent:
// TODO: does this need to be protected?
pii.state.paused = true
return nil
case ResumeEvent:
pii.state.paused = false
return nil
}
return pii.Next.HandleSignal(ctx, in)
}
type PauseOutboundInterceptor struct {
interceptor.WorkflowOutboundInterceptorBase
state *PauseState
}
func (poi *PauseOutboundInterceptor) ExecuteActivity(ctx workflow.Context, activityType string, args ...any) workflow.Future {
if poi.state.paused {
// TODO: if we're paused, what do we return here?
// TODO: do we need to queue up the activity for when things are unpaused?
}
return poi.Next.ExecuteActivity(ctx, activityType, args...)
}
I’m intercepting the signals and execute activity calls here, but some things are not clear:
- If we’re paused, how to handle ExecuteActivity? Returning nil causes panics. Returning a Future with an error set on it fails the workflow. So I’m unclear what to return there and what to do with the paused activity I’ve been asked to execute.
- is the interceptor state here going to be concurrently accessed? do I need to protect it?
- this is acting like a global flag for all workflows - to make it more targeted at workflow ID, what would be the recommended practice for doing that inside of an interceptor?
- is swallowing the signal in the interceptor ok? these signals are only targeted at the interceptor, so would like to avoid passing them on.
Thanks for any guidance and help you can provide.
Mark