How to implement selector/future on non activities?

Hi,

I want to trigger a function in parallel to listening to a signal. I’m trying to figure out that would look with selectors.

Let’s say I have

  • a listener initialize to listen for a skip signal. I have a workflow variable that gets updated when the signal is received. I’ll use that to monitor if i can skip an operation.
  • I have a function foo() that does a set of operations.

The goal is is as foo() is executing, if any time i get the signal skip i want to exit out of the foo() function.

The problem I’m struggling with is

  • how to create the Future needed for the selector when its not tied to an activity. In this case, it’s tied to a variable that is updated by another listener or the execution of a function.
  • This is probably not ideal but can a workflow have two listener for a signal. I think I tried this and it does not work.
type signalInfo struct {
	skip bool
}
func (s *signalInfo) Listen(ctx workflow.Context) {
for {
	selector := workflow.NewSelector(ctx)
    selector.AddReceive(workflow.GetSignalChannel(ctx, "skip"), func(c workflow.ReceiveChannel, _ bool) {
	     skip = true
    }
    selector.Select(ctx)
}
func SampleWorkflow(ctx....) {
     var signal signalInfo
     workflow.Go(ctx, signal.Listen)

     selector := workflow.NewSelector(ctx)
     selector.AddFuture(...,  func(f workflow.Future) { // Don' t know the proper syntax for this
         // basically wait til signal.request = true
         workflow.Await(ctx, func() bool {
			return signal.skip == true
	     })
     })
     selector.AddFuture(..., func(f workflow.Future) { // Don't know the proper syntax for this either
           // wait till foo() finish executing
      })
      selector.select(ctx);
}

func foo(){}

Because I don’t need to worry about concurrency, one option could be.

func SampleWorkflow(ctx....) {
     var signal signalInfo
     workflow.Go(ctx, signal.Listen)

    fooDone:= false
     workflow.Go(ctx, func(ctx ) {
            err := foo()
            fooDone = true
     })

    for {
        if signal != nil || fooDone {
            break
        }
    }
}

```
Wonder if there is a cleaner way

Selector is needed only if you listen to more than one Channel or Future.
So the Listen function can be implemented as:

func (s *signalInfo) Listen(ctx workflow.Context) {
    for {
        workflow.GetSignalChannel(ctx, "skip").Receive(ctx, null)   
        s.skip = true
    }
}

how to create the Future needed for the selector when its not tied to an activity. In this case, it’s tied to a variable that is updated by another listener or the execution of a function.

You are thinking about this backwards. You need selector only if you have more than one Channel or Future to interact with from a single goroutine.

Here is how you use Future:

        completed, settable := workflow.NewFuture(ctx)
        // then somewhere, possibly from a different goroutine
        settable.set("value", nil)
        // then somewhere, possibly in a different goroutine
        var v string
        err := completed.Get(ctx, &v)
        // or you can add this future to the Selector

The goal is is as foo() is executing, if any time i get the signal skip i want to exit out of the foo() function.

You need to define what “exist out of the foo() function” means. Is it just checking the value of skip variable like:

if skip {
  return nil
}

before every next step?

Or do you want to interrupt already-running activities or child workflows? In this case, I recommend using Context cancellation. When a context passed to an activity or child workflow is canceled, the correspondent activity or child workflow receives a cancellation request. See workflow.WithCancel.

Would this work? In this case, i’m using the cancelListener and cancelFoo to cancel the go routine

	listenerCtx, cancelListener := workflow.WithCancel(ctx)
	workflow.Go(listenerCtx, signal.Listen)

	fooComplete := false
	var fooError error

	fooCtx, cancelFoo := workflow.WithCancel(ctx)
	workflow.Go(fooCtx, func(ctx workflow.Context) {
		foo(ctx) // does some business logic
		fooComplete = true
	})

    err := workflow.Await(ctx, func() bool {
		return signal.skip || canaryAnalysisComplete
	})

    // clean up go routines
    cancelFoo()
    cancelListener()

I’m trying to unit test this but running into setup issues.

This sounds reasonable. By the way, there is no need to clean up the goroutines after the workflow method returns.