Triggering Synchronous and Asynchronous Workflows from a workflow

I have a use case where I am using temporal to replace an existing ingest process. There is a requirement where requests that come through for the same ingest id, must be run in the order that they arrive.

To implement this I have used the suggestion outlined by maxim here: design patterns - Job queue with job affinity - Stack Overflow which provides an internal queue to trigger the workflows in order. The code snippet is similar to:

@WorkflowInterface
public interface SerializedExecutionWorkflow {

    @WorkflowMethod
    void execute();

    @SignalMethod
    void addTask(Task t);
}

@ActivityInterface
public interface TaskProcessorActivity {
    void process(Task poll);
}

public class SerializedExecutionWorkflowImpl implements SerializedExecutionWorkflow {

    private final Queue<Task> taskQueue = new ArrayDeque<>();
    private final TaskProcesorActivity processor = Workflow.newActivityStub(TaskProcesorActivity.class);

    @Override
    public void execute() {
        while(!taskQueue.isEmpty()) {
            processor.process(taskQueue.poll());
        }
    }

    @Override
    public void addTask(Task t) {
        taskQueue.add(t);
    }
}

This solution works great.

Currently I am just calling Workflow.newChildWorkflowStub() to create each new task.

If I wanted to extend this use-case to allow both synchronous and asynchronous workflows to be triggered, (based on information in the task added to the queue), would this be possible? The idea behind supporting asynchronous workflows is that once it is triggered I don’t care when it finishes or how many I trigger.

Using synchronous child workflows is ok because the parent workflow has to wait for all child workflows to finish before it completes.

My question is, if I started asynchronous child workflows wouldn’t these get automatically stopped as soon as the parent triggers them all and then exists itself? Is there a way to trigger an asynchronous workflow from another workflow without it being a child workflow dependant on the parent?

Does this post answer your question?

Thanks maxim - this is exactly what I was looking for. I’ll try it out.

Hi Maxim,

In the java version if my child workflow had a method called execute() that had to be called to trigger the actual workflow how would that be done in the code below?

Also is there any option to do something similar to the BatchRequest where you could pass multiple methods in to be triggered on the workflow?

public void parentWorklfow() {
       ChildWorkflowOptions options =
          ChildWorkflowOptions.newBuilder()
              .setCancellationType(ChildWorkflowCancellationType.ABANDON)
              .build();
       MyChildWorkflow child = Workflow.newChildWorkflowStub(MyChildWorkflow.class, options);
       Promise<WorkflowExecution> childExecution = Workflow.getWorkflowExecution(child);
       // Wait for child to start
       childExecution.get()
  }

Good catch. The workflow has to be started before waiting for a child to start.

       MyChildWorkflow child = Workflow.newChildWorkflowStub(MyChildWorkflow.class, options);
       Async.procedure(child::execute);
       Promise<WorkflowExecution> childExecution = Workflow.getWorkflowExecution(child);
       // Wait for child to start
       childExecution.get()

Thanks - will try it out.

Hi Maxim,

I’m not able to trigger the child workflows asynchronously with the above code. I tested the workflow synchronously and it works but asynchronously my execute() method is never getting called. Would there be another step I’m missing to trigger the workflows?

Could you post the workflow execution history for the parent?

Here is the history:

I noticed that in step 6 the parentClosePolicy is PARENT_CLOSE_POLICY_TERMINATE. Could this be the issue?

The forum post I referenced had correct text description, but invalid sample code. Make sure that you set ParentClosePolicy to ABANDON. Not the cancellation type:

       ChildWorkflowOptions options =
          ChildWorkflowOptions.newBuilder()
              .setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON)
              .build();

I’ve fixed the original forum post sample.

Thanks Maxim,

I have applied the update but the workflow still does not trigger.

My code snippet for triggering the workflow:

	System.out.println(String.format("workflow execute START ASYNC"));
	Async.procedure(workflow::execute);
	System.out.println(String.format("workflow execute STARTED"));
	Promise<WorkflowExecution> childExecution = Workflow.getWorkflowExecution(workflow);
	System.out.println(String.format("workflow execute EXECUTION STATE"));
	
	// Wait for child to start
	childExecution.get();
	System.out.println(String.format("workflow execute STARTED"));

I get the workflow execute EXECUTION STATE printed and then it blocks on the childExecution.get()

Something is weird. How is your workflow completing if the workflow method is blocked on childExecution.get()?

Are you using multiple threads?

Here is the code for the workflow and how it’s being called:

public class SerializedIngestWorkflowImpl implements SerializedIngestWorkflow {
	
	private final Queue<IngestTask> taskQueue = new ArrayDeque<>();

	public SerializedIngestWorkflowImpl() {
		ActivityOptions ingestOptions = ActivityOptions.newBuilder()
				.setScheduleToCloseTimeout(Duration.ofHours(1))
				.setTaskQueue(Queues.INGEST_QUEUE)
				.build();
	}
	
	@Override
	public void runQueue() {
		System.out.println(String.format("execute()"));
		while(!taskQueue.isEmpty()) {
			System.out.println(String.format("getting task"));
			IngestTask task = taskQueue.poll();
			System.out.println(String.format("got task...calling executeIngestWorkflow"));
			
			runTask(task, true);
		}
	}
	
	@Override
	public void addTask(IngestTask task) {
		System.out.println(String.format("addTask()"));
		if(task.isTaskSynchronous()) {
			System.out.println(String.format("add synchronous task"));
			taskQueue.add(task);
		}
		else {
			// run the task now
			runTask(task, false);
		}
		
	}
	
	private void runTask(IngestTask task, boolean isSynchronous) {
		IngestWorkflow workflow = task.getWorkflow();
		
		if(isSynchronous) {
			System.out.println(String.format("workflow execute START"));
			workflow.execute();
			System.out.println(String.format("workflow execute STOP"));
		}
		else {
			System.out.println(String.format("workflow execute START ASYNC"));
			Async.procedure(workflow::execute);
			System.out.println(String.format("workflow execute STARTED"));
			Promise<WorkflowExecution> childExecution = Workflow.getWorkflowExecution(workflow);
			System.out.println(String.format("workflow execute EXECUTION STATE"));
			
			// Wait for child to start
			childExecution.get();
			System.out.println(String.format("workflow execute STARTED"));
			
		}
	}
}

and how it’s called

	private static void addIngest(WorkflowClient workflowClient, IngestTask task) {
		// Set workflowId to userId
		WorkflowOptions options = WorkflowOptions.newBuilder()
				.setTaskQueue(Queues.INGEST_QUEUE)
				.setWorkflowId(task.getDatasourceId())
				.setWorkflowTaskTimeout(null)
				.build();
		// Use workflow interface stub to start/signal workflow instance
		SerializedIngestWorkflow workflow = workflowClient.newWorkflowStub(SerializedIngestWorkflow.class, options);
		BatchRequest request = workflowClient.newSignalWithStartRequest();
		request.add(workflow::runQueue);
		request.add(workflow::addTask, task);
		workflowClient.signalWithStart(request);
	}

Based on my understanding the BatchRequest calls would be blocking so the addTask() should finish triggering the child workflow before the runQueue() is called. For an asynchronous task the queue would be empty so the runQueue() would exit immediately after it is called.

A workflow completes as soon as the method annotated with @WorkflowMethod completes. In your case, you call runTask from a signal handler method for asynchronous tasks. Blocking signal method on childExecution.get() doesn’t preclude the main workflow method from completing. This completes workflow without waiting for child workflows to start.

I would refactor your workflow to start all children from the main workflow thread by adding all tasks to the queue. And then waiting on result Promises for synchronous ones and waiting on WorkflowExecution promises for the asynchronous.

Thanks maxim for the advice, I’ll try this out. The only thing I can see here is that I want all asynchronous tasks to start immediately. If they are added to the same queue as the synchronous ones they would not get triggered until any synchronous tasks before it in the queue have completed.