What’s the best practice to integrate with external system invocation without polling?

We’re evaluating Temporal recently and many of our scenarios include activities that submit computational intensive jobs to an HPC cluster via scheduler such as Slurm (usually the jobs are simply shell scripts invoking programs stored in shared NAS storage accessible from HPC nodes).

As I understand, we can implement the “HPC job” as two activities - one for job submission and one for polling job status from Slurm.

But I’m wondering if we can implement it without polling - for example, it is possible that as last step in the job script submitted to Slurm, we include a temporal client program sending a Signal to notify the job completion?

Thanks in advance!

1 Like

Sure! There are multiple ways to do this depending on the shape of the jobs.

The most obvious to do this is async activity completion. So depending on your preferred language you include the task token from the activity info in the Slurm job and then complete the activity notifying the system that it will be completed asynchronously (this looks different in different SDKs). Then have the Slurm job complete the activity with its result using a Temporal client and that task token you gave the job. Bonus points if you can also have the Slurm job do regular heartbeating w/ the client and set activity option heartbeat timeout to confirm to the system the job is still alive.

I would recommend an activity to submit a request and a signal to reply over async activity completion.

The main reason is timeout and retries. You want the request submission to have a short timeout and aggressive retry options. An HPC job can take a long time, so modeling it as an async activity would require a long timeout which means that in case of a worker process crash it would wait for that timeout to resubmit the request.

1 Like

Thanks Chad_Retz and maxim for your help! We will try it out.

We have implemented the scenario with the signal approach as suggested and it works great - now the we can aggressively retry submission activity if the HPC cluster temporarily rejects jobs due to workload throttling.

But sometimes HPC admitted jobs fail due to various reasons and we would like to automatically resubmit the job if its execution fails at HPC cluster. With signal, we can send job exit code back to the workflow, but we are not sure what is the best approach to implement the retry policy for HPC admitted but failed jobs.

Could you please shed some light on it? Thanks.

You can retry the whole part of the workflow that failed. See the fileprocessing sample that does exactly that.

UPDATE: I figured that WorkflowImplementationOptions.setFailWorkflowExceptionTypes does the trick by failing the workflow upon certain exceptions, which fits our scenario as we do want the workflow to fail if any DAG job fails after sufficient retries.

====================================
Thanks maxim.

Sorry to bother again, but still couldn’t get it working properly…

Let me describe my scenario in a bit detail: Basically our workflow is a DAG of HPC jobs, we have a simple DSL for DAG configuration, and an in-house developed service to submit jobs to HPC and keep track of the DAG states - we want to replace it with Temporal.

My current Workflow implementation is as follows:

  • An activity implementation that submits jobs to HPC, throw exception if job is not accepted by HPC due to temporarily problems (e.g workload throttling)
public enum TaskState {
    PENDING, RUNNING, SUCCEEDED, FAILED
}
public class Task {
    private String name;
    private String cmd;
    private List<String> dependencyNames;
    TaskState state;
}
@ActivityMethod
public void submitHPCJob(Task task);
  • An workflow implementation that
    • holds the DAG data (a map from task name to Task)
    • implement a SignalMethod to receive HPC job execution status (SUCCEEDEDor FAILED), and update status in the DAG data
    • WorkflowMethod: asynchronously process tasks (retry options set for whole processing logic of a single task), for each task
      • Wait for all dependencies to succeed
      • Submit job to HPC via activity (with aggressive retry upon failure)
      • Wait for the status of the task to be updated to either SUCCEEDED or FAILED
        • if FAILED - then throw an exception so that retry could happen starting from job submission
// DAG data
private Map<String, Task> taskMap;

// WorkflowMethod
public void startWorkflow(Map<String, Task> tasks) {
	this.taskMap = tasks;
	
	// Process tasks
	List<Promise<String>> promises = taskMap.values().stream().map(task -> {
		return Async.retry(
			RetryOptions.newBuilder()
				setMaximumAttempts(2)
				.build(),
			Optional.empty(),
			() -> Async.procedure(this::executeTask, task));
	}).collect(Collectors.toList());

	Promise.allOf(promises).get();
}

private void executeTask(Task task) {
	String taskName = task.getName();
	// Await for dependencies to succeed
	Workflow.await(() -> task.getDependencies().stream().allMatch(d -> taskMap.get(d).getState() == TaskState.SUCCEEDED));

	// Execute task
	this.activities.submitHPCJob(task);
	
	// update task state in the map
	task.setState(TaskState.RUNNING);

	// Await for task completion (from signal)
	Workflow.await(() -> task.getState() == TaskState.SUCCEEDED || task.getState() == TaskState.FAILED);

	if (task.getState() == TaskState.FAILED) {
		throw new IllegalStateException(String.format("[%s] execution failed.", taskName));
	}
}

// SignalMethod
public void updateHPCJobStatus(String name, TaskState state) {
	if (taskMap.containsKey(name)) {
		taskMap.get(name).setState(state);
	}
}

// QueryMethod
public Map<String, Task> getTasks() {
	return taskMap;
}

Now if any task execution failed at HPC (thus resulted a FAILED state set by SignalMethod), the workflow will retry job submission. However if max attempts exceeded, the whole workflow seems to be stuck with the following exception stacktrace repeating (above the stack trace are the log messages printed by the workflow code): it appears to me that the workflow was replayed repeatedly.

The DAG used is as follows, and task1 failed twice on HPC (retry max attempt set to 2)
task1: no dependencies
task2: depends on task1
task3: depends on task1
task4: depends on task2
task5: depends on task3

I also noticed that the QueryMethod did not work when the workflow was stuck (web UI showing “No queries registered” and the loading circle kept spinning)

[task5] Awaiting for dependencies ([task3])
[task1] Awaiting for dependencies ([])
[task1] About to execute task.
[task2] Awaiting for dependencies ([task1])
[task3] Awaiting for dependencies ([task1])
[task4] Awaiting for dependencies ([task2])
[task1] Status changing to RUNNING.
[task1] Awaiting for status change from external execution system.
[task1] Received signal - FAILED
[task1] Task status changed to FAILED
[task1] Task failed, throwing Exception.
[task1] Awaiting for dependencies ([])
[task1] About to execute task.
[task1] Status changing to RUNNING.
[task1] Awaiting for status change from external execution system.
[task1] Received signal - FAILED
[task1] Task status changed to FAILED
[task1] Task failed, throwing Exception.
02:05:11.915 [Workflow Executor taskQueue="DagWorkflowQueue", namespace="default": 297] WARN  i.t.i.r.ReplayWorkflowTaskHandler - Workflow task processing failure. startedEventId=36, WorkflowId=DagWorkflow, RunId=11c23f77-12d9-4791-baa0-ab57859a8c43. If seen continuously the workflow might be stuck.
io.temporal.internal.replay.InternalWorkflowTaskException: Failure handling event 36 of type 'EVENT_TYPE_WORKFLOW_TASK_STARTED' during execution. {PreviousStartedEventId=36, workflowTaskStartedEventId=36, Currently Processing StartedEventId=36}
	at io.temporal.internal.statemachines.WorkflowStateMachines.createEventProcessingException(WorkflowStateMachines.java:222)
	at io.temporal.internal.statemachines.WorkflowStateMachines.handleEventsBatch(WorkflowStateMachines.java:201)
	at io.temporal.internal.statemachines.WorkflowStateMachines.handleEvent(WorkflowStateMachines.java:175)
	at io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTaskImpl(ReplayWorkflowRunTaskHandler.java:175)
	at io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTask(ReplayWorkflowRunTaskHandler.java:145)
	at io.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTaskWithQuery(ReplayWorkflowTaskHandler.java:122)
	at io.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTask(ReplayWorkflowTaskHandler.java:97)
	at io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:241)
	at io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:199)
	at io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:93)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: WorkflowTask: failure executing SCHEDULED->WORKFLOW_TASK_STARTED, transition history is [CREATED->WORKFLOW_TASK_SCHEDULED]
	at io.temporal.internal.statemachines.StateMachine.executeTransition(StateMachine.java:151)
	at io.temporal.internal.statemachines.StateMachine.handleHistoryEvent(StateMachine.java:101)
	at io.temporal.internal.statemachines.EntityStateMachineBase.handleEvent(EntityStateMachineBase.java:67)
	at io.temporal.internal.statemachines.WorkflowStateMachines.handleSingleEvent(WorkflowStateMachines.java:235)
	at io.temporal.internal.statemachines.WorkflowStateMachines.handleEventsBatch(WorkflowStateMachines.java:199)
	... 11 common frames omitted
Caused by: java.lang.IllegalStateException: [task1] execution failed.
	at poc.dag.DagWorkflowImpl6.executeTask(DagWorkflowImpl6.java:97)
	at io.temporal.internal.sync.AsyncInternal.lambda$function$8f645b3f$1(AsyncInternal.java:66)
	at io.temporal.internal.sync.AsyncInternal.lambda$execute$0(AsyncInternal.java:302)
	at io.temporal.internal.sync.CancellationScopeImpl.run(CancellationScopeImpl.java:101)
	at io.temporal.internal.sync.WorkflowThreadImpl$RunnableWrapper.run(WorkflowThreadImpl.java:111)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	... 3 common frames omitted
02:05:11.915 [Workflow Executor taskQueue="DagWorkflowQueue", namespace="default": 297] ERROR i.t.internal.worker.PollerOptions - uncaught exception
java.lang.RuntimeException: Failure processing workflow task. WorkflowId=DagWorkflow, RunId=11c23f77-12d9-4791-baa0-ab57859a8c43, Attempt=295
	at io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.wrapFailure(WorkflowWorker.java:301)
	at io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.wrapFailure(WorkflowWorker.java:199)
	at io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:98)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: io.temporal.internal.replay.InternalWorkflowTaskException: Failure handling event 36 of type 'EVENT_TYPE_WORKFLOW_TASK_STARTED' during execution. {PreviousStartedEventId=36, workflowTaskStartedEventId=36, Currently Processing StartedEventId=36}
	at io.temporal.internal.statemachines.WorkflowStateMachines.createEventProcessingException(WorkflowStateMachines.java:222)
	at io.temporal.internal.statemachines.WorkflowStateMachines.handleEventsBatch(WorkflowStateMachines.java:201)
	at io.temporal.internal.statemachines.WorkflowStateMachines.handleEvent(WorkflowStateMachines.java:175)
	at io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTaskImpl(ReplayWorkflowRunTaskHandler.java:175)
	at io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTask(ReplayWorkflowRunTaskHandler.java:145)
	at io.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTaskWithQuery(ReplayWorkflowTaskHandler.java:122)
	at io.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTask(ReplayWorkflowTaskHandler.java:97)
	at io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:241)
	at io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:199)
	at io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:93)
	... 3 common frames omitted
Caused by: java.lang.RuntimeException: WorkflowTask: failure executing SCHEDULED->WORKFLOW_TASK_STARTED, transition history is [CREATED->WORKFLOW_TASK_SCHEDULED]
	at io.temporal.internal.statemachines.StateMachine.executeTransition(StateMachine.java:151)
	at io.temporal.internal.statemachines.StateMachine.handleHistoryEvent(StateMachine.java:101)
	at io.temporal.internal.statemachines.EntityStateMachineBase.handleEvent(EntityStateMachineBase.java:67)
	at io.temporal.internal.statemachines.WorkflowStateMachines.handleSingleEvent(WorkflowStateMachines.java:235)
	at io.temporal.internal.statemachines.WorkflowStateMachines.handleEventsBatch(WorkflowStateMachines.java:199)
	... 11 common frames omitted
Caused by: java.lang.IllegalStateException: [task1] execution failed.
	at poc.dag.DagWorkflowImpl6.executeTask(DagWorkflowImpl6.java:97)
	at io.temporal.internal.sync.AsyncInternal.lambda$function$8f645b3f$1(AsyncInternal.java:66)
	at io.temporal.internal.sync.AsyncInternal.lambda$execute$0(AsyncInternal.java:302)
	at io.temporal.internal.sync.CancellationScopeImpl.run(CancellationScopeImpl.java:101)
	at io.temporal.internal.sync.WorkflowThreadImpl$RunnableWrapper.run(WorkflowThreadImpl.java:111)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	... 3 common frames omitted

Try changing

to

	if (task.getState() == TaskState.FAILED) {
		throw ApplicationFailure.newFailure(String.format("[%s] execution failed.", taskName), "IllegalState");
	}

Thanks maxim, it works like a charm!

@maxim related to this question, is there any general duration above which the Async completion approach works better than synchronously? My use case is a numerical simulation which make take up to 10 mins to execute. For now I’ll be running that on AWS lambda but the idea would be to move to some other HPC optimised space in the near future.

Can HPC job heartbeat? Is submission to HPC an operation that can fail and should be retried on a short interval?

Thanks for responding. Yes at various stages along the data preparation and computation it could send a heartbeat back. The job could definitely fail and should be returned within a short interval.

Then you can use async completion with heartbeating even for HPC.