I’d like to design a Workflow A with a Finite State Machine (FSM) embedded within. This workflow is a long execution process that will be transitioned to multiple different stages based on the state received from inside 2 different Kafka topics. This state will be signaled by external workflows, which will be initiated and called from within the Kafka consumer code. When Workflow A receives a new state signal from an external workflow, it will perform several activities, including database operations and third-party calls, before transitioning to the new state. My questions are:
Should the FSM transition logic be implemented in the WorkflowMethod instead of the SignalMethod, and why? Could you please provide a sample code for this FSM transition?
what’s the best practice to implement the signal workflow within the Kafka consumer? how to ensure the reliability of the signaling action?
how to guarantee the idempotency of the DB operations, which will be inserting/updating records in 3 different MySQL tables? any recommendations for this?
Are you sure that FSM is the best option? Do you have situations when the same event causes execution of a different set of activities depending on the current state?
Signal workflow from a Kafka consumer and ack the message only after signal returns OK.
Write DB transactions in a way that duplicated inserts/updates are rejected/ignored.
In this business logic, we have a long-running object, such as an Order. When a user creates an order, it is submitted to a downstream order processing service, which then sends the latest status of the order back to us through Kafka. The possible statuses of this order would be something like PROCESSING, FAILED, or FINISHED. Meanwhile, the user can cancel the order by calling a CANCEL RESTful endpoint or pause it by calling a PAUSE endpoint, putting the order on hold. As a result, the order can transition through various stages, including SUBMIT, PAUSE, and CANCEL.
I design it in this way using a long-running workflow, say one Workflow called order-{$order-unique-id} presenting one Order to manage the different stages and execution of this ORDER object. My service needs to react accordingly depending on what action(CANCEL, PAUSE etc…) the user made(through RESTful endpoint) and what the latest status is received from the downstream order processing service through Kakfa Topic.
Please let me know does it make sense to use the FSM and defined it like this in this scenario.
Do you have situations when the same event causes the execution of a different set of activities depending on the current state?
In this scenario, events can be user actions (e.g., PAUSE, CANCEL) or the actual order status received in Kafka from the downstream order processing service. The order workflow has an internal attribute called ‘PHASE’ (with values like SUBMIT, PAUSE, CANCEL) to indicate the current user action, such as order SUBMIT, PAUSE, or CANCEL. Within each phase, the service reacts differently to various statuses. Consequently, the same event does not trigger different sets of activities, as the service’s response is determined by both the event and the current phase.
overall, what I needed here is to implement an event-driven workflow. This workflow needs to react differently based on the external signals which are originally coming from Kafka consumers. Any good patterns for this?
thanks for the response @maxim I might not clarify my business logic clearly and it was kind of misleading.
Actually, the ORDER workflow needs to maintain the status of the specific Order object in the system and react according to some external signals. but this Order could be at different stages depending on either the action made by the user or the latest status of this order received from the Kafka topic.
These ORDER stages include: CREATE, PREEMPT, RESUME, and TERMINATE stage
This stage transition could happen in different orders, for example:
1. CREATE → PREEMPT → TERMINATE
OR 2. CREATE → PREEMPT → RESUME → PREEMPT → RESUME → TERMINATE
OR 3. CREATED → TERMINATE
OR any other orders
Depending on which ORDER stage it is now, the order status transition operations might be different. Those order statuses could be: running, queued, validating etc.
public class OrderWorkflowImpl implements OrderWorkflow {
private Order order;
private OrderStatus currentStatus;
private CompletablePromise<OrderEventInfo> eventPromise;
public OrderWorkflowImpl() {
eventPromise = Workflow.newPromise();
}
@Override
public void orderProcessor(Order order) {
this.order = order;
this.currentStatus = OrderStatus.CREATED;
// Main loop to process order events
while (currentStatus != OrderStatus.FINISHED && currentStatus != OrderStatus.FAILED && currentStatus != OrderStatus.CANCELED) {
OrderEventInfo eventInfo = eventPromise.get();
processOrderEvent(eventInfo);
eventPromise = Workflow.newPromise();
}
}
@Override
public OrderStatus getOrderStatus() {
return currentStatus;
}
@Override
public void processOrderEvent(OrderStage orderStage, OrderStatus status) {
eventPromise.complete(new OrderEventInfo(orderStage, status));
}
private void processOrderEvent(OrderEventInfo eventInfo) {
switch (eventInfo.getStage()) {
case CREATE:
currentStatus = eventInfo.getOrderStatus();
//activity operations such as DB persist
break;
case PREEMPT:
currentStatus = eventInfo.getOrderStatus();
//activity operations such as DB persist
break;
case RESUME:
currentStatus = eventInfo.getOrderStatus();
//activity operations such as DB persist
break;
case TERMINATE:
currentStatus = eventInfo.getStatus();
//activity operations such as DB persist
break;
}
}
}
the signal processOrderEvent would be coming from either kafka consumers or restful endpoints called by the user.
TLDR; I leveraged Workflow.newPromise() here to wait for some external event (event = order_stage + order_status) to arrive in random order and react accordingly.
Is this the right pattern being used here?
Don’t think you need to use Workflow.newPromise, in signal method you can write events to list and then iterate until empty, see sample here for reference.
Could consider passing in single input (OrderEventInfo) here.
this.currentStatus = OrderStatus.CREATED;
Would not initialize this inside workflow method. Note that signal handler can be invoked before the workflow method (for example if you started exec from client using signalWithStart).
// Main loop to process order events
while (currentStatus != OrderStatus.FINISHED && currentStatus != OrderStatus.FAILED && currentStatus != OrderStatus.CANCELED) { ... }
How many updates (max) to single order are you expecting? Note that single exec has limits on event history size and count so you might need to look into using continueAsNew here to clear history, also see batch samples here that might help.
Then the state machine might be OK. I still think that you should consider setting state variables and using await to implement specific flows. But in some cases, FSM is fine. You can implement it as workflow code directly and Temporal will take care of persistence and executing it reliably.
@maxim do you meant that we should do something like:
@Override
public void processOrder() {//Workflow Method
while (!finalTerminatedStatusAction && !isCancelled) {
switch (orderStage) {
case CREATE:
// Wait for a status change or a user action
Workflow.await(() -> statusChanged || finalTerminatedStatusAction);
handleStageFlowInCreation(orderStatusInfo);
break;
case PREEMPT:
// Wait for a status change or a user action
Workflow.await(() -> statusChanged || finalTerminatedStatusAction);
handleStageFlowInPremption(orderStatusInfo);
break;
case RESUME:
// Wait for a status change or a user action
Workflow.await(() -> statusChanged || finalTerminatedStatusAction);
handleStageFlowInResume(orderStatusInfo);
break;
case TERMINATE:
// Wait for a status change or a user action
Workflow.await(() -> statusChanged || finalTerminatedStatusAction);
handleStageFlowInTerminate(orderStatusInfo);
break;
}
}
// Perform cleanup activities
@tihomir
Within each stage, there’re some external OrderStatusInfo events coming from 2 different resources(aka 2 different Kafka consumers). The order workflow needs to react differently based on the OrderStatusInfo it got signaled. Meaning instead of having one single message queue, may we need multiple message queues. one message queue for each stage. Do I understand your guidance correctly?
In other words, in order to integrate what you proposed, do you think it would be better to design some internal message queue for each stage storing the external signals coming from that 2 different resources at each stage and processing those signals stored in each message queue one by one depending on what current stage it is now? both of the two Kafka consumers would signal OrderStatusInfo to that order workflow at any time and the order workflow needs to react accordingly based on the OrderStatus received.
If you are doing real state machine, you want to have a map of states + event to action. And if you want to buffer events, using a queue is a reasonable approach.