Triggering Synchronous and Asynchronous Workflows from a workflow

I have a use case where I am using temporal to replace an existing ingest process. There is a requirement where requests that come through for the same ingest id, must be run in the order that they arrive.

To implement this I have used the suggestion outlined by maxim here: design patterns - Job queue with job affinity - Stack Overflow which provides an internal queue to trigger the workflows in order. The code snippet is similar to:

@WorkflowInterface
public interface SerializedExecutionWorkflow {

    @WorkflowMethod
    void execute();

    @SignalMethod
    void addTask(Task t);
}

@ActivityInterface
public interface TaskProcessorActivity {
    void process(Task poll);
}

public class SerializedExecutionWorkflowImpl implements SerializedExecutionWorkflow {

    private final Queue<Task> taskQueue = new ArrayDeque<>();
    private final TaskProcesorActivity processor = Workflow.newActivityStub(TaskProcesorActivity.class);

    @Override
    public void execute() {
        while(!taskQueue.isEmpty()) {
            processor.process(taskQueue.poll());
        }
    }

    @Override
    public void addTask(Task t) {
        taskQueue.add(t);
    }
}

This solution works great.

Currently I am just calling Workflow.newChildWorkflowStub() to create each new task.

If I wanted to extend this use-case to allow both synchronous and asynchronous workflows to be triggered, (based on information in the task added to the queue), would this be possible? The idea behind supporting asynchronous workflows is that once it is triggered I don’t care when it finishes or how many I trigger.

Using synchronous child workflows is ok because the parent workflow has to wait for all child workflows to finish before it completes.

My question is, if I started asynchronous child workflows wouldn’t these get automatically stopped as soon as the parent triggers them all and then exists itself? Is there a way to trigger an asynchronous workflow from another workflow without it being a child workflow dependant on the parent?

1 Like

Does this post answer your question?

Thanks maxim - this is exactly what I was looking for. I’ll try it out.

Hi Maxim,

In the java version if my child workflow had a method called execute() that had to be called to trigger the actual workflow how would that be done in the code below?

Also is there any option to do something similar to the BatchRequest where you could pass multiple methods in to be triggered on the workflow?

public void parentWorklfow() {
       ChildWorkflowOptions options =
          ChildWorkflowOptions.newBuilder()
              .setCancellationType(ChildWorkflowCancellationType.ABANDON)
              .build();
       MyChildWorkflow child = Workflow.newChildWorkflowStub(MyChildWorkflow.class, options);
       Promise<WorkflowExecution> childExecution = Workflow.getWorkflowExecution(child);
       // Wait for child to start
       childExecution.get()
  }

Good catch. The workflow has to be started before waiting for a child to start.

       MyChildWorkflow child = Workflow.newChildWorkflowStub(MyChildWorkflow.class, options);
       Async.procedure(child::execute);
       Promise<WorkflowExecution> childExecution = Workflow.getWorkflowExecution(child);
       // Wait for child to start
       childExecution.get()

Thanks - will try it out.

Hi Maxim,

I’m not able to trigger the child workflows asynchronously with the above code. I tested the workflow synchronously and it works but asynchronously my execute() method is never getting called. Would there be another step I’m missing to trigger the workflows?

Could you post the workflow execution history for the parent?

Here is the history:

I noticed that in step 6 the parentClosePolicy is PARENT_CLOSE_POLICY_TERMINATE. Could this be the issue?

The forum post I referenced had correct text description, but invalid sample code. Make sure that you set ParentClosePolicy to ABANDON. Not the cancellation type:

       ChildWorkflowOptions options =
          ChildWorkflowOptions.newBuilder()
              .setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON)
              .build();

I’ve fixed the original forum post sample.

Thanks Maxim,

I have applied the update but the workflow still does not trigger.

My code snippet for triggering the workflow:

	System.out.println(String.format("workflow execute START ASYNC"));
	Async.procedure(workflow::execute);
	System.out.println(String.format("workflow execute STARTED"));
	Promise<WorkflowExecution> childExecution = Workflow.getWorkflowExecution(workflow);
	System.out.println(String.format("workflow execute EXECUTION STATE"));
	
	// Wait for child to start
	childExecution.get();
	System.out.println(String.format("workflow execute STARTED"));

I get the workflow execute EXECUTION STATE printed and then it blocks on the childExecution.get()

Something is weird. How is your workflow completing if the workflow method is blocked on childExecution.get()?

Are you using multiple threads?

Here is the code for the workflow and how it’s being called:

public class SerializedIngestWorkflowImpl implements SerializedIngestWorkflow {
	
	private final Queue<IngestTask> taskQueue = new ArrayDeque<>();

	public SerializedIngestWorkflowImpl() {
		ActivityOptions ingestOptions = ActivityOptions.newBuilder()
				.setScheduleToCloseTimeout(Duration.ofHours(1))
				.setTaskQueue(Queues.INGEST_QUEUE)
				.build();
	}
	
	@Override
	public void runQueue() {
		System.out.println(String.format("execute()"));
		while(!taskQueue.isEmpty()) {
			System.out.println(String.format("getting task"));
			IngestTask task = taskQueue.poll();
			System.out.println(String.format("got task...calling executeIngestWorkflow"));
			
			runTask(task, true);
		}
	}
	
	@Override
	public void addTask(IngestTask task) {
		System.out.println(String.format("addTask()"));
		if(task.isTaskSynchronous()) {
			System.out.println(String.format("add synchronous task"));
			taskQueue.add(task);
		}
		else {
			// run the task now
			runTask(task, false);
		}
		
	}
	
	private void runTask(IngestTask task, boolean isSynchronous) {
		IngestWorkflow workflow = task.getWorkflow();
		
		if(isSynchronous) {
			System.out.println(String.format("workflow execute START"));
			workflow.execute();
			System.out.println(String.format("workflow execute STOP"));
		}
		else {
			System.out.println(String.format("workflow execute START ASYNC"));
			Async.procedure(workflow::execute);
			System.out.println(String.format("workflow execute STARTED"));
			Promise<WorkflowExecution> childExecution = Workflow.getWorkflowExecution(workflow);
			System.out.println(String.format("workflow execute EXECUTION STATE"));
			
			// Wait for child to start
			childExecution.get();
			System.out.println(String.format("workflow execute STARTED"));
			
		}
	}
}

and how it’s called

	private static void addIngest(WorkflowClient workflowClient, IngestTask task) {
		// Set workflowId to userId
		WorkflowOptions options = WorkflowOptions.newBuilder()
				.setTaskQueue(Queues.INGEST_QUEUE)
				.setWorkflowId(task.getDatasourceId())
				.setWorkflowTaskTimeout(null)
				.build();
		// Use workflow interface stub to start/signal workflow instance
		SerializedIngestWorkflow workflow = workflowClient.newWorkflowStub(SerializedIngestWorkflow.class, options);
		BatchRequest request = workflowClient.newSignalWithStartRequest();
		request.add(workflow::runQueue);
		request.add(workflow::addTask, task);
		workflowClient.signalWithStart(request);
	}

Based on my understanding the BatchRequest calls would be blocking so the addTask() should finish triggering the child workflow before the runQueue() is called. For an asynchronous task the queue would be empty so the runQueue() would exit immediately after it is called.

A workflow completes as soon as the method annotated with @WorkflowMethod completes. In your case, you call runTask from a signal handler method for asynchronous tasks. Blocking signal method on childExecution.get() doesn’t preclude the main workflow method from completing. This completes workflow without waiting for child workflows to start.

I would refactor your workflow to start all children from the main workflow thread by adding all tasks to the queue. And then waiting on result Promises for synchronous ones and waiting on WorkflowExecution promises for the asynchronous.

Thanks maxim for the advice, I’ll try this out. The only thing I can see here is that I want all asynchronous tasks to start immediately. If they are added to the same queue as the synchronous ones they would not get triggered until any synchronous tasks before it in the queue have completed.

Following on from the previous discussion on the Job Queue workflow, I’ve found that if too many tasks get added to the queue the Job Queue workflow never completes and the history grows until it reaches the history event limit. I’d like to implement ContinueAsNew, however, I’ve found issues with serializing the data in the Job Queue.

To elaborate more on why this is an issue when a child workflow is registered with the job queue the following occurs:

  1. The child workflow starts by registering itself and sets it state to pending. It then waits for a signal to start processing from the parent Job Queue workflow. At this point a user can see all jobs that are registered but not necessarily actively running.
  2. The job queue processes each child workflow sequentially. To do this it calls a method on the child workflow that signals it to continue processing.

In order for this to work each entry on the job queue contains a reference to the child workflow so it can signal it when to continue processing.

I am finding that when I try and perform a ContinueAsNew and pass through the queue of pending child workflows it fails to serialize.

  1. Is there a way that I can use ContinueAsNew where I can pass through the queue of remaining jobs?
  2. If I don’t keep a reference to the child workflow in the job queue is there any way of referencing the child workflow to call the method on it to tell it to continue processing.
  3. If 2. is possible how do I know when the child has completed so the Job Queue can trigger the next job in the queue
  1. Drain the queue into a List or an array and pass it as the parameter.
  2. You can create a child stub from its WorkflowID. Using newExternalWorkflowStub function.
  3. Make the child send a signal to the parent before completing.

Thanks Maxim,

As a first step I am trying to confirm steps 2 & 3 using the existing Job Queue workflow. When I take a Job from the queue I now call:

MyChildWorkflow childWorkflow = Workflow.newExternalWorkflowStub(MyChildWorkflow.class, childWorkflowId);

I’m getting an exception saying the workflow with the childWorkflowId could not be found. The sequence of events when a task is added to the Job Queue workflow is:

  1. addTask() called on JobQueue workflow
    a. child workflow is created from task
    b. child workflow “register” method is called which basically registers the child workflow and then blocks on a signal to continue processing
    c. task gets added to the queue
  2. At some point later task gets taken off the queue
    a. The signal method on the child workflow called “startProcessing” needs to be called.
    b. Previously a reference to the workflow created is step 1. was kept so I just called startProcessing() and it worked.
    c. Now I’m getting a reference to the workflow via MyChildWorkflow childWorkflow = Workflow.newExternalWorkflowStub(MyChildWorkflow.class, childWorkflowId); and getting an error saying the workflow could not be found.

Is there any reason why the child workflow could not be found. It appears to be created and I can see it in the temporal front-end as being created. Does newExternalWorkflowStub() only work for non-child workflows?

ChildWorkflow childWorkflow =
          Workflow.newExternalWorkflowStub(ChildWorkflow.class, "childWorkflowId");

should work inside your workflow code. Are you sure the child workflow is running when you make this call?

Hi @tihomir,

The child workflows @WorkflowMethod is called register(). The sequence of events is as follows.

  1. Create the new child workflow
  2. Start the child workflow asynchronously by calling Async.procedure(workflow::register)
  3. Add the child workflow details to a queue
  4. At some point later call the ChildWorkflow childWorkflow =
    Workflow.newExternalWorkflowStub(ChildWorkflow.class, “childWorkflowId”);

Step 4. is where it’s failing.

Could the issue be calling the @WorkflowMethod of the child workflow asynchronously? Is there are way I can trigger the childworkflow but wait until it has “registered” so it can be found. I’m not clear at which point the child workflow can be looked up - is it once the workflow is created, or after the @WorkflowMethod has been called?