workflow.NewFuture vs ExecuteActivity with selector.AddFuture

Hello,

I’m making a workflow where I want to run a batch in parallel, running around 15 activities at the same time and waiting for the result to complete before proceeding.

I have been using the following code to run these activities:

selector := workflow.NewSelector(ctx)
links := []string{...links} // insert links here
results := []string{}
childCtx, cancelHandler := workflow.WithCancel(ctx)
var batchErr error

for i, v := range links {
  v := v

  future := workflow.ExecuteActivity(childCtx, activities.ProcessLinks, v)
  selector.AddFuture(future, func(f workflow.Future) {
    var result string
    if err := f.Get(ctx, &result); err != nil {
        cancelHandler()
        batchErr = fmt.Errorf("ProcessLinks(%s): %w", v, err)
    }
    results = append(results, result)
  })
}
for i := 0; i < len(links); i++ {
        selector.Select(ctx)
	if batchErr != nil {
		return "", batchErr
	}
}
// continue from here with the results

But I also looked the ‘DSL’ example here: https://github.com/temporalio/samples-go/blob/main/dsl/workflow.go

and I noticed it uses workflow.Go instead:

selector := workflow.NewSelector(ctx)
links := []string{...links} // insert links here
results := []string{}
childCtx, cancelHandler := workflow.WithCancel(ctx)
var batchErr error

for i, v := range links {
  v := v
  future, settable := workflow.NewFuture(childCtx)

  workflow.Go(childCtx, func(futCtx workflow.Context) {
    var result string
    err := workflow.ExecuteActivity(futCtx, activities.ProcessLinks, v).Get(futCtx, &result)
    settable.Set(result, err)
  })

  selector.AddFuture(future, func(f workflow.Future) {
    var result string
    if err := f.Get(ctx, &result); err != nil {
        cancelHandler()
        batchErr = fmt.Errorf("ProcessLinks(%s): %w", v, err)
    }
    results = append(results, result)
  })
}
for i := 0; i < len(links); i++ {
        selector.Select(ctx)
	if batchErr != nil {
		return "", batchErr
	}
}
// continue from here with the results

Which is the correct and canonical way to do this? Since AddFuture accepts a future and, I’m assuming, puts this event into the history anyway, what is the benefit of using a goroutine?

Also as a separate question, do I need to defer cancelHandler, like in Go?

I don’t think you need a goroutine to run a single activity as it already returns a future. DSL example is overkill. Goroutine is easier when you need to run multiple sequences of activities. Each sequence per goroutine.

1 Like

Great. I was wondering!! I’m still not entirely sure why the DSL example is written using goroutines?

The DSL example doesn’t execute a single activity in a goroutine. It calls an abstract function. The implementations of this function can contain arbitrary code.

func executeAsync(exe executable, ctx workflow.Context, bindings map[string]string) workflow.Future {
	future, settable := workflow.NewFuture(ctx)
	workflow.Go(ctx, func(ctx workflow.Context) {
		err := exe.execute(ctx, bindings)
		settable.Set(nil, err)
	})
	return future
}