workflow.Go coroutine blocked

Hello everyone, I’m trying to write a batcher program for temporals workflow. Go.

I have a huge csv I need to process, which involves validating data fields, then going to our database to check the validity of the data in the csv.

This process per row is very slow when dealing with a 10k row csv so I am implementing a batcher that wraps workflow. Go

For the most part it’s working correctly, but I do get frequent context closed, gorouting blocked errors

func Batch[D any](ctx workflow.Context, data []D, batchFunc BatchFunc[D], opts ...BatchOption) ([]D, error) {
	// new options with default values
	options := &batchOptions{
		batchSize: 10, // default batch size
	}

	// apply the options
	for _, opt := range opts {
		opt(options)
	}

	// timestamp now time to track the total time the batcher takes
	startTime := time.Now().UTC()

	// total batches
	totalBatches := (len(data) + options.batchSize - 1) / options.batchSize
	currentBatch := 1 // current batch starting from 1
	workflow.GetLogger(ctx).Info(
		"starting batcher with batch size",
		options.batchSize,
		"total batches required",
		totalBatches,
		"batches",
	)

	// define a new data slice
	var mutatedData []D

	// itterate through the current batch size
	for start, end := 0, 0; start <= len(data)-1; start = end {
		// calculate the end index of the batch
		end = start + options.batchSize

		// if the end index is greater than the length of the data slice
		if end > len(data) {
			end = len(data)
		}

		// get the data slice for the current batch
		batchData := data[start:end]

		// new workflow waitgroup
		wg := workflow.NewWaitGroup(ctx)

		// get the time of the current batch start
		batchStartTime := time.Now().UTC()

		// log the current batch out of the total batches
		workflow.GetLogger(ctx).Info(
			"starting batch",
			currentBatch,
			"of",
			totalBatches,
			"with",
			len(batchData),
			"items",
		)

		// increment the current batch
		currentBatch++

		// itterate through the batch data
		for _, bd := range batchData {
			d := bd // Should be outside lambda to be captured correctly

			// add to the waitgroup
			wg.Add(1)

			// Start a goroutine in a workflow safe way
			workflow.Go(ctx, func(gCtx workflow.Context) {
				// defer the waitgroup done
				defer wg.Done()

				// execute the batch function
				mutated, err := batchFunc(gCtx, d)
				if err != nil {
					workflow.GetLogger(gCtx).Error("error in batch function: ", err)
					return // TODO: handle error
				}

				// append the mutated data to the new data slice
				mutatedData = append(mutatedData, mutated)

			})
		}

		// wait for the waitgroup to finish
		wg.Wait(ctx)

		// log the end time of the current batch
		workflow.GetLogger(ctx).Debug(
			"batch finished in", time.Since(batchStartTime),
		)
	}

	workflow.GetLogger(ctx).Debug(
		"batcher finished successfully in", time.Since(startTime),
	)

	// return the mutated data and nil error
	return mutatedData, nil
}

This is the batcher in full minus the functions for the options.

Now not sure if it’s related but the goroutine error always happens on a database heavy activity, and usually always the same activity too (it’s the expensive one go to many places and validate the csv fields with database info)

So my question is what’s wrong with it (I know it’s not catching errors properly etc this is still very new and working on adding those things once this is ironed out.)

Or at the very least how would one go about debugging an issue like this it’s never the same chunk of data that errors it just happens to usually be the same activity.

If you need to see the workflow batchFunc let me know. but basically in that workflow I just use workflow. Context the one passed in by the workflow. Go (gCtx)

NOTE: I am using temporal.testsuite right now I haven’t actually tested it against a live temporal environment but as far as I’m aware the testsuite is a true to life experience so I don’t think that’s the issue.

Could you provide more information about the errors you are getting?

Yup sorry I guess I should have included the stack traces

So things I’ve done since this post, is omit the workflow. Go and waitgroup stuff, the issue still happens same error so I don’t think it’s related specifically to workflow.Go anymore

So I get the error from the activity which is this

2024/07/10 18:20:34 ERROR Activity error. WorkflowID default-test-workflow-id RunID default-test-run-id ActivityType ActivityGetAttendanceCodeScheduleInfo Attempt 1 Error error querying schedule info: connection(OBFUSCATED-00-00.btgpo.mongodb.net:27017[-16]) incomplete read of message header: read tcp 10.8.0.19:61834->OBFUSCATED:27017: use of closed network connection

then it continues with stack traces about coroutine block on chan-whatever.Receive

FAIL: Test_WorkflowTimeBankImportWithSunbeamPayload (134.09s)
panic: test timeout: 3s, workflow stack: coroutine root [blocked on chan-3126.Receive]:
go.temporal.io/sdk/internal.(*futureImpl).Get(0x14004148050, {0x105789388, 0x140002393e0}, {0x1055754e0, 0x14004150428})
        /Users/shanehughes/workspace/go/pkg/mod/go.temporal.io/sdk@v1.27.0/internal/internal_workflow.go:337 +0x58

The triggering of activity in the workflow is this

					if err := workflow.ExecuteActivity(

						workflow.WithActivityOptions(
							ctx,
							workflow.ActivityOptions{
								StartToCloseTimeout:    600 * time.Second,
								ScheduleToCloseTimeout: 600 * time.Second,
								ScheduleToStartTimeout: 600 * time.Second,
							},
						),
						a.ActivityGetAttendanceCodeScheduleInfo,
						activity.ActivityGetAttendanceCodeScheduleInfoInput{
							CompanyID:         input.CompanyID,
							UID:               input.Row.UserInfo.ID,
							StartTime:         input.Row.EntryDateTime.UTC(),
							AttendanceCodeIDs: bank.AttendanceCodeIds,
							AccrualData:       bank.AccrualData,
						},
					).Get(ctx, &bank.ScheduleInfo); err != nil {
						workflow.GetLogger(ctx).Error("ActivityGetAttendanceCodeScheduleInfo failed.", "Error", err)
						// we dont' want to hault the whole csv for one bad entry
						// so just append the error and continue
						input.Row.Error = err. Error()
					}

and the context used for that is passed in via the batchFunc

batchWorkflowFunc(ctx workflow.Context, input batchWorkflowFuncInput) (batchWorkflowFuncInput, error)

Which is part of the main workflow used here

	mutatedData, err := batcher.Batch(
		workflow.WithActivityOptions(
			ctx,
			workflow.ActivityOptions{
				StartToCloseTimeout:    10 * time.Second,
				ScheduleToCloseTimeout: 10 * time.Second,
				ScheduleToStartTimeout: 10 * time.Second,
			},
		),
		// TODO: This should be an activity
		funkutil.Map(
			rows,
			func(row *model.CSVRow) batchWorkflowFuncInput {
				return batchWorkflowFuncInput{
					CompanyID:     input.CompanyID,
					SessionUserID: input.SessionUserID,
					Row:           row,
				}
			},
		),
		batchWorkflowFunc,
		batcher.WithBatchSize(pointer.Deref(input.BatchSize)),
	)
	if err != nil {
		workflow.GetLogger(ctx).Error("Batch", "Error", err)
		return nil, err
	}

I’ve also increased any activity timeout that is available in the activity options from 10 seconds to a crazy high amount like 600 seconds still the same (much longer than a whole batch takes let alone one query).

It’s never the same batch or length of time it’s a race condition somewhere I think, it can happen immediatly, sometimes the whole workflow finishes successfully and randomly any number of batches in. Always seems to this activity

It’s a mongo aggregate inside the activity itself using context. Context passed into the activity, My mongo client option timeouts are all set to a huge amount of time or disabled all together.

And sorry the test file itself triggering this

ctx := context.Background()
	client, err := utils.GetMongoClient(
		ctx,
		conn,
		// if workflow ends you get a panic using closed context
		options.Client().SetConnectTimeout(0),
	)
	if err != nil {
		return err
	}

	defer client.Disconnect(ctx)
	db := client.Database(conn.Database)

	// initialize the activity dependencies
	a := activity.NewActivity(db)

	// setup the test suite
	testSuite := &testsuite.WorkflowTestSuite{}
	env := testSuite.NewTestWorkflowEnvironment()

	// register all children workflows
	env.RegisterWorkflow(WorkflowParseCSVRecords)
	env.RegisterWorkflow(WorkflowTimeBankImport)
	env.RegisterWorkflow(WorkflowBulkWrite)

	// register all activities used by the workflows
	env.RegisterActivity(activity.ActivityReadCSVBytes)
	env.RegisterActivity(activity.ActivityConvertStringDates)
	env.RegisterActivity(a.ActivityGetUserInfo)
	env.RegisterActivity(a.ActivityGetTimeBankInfo)
	env.RegisterActivity(activity.ActivityTrimResults)
	env.RegisterActivity(a.ActivityGetAttendanceCodeScheduleInfo)
	env.RegisterActivity(a.ActivityGenerateUniqueID)
	env.RegisterActivity(activity.ActivityCreateTimeBankEvent)
	env.RegisterActivity(a.ActivityBulkDeleteTimeBankEvents)
	env.RegisterActivity(a.ActivityBulkInsertTimeBankEvents)

	// execute the workflow
	env.ExecuteWorkflow(WorkflowTimeBankImport, p)
	if !env.IsWorkflowCompleted() {
		return fmt.Errorf("workflow did not complete")
	}

	if err := env.GetWorkflowError(); err != nil {
		return err
	}

Are you sure the activity result does not exceed the 2mb limit?

I never thought of that, it’s a big payload so maybe? You would think though that it would never finish if that’s the case I’ve run the same CSV many times and been successful with it about 40% of the time

So to answer the question I wiped up this quick little generics checker

// MaxPayloadSize is the maximum size of a payload that can be sent to Temporal
const MaxPayloadSize = 2 * 1024 * 1024 // 2MB

// CheckPayloadSize checks the size of the payload and returns an error if it exceeds the maximum size
func CheckPayloadSize[D any](payload D) error {
	// marshal to json
	b, err := json.Marshal(payload)
	if err != nil {
		return err
	}

	// check the size
	if len(b) > MaxPayloadSize {
		return fmt.Errorf("payload size exceeds the maximum size of %d bytes", MaxPayloadSize)
	}

	return nil
}

it’s placed both prior to the activity and the return of the activity neither of which gets tripped.

I also just noticed that if I increase the batch size I get alot more coroutine blocked on chan-whatever.Receive, also failing much sooner.

I was hoping that checking for ctx.Err would be enough to trip it up and reproduce it but if you paste this and press test it works just fine

func Test_Batcher(t *testing.T) {
	// setup the test suite
	testSuite := &testsuite.WorkflowTestSuite{}
	env := testSuite.NewTestWorkflowEnvironment()
	env.SetStartTime(time.Now().UTC())

	// TestData is a struct that holds the test data
	type TestData struct {
		ID   int
		Name string
	}

	createTestDatas := func(length int) []TestData {
		var datas []TestData
		for i := 1; i <= length; i++ {
			datas = append(datas, TestData{ID: i, Name: "test" + fmt.Sprint(i)})
		}
		return datas
	}

	// workflowTestInput is a struct that holds the input for the workflowTest
	type workflowTestInput struct {
		BatchSize int
		Data      []TestData
	}

	workflowTest := func(ctx workflow.Context, input workflowTestInput) (results []TestData, err error) {
		batchFunc := func(ctx workflow.Context, data TestData) (TestData, error) {
			if ctx.Err() != nil {
				return data, ctx.Err()
			}

			// mutate the name from test to test-mutated
			data.Name = data.Name + "-mutated"

			return data, nil
		}

		// run the batcher
		results, err = Batch(ctx, input.Data, batchFunc, WithBatchSize(input.BatchSize))
		if err != nil {
			return nil, err
		}

		return results, nil
	}

	// register all children workflows
	env.RegisterWorkflow(workflowTest)

	amount := 50000
	testDatas := createTestDatas(amount)

	// execute the workflow
	env.ExecuteWorkflow(
		workflowTest,
		workflowTestInput{BatchSize: 20, Data: testDatas},
	)
	if !env.IsWorkflowCompleted() {
		t.Error("workflow did not complete")
	}

	require.NoError(t, env.GetWorkflowError())

	// get the result
	var results []TestData
	require.NoError(t, env.GetWorkflowResult(&results))

	// sort the results back to there original order
	sort.SliceStable(results, func(i, j int) bool {
		// sort 0 to the top
		return results[i].ID < results[j].ID
	})

	// check the result
	require.Len(t, results, amount)
	for i := 0; i < amount; i++ {
		require.Equal(t, results[i].ID, i+1)
		require.Equal(t, results[i].Name, "test"+fmt.Sprint(i+1)+"-mutated")
	}
}

My guess is that you are processing too many records in a single workflow execution, and creating a goroutine per record hits some process limits. You need to partition the processing into smaller chunks.

Yeah my guess is it’s too expensive on the database, I’m in the middle of breaking it apart as we speak, but it’s kinda nice to know the batcher I made is fine. featureless but the core is there and works :slight_smile: , thank you for you assitance.

An update to this ticket because I forgot and I solved it in the end.

The solution to this problem was nothing to do with my batcher, max payload limits etc etc. None of that.

the solution at least mine, it’s bullet proof now has been since I added this, it seems to be testsuite specific

	// setup the test suite
	env := test.NewWorkflowEnvironment(
		test.WithTestTimeout(60000*time.Second), <<<<<<<<<<< testsuite HARD CAP
		test.WithWorkflowExecutionTimeout(60000*time.Second),
		test.WithWorkflowRunTimeout(60000*time.Second),
		test.WithWorkflowTaskTimeout(60000*time.Second),
	)
1 Like