Hi everyone,
I am trying to RCA below exception that I am seeing on workflow executions.
io.temporal.internal.replay.InternalWorkflowTaskException: Failure handling event 12 of 'EVENT_TYPE_ACTIVITY_TASK_SCHEDULED' type. IsReplaying=true, PreviousStartedEventId=3, workflowTaskStartedEventId=62, Currently Processing StartedEventId=3
io.temporal.internal.statemachines.WorkflowStateMachines.handleEvent(WorkflowStateMachines.java:193)
io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleEvent(ReplayWorkflowRunTaskHandler.java:140)
io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTaskImpl(ReplayWorkflowRunTaskHandler.java:180)
io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTask(ReplayWorkflowRunTaskHandler.java:150)
io.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTaskWithEmbeddedQuery(ReplayWorkflowTaskHandler.java:201)
io.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTask(ReplayWorkflowTaskHandler.java:114)
io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:319)
io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:279)
io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:73)
datadog.trace.bootstrap.instrumentation.java.concurrent.Wrapper.run(Wrapper.java:25)
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
java.base/java.lang.Thread.run(Thread.java:829)
Caused By: java.lang.IllegalStateException: No command scheduled that corresponds to event_id: 12
event_time {
seconds: 1634054148
nanos: 546791891
}
event_type: EVENT_TYPE_ACTIVITY_TASK_SCHEDULED
task_id: 2198863888
activity_task_scheduled_event_attributes {
activity_id: "2cd0d605-2228-33e9-b82c-6b65da2b8c42"
activity_type {
name: "InsertTradeReconRecord"
}
task_queue {
name: "INVEST_CRYPTO_RECON_WORKFLOW_TASK_QUEUE"
kind: TASK_QUEUE_KIND_NORMAL
}
header {
}
input {
payloads {
metadata {
key: "encoding"
value: "json/plain"
}
data: "\"024d906c-c2a0-4211-84c3-dfacdeebfd21\""
}
payloads {
metadata {
key: "encoding"
value: "json/plain"
}
data: "\"Crypto_Trade_Recon_Order-12601936\""
}
payloads {
metadata {
key: "encoding"
value: "json/plain"
}
data: "\"IN_PROGRESS\""
}
payloads {
metadata {
key: "encoding"
value: "json/plain"
}
data: "12601936"
}
}
schedule_to_close_timeout {
}
schedule_to_start_timeout {
}
start_to_close_timeout {
seconds: 300
}
heartbeat_timeout {
}
workflow_task_completed_event_id: 4
retry_policy {
initial_interval {
seconds: 60
}
backoff_coefficient: 1.0
maximum_interval {
seconds: 60
}
maximum_attempts: 10
}
}
io.temporal.internal.statemachines.WorkflowStateMachines.handleCommandEvent(WorkflowStateMachines.java:244)
io.temporal.internal.statemachines.WorkflowStateMachines.handleEventImpl(WorkflowStateMachines.java:199)
io.temporal.internal.statemachines.WorkflowStateMachines.handleEvent(WorkflowStateMachines.java:178)
io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleEvent(ReplayWorkflowRunTaskHandler.java:140)
io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTaskImpl(ReplayWorkflowRunTaskHandler.java:180)
io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTask(ReplayWorkflowRunTaskHandler.java:150)
io.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTaskWithEmbeddedQuery(ReplayWorkflowTaskHandler.java:201)
io.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTask(ReplayWorkflowTaskHandler.java:114)
io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:319)
io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:279)
io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:73)
datadog.trace.bootstrap.instrumentation.java.concurrent.Wrapper.run(Wrapper.java:25)
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
java.base/java.lang.Thread.run(Thread.java:829)
I suspect either of the two changes could be the reason for above exception, but I am seeking some help from the community to understand what could have caused the above exception and why.
-
Added a new activity
While the above workflow was being executed, we recently made a change where we added an activity which is executed at the end just before the workflow terminates. For example: Initially the workflow had 3 activities a,b,c. We added a 4th activity d which is executed after c. However we did not manage workflow versions usingworkflow.getVersion()
simply because we weren’t aware about it.
Would adding an activity make the workflow execution backward compatible as compared to updating/removing an activity ? -
Use of LocalDateTime
Activity b uses the below util method which is also part of workflow code. We are adhering to these constraints for workflow implementation but want to double check if below code is causing the workflow to be non-deterministic.
private LocalDateTime getCurrentLocalDateTime() {
return LocalDateTime.ofInstant(Instant.ofEpochMilli(Workflow.currentTimeMillis()), TimeZone.getDefault()
.toZoneId());
}
Appreciate any help here to understand this issue better.