Workflow.await(), does it release thread?

Is there any way in temporal to resequence the messages to process them as a group identified by a functional key in the message?

Here is the proposed approach.

Have a single workflow per unique functional key which receives a message (possibly with SignalWithStart) and queues, workflow will complete processing everything in the queue and go to wait state and wait for more messages to process.

The actual processing is implemented as a child workflow or directly as part of the parent workflow.

Sudo code

SampleWorkflow {
    WorkflowQueue<String> queue = Workflow.newQueue(10);
    ChildWorkflowOptions childWorkflowOptions = ChildWorkflowOptions.newBuilder().build();
    @Override
    public void runProcess(String workflowRequest) {
        queue.offer(workflowRequest);
        while (true) {
            workflowRequest = queue.poll();
            SampleChildWorkflow child = Workflow.newChildWorkflowStub(SampleChildWorkflow.class, childWorkflowOptions);
            child.runChildProcess(workflowRequest);
            if (null == queue.peek()) {
                Workflow.await(() -> (null != queue.peek()));
            }
        }
    }
    @Override
    public void addEntry(String workflowRequest) {
        this.queue.offer(workflowRequest);
    }
}

Once after I finish processing all the events in the queue,

I am moving the workflow to await state until it receive a new request using getEntry-Signal.

Any new entry will be processed by coming out from the wait state. Once after processing all the events in the WorkflowQueue, the workflow will await for new requests to be added to the queue.

When I checked the logs, I see that the workflow thread ID is same, every time a new request come and the workflow start running.
Does it mean that this thread is not release when it go to await state?

We are planning to run a million instances of such workflows (not always active) and would like to understand the performance and memory impact.

2 Likes

The proposed approach is fine. Just some nits:

  • WorkflowQueue is a blocking queue.
  • WorkflowQueue.take blocks until a message is available.
  • To support graceful cancellation consider using WorkflowQueue.cancellableTake.
  • Don’t use the offer without checking the result as this is going to lose a message if the queue is full.
  • I would use an unbounded queue and deal with the message shedding in the consumer code if necessary.
  • Running such workflow infinitely is going to grow its history to an unbounded size. To keep the history size bounded the workflow has to call continue as new periodically.
  • To not lose messages when calling continue as new the buffered messages have to be passed from one run to another as a workflow argument.

So I would rewrite your code to:

SampleWorkflow {

    WorkflowQueue<String> queue = Workflow.newQueue();
    ChildWorkflowOptions childWorkflowOptions = ChildWorkflowOptions.newBuilder().build();

    @Override
    public void runProcess(WorkflowQueue<String> buffered) {
        while(true) {
             String request =  buffered.peek();
             if (request == null) {
                 break;
             }
            queue.put(request); 
         }
         for(int i=0; i<CONTINUE_AS_NEW_FREQUENCY; i++) {
             workflowRequest = queue.take();
             SampleChildWorkflow child = Workflow.newChildWorkflowStub(SampleChildWorkflow.class, childWorkflowOptions);
             child.runChildProcess(workflowRequest);
          }
          // Continue as new
          SampleWorkflow next = Workflow.newContinueAsNewStub(SampleWorkflow.class);
          next.runProcess(queue);
     }

     @Override
     public void addEntry(String workflowRequest) {
          this.queue.put(workflowRequest);
     }
}

This is a pretty frequently used pattern. I’ll try to write a full sample that demonstrates it at some point.

2 Likes

Great inputs @maxim. Thank You.