Hi there,
Apologies in advance for the slightly convoluted problem - I’ve tried to distill this down to just the basics, so hopefully this will be useful for others too.
I’ve got a problem that is similar to: Polling in workflow vs. Activity?. I have refactored the code which was making use of Workflow.await in the workflow to move things into an activity.
However, I have an additional complication, that the external service also supports a webhook method of receiving status updates. I have decided I want to both have a long poll with a suitably high polling period (more as a backup in case the webhook system is down) AND also consume webhook notifications of status changes.
In my workflow - I now have code very similar to:
public void raiseTicket(Changes changes) {
// Workflow Code
var ticketId = activity.raiseTicket(changes);
var approvalStatus = activity.waitForApproval(ticketId);
...
}
Then inside the activity I use the async completion client to allow me to poll the external service for status updates, like so:
public void waitForApproval(String ticketId) {
// Activity Code
var subscriptionId = makeSubscription(ticketId); // *** IMPORTANT LINE HERE (1) ***
var ctx = Activity.getExecutionContext();
var taskToken = ctx.getTaskToken();
Runnable task = () -> checkStatus(taskToken, subscriptionId, ticketId);
// Omitted async code that invokes runnable above
...
ctx.doNotCompleteOnReturn();
return null;
}
public void checkStatus(byte[] taskToken, String subscriptionId, long ticketId) {
var status = getStatus(ticketId);
if (status == Status.PENDING) { return; }
// If in terminal state, i.e. not PENDING - close out the original call
completionClient.complete(taskToken, status);
closeSubscription(ticketId); // Remove webhook notifications
}
The design question here is - if I register for notifications from an external service from my activity itself (see line (1) in snippet above) - I assume my best way to consume the notifications from this service are via a SignalMethod in my workflow, like so?
// @SignalMethod in corresponding interface
public void handleNotification(String someTaskToken) {
activity.handleExternalNotification(someTaskToken);
}
Am I allowed to do something similar to the above where the Signal forwards the notification with some sort of context information onto the activity method? This activity method would then use this context information to invoke the Runnable which is associated with this request in order to close out the async call - i.e using the same handler that the polling version does.
Is there some other way I’m missing to handle this? Should I instead fire up some kind of HTTP notification endpoint in the activity itself to avoid the hop from SignalMethod back to activity?
I feel like the logic - when I had more in my workflow was a lot obvious - but I actually managed to get several similar async calls to deadlock the workflow and stop it making progress (despite not using any Java synchronisation primitives) - so I thought I would first try following the advice to do repeated polling only from the activity.
Original code:
Here is my original code - where I was heavily relying on Workflow.await. It feels a bit cleaner because the subscription to the external service is managed within the context of one method, which also makes error handling (not shown) easier. To handle the fact I need to track multiple tickets from one workflow - I was looking to launch these as child workflows in an asynchronous way - which hopefully was going to get me out of the deadlock I was running into.
(Just posting this for completeness - I understand the problematic nature of running activity calls in a loop which can exhaust the history limit for a workflow)
// Workflow Code
public void raiseTicket(Changes changes) {
this.ticketId = activity.raiseTicket(changes);
var subId = activity.makeSubscription(ticketId);
var status = Status.PENDING
while (status == Status.PENDING) {
var lastTs = this.ticketUpdateTs;
Workflow.await(Duration.ofHours(6), () -> ticketUpdateTs != lastTs);
status = activity.getStatus();
}
activity.closeSubscription(subId);
// Do some more stuff now the ticket is approved / cancelled
}
// @SignalMethod in corresponding interface
public void handleNotification(String someTaskToken) {
this.ticketUpdateTs = Workflow.currentTimeMillis();
}