Workflow.await(), does it release thread?

The proposed approach is fine. Just some nits:

  • WorkflowQueue is a blocking queue.
  • WorkflowQueue.take blocks until a message is available.
  • To support graceful cancellation consider using WorkflowQueue.cancellableTake.
  • Don’t use the offer without checking the result as this is going to lose a message if the queue is full.
  • I would use an unbounded queue and deal with the message shedding in the consumer code if necessary.
  • Running such workflow infinitely is going to grow its history to an unbounded size. To keep the history size bounded the workflow has to call continue as new periodically.
  • To not lose messages when calling continue as new the buffered messages have to be passed from one run to another as a workflow argument.

So I would rewrite your code to:

SampleWorkflow {

    WorkflowQueue<String> queue = Workflow.newQueue();
    ChildWorkflowOptions childWorkflowOptions = ChildWorkflowOptions.newBuilder().build();

    @Override
    public void runProcess(WorkflowQueue<String> buffered) {
        while(true) {
             String request =  buffered.peek();
             if (request == null) {
                 break;
             }
            queue.put(request); 
         }
         for(int i=0; i<CONTINUE_AS_NEW_FREQUENCY; i++) {
             workflowRequest = queue.take();
             SampleChildWorkflow child = Workflow.newChildWorkflowStub(SampleChildWorkflow.class, childWorkflowOptions);
             child.runChildProcess(workflowRequest);
          }
          // Continue as new
          SampleWorkflow next = Workflow.newContinueAsNewStub(SampleWorkflow.class);
          next.runProcess(queue);
     }

     @Override
     public void addEntry(String workflowRequest) {
          this.queue.put(workflowRequest);
     }
}

This is a pretty frequently used pattern. I’ll try to write a full sample that demonstrates it at some point.

5 Likes