Polling for external event and maintaining subscription

Hi there,

Apologies in advance for the slightly convoluted problem - I’ve tried to distill this down to just the basics, so hopefully this will be useful for others too.

I’ve got a problem that is similar to: Polling in workflow vs. Activity?. I have refactored the code which was making use of Workflow.await in the workflow to move things into an activity.

However, I have an additional complication, that the external service also supports a webhook method of receiving status updates. I have decided I want to both have a long poll with a suitably high polling period (more as a backup in case the webhook system is down) AND also consume webhook notifications of status changes.

In my workflow - I now have code very similar to:

public void raiseTicket(Changes changes) {
    // Workflow Code
    var ticketId = activity.raiseTicket(changes);
    var approvalStatus = activity.waitForApproval(ticketId);
    ...
}

Then inside the activity I use the async completion client to allow me to poll the external service for status updates, like so:

public void waitForApproval(String ticketId) {
    // Activity Code
    var subscriptionId = makeSubscription(ticketId); // *** IMPORTANT LINE HERE (1) ***
    var ctx = Activity.getExecutionContext();
    var taskToken = ctx.getTaskToken();
    Runnable task = () -> checkStatus(taskToken, subscriptionId, ticketId);
    // Omitted async code that invokes runnable above
    ...
    ctx.doNotCompleteOnReturn();
    return null;
}

public void checkStatus(byte[] taskToken, String subscriptionId, long ticketId) {
    var status = getStatus(ticketId);
    if (status == Status.PENDING) { return; }
    // If in terminal state, i.e. not PENDING - close out the original call
    completionClient.complete(taskToken, status);
    closeSubscription(ticketId); // Remove webhook notifications
}

The design question here is - if I register for notifications from an external service from my activity itself (see line (1) in snippet above) - I assume my best way to consume the notifications from this service are via a SignalMethod in my workflow, like so?

// @SignalMethod in corresponding interface
public void handleNotification(String someTaskToken) {
     activity.handleExternalNotification(someTaskToken);
}

Am I allowed to do something similar to the above where the Signal forwards the notification with some sort of context information onto the activity method? This activity method would then use this context information to invoke the Runnable which is associated with this request in order to close out the async call - i.e using the same handler that the polling version does.

Is there some other way I’m missing to handle this? Should I instead fire up some kind of HTTP notification endpoint in the activity itself to avoid the hop from SignalMethod back to activity?

I feel like the logic - when I had more in my workflow was a lot obvious - but I actually managed to get several similar async calls to deadlock the workflow and stop it making progress (despite not using any Java synchronisation primitives) - so I thought I would first try following the advice to do repeated polling only from the activity.


Original code:

Here is my original code - where I was heavily relying on Workflow.await. It feels a bit cleaner because the subscription to the external service is managed within the context of one method, which also makes error handling (not shown) easier. To handle the fact I need to track multiple tickets from one workflow - I was looking to launch these as child workflows in an asynchronous way - which hopefully was going to get me out of the deadlock I was running into.

(Just posting this for completeness - I understand the problematic nature of running activity calls in a loop which can exhaust the history limit for a workflow)

// Workflow Code
public void raiseTicket(Changes changes) {
    this.ticketId = activity.raiseTicket(changes);
    var subId = activity.makeSubscription(ticketId);
    var status = Status.PENDING
    while (status == Status.PENDING) {
        var lastTs = this.ticketUpdateTs;
        Workflow.await(Duration.ofHours(6), () -> ticketUpdateTs != lastTs);
        status = activity.getStatus();
    }
    activity.closeSubscription(subId);
    // Do some more stuff now the ticket is approved / cancelled
}

// @SignalMethod in corresponding interface
public void handleNotification(String someTaskToken) {
     this.ticketUpdateTs = Workflow.currentTimeMillis();
}

I think the simplest approach would be canceling the activity upon receiving the notification via signal. It assumes that the activity heartbeats periodically as the cancellation is delivered through the heartbeat call result.

Here is an example that demonstrates explicit cancellation of an activity.

Hi @maxim,

Thanks for your quick response. I forgot to mention that the notification from the webhook simply means that I need to call getStatus() again because something changed, it doesn’t imply that the ticket was approved or cancelled, etc. Thus, if I cancel the activity in the notification, then how will I get my “approvalStatus” (first code snippet)? Will the activity be retried in case of a cancellation?

// Workflow Code
public void raiseTicket(Changes changes) {
    this.ticketId = activity.raiseTicket(changes);
    this.approvalCancelScope = Workflow.newCancellationScope(() -> {
        try {
            this.subscriptionId = activity.makeSubscription(ticketId);
            // could be renamed pollForApproval now that subscription is managed in the workflow itself
            var approvalStatus = activity.waitForApproval(ticketId);
            // I need approvalStatus to proceed beyond this point
        } catch (ActivityException e) {
            // Don't leave lingering subscriptions around
            activity.closeSubscription(ticketId);
            this.subscriptionId = null;
        }
    }).run();
    ...
}

// Signal
public void handleNotification(String someTaskToken) {
    if (this.subscriptionId != null) {
        activity.closeSubscription(ticketId);
    }
    this.approvalCancelScope.cancel();
}

This could work provided the code in the cancellation scope could be rerun in the case it is cancelled - because I could make .waitForApproval trivially check the status before setting up for its long poll.

I had assumed that I would not need to heartbeat in my case, as I’m using async to do the work, then calling the CompletionClient once done and I don’t have any tight loops anywhere where I would need to report progress?

Thanks!

I would cancel the scope and then run an activity getApprovalStatus which calls the API and returns the status without waiting. Or you can use waitForApproval for the same purpose.

I had assumed that I would not need to heartbeat in my case, as I’m using async to do the work, then calling the CompletionClient once done and I don’t have any tight loops anywhere where I would need to report progress?

Without heartbeating the activity StartToClose timeout has to be pretty large. It means that if the worker process that executes the activity is restarted the workflow is going to wait for that timeout to restart the activity according to the retry policy. Calling heartbeat periodically would allow setting much shorter Heartbeat timeout which would allow much faster restarts and would support activity cancellation as well.

Hi @maxim,

Thanks again - the problem here is the notifications we get from webhooks doesn’t necessarily mean that the ticket has gone into Approved or Cancelled state - the webhook might be because someone commented on a ticket (think something like a JIRA ticket). This means - we might be lucky and a getApprovalStatus will return APPROVED or CANCELLED, but we may just need to wait again. The cancel pattern I think can only work for me if I can make it go into a loop somehow. Is there some pattern I could use to make that work?

Thanks also for the pointers on heartbeating!

You absolutely can make it into the loop. Move the cancellation scope creation into a separate method and call this method from a loop.