WorkflowTaskFailed - Failure handling event 12 of 'EVENT TYPE ACTIVITY TASK SCHEDULED'

Hi everyone,

I am trying to RCA below exception that I am seeing on workflow executions.

io.temporal.internal.replay.InternalWorkflowTaskException: Failure handling event 12 of 'EVENT_TYPE_ACTIVITY_TASK_SCHEDULED' type. IsReplaying=true, PreviousStartedEventId=3, workflowTaskStartedEventId=62, Currently Processing StartedEventId=3 
io.temporal.internal.statemachines.WorkflowStateMachines.handleEvent(WorkflowStateMachines.java:193)
io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleEvent(ReplayWorkflowRunTaskHandler.java:140)
io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTaskImpl(ReplayWorkflowRunTaskHandler.java:180)
io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTask(ReplayWorkflowRunTaskHandler.java:150)
io.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTaskWithEmbeddedQuery(ReplayWorkflowTaskHandler.java:201)
io.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTask(ReplayWorkflowTaskHandler.java:114)
io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:319)
io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:279)
io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:73)
datadog.trace.bootstrap.instrumentation.java.concurrent.Wrapper.run(Wrapper.java:25)
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
java.base/java.lang.Thread.run(Thread.java:829)

Caused By: java.lang.IllegalStateException: No command scheduled that corresponds to event_id: 12
event_time {
  seconds: 1634054148
  nanos: 546791891
}
event_type: EVENT_TYPE_ACTIVITY_TASK_SCHEDULED
task_id: 2198863888
activity_task_scheduled_event_attributes {
  activity_id: "2cd0d605-2228-33e9-b82c-6b65da2b8c42"
  activity_type {
    name: "InsertTradeReconRecord"
  }
  task_queue {
    name: "INVEST_CRYPTO_RECON_WORKFLOW_TASK_QUEUE"
    kind: TASK_QUEUE_KIND_NORMAL
  }
  header {
  }
  input {
    payloads {
      metadata {
        key: "encoding"
        value: "json/plain"
      }
      data: "\"024d906c-c2a0-4211-84c3-dfacdeebfd21\""
    }
    payloads {
      metadata {
        key: "encoding"
        value: "json/plain"
      }
      data: "\"Crypto_Trade_Recon_Order-12601936\""
    }
    payloads {
      metadata {
        key: "encoding"
        value: "json/plain"
      }
      data: "\"IN_PROGRESS\""
    }
    payloads {
      metadata {
        key: "encoding"
        value: "json/plain"
      }
      data: "12601936"
    }
  }
  schedule_to_close_timeout {
  }
  schedule_to_start_timeout {
  }
  start_to_close_timeout {
    seconds: 300
  }
  heartbeat_timeout {
  }
  workflow_task_completed_event_id: 4
  retry_policy {
    initial_interval {
      seconds: 60
    }
    backoff_coefficient: 1.0
    maximum_interval {
      seconds: 60
    }
    maximum_attempts: 10
  }
}
 
io.temporal.internal.statemachines.WorkflowStateMachines.handleCommandEvent(WorkflowStateMachines.java:244)
io.temporal.internal.statemachines.WorkflowStateMachines.handleEventImpl(WorkflowStateMachines.java:199)
io.temporal.internal.statemachines.WorkflowStateMachines.handleEvent(WorkflowStateMachines.java:178)
io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleEvent(ReplayWorkflowRunTaskHandler.java:140)
io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTaskImpl(ReplayWorkflowRunTaskHandler.java:180)
io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTask(ReplayWorkflowRunTaskHandler.java:150)
io.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTaskWithEmbeddedQuery(ReplayWorkflowTaskHandler.java:201)
io.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTask(ReplayWorkflowTaskHandler.java:114)
io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:319)
io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:279)
io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:73)
datadog.trace.bootstrap.instrumentation.java.concurrent.Wrapper.run(Wrapper.java:25)
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
java.base/java.lang.Thread.run(Thread.java:829)

I suspect either of the two changes could be the reason for above exception, but I am seeking some help from the community to understand what could have caused the above exception and why.

  1. Added a new activity
    While the above workflow was being executed, we recently made a change where we added an activity which is executed at the end just before the workflow terminates. For example: Initially the workflow had 3 activities a,b,c. We added a 4th activity d which is executed after c. However we did not manage workflow versions using workflow.getVersion() simply because we weren’t aware about it.
    Would adding an activity make the workflow execution backward compatible as compared to updating/removing an activity ?

  2. Use of LocalDateTime
    Activity b uses the below util method which is also part of workflow code. We are adhering to these constraints for workflow implementation but want to double check if below code is causing the workflow to be non-deterministic.

private LocalDateTime getCurrentLocalDateTime() {
        return LocalDateTime.ofInstant(Instant.ofEpochMilli(Workflow.currentTimeMillis()), TimeZone.getDefault()
                                                                                                   .toZoneId());
    }

Appreciate any help here to understand this issue better.

1 Like
  1. Activities are stateless, and you can update/change their implementation without breaking workflow determinism (even tho keeping the changes backwards-compatible is best option). Adding / removing activity invocations in workflow code should break determinism of existing executions that have passed the point of execution where your change happened. This should also happen on for example querying already completed (closed) workflows of the changed/updated workflow type that completed before you added the changes.
    Overall recommendation is to always use versioning when you update your workflow definition and have some already running executions of that type.

  2. You are using Workflow.currentTimeMillis() which is correct if used inside the workflow code. Inside activities you should just use the system time.