How to pass values back from a workflow.WaitGroup

Hi,

I was wondering what the recommend way to pass values back with a Go WaitGroup. Normally you would use channels to send the result back, but don’t think that is an option.

wg := workflow.WaitGroup
results := make(string[])
wg.Add(1)
workflow.Go(ctx, func(ctx workflow.Context) { 
     // do some method and get a result
    // store result back
    results = append(results, result)
})
wg.Add(2)
workflow.Go(ctx, func(ctx workflow.Context) { 
     // do some method and get a result
    // store result back
    results = append(results, result)
})

err := wg.Wait()

What i did isn’t ideal and probably not the right way to do this.

In normal go, I would do something like this:


func worker(id int, wg *sync.WaitGroup, resultChan chan<- int) {
    defer wg.Done() // decrement the counter when the goroutine completes

    // Simulate some work
    result := id * 2

    // Send the result to the result channel
    resultChan <- result
}

func main() {
    var wg sync.WaitGroup
    resultChan := make(chan int)

    // Number of workers
    numWorkers := 5

    // Add the number of workers to the wait group
    wg.Add(numWorkers)

    // Start the workers
    for i := 0; i < numWorkers; i++ {
        go worker(i, &wg, resultChan)
    }

    // Wait for all workers to finish
    wg.Wait()

    // Close the result channel since we're done sending values
    close(resultChan)

    // Collect results from the result channel
    var results []int
    for result := range resultChan {
        results = append(results, result)
    }

    // Print results
    fmt.Println("Results:", results)
}

How would you do something like above in temporal? I looked at the workflow.NewChannel() but the receiver blocks, so not sure if this is the right usage. I could pass the value through context propagation but I feel like there should be an easier solution to this.

Thanks,
Derek

There is a post here Is workflow.Go safe for concurrency? - #2 by Chad_Retz which calls out that

Don’t really understand this. When you run a go routine and each trigger some logic and activities, aren’t they ran in parallel?

You absolutely can implement exactly the same logic using Temporal WaitGroup and Channel primitives:

func worker(ctx workflow.Context, id int, wg workflow.WaitGroup, resultChan workflow.SendChannel) {
	defer wg.Done() // decrement the counter when the goroutine completes

	// Simulate some work
	result := id * 2

	// Send the result to the result channel
	resultChan.Send(ctx, result)
}

func workflow_main(ctx workflow.Context) {
	var wg sync.WaitGroup
	resultChan := workflow.NewChannel(ctx)

	// Number of workers
	numWorkers := 5

	// Add the number of workers to the wait group
	wg.Add(numWorkers)

	// Start the workers
	for i := 0; i < numWorkers; i++ {
		workflow.Go(ctx, func(ctx workflow.Context) {
			worker(ctx, i, &wg, resultChan)
		})
	}

	// Wait for all workers to finish
	wg.Wait()

	// Close the result channel since we're done sending values
	resultChan.Close()

	// Collect results from the result channel
	var results []int
	var result int
	for {
		ok := resultChan.ReceiveAsync(&result)
		if !ok {
			break
		}
		results = append(results, result)
	}
	// Print results
	fmt.Println("Results:", results)
}

Your original code with a slice is simpler and correct, but I’m not sure how adding a channel to the mix makes it simpler.

You can also use Future to return a result of an async computation.

Don’t really understand this. When you run a go routine and each trigger some logic and activities, aren’t they ran in parallel?

Goroutines run in parallel. But only one of them can be active at a time, others are blocked.

Your example rewritten using Futures:

func worker(ctx workflow.Context, id int, result workflow.Settable) {
	// Simulate some work
	r := id * 2

	result.Set(r, nil)
}

func workflow_main(ctx workflow.Context) {
	// Number of workers
	numWorkers := 5

	var resultsFutures []workflow.Future

	// Start the workers
	for i := 0; i < numWorkers; i++ {
		workflow.Go(ctx, func(ctx workflow.Context) {
			f, s := workflow.NewFuture(ctx)
			worker(ctx, i, s)
			resultsFutures = append(resultsFutures, f)
		})
	}

	var results []int
	for _, resultFuture := range resultsFutures {
		var result int
		err := resultFuture.Get(ctx, &result)
		if err != nil {
			panic(err)
		}
		results = append(results, result)
	}
	fmt.Println("Results:", results)
}

See the splitmerge-future sample for the more complete example.

Thanks @maxim. let me give it a try. Somehow when i tried the channel something was going wrong but I prob implemented it wrong.

In the example below, resultsFutures is declared outside the go routine.

workflow.Go(ctx, func(ctx workflow.Context) {
f, s := workflow.NewFuture(ctx)
worker(ctx, i, s)
resultsFutures = append(resultsFutures, f)
})

Based on this:

Goroutines run in parallel. But only one of them can be active at a time, others are blocked.

  • Does this mean that I don’t need to worry about using a mutex or making it atomic since its never possible that two concurrent threads would write to it at the same time.
  • If this is true, would it always be easier to just have a variable declare outside to set values in the go routine it vs. using methods like channels? I just wasn’t sure how variables declared outside go routines work in terms of temporal and workflows being distributed.
  • Also is there is there a difference with using workflow.WorkGroup with a slice vs. workflow.Future? They seem to be able to do the same things?
  • Does this mean that I don’t need to worry about using a mutex or making it atomic since its never possible that two concurrent threads would write to it at the same time.

Yes. And use of mutexes and other similar classes is explicitly prohibited in the workflow code. Only workflow APIs can be used for blocking.

  • If this is true, would it always be easier to just have a variable declare outside to set values in the go routine it vs. using methods like channels? I just wasn’t sure how variables declared outside go routines work in terms of temporal and workflows being distributed.

Yes, it is easier in some cases, especially when used in conjunction with workflow.Await and workflow.AwaitWithTimeout. Here is an example of such simplification: samples-go/await-signals at main · temporalio/samples-go · GitHub

  • Also is there is there a difference with using workflow.WorkGroup with a slice vs. workflow.Future? They seem to be able to do the same things?

You can achieve the same thing and even implement WaitGroup using Futures and implement Future using a WaitGroup :). IMHO, the Future is more straightforward when it is appropriate, especially when you need to think about error handling. Future can return either an error or a result. Returning a partial error in the WaitGroup with a slice case is more complicated.

1 Like
 for _, resultFuture := range resultsFutures {
		var result int
		err := resultFuture.Get(ctx, &result)
		if err != nil {
			panic(err)
		}
		results = append(results, result)
	}

From this code, it looks like it loops through each future sequentially and block till it succeeds or error outs; not really running in parallel. If i wanted to support something like a Promise.All, it sounds like I would have to implement a workflow.Selector. Is that correct?

The operation that returns future executes independently of Fugure.Get calls. Your question has an assumption that only Fugure.Get starts executing the operation which is not correct.

In the example, the workflow.Go actually executes it already. Is that what you are referring to.

for i := 0; i < numWorkers; i++ {
		workflow.Go(ctx, func(ctx workflow.Context) {
			f, s := workflow.NewFuture(ctx)
			worker(ctx, i, s)
			resultsFutures = append(resultsFutures, f)
		})
	}

	var results []int
	for _, resultFuture := range resultsFutures {
		var result int
		err := resultFuture.Get(ctx, &result)
		if err != nil {
			panic(err)
		}
		results = append(results, result)
	}
	fmt.Println("Results:", results)

What is blocking in this code then? In the prior example, we use waitgroups with Wait. Here its relying on the future slice to be set. Each go routine gets started async, so if the code takes a while before it appends to the the future slice, isn’t there a race condition where resultFutures could be empty?

Good catch, this code is not going to work, I blindly copied it from an example that created Futures in the loop synchronously.
The corrected version:

	for i := 0; i < numWorkers; i++ {
		f, s := workflow.NewFuture(ctx)
		resultsFutures = append(resultsFutures, f)
		workflow.Go(ctx, func(ctx workflow.Context) {
			worker(ctx, i, s)
		})
	}