Preventing single workflow failure from blocking the task queue

Hello,
I’d like to understand what the pattern is for implementing workflow isolation in a way that prevents a single workflow task failure from blocking all other workflow executions.

For reference, the reason I am asking is because I have observed one unhandled error

java.lang.RuntimeException: Failure processing workflow task

blocking the task queue for all other workflows.

I understand that this is the desired behavior in certain cases where it makes sense to suspend the execution until such unhandled error (bug) is fixed.

Our case is such that the workflows are fairly shortlived (1-30 minutes) and the workflow is only relevant during that time and after that they should not be executed and it is not desirable to have a random error in a single workflow bring all other workflows to halt.

I am unsure how to do two things efficiently to handle these issues:

  1. Can I somehow terminate the workflow executions on such “fatal” exceptions raised by activities from the workflow code itself? That is, without modifying the workflow state and then wrapping every single activity invocation (both in the WorkflowMethod and SignalMethods) with checks to prevent any further execution until the workflow method returns?

  2. Is there a way to prevent failed task in the queue to stay at the head so that other workflows can continue with execution? Or do I have to create a new worker/taskqueue for each workflow in order to have them isolated (and then, somehow, cleaning them up afterwards)?

The behavior you observed is not expected. I don’t believe that a single or even multiple workflows failing their workflow tasks would block other workflows from execution.

Thank you for the fast response.
Maybe I am describing the issue wrong or something else is going on then.

Unfortunately I cannot access the terminated workflow in the history on the web ui (possibly we have something set up with restrictions during the deployment as I only see last two days now).

I can now see that there was some other activity other than the error but very limited.

This is the log from the failure we observed and that was the almost the only thing happening on the service for some time:

07 Sep 2020 18:00:43.3842020-09-07T18:00:43.381  [Workflow Executor taskQueue="SearchWorkerTaskQueue", namespace="default": 924] i.t.i.w.PollerOptions [traceId=,spanId=] [][]  exception
java.lang.RuntimeException: Failure processing workflow task. WorkflowId=workflow-search-3c3f6cf4-c173-44a2-a7ae-352608da9186, RunId=d8245054-07fb-4a44-ada1-3615e94b541d
	at io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.wrapFailure(WorkflowWorker.java:337)
	at io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.wrapFailure(WorkflowWorker.java:275)
	at io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:79)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)
Caused by: io.temporal.internal.replay.InternalWorkflowTaskException: Failure handling event 564 of 'EVENT_TYPE_WORKFLOW_TASK_STARTED' type. IsReplayng=false, PreviousStartedEventId=564, workflowTaskStartedEventId=564, Currently Processing StartedEventId=564
	at io.temporal.internal.statemachines.WorkflowStateMachines.handleEvent(WorkflowStateMachines.java:193)
	at io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleEvent(ReplayWorkflowRunTaskHandler.java:140)
	at io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTaskImpl(ReplayWorkflowRunTaskHandler.java:180)
	at io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTask(ReplayWorkflowRunTaskHandler.java:150)
	at io.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTaskWithEmbeddedQuery(ReplayWorkflowTaskHandler.java:202)
	at io.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTask(ReplayWorkflowTaskHandler.java:112)
	at io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:309)
	at io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:275)
	at io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:73)
	... 3 common frames omitted
Caused by: java.lang.RuntimeException: WorkflowTask: failure executing SCHEDULED->WORKFLOW_TASK_STARTED, transition history is [CREATED->WORKFLOW_TASK_SCHEDULED]
	at io.temporal.internal.statemachines.StateMachine.executeTransition(StateMachine.java:140)
	at io.temporal.internal.statemachines.StateMachine.handleHistoryEvent(StateMachine.java:91)
	at io.temporal.internal.statemachines.EntityStateMachineBase.handleEvent(EntityStateMachineBase.java:63)
	at io.temporal.internal.statemachines.WorkflowStateMachines.handleEventImpl(WorkflowStateMachines.java:210)
	at io.temporal.internal.statemachines.WorkflowStateMachines.handleEvent(WorkflowStateMachines.java:178)
	... 11 common frames omitted
Caused by: io.temporal.failure.ActivityFailure: scheduledEventId=553, startedEventId=554, activityType='OccupyAcceptee', activityId='7610bba0-66e1-3067-95f1-05b647be01eb', identity='1@driver-search-7db89b9d69-9xbj6', retryState=RETRY_STATE_TIMEOUT
	at java.base/java.lang.Thread.getStackTrace(Unknown Source)
	at io.temporal.internal.sync.ActivityStubBase.execute(ActivityStubBase.java:48)
	at io.temporal.internal.sync.ActivityInvocationHandler.lambda$getActivityFunc$0(ActivityInvocationHandler.java:59)
	at io.temporal.internal.sync.ActivityInvocationHandlerBase.invoke(ActivityInvocationHandlerBase.java:65)
	at com.sun.proxy.$Proxy167.occupyAcceptee(Unknown Source)
	at com.l.d.search.SearchWorkflow$Impl.handleInquiryResponse(SearchWorkflow.kt:115)
	at jdk.internal.reflect.GeneratedMethodAccessor128.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.base/java.lang.reflect.Method.invoke(Unknown Source)
	at io.temporal.internal.sync.WorkflowInternal.lambda$registerListener$155fbe99$1(WorkflowInternal.java:140)
	at io.temporal.internal.sync.SyncWorkflowContext.lambda$registerSignal$3ac69c8f$1(SyncWorkflowContext.java:676)
	at io.temporal.internal.sync.SyncWorkflowContext.signal(SyncWorkflowContext.java:639)
	at io.temporal.internal.sync.WorkflowExecuteRunnable.processSignal(WorkflowExecuteRunnable.java:71)
	at io.temporal.internal.sync.SyncWorkflow.lambda$handleSignal$2(SyncWorkflow.java:129)
	at io.temporal.internal.sync.CancellationScopeImpl.run(CancellationScopeImpl.java:104)
	at io.temporal.internal.sync.WorkflowThreadImpl$RunnableWrapper.run(WorkflowThreadImpl.java:107)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
	at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
	... 3 common frames omitted
Caused by: io.temporal.failure.ApplicationFailure: message='action=occupyDriver status=unexpected-call-failure code=500', type='com.l.d.search.infrastructure.ExternalApiException', nonRetryable=false
	at com.l.d.search.infrastructure.ExternalApi$Impl.occupyDriver(ExternalApi.kt:41)
	at com.l.d.search.SearchActivities$Impl.occupyDriver(SearchActivities.kt:224)
	at com.l.d.search.SearchActivities$Impl.occupyAcceptee(SearchActivities.kt:173)
	at jdk.internal.reflect.GeneratedMethodAccessor172.invoke(Unknown Source:0)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source:0)
	at java.base/java.lang.reflect.Method.invoke(Unknown Source:0)
	at io.temporal.internal.sync.POJOActivityTaskHandler$POJOActivityInboundCallsInterceptor.execute(POJOActivityTaskHandler.java:279)
	at io.temporal.internal.sync.POJOActivityTaskHandler$POJOActivityImplementation.execute(POJOActivityTaskHandler.java:218)
	at io.temporal.internal.sync.POJOActivityTaskHandler.handle(POJOActivityTaskHandler.java:183)
	at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handle(ActivityWorker.java:194)
	at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handle(ActivityWorker.java:154)
	at io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:73)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source:0)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source:0)
	at java.base/java.lang.Thread.run(Unknown Source:0)

The bug in our service is the application failure thrown from an acitivity, an HTTP call that returned 500 after which we threw an exception.

Caused by: io.temporal.failure.ApplicationFailure: message='action=occupyDriver status=unexpected-call-failure code=500', type='com.l.d.search.infrastructure.ExternalApiException', nonRetryable=false
    	at com.l.d.search.infrastructure.ExternalApi$Impl.occupyDriver(ExternalApi.kt:41)
    	at com.l.d.search.SearchActivities$Impl.occupyDriver(SearchActivities.kt:224)
    	at com.l.d.search.SearchActivities$Impl.occupyAcceptee(SearchActivities.kt:173)

When I have noticed this the next day, this single workflow was stuck on this loop and other workflows were started but not progressing.
I terminated the workflow using the red button in the WebUI, which immediatelly cause the other workflows to be able to continue (for example a workflow that has been started almost an hour earlier but was not able to move past first task until then).

Unfortunately I can only offer the visual representation from our logs due to the missing history but it shows how after terminating the workflow with exceptions the other workflows spiked to completion.

workflow-spike

So, you are right in that it was not blocked completely.
Maybe the issue was that every 10 seconds the queue was blocked for number of seconds handling this failed task and increase the number of workers would help executing the other tasks without interruption?

Unfortunately I cannot access the terminated workflow in the history on the web ui (possibly we have something set up with restrictions during the deployment as I only see last two days now).

The retention of workflows after they are closed is controlled by retention period namespace configuration. You can update it for the default namespace using

tctl namespace update --retention <number of days> 

Or for non default namespace:

tctl -ns <yournamespace> namespace update --retention <number of days> 

Are you 100% sure that this activity was failing just for one workflow? Could it be that all the other workflows had a similar issue?

What is the retry policy on that activity? Could it be that it was retrying too aggressively starving workers?

How many parallel activities are allowed per worker? It is defined by WorkerOptions.maxConcurrentActivityExecutionSize.

I will modify the retention soon, thank you for the advice.

Are you 100% sure that this activity was failing just for one workflow? Could it be that all the other workflows had a similar issue?

I have verified from the logs that all those failures, about 8300 of them, were specific to this WorkflowId and RunId. There were couple other workflows that did manage to finish meanwhile without issues, just slowly.

What is the retry policy on that activity? Could it be that it was retrying too aggressively starving workers?

The activity is initialized as follow, with default RetryOptions

Workflow.newActivityStub(
    SearchActivities::class.java,
    ActivityOptions.newBuilder()
        .setScheduleToCloseTimeout(Duration.ofSeconds(15))
        .build()
)

How many parallel activities are allowed per worker? It is defined by WorkerOptions.maxConcurrentActivityExecutionSize.

There is only a single worker, initialized with the default (200 as it says then, the whole initialization is largerly unchanged from the java examples):

// gRPC stubs wrapper that talks to the local docker instance of temporal service.
val serviceStubsOptions = WorkflowServiceStubsOptions.newBuilder()
    .setTarget(target)
    .build()
val service = WorkflowServiceStubs.newInstance(serviceStubsOptions)

val workflowClientOptions = WorkflowClientOptions.newBuilder()
    .setDataConverter(dataConverter)
    .build()

// client that can be used to start and signal workflows
val client = WorkflowClient.newInstance(service, workflowClientOptions)
// worker factory that can be used to create workers for specific task lists
val factory = WorkerFactory.newInstance(client)
// Worker that listens on a task list and hosts both workflow and activity implementations.
val searchWorker = factory.newWorker(SEARCH_TASK_QUEUE)

I think I understand what’s was going on. Still don’t understand why this caused delays of other workflows.

From the stack trace, it looks like the occupyAcceptee activity was invoked from a signal handler method SearchWorkflow$Impl.handleInquiryResponse. The activity has a short 15-second ScheduleToClose timeout. This timeout defines the maximum time an activity will be retried according to the default retry policy. So after 15 seconds activity fails and the ActivityFailure exception is thrown from the signal handler method. By default, an exception thrown from the signal handler doesn’t fail workflow but blocks its execution (through retries of the workflow task) until the problem is fixed.

So your options are.

  • Increase activity ScheduleToClose timeout to a much larger value. This way an activity is going to be retried until the backend service is fixed. It assumes that you monitor activity failure rates.
  • Do not let exceptions escape signal handler method
  • Add ActivityFailure exception to the WorkflowImplementationOptions.failWorkflowExceptionTypes list. And pass these options to Worker.registerWorkflowImplementationTypes method. This tells the SDK to immediately fail workflow on the specified exception without blocking.

You are right about what was going on, sorry I did not mention the origin being a signal handler.

I also understand the ActivityFailure case, I have since modified the intermittent failure to be handled properly.

For fatal failures, I will throw ApplicationFailure which will terminate the workflow without registering specific type to the Worker.registerWorkflowImplementationTypes.

Still don’t understand why this caused delays of other workflows.

I don’t either and I cannot produce a minimal working example right now.

I tried reproducing this both in a fresh project in the java samples and in our actual application with test environment and I see the behavior you describe as expected - parallel workflows on the same task queue continue executing without any noticable delay even if one workflow fails (both with application failure and activity failure).

I will try to mitigate the possible worker starvation scenario with having more resources.

Thank you for the help, if we encounter the issue in a reproducible manner again I will try to provide more helpful information.

1 Like