UPDATE: I figured that WorkflowImplementationOptions.setFailWorkflowExceptionTypes
does the trick by failing the workflow upon certain exceptions, which fits our scenario as we do want the workflow to fail if any DAG job fails after sufficient retries.
====================================
Thanks maxim.
Sorry to bother again, but still couldn’t get it working properly…
Let me describe my scenario in a bit detail: Basically our workflow is a DAG of HPC jobs, we have a simple DSL for DAG configuration, and an in-house developed service to submit jobs to HPC and keep track of the DAG states - we want to replace it with Temporal.
My current Workflow implementation is as follows:
- An activity implementation that submits jobs to HPC, throw exception if job is not accepted by HPC due to temporarily problems (e.g workload throttling)
public enum TaskState {
PENDING, RUNNING, SUCCEEDED, FAILED
}
public class Task {
private String name;
private String cmd;
private List<String> dependencyNames;
TaskState state;
}
@ActivityMethod
public void submitHPCJob(Task task);
- An workflow implementation that
- holds the DAG data (a map from task name to Task)
- implement a SignalMethod to receive HPC job execution status (SUCCEEDEDor FAILED), and update status in the DAG data
- WorkflowMethod: asynchronously process tasks (retry options set for whole processing logic of a single task), for each task
- Wait for all dependencies to succeed
- Submit job to HPC via activity (with aggressive retry upon failure)
- Wait for the status of the task to be updated to either SUCCEEDED or FAILED
- if FAILED - then throw an exception so that retry could happen starting from job submission
// DAG data
private Map<String, Task> taskMap;
// WorkflowMethod
public void startWorkflow(Map<String, Task> tasks) {
this.taskMap = tasks;
// Process tasks
List<Promise<String>> promises = taskMap.values().stream().map(task -> {
return Async.retry(
RetryOptions.newBuilder()
setMaximumAttempts(2)
.build(),
Optional.empty(),
() -> Async.procedure(this::executeTask, task));
}).collect(Collectors.toList());
Promise.allOf(promises).get();
}
private void executeTask(Task task) {
String taskName = task.getName();
// Await for dependencies to succeed
Workflow.await(() -> task.getDependencies().stream().allMatch(d -> taskMap.get(d).getState() == TaskState.SUCCEEDED));
// Execute task
this.activities.submitHPCJob(task);
// update task state in the map
task.setState(TaskState.RUNNING);
// Await for task completion (from signal)
Workflow.await(() -> task.getState() == TaskState.SUCCEEDED || task.getState() == TaskState.FAILED);
if (task.getState() == TaskState.FAILED) {
throw new IllegalStateException(String.format("[%s] execution failed.", taskName));
}
}
// SignalMethod
public void updateHPCJobStatus(String name, TaskState state) {
if (taskMap.containsKey(name)) {
taskMap.get(name).setState(state);
}
}
// QueryMethod
public Map<String, Task> getTasks() {
return taskMap;
}
Now if any task execution failed at HPC (thus resulted a FAILED state set by SignalMethod), the workflow will retry job submission. However if max attempts exceeded, the whole workflow seems to be stuck with the following exception stacktrace repeating (above the stack trace are the log messages printed by the workflow code): it appears to me that the workflow was replayed repeatedly.
The DAG used is as follows, and task1 failed twice on HPC (retry max attempt set to 2)
task1: no dependencies
task2: depends on task1
task3: depends on task1
task4: depends on task2
task5: depends on task3
I also noticed that the QueryMethod did not work when the workflow was stuck (web UI showing “No queries registered” and the loading circle kept spinning)
[task5] Awaiting for dependencies ([task3])
[task1] Awaiting for dependencies ([])
[task1] About to execute task.
[task2] Awaiting for dependencies ([task1])
[task3] Awaiting for dependencies ([task1])
[task4] Awaiting for dependencies ([task2])
[task1] Status changing to RUNNING.
[task1] Awaiting for status change from external execution system.
[task1] Received signal - FAILED
[task1] Task status changed to FAILED
[task1] Task failed, throwing Exception.
[task1] Awaiting for dependencies ([])
[task1] About to execute task.
[task1] Status changing to RUNNING.
[task1] Awaiting for status change from external execution system.
[task1] Received signal - FAILED
[task1] Task status changed to FAILED
[task1] Task failed, throwing Exception.
02:05:11.915 [Workflow Executor taskQueue="DagWorkflowQueue", namespace="default": 297] WARN i.t.i.r.ReplayWorkflowTaskHandler - Workflow task processing failure. startedEventId=36, WorkflowId=DagWorkflow, RunId=11c23f77-12d9-4791-baa0-ab57859a8c43. If seen continuously the workflow might be stuck.
io.temporal.internal.replay.InternalWorkflowTaskException: Failure handling event 36 of type 'EVENT_TYPE_WORKFLOW_TASK_STARTED' during execution. {PreviousStartedEventId=36, workflowTaskStartedEventId=36, Currently Processing StartedEventId=36}
at io.temporal.internal.statemachines.WorkflowStateMachines.createEventProcessingException(WorkflowStateMachines.java:222)
at io.temporal.internal.statemachines.WorkflowStateMachines.handleEventsBatch(WorkflowStateMachines.java:201)
at io.temporal.internal.statemachines.WorkflowStateMachines.handleEvent(WorkflowStateMachines.java:175)
at io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTaskImpl(ReplayWorkflowRunTaskHandler.java:175)
at io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTask(ReplayWorkflowRunTaskHandler.java:145)
at io.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTaskWithQuery(ReplayWorkflowTaskHandler.java:122)
at io.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTask(ReplayWorkflowTaskHandler.java:97)
at io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:241)
at io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:199)
at io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:93)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: WorkflowTask: failure executing SCHEDULED->WORKFLOW_TASK_STARTED, transition history is [CREATED->WORKFLOW_TASK_SCHEDULED]
at io.temporal.internal.statemachines.StateMachine.executeTransition(StateMachine.java:151)
at io.temporal.internal.statemachines.StateMachine.handleHistoryEvent(StateMachine.java:101)
at io.temporal.internal.statemachines.EntityStateMachineBase.handleEvent(EntityStateMachineBase.java:67)
at io.temporal.internal.statemachines.WorkflowStateMachines.handleSingleEvent(WorkflowStateMachines.java:235)
at io.temporal.internal.statemachines.WorkflowStateMachines.handleEventsBatch(WorkflowStateMachines.java:199)
... 11 common frames omitted
Caused by: java.lang.IllegalStateException: [task1] execution failed.
at poc.dag.DagWorkflowImpl6.executeTask(DagWorkflowImpl6.java:97)
at io.temporal.internal.sync.AsyncInternal.lambda$function$8f645b3f$1(AsyncInternal.java:66)
at io.temporal.internal.sync.AsyncInternal.lambda$execute$0(AsyncInternal.java:302)
at io.temporal.internal.sync.CancellationScopeImpl.run(CancellationScopeImpl.java:101)
at io.temporal.internal.sync.WorkflowThreadImpl$RunnableWrapper.run(WorkflowThreadImpl.java:111)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
... 3 common frames omitted
02:05:11.915 [Workflow Executor taskQueue="DagWorkflowQueue", namespace="default": 297] ERROR i.t.internal.worker.PollerOptions - uncaught exception
java.lang.RuntimeException: Failure processing workflow task. WorkflowId=DagWorkflow, RunId=11c23f77-12d9-4791-baa0-ab57859a8c43, Attempt=295
at io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.wrapFailure(WorkflowWorker.java:301)
at io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.wrapFailure(WorkflowWorker.java:199)
at io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:98)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: io.temporal.internal.replay.InternalWorkflowTaskException: Failure handling event 36 of type 'EVENT_TYPE_WORKFLOW_TASK_STARTED' during execution. {PreviousStartedEventId=36, workflowTaskStartedEventId=36, Currently Processing StartedEventId=36}
at io.temporal.internal.statemachines.WorkflowStateMachines.createEventProcessingException(WorkflowStateMachines.java:222)
at io.temporal.internal.statemachines.WorkflowStateMachines.handleEventsBatch(WorkflowStateMachines.java:201)
at io.temporal.internal.statemachines.WorkflowStateMachines.handleEvent(WorkflowStateMachines.java:175)
at io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTaskImpl(ReplayWorkflowRunTaskHandler.java:175)
at io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTask(ReplayWorkflowRunTaskHandler.java:145)
at io.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTaskWithQuery(ReplayWorkflowTaskHandler.java:122)
at io.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTask(ReplayWorkflowTaskHandler.java:97)
at io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:241)
at io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:199)
at io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:93)
... 3 common frames omitted
Caused by: java.lang.RuntimeException: WorkflowTask: failure executing SCHEDULED->WORKFLOW_TASK_STARTED, transition history is [CREATED->WORKFLOW_TASK_SCHEDULED]
at io.temporal.internal.statemachines.StateMachine.executeTransition(StateMachine.java:151)
at io.temporal.internal.statemachines.StateMachine.handleHistoryEvent(StateMachine.java:101)
at io.temporal.internal.statemachines.EntityStateMachineBase.handleEvent(EntityStateMachineBase.java:67)
at io.temporal.internal.statemachines.WorkflowStateMachines.handleSingleEvent(WorkflowStateMachines.java:235)
at io.temporal.internal.statemachines.WorkflowStateMachines.handleEventsBatch(WorkflowStateMachines.java:199)
... 11 common frames omitted
Caused by: java.lang.IllegalStateException: [task1] execution failed.
at poc.dag.DagWorkflowImpl6.executeTask(DagWorkflowImpl6.java:97)
at io.temporal.internal.sync.AsyncInternal.lambda$function$8f645b3f$1(AsyncInternal.java:66)
at io.temporal.internal.sync.AsyncInternal.lambda$execute$0(AsyncInternal.java:302)
at io.temporal.internal.sync.CancellationScopeImpl.run(CancellationScopeImpl.java:101)
at io.temporal.internal.sync.WorkflowThreadImpl$RunnableWrapper.run(WorkflowThreadImpl.java:111)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
... 3 common frames omitted