Hey!
I was using entity workflow pattern to control the order of the operations over an entity, it works pretty well but i am curious about the next
If the workflow run continue as new and the workflow get created but a signal is received exactly before executing the main workflow method, then i will have an item enqueued first than the ones provided as an input of the continue as new, this representing and issue for my use case, since it should be proceeded first the ones provided by the continue as new and then the new ones received through the signals.
So the question is, am i missing something here or should I implement a waiting strategy to allow the signal only enqueue items only when i initialize the queue in the main method.
Note:
Also the course that shows the entity workflow pattern example is using .offer and .add methods, but the .add method do not exist for WorkflowQueue, is this a typo error?
Doc reason:
The message handler runs first in several scenarios, such as:
- When messages arrive immediately after a Workflow continues as new but before it resumes.
class OrderUpdate {
enum UpdateType {
PURCHASE, CANCEL, MODIFY
}
String orderID;
UpdateType updateType;
}
@WorkflowInterface
class OrderManagementWorkflow {
@WorkflowMethod
String orderManagementWorkflow(Queue<OrderUpdate> updates);
@SignalMethod
void newOrderSignal(OrderUpdate update);
}
class OrderManagementWorkflowImpl implements OrderManagementWorkflow {
// instantiate your WorkflowQueue to prevent a race and add a capacity based
// on your business logic
private WorkflowQueue<OrderUpdate> signals = new WorkflowQueue<OrderUpdate>(Integer.MAX_VALUE);
private WorkflowInfo info;
@Override
public void newOrderSignal(OrderUpdate update) {
// Add new order updates to the queue
signals.add(update);
}
// Entity Workflow pattern with serialization of request
// (ie. only one request is processed at a time)
public String orderManagementWorkflow(LinkedList<OrderUpdate> updates) {
for (OrderUpdate element: updates) {
signals.offer(element);
}
info = Workflow.getInfo();
while(!info.isContinueAsNewSuggested()){
long timeSinceStarted = Workflow.currentTimeMillis() - info.getRunStartedTimestampMillis();
Duration duration = Duration.ofMillis(timeSinceStarted);
Duration twentyFourHours = Duration.ofHours(24);
boolean shouldProcessMore = Workflow.await(twentyFourHours - duration, () -> !signals.isEmpty());
if (!shouldProcessMore){
break;
}
OrderUpdate update = signals.poll();
// Call the activity to process the Update
// Activity implementation omitted for brevity
activity.processUpdate(update);
}
Workflow.continueAsNew(signals);
}
}