The proposed approach is fine. Just some nits:
- WorkflowQueue is a blocking queue.
- WorkflowQueue.take blocks until a message is available.
- To support graceful cancellation consider using
WorkflowQueue.cancellableTake
. - Don’t use the
offer
without checking the result as this is going to lose a message if the queue is full. - I would use an unbounded queue and deal with the message shedding in the consumer code if necessary.
- Running such workflow infinitely is going to grow its history to an unbounded size. To keep the history size bounded the workflow has to call continue as new periodically.
- To not lose messages when calling continue as new the buffered messages have to be passed from one run to another as a workflow argument.
So I would rewrite your code to:
SampleWorkflow {
WorkflowQueue<String> queue = Workflow.newQueue();
ChildWorkflowOptions childWorkflowOptions = ChildWorkflowOptions.newBuilder().build();
@Override
public void runProcess(WorkflowQueue<String> buffered) {
while(true) {
String request = buffered.peek();
if (request == null) {
break;
}
queue.put(request);
}
for(int i=0; i<CONTINUE_AS_NEW_FREQUENCY; i++) {
workflowRequest = queue.take();
SampleChildWorkflow child = Workflow.newChildWorkflowStub(SampleChildWorkflow.class, childWorkflowOptions);
child.runChildProcess(workflowRequest);
}
// Continue as new
SampleWorkflow next = Workflow.newContinueAsNewStub(SampleWorkflow.class);
next.runProcess(queue);
}
@Override
public void addEntry(String workflowRequest) {
this.queue.put(workflowRequest);
}
}
This is a pretty frequently used pattern. I’ll try to write a full sample that demonstrates it at some point.