Entity Workflow Pattern / Continue As New and Signal order

Hey!

I was using entity workflow pattern to control the order of the operations over an entity, it works pretty well but i am curious about the next

If the workflow run continue as new and the workflow get created but a signal is received exactly before executing the main workflow method, then i will have an item enqueued first than the ones provided as an input of the continue as new, this representing and issue for my use case, since it should be proceeded first the ones provided by the continue as new and then the new ones received through the signals.

So the question is, am i missing something here or should I implement a waiting strategy to allow the signal only enqueue items only when i initialize the queue in the main method.

Note:
Also the course that shows the entity workflow pattern example is using .offer and .add methods, but the .add method do not exist for WorkflowQueue, is this a typo error?

Doc reason:

The message handler runs first in several scenarios, such as:

  • When messages arrive immediately after a Workflow continues as new but before it resumes.
class OrderUpdate {
  enum UpdateType {
    PURCHASE, CANCEL, MODIFY
  }

  String orderID;
  UpdateType updateType;
}

@WorkflowInterface
class OrderManagementWorkflow {

  @WorkflowMethod
  String orderManagementWorkflow(Queue<OrderUpdate> updates);

  @SignalMethod
  void newOrderSignal(OrderUpdate update);

}

class OrderManagementWorkflowImpl implements OrderManagementWorkflow {

  // instantiate your WorkflowQueue to prevent a race and add a capacity based
  // on your business logic
  private WorkflowQueue<OrderUpdate> signals = new WorkflowQueue<OrderUpdate>(Integer.MAX_VALUE);

  private WorkflowInfo info;

  @Override
  public void newOrderSignal(OrderUpdate update) {
    // Add new order updates to the queue
    signals.add(update);
  }

  // Entity Workflow pattern with serialization of request
  // (ie. only one request is processed at a time)
  public String orderManagementWorkflow(LinkedList<OrderUpdate> updates) {

    for (OrderUpdate element: updates) {
      signals.offer(element);
    }

    info = Workflow.getInfo();

    while(!info.isContinueAsNewSuggested()){
      long timeSinceStarted = Workflow.currentTimeMillis() - info.getRunStartedTimestampMillis();

      Duration duration = Duration.ofMillis(timeSinceStarted);
      Duration twentyFourHours = Duration.ofHours(24);

      boolean shouldProcessMore = Workflow.await(twentyFourHours - duration, () -> !signals.isEmpty());

      if (!shouldProcessMore){
        break;
      }

       OrderUpdate update = signals.poll();

       // Call the activity to process the Update
       // Activity implementation omitted for brevity
       activity.processUpdate(update);

    }

      Workflow.continueAsNew(signals);
   
  }
}

Hi

If the workflow run continue as new and the workflow get created but a signal is received exactly before executing the main workflow method, then i will have an item enqueued first than the ones provided as an input of the continue as new, this representing and issue for my use case, since it should be proceeded first the ones provided by the continue as new and then the new ones received through the signals.

You can annotate the constructor with @WorkflowInit, see this example

Also the course that shows the entity workflow pattern example is using .offer and .add methods, but the .add method do not exist for WorkflowQueue, is this a typo error?

thanks, I will pass this on to the team

Hey!

You can annotate the constructor with @WorkflowInit, see this example

Should I add this in the init ?, since what i am looking is seed the queue with the “updates” pending provided but when calling continue as new

for (OrderUpdate element: updates) {
signals.offer(element);
}

continue as new and the workflow get created but a signal is received exactly before executing the main workflow method, then i will have an item enqueued first than the ones provided as an input of the continue as new

You should drain signals before calling continue-as-new. However, if you have a high rate of signals flowing in, you might hit the limits before the end of the drain.

Yes, in the code example at the end, before executing the continue-as-new, I am draining the signals to pass them as a seed parameter for the next workflow. But the case I’m trying to check is when a single signal arrives exactly after the new workflow starts but before the main method has been executed, because if that happens then I will have a signal being enqueued in the wrong order.

Doc ref: The message handler runs first in several scenarios, such as:

  • When messages arrive immediately after a Workflow continues as new but before it resumes

The WorkflowInit method is executed before any signal handlers.

@maxim Got it, so just to double check, is it valid to have this at the init level?

@WorkflowInit
public OrderManagementWorkflowImpl(LinkedList<OrderUpdate> updates) {
    signals = Workflow.newWorkflowQueue(Integer.MAX_VALUE);
    // seed signals queue with the initial updates before handle any new signal
    for (OrderUpdate element: updates) {     
       signals.offer(element);  
    }
}

Hi @Andres_Felipe_Benavi

yes, this code runs before the signal handler

1 Like