Cleanest way to wait for signal and do 'backup' long polling

Hello,

I am trying to move a process from a long running polling activity (that could take multiple days to complete), to a “push”/signaling process. The external process we’re polling is now getting the capability to signal us when the task is complete.
But they cannot fully guarantee the delivery of the signal, however in most cases it should be able to, but I would like to have a “backup” polling mechanism that will go back to doing very slow polling in case delivering the signal failed.

I was thinking of taking this approach:

  • create selector
  • add signal to selector
  • add timer to selector (10 min timer)
  • if timer fires:
    • start slow polling activity
      • slow polling activity will do single poll, and if process is not complete, will return an error so it gets retried later
    • add polling activity future to selector
    • wait in selector again
  • when select() exits again, either we received a signal with response, or the polling activity has completed or exhausted its retries and failed

Partial code:

func executeAction() (exec) {
	s := workflow.NewSelector(ctx)
	var exec [...]
	respChannel := workflow.GetSignalChannel(ctx, ...)

    // in most cases, this is where the response will be sent, and within less than a second
	s.AddReceive(respChannel, func(c workflow.ReceiveChannel, more bool) {
		c.Receive(ctx, &exec)
	})

	resp, systemErr := ExecuteActivityLaunchAction(ctx, &steprunnerpb.LaunchActionRequest{RunActionRequest: activityReq})
    // [handle errors and such]

	startPolling := false
	s.AddFuture(workflow.NewTimer(ctx, 10*time.Minute), func(f workflow.Future) {
		// start polling action after 10 minutes
		startPolling = true
	})

waitSelect:
	s.Select(ctx)

	if startPolling {
		startPolling = false
        // the polling activity will do a quick polling req, and return an err if the action isn't finished so that it gets retried later (like 10 min later)
		f := ExecuteActivityPollActionAsync(ctx, pollReq)
		
		s.AddFuture(f, func(f workflow.Future) {
			err := f.Get(ctx, &exec)
            // handle errors and such
		})


		goto waitSelect
	}

    return exec, nil

The advantages are: we only start polling if a signal delivery somehow failed or an action is very long running (98% of actions typically complete within less than a second). In a case of normal short action run, there is almost no overhead other than the startPolling timer.
And the advantage of getting rid of the previous long running “active” polling activity is there was the possibility that we accumulate these long running active polling activities and run out of available activity slots (we had a few close calls). And that the “push” notification approach makes our system more responsive.

Is this expected to work? I am currently getting an error “cannot chain Future that wasn’t created with workflow.NewFuture” and I cannot tell if I’m doing something fundamentally wrong, or if it’s an issue of some wrappers that we have on top of the SDK.
Are there other better approaches that still get us the advantage of having low overhead for most executions?

I don’t see any code that calls Future.Chain in the snippet. Is it called inside the ExecuteActivityPollActionAsync?

Thanks for the reply

No call to Chain(), I believe I was hitting this error: sdk-go/internal/internal_workflow.go at 3671c99d88439dfe8f0588f03a7e26403f12379d · temporalio/sdk-go · GitHub

The ExecuteActivityPollActionAsync() has some wrappers and other things but ultimately under the hood calls workflow.ExecuteActivity()

However, I think I found out that the error was caused by some of these mentioned wrappers and I think I was able to get this solution to work. It looks like this solution is feasible? Is it reasonable to take this approach?

It is feasible. But I think the code is simpler if you use two gorotines:

func executeAction(ctx workflow.Context) exec {
	var exec *Exec
	cancelCtx, cancelHandle := workflow.WithCancel(ctx)
	workflow.Go(cancelCtx, func(ctx workflow.Context) {
		workflow.AwaitWithTimeout(ctx, 10 *time.Minute, func() bool {
			return exec != nil
		})
		if exec == nil {
			resp, systemErr := ExecuteActivityLaunchAction(ctx, &steprunnerpb.LaunchActionRequest{RunActionRequest: activityReq})
			if systemErr != nil {
				// handle
			}
			exec = resp
		}
	})
	respChannel := workflow.GetSignalChannel(ctx, ...)
	workflow.Await(ctx, func() bool {
		return respChannel.Len() > 0 || exec != nil
	})
	if exec == nil {
		respChannel.Receive(ctx, &exec)
		cancelHandle()
	}
	return exec;
}

Ok great, thank you for the reply Maxim