Hi Team,
We have this issue in production. We have workflows that run forever that get continued as new occasionally. Periodically, we signal them to add a set of items to its state and pass the new list to the activity. The activity runs forever and periodically heartbeats. When a signal happens, we cancel the activity, and start it again and we call this an iteration. After 10 iterations, we continueAsNew.
This is the stuck workflow exception caused by “Signal Received after workflow” we received:
io.temporal.internal.statemachines.InternalWorkflowTaskException: Failure handling event 2520 of type 'EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED' during execution. {PreviousStartedEventId=2513, WorkflowTaskStartedEventId=2521, CurrentStartedEventId=2513}
at io.temporal.internal.statemachines.WorkflowStateMachines.createEventProcessingException(WorkflowStateMachines.java:263)
at io.temporal.internal.statemachines.WorkflowStateMachines.handleEventsBatch(WorkflowStateMachines.java:242)
at io.temporal.internal.statemachines.WorkflowStateMachines.handleEvent(WorkflowStateMachines.java:216)
at io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTaskImpl(ReplayWorkflowRunTaskHandler.java:190)
at io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTask(ReplayWorkflowRunTaskHandler.java:137)
at io.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTaskWithQuery(ReplayWorkflowTaskHandler.java:129)
at io.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTask(ReplayWorkflowTaskHandler.java:98)
at io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handleTask(WorkflowWorker.java:293)
at io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:237)
at io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:178)
at io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:93)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.IllegalStateException: Signal received after workflow is closed.
at io.temporal.internal.replay.ReplayWorkflowExecutor.handleWorkflowExecutionSignaled(ReplayWorkflowExecutor.java:129)
at io.temporal.internal.replay.ReplayWorkflowRunTaskHandler$StatesMachinesCallbackImpl.signal(ReplayWorkflowRunTaskHandler.java:323)
at io.temporal.internal.statemachines.WorkflowStateMachines.handleNonStatefulEvent(WorkflowStateMachines.java:529)
at io.temporal.internal.statemachines.WorkflowStateMachines.handleSingleEvent(WorkflowStateMachines.java:281)
at io.temporal.internal.statemachines.WorkflowStateMachines.handleEventsBatch(WorkflowStateMachines.java:240)
... 12 common frames omitted
This is the “CanceledFailure” exception we received just before the stuck workflow exception.
io.temporal.failure.ActivityFailure: scheduledEventId=2509, startedEventId=0, activityType='EnsureSubscription', activityId='961d272f-393f-30f7-819c-ffe8187a38b0', identity='workflow', retryState=RETRY_STATE_UNSPECIFIED
at java.base/java.lang.Thread.getStackTrace(Thread.java:1610)
at io.temporal.internal.sync.CompletablePromiseImpl.throwFailure(CompletablePromiseImpl.java:137)
at io.temporal.internal.sync.CompletablePromiseImpl.getImpl(CompletablePromiseImpl.java:96)
at io.temporal.internal.sync.CompletablePromiseImpl.get(CompletablePromiseImpl.java:75)
at com.sofi.invest.marketdata.dxfeed.services.impl.WorkflowServiceImpl.startWorkFlow(WorkflowServiceImpl.kt:92)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at io.temporal.internal.sync.POJOWorkflowImplementationFactory$POJOWorkflowImplementation$RootWorkflowInboundCallsInterceptor.execute(POJOWorkflowImplementationFactory.java:310)
at io.temporal.internal.sync.POJOWorkflowImplementationFactory$POJOWorkflowImplementation.execute(POJOWorkflowImplementationFactory.java:285)
at io.temporal.internal.sync.WorkflowExecuteRunnable.run(WorkflowExecuteRunnable.java:68)
at io.temporal.internal.sync.SyncWorkflow.lambda$start$0(SyncWorkflow.java:135)
at io.temporal.internal.sync.CancellationScopeImpl.run(CancellationScopeImpl.java:102)
at io.temporal.internal.sync.WorkflowThreadImpl$RunnableWrapper.run(WorkflowThreadImpl.java:107)
at io.temporal.worker.WorkerFactory.lambda$newWorkflowThreadExecutor$7(WorkerFactory.java:392)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: io.temporal.failure.CanceledFailure:
at io.temporal.failure.FailureConverter.failureToExceptionImpl(FailureConverter.java:123)
at io.temporal.failure.FailureConverter.failureToException(FailureConverter.java:73)
at io.temporal.failure.FailureConverter.failureToExceptionImpl(FailureConverter.java:87)
at io.temporal.failure.FailureConverter.failureToException(FailureConverter.java:73)
at io.temporal.internal.sync.SyncWorkflowContext$ActivityCallback.lambda$invoke$0(SyncWorkflowContext.java:288)
... 8 common frames omitted
This is the workflow code:
class WorkflowServiceImpl : MyWorkFlow {
/**
* Temporal Activity to manage the lifetime of the subscription.
* This activity is intended to be long-running, and as a result has an aggressive heartbeat timeout.
* Further, the activity does not use exponential backoff, as if a worker fails to heartbeat,
* we want another worker to recover and retry as quickly as possible.
*/
private val subscriptionActivity: SymbolSubscriberService = Workflow.newActivityStub(
SymbolSubscriberService::class.java,
ActivityOptions.newBuilder()
.setHeartbeatTimeout(Duration.ofSeconds(20))
.setStartToCloseTimeout(Duration.ofDays(1))
.setRetryOptions(
RetryOptions.newBuilder()
.setInitialInterval(Duration.ofMillis(100))
.setBackoffCoefficient(1.0)
.setMaximumInterval(Duration.ofMillis(100))
.build()
)
// This is important - instruct the workflow to wait for the activity to report/handle its own cancellation,
// and not try to pre-emptively cancel when we trigger the cancellation scope
// This allows the activity to own its own cancellation/cleanup behavior and return to the workflow
.setCancellationType(ActivityCancellationType.WAIT_CANCELLATION_COMPLETED)
.build()
)
/**
* The list of managedSymbols is an important piece of workflow state, as it allows us to keep track
* of what complete list of symbols this particular workflow is in charge of, regardless of the identifier.
* Storing this as workflow state means that if the workflow is suddenly run on a new worker
* (e.g. this worker dies or is too slow to heartbeat), that worker will be able to invoke the
* [subscriptionActivity] with the correct and complete list of symbols for this workflow.
*/
private val managedSymbols: MutableSet<String> = mutableSetOf()
/**
* Scope current activity is running in, this allows us to cancel an activity when a new symbol comes in.
* Cancelling an activity allow us to change the subscriptions in vendor. On restart an activity may end up on a
* different worker. Restarting an activity will clean up existing activity (unsubscribe) and start a new activity
* subscribe to all the managed symbols.
* @see [CancellationScope]
*/
private var currentScope: CancellationScope? = null
/**
* Async handler for the ensureSubscription activity.
*/
private var activityPromise: Promise<Set<String>>? = null
/**
* Count of how many iterations this workflow has run (how many times it has restarted the activity).
* The workflow cannot restart the activity _forever_, as there's a cap on workflow state.
* We keep track of the number of iterations, so we can ContinueAsNew in a new workflow when we reach a threshold.
*/
private var iterationCount: Int = 0
/**
* Assigns symbols to worker/subscription
*/
override fun startWorkFlow(symbolsToManage: List<String>) {
// Set symbols
this.managedSymbols.addAll(symbolsToManage)
do {
currentScope = Workflow.newCancellationScope(
Runnable {
activityPromise = Async.function {
log.debug("Invoking ensure subscription activity with symbols: $managedSymbols")
subscriptionActivity.ensureSubscription(managedSymbols)
}
}
)
try {
currentScope?.run()
// Wait for the activity to return a result.
// This should only happen when it is canceled from the signal (e.g. due to adding/removing symbols),
// so in practice, we may be waiting here for a long time.
val result = activityPromise!!.get()
log.info("At iteration $iterationCount, received result from workflow: ${Workflow.getInfo().workflowId}")
log.debug("Managed symbols: $result")
} catch (ex: Exception) {
log.error("Unable to subscribe to symbols for $symbolsToManage", ex)
}
iterationCount++
} while (iterationCount < 10)
// After enough iterations, we should invoke Workflow.continueAsNew simply to start
// a new workflow with the current set of symbols
Workflow.continueAsNew(managedSymbols)
}
/**
* Add a symbols to this workflow's subscription
*/
override fun addSymbols(symbols: Set<String>) {
val symbolsToAdd = symbols.minus(managedSymbols)
if (symbolsToAdd.isNotEmpty()) {
log.info("Adding symbols: $symbolsToAdd")
managedSymbols.addAll(symbols)
log.debug("New list of managed symbols: $managedSymbols")
if (currentScope?.isCancelRequested != true) {
currentScope?.cancel()
log.debug(ACTIVITY_CANCELLATION_MESSAGE)
}
}
}
companion object {
private val log = Workflow.getLogger(WorkflowServiceImpl::class.java)
private const val ACTIVITY_CANCELLATION_MESSAGE = "Requested activity cancellation"
}
}
Screenshot of logs:
You can see, we triggered the signal multiple times at 07:53:21.201 and correctly canceled the workflow at 07:53:21.215 and this resulted in an expected heartbeat failure. We signaled the workflow multiple more times from 07:53:21.248 to 7:53:21:463. An iteration was completed (activity canceled) at 07:53:21.316. The “Cancel Failed” error occurred at 07:53:21.465. And ultimately stuck workflow error occurs starting 07:53:21.490 and continues until the workflow is manually terminated by us.
We are wondering, if we are doing something incorrectly and inadvertently creating a race condition? Why are we getting a CancelFailure? Is it expected that signals received after the continueAsNew will not end up on the new workflow? Finally, we didn’t observe the continueAsNew working on its own. It appeared to fail and cause the workflow to be “stuck” until we manually terminated/recreated the workflow.