Hello,
I am trying to move a process from a long running polling activity (that could take multiple days to complete), to a “push”/signaling process. The external process we’re polling is now getting the capability to signal us when the task is complete.
But they cannot fully guarantee the delivery of the signal, however in most cases it should be able to, but I would like to have a “backup” polling mechanism that will go back to doing very slow polling in case delivering the signal failed.
I was thinking of taking this approach:
- create selector
- add signal to selector
- add timer to selector (10 min timer)
- if timer fires:
- start slow polling activity
- slow polling activity will do single poll, and if process is not complete, will return an error so it gets retried later
- add polling activity future to selector
- wait in selector again
- start slow polling activity
- when select() exits again, either we received a signal with response, or the polling activity has completed or exhausted its retries and failed
Partial code:
func executeAction() (exec) {
s := workflow.NewSelector(ctx)
var exec [...]
respChannel := workflow.GetSignalChannel(ctx, ...)
// in most cases, this is where the response will be sent, and within less than a second
s.AddReceive(respChannel, func(c workflow.ReceiveChannel, more bool) {
c.Receive(ctx, &exec)
})
resp, systemErr := ExecuteActivityLaunchAction(ctx, &steprunnerpb.LaunchActionRequest{RunActionRequest: activityReq})
// [handle errors and such]
startPolling := false
s.AddFuture(workflow.NewTimer(ctx, 10*time.Minute), func(f workflow.Future) {
// start polling action after 10 minutes
startPolling = true
})
waitSelect:
s.Select(ctx)
if startPolling {
startPolling = false
// the polling activity will do a quick polling req, and return an err if the action isn't finished so that it gets retried later (like 10 min later)
f := ExecuteActivityPollActionAsync(ctx, pollReq)
s.AddFuture(f, func(f workflow.Future) {
err := f.Get(ctx, &exec)
// handle errors and such
})
goto waitSelect
}
return exec, nil
The advantages are: we only start polling if a signal delivery somehow failed or an action is very long running (98% of actions typically complete within less than a second). In a case of normal short action run, there is almost no overhead other than the startPolling
timer.
And the advantage of getting rid of the previous long running “active” polling activity is there was the possibility that we accumulate these long running active polling activities and run out of available activity slots (we had a few close calls). And that the “push” notification approach makes our system more responsive.
Is this expected to work? I am currently getting an error “cannot chain Future that wasn’t created with workflow.NewFuture” and I cannot tell if I’m doing something fundamentally wrong, or if it’s an issue of some wrappers that we have on top of the SDK.
Are there other better approaches that still get us the advantage of having low overhead for most executions?