How to skip a short-live err activity without change workflow or activity code, and no worker restart

the scene is that there is a short-live activity , it may failed due to outer service quality, the outer service might down for hours, in the time the activity won’t be success,. but there a way to finish the job manually which workaround the outer service.

so is there a way to skip an err activity? besides the way has to comply with

  • non-blocking
  • no code change of the running workflow/activity, no worker restart

what i have tried .

  1. tctl activity complete to skip. it doesn’t work for the scene cause the activity failed immediately, u can’t complete a failed activity
  2. interceptor and signal . i thought the best place is “activityInboundInterceptor ExecuteActivity”, but can’t find a non-blocking way to pass a signal to it
  1. Here is the issue to get this fixed.

  2. Activity interceptor wraps the activity execution, not its invocation by the workflow. Use WorkflowOutboundInterceptor which runs in the workflow context instead.

tks for answering

  1. seems the issue won’t be close recently .

  2. workflowOutboundInterceptor can’t handle the scene i thought, “workflowOutboundInterceptor ExecuteActivity” return a Future, couldn’t handler activity here, any suggestions?

  3. is there any other ways to skip a short-live activity when it’s failed ? has to change the workflow code and restart woker?

  1. I don’t understand your comment. The outbound interceptor intercepts an activity invocation from the workflow. See this example. You can listen to signal in the interceptor, then cancel activity on signal.

tks for your patience

maybe there is some misunderstanding , let me show u the code

with the codes below. when i send signal “SkipErrActivityForOneTime=true” , the whole workflow was canceled, but my expect is cancel/skip the activity only. what is wrong?


the workflow

func GreetingWorkflow(ctx workflow.Context, name string) (string, error) {
	options := workflow.ActivityOptions{
		...
	}
	ctx = workflow.WithActivityOptions(ctx, options)

	var result string
	// failed immediately
	err := workflow.ExecuteActivity(ctx, activities.ComposeGreeting, activities.GreetingParam{"jack"}).Get(ctx, &result)
	if err != nil {
		return "", err
	}

	// other activities ...

	return "", nil
}

the activity

// failed immediately
func ComposeGreeting(param GreetingParam) (string, error) {
	return "", errors.New("bad error greeting")
}

the interceptor

save param pointer to ctx

func (w *workflowInboundInterceptor) ExecuteWorkflow(
	ctx workflow.Context,
	in *interceptor.ExecuteWorkflowInput,
) (interface{}, error) {
	// save the pointer to ctx
	cancelFuncP := cancelFuncStruct{cancelFunc: nil, cancelActivity: nil}
	ctx = workflow.WithValue(ctx, "pauseCancelFunc", &cancelFuncP)
	flowControl := workflows.FlowControl{}
	ctx = workflow.WithValue(ctx, "flowControl", &flowControl)
	
	// fetch signal aync
	workflow.Go(ctx, flowSignalHandler)

	return w.Next.ExecuteWorkflow(ctx, in)
}

expose the cancelHandler to ctx

func (w *workflowOutboundInterceptor) ExecuteActivity(ctx workflow.Context, activityType string, args ...interface{}) workflow.Future {

	childCtx, cancelHandler := workflow.WithCancel(ctx)
	cancelFuncP := ctx.Value("pauseCancelFunc").(*cancelFuncStruct)
	cancelFuncP.cancelActivity = cancelHandler

	wFuture := w.Next.ExecuteActivity(childCtx, activityType, args...)

	return wFuture
}

flowSignalHandler to handle singal

func flowSignalHandler(ctx workflow.Context) {
	flowControl := ctx.Value("flowControl").(*workflows.FlowControl)
	signalChan := workflow.GetSignalChannel(ctx, "flowControl")
	// keep handler signal in the loop
	for {
		signalChan.Receive(ctx, &flowControl)
		if flowControl.SkipErrActivityForOneTime == true {
			// try to cancel activity execution
			cancelFuncP := ctx.Value("pauseCancelFunc").(*cancelFuncStruct)
			cancelFuncP.cancelActivity()
		}
	}
}

I guess that you canceled the activity, which caused it to return a cancellation error. The workflow handled that error by returning it from the workflow function. This was interpreted as workflow cancellation.

I think you want to retry the activity in the case of a cancellation request.