I was wondering what the recommend way to pass values back with a Go WaitGroup. Normally you would use channels to send the result back, but don’t think that is an option.
wg := workflow.WaitGroup
results := make(string[])
wg.Add(1)
workflow.Go(ctx, func(ctx workflow.Context) {
// do some method and get a result
// store result back
results = append(results, result)
})
wg.Add(2)
workflow.Go(ctx, func(ctx workflow.Context) {
// do some method and get a result
// store result back
results = append(results, result)
})
err := wg.Wait()
What i did isn’t ideal and probably not the right way to do this.
In normal go, I would do something like this:
func worker(id int, wg *sync.WaitGroup, resultChan chan<- int) {
defer wg.Done() // decrement the counter when the goroutine completes
// Simulate some work
result := id * 2
// Send the result to the result channel
resultChan <- result
}
func main() {
var wg sync.WaitGroup
resultChan := make(chan int)
// Number of workers
numWorkers := 5
// Add the number of workers to the wait group
wg.Add(numWorkers)
// Start the workers
for i := 0; i < numWorkers; i++ {
go worker(i, &wg, resultChan)
}
// Wait for all workers to finish
wg.Wait()
// Close the result channel since we're done sending values
close(resultChan)
// Collect results from the result channel
var results []int
for result := range resultChan {
results = append(results, result)
}
// Print results
fmt.Println("Results:", results)
}
How would you do something like above in temporal? I looked at the workflow.NewChannel() but the receiver blocks, so not sure if this is the right usage. I could pass the value through context propagation but I feel like there should be an easier solution to this.
You absolutely can implement exactly the same logic using Temporal WaitGroup and Channel primitives:
func worker(ctx workflow.Context, id int, wg workflow.WaitGroup, resultChan workflow.SendChannel) {
defer wg.Done() // decrement the counter when the goroutine completes
// Simulate some work
result := id * 2
// Send the result to the result channel
resultChan.Send(ctx, result)
}
func workflow_main(ctx workflow.Context) {
var wg sync.WaitGroup
resultChan := workflow.NewChannel(ctx)
// Number of workers
numWorkers := 5
// Add the number of workers to the wait group
wg.Add(numWorkers)
// Start the workers
for i := 0; i < numWorkers; i++ {
workflow.Go(ctx, func(ctx workflow.Context) {
worker(ctx, i, &wg, resultChan)
})
}
// Wait for all workers to finish
wg.Wait()
// Close the result channel since we're done sending values
resultChan.Close()
// Collect results from the result channel
var results []int
var result int
for {
ok := resultChan.ReceiveAsync(&result)
if !ok {
break
}
results = append(results, result)
}
// Print results
fmt.Println("Results:", results)
}
func worker(ctx workflow.Context, id int, result workflow.Settable) {
// Simulate some work
r := id * 2
result.Set(r, nil)
}
func workflow_main(ctx workflow.Context) {
// Number of workers
numWorkers := 5
var resultsFutures []workflow.Future
// Start the workers
for i := 0; i < numWorkers; i++ {
workflow.Go(ctx, func(ctx workflow.Context) {
f, s := workflow.NewFuture(ctx)
worker(ctx, i, s)
resultsFutures = append(resultsFutures, f)
})
}
var results []int
for _, resultFuture := range resultsFutures {
var result int
err := resultFuture.Get(ctx, &result)
if err != nil {
panic(err)
}
results = append(results, result)
}
fmt.Println("Results:", results)
}
Thanks @maxim. let me give it a try. Somehow when i tried the channel something was going wrong but I prob implemented it wrong.
In the example below, resultsFutures is declared outside the go routine.
workflow.Go(ctx, func(ctx workflow.Context) {
f, s := workflow.NewFuture(ctx)
worker(ctx, i, s)
resultsFutures = append(resultsFutures, f)
})
Based on this:
Goroutines run in parallel. But only one of them can be active at a time, others are blocked.
Does this mean that I don’t need to worry about using a mutex or making it atomic since its never possible that two concurrent threads would write to it at the same time.
If this is true, would it always be easier to just have a variable declare outside to set values in the go routine it vs. using methods like channels? I just wasn’t sure how variables declared outside go routines work in terms of temporal and workflows being distributed.
Also is there is there a difference with using workflow.WorkGroup with a slice vs. workflow.Future? They seem to be able to do the same things?
Does this mean that I don’t need to worry about using a mutex or making it atomic since its never possible that two concurrent threads would write to it at the same time.
Yes. And use of mutexes and other similar classes is explicitly prohibited in the workflow code. Only workflow APIs can be used for blocking.
If this is true, would it always be easier to just have a variable declare outside to set values in the go routine it vs. using methods like channels? I just wasn’t sure how variables declared outside go routines work in terms of temporal and workflows being distributed.
Also is there is there a difference with using workflow.WorkGroup with a slice vs. workflow.Future? They seem to be able to do the same things?
You can achieve the same thing and even implement WaitGroup using Futures and implement Future using a WaitGroup :). IMHO, the Future is more straightforward when it is appropriate, especially when you need to think about error handling. Future can return either an error or a result. Returning a partial error in the WaitGroup with a slice case is more complicated.
for _, resultFuture := range resultsFutures {
var result int
err := resultFuture.Get(ctx, &result)
if err != nil {
panic(err)
}
results = append(results, result)
}
From this code, it looks like it loops through each future sequentially and block till it succeeds or error outs; not really running in parallel. If i wanted to support something like a Promise.All, it sounds like I would have to implement a workflow.Selector. Is that correct?
The operation that returns future executes independently of Fugure.Get calls. Your question has an assumption that only Fugure.Get starts executing the operation which is not correct.
In the example, the workflow.Go actually executes it already. Is that what you are referring to.
for i := 0; i < numWorkers; i++ {
workflow.Go(ctx, func(ctx workflow.Context) {
f, s := workflow.NewFuture(ctx)
worker(ctx, i, s)
resultsFutures = append(resultsFutures, f)
})
}
var results []int
for _, resultFuture := range resultsFutures {
var result int
err := resultFuture.Get(ctx, &result)
if err != nil {
panic(err)
}
results = append(results, result)
}
fmt.Println("Results:", results)
What is blocking in this code then? In the prior example, we use waitgroups with Wait. Here its relying on the future slice to be set. Each go routine gets started async, so if the code takes a while before it appends to the the future slice, isn’t there a race condition where resultFutures could be empty?