Is workflow.Go safe for concurrency?

Hi all. I want to use temporal to upload a large files using the map reduce algorithm to parse files in parallel. In pseudo-code it looks like this

var uploads int

for _, offset := range offsets {
    workflow.Go(ctx, func(gCtx workflow.Context) {
          activity.UploadChunk(gCtx, offset)
          uploads += 1			
    })
}

_ = workflow.Await(ctx, func() bool {
      return err != nil || len(offsets) == uploads
})

It is safe to increment the uploads counter inside workflow.Go or i should use the atomic package?

Yes, only one coroutine runs at a time in a workflow, so there are no concurrency concerns. It is safe to increment that variable.

Thank you. In that case, does it make sense to use workflow.Go instead of calling workflow.ExecuteActivity in the loop?

It’s basically the same thing. What many people do is call execute activity in the loop collecting futures, and then another loop to get results. This allows activities to run in parallel. Of course workflow.Go does too.

2 Likes

I recommend ExecuteActivity if you need to do a single parallel action. But if you need to execute multiple sequences of actions in parallel, then Go is much simpler.

See the splitmerge-future and splitmerge-selector samples for the Future based parallelism approaches.

See the goroutine sample for running sequences in parallel.

1 Like

Thanks. I started the thread with the goroutine example, but thought about data race, but in the reply Is workflow.Go safe for concurrency? - #2 by Chad_Retz replied that this is not a issue in the case of temporal.

I am trying to decide whether to use workflow goroutines or simply collect Futures. My use case requires that we do not block at any point, i.e. we don’t wait for goroutines to finish or futures to be ready. We also want to run some code (update a map) at the end of the goroutine, or when the future is ready with AddFuture.

It seems to be that AddFuture is called only when Select runs? I haven’t tested this, but in the documentation it says “The callback is called when Select(ctx) is called.”

That means that running code on completion using Futures, requires to run Select at some point, and therefore block. The workaround would be to run Select in a separate goroutine to avoid blocking, but if we are gonna do that, we might as well use workflow goroutines instead of futures in the first place.

Would be nice to have a comparison between goroutines and futures-based design somewhere. Looks like they can both be used for parallelism, but each one has its own pros/cons.

If you don’t need to wait for the requests to complete then goroutines are simpler.

1 Like

Why does the following problem occur after workflow.go is increased to 10000?

This screenshot only shows the a bit of the panic, not the whole thing. Can you provide the full runnable standalone issue? Also, workflow history warns at 10k events and terminates at 50k (not to mention other limits like 2k child workflows), so there shouldn’t be a reason to run 10k coroutines.

There are too many complete error messages and it is difficult to provide them. I am using the test case Test_GoroutineWorkflow in this warehouse (GitHub - temporalio/samples-go: Temporal Go SDK samples). I changed the number of parallelism from 2 to 10000. The original intention is Verify whether there are concurrency security issues, but the above-mentioned error will appear, causing the test case to fail. Is it because there are too many parallels?