Issue Triggering Workflow B from a Service Called by Workflow A's Activity

Hi Team,

I have a scenario where I need to trigger Workflow B from a service invoked by an activity in Workflow A, due to data size limitations. The flow looks like this:

Workflow A -> Activity AA -> Service S -> Workflow B

Reason for This Design:

  • Activity AA generates a list that can exceed 2 MB, which is the maximum payload size for Temporal.
  • To work around this limitation, I pass the data to an service (Service S).
  • From Service S, I attempt to trigger Workflow B using Temporal’s Go SDK.

Problem:

While I can manually trigger Workflow B successfully from the Temporal UI, when trying to trigger Workflow B programmatically from Service S, the workflow does not start.

Code Snippet for Workflow Trigger in Service S:

go

Copy code

options := client.StartWorkflowOptions{
    ID:        fmt.Sprintf("gateway-capture-batch-%d", time.Now().UnixNano()),
    TaskQueue: "gateway-capture-task-queue",
}

we, err := temporalClient.ExecuteWorkflow(ctx, options, gatewayCaptureWorkflow.GatewayCapturePayments, gatewayCaptureRequest)
if err != nil {
    logger.Error("Failed to execute workflow", zap.Error(err))
    return err
}

logger.Info("Workflow started", zap.String("WorkflowID", we.GetID()), zap.String("RunID", we.GetRunID()))

What I’ve Tried:

  1. Verified that the Temporal worker for Workflow B is running and polling the correct task queue (gateway-capture-task-queue).
  2. Checked that the Temporal client in Service S is correctly initialized and connected to the Temporal server.
  3. Manually triggered Workflow B from the Temporal UI, and it works without issues.

Questions:

  1. Is this design pattern supported in Temporal, where a service invoked by an activity triggers another workflow?

@maxim @tihomir

Payload size limits apply to workflow inputs as well.

If you have client metrics configured you can use temporal_request metric and filter on operation and status_code to monitor these start failures.

For use case you could batch the data and invoke workflow for each batch directly from activity, or store data to db or bucket and pass around references to it rather than data itself.

For use case you could batch the data and invoke workflow for each batch directly from activity, or store data to db or bucket and pass around references to it rather than data itself.

@tihomir I implemented a similar approach where activity AA calls a service that batches the inputs and passes them to workflow B. The triggering of workflow B works from activity AA is not working, it only successfully executes when manually triggered from the Temporal UI.

You can start workflow from activity. if its not working for you maybe try explain what you are doing in activity code and what issue is.

@tihomir, sharing the code snippet.

main.go

bWorker := worker.New(temporalClient, bTemporal.bTaskQueueName, worker.Options{})
bActivity := btemporal.NewBActivity(bService)
bWorker.RegisterWorkflow(bTemporal.BWorkflow)
bWorker.RegisterActivity(bActivity.BFirstActivity)

aWorker := worker.New(temporalClient, atemporal.ATaskQueueName, worker.Options{})
aActivity := atemporal.NewAActivity(aService)
aWorker.RegisterWorkflow(atemporal.AWorkflow)
aWorker.RegisterActivity(aActivity.AFirstActivity)

go func() {
	err := aWorker.Run(worker.InterruptCh())
	if err != nil {
		fmt.Println("Unable to start A  Worker")
	}
}()

go func() {
	err := bWorker.Run(worker.InterruptCh())
	if err != nil {
		fmt.Println"Unable to start B Worker")
	}
}()

AWorkflow

const ATaskQueueName = "A_TASK_QUEUE_NAME"


func AWorkflow(ctx workflow.Context) (*AResponse, error) {

    aPolicy := &temporal.RetryPolicy{
		InitialInterval:    time.Second * 5,
		BackoffCoefficient: 2.0,
		MaximumInterval:    time.Second * 300,
	}

	aOptions := workflow.ActivityOptions{
		StartToCloseTimeout: time.Minute * 30,
		RetryPolicy:         aPolicy,
	}

	aCtx := workflow.WithActivityOptions(ctx, aOptions)

 	aErr := workflow.ExecuteActivity(aCtx, aActivity.AFirstActivity, "1234").Get(aCtx, nil)
}

AActivity

func (a *AActivity) AFirstActivity(ctx context.Context, batchId string) error {
	return a.sService.TriggerSService(ctx, batchId)
}

Start B Workflow which is triggered by S service.



func (b *Service) TriggerSService(ctx context.Context, batchId string) errors.IError {

	ids := b.GetIds(batchId)

	var wg sync.WaitGroup
	errChan := make(chan error, len(ids)/BatchSize+1) // Buffer to collect errors

	// Process payments in batches
	for start := 0; start < len(ids); start += BatchSize {
		end := start + BatchSize
		if end > len(ids) {
			end = len(ids)
		}
		idsInBatch := ids[start:end]

		wg.Add(1)
		go func(batch []string) {
			defer wg.Done()

			// Start workflow for the current batch
			if err := b.startBWorkflow(ctx, batch); err != nil {
				errChan <- err
			}
		}(idsInBatch)
	}

	// Wait for all goroutines to finish
	wg.Wait()
	close(errChan)

	// Check for errors
	if len(errChan) > 0 {
		return errors.New("failed to start one or more B workflows")
	}

	return nil
}

func (b *Service) startBWorkflow(ctx context.Context, id []string) error {
	// Configure workflow options

	options := client.StartWorkflowOptions{
		ID:        fmt.Sprintf("b-workflow-%d", time.Now().UnixNano()),
		TaskQueue: bWorkflow.bTaskQueueName,
	}

	// Create a gateway capture request
	bRequest := b.NewRequest(id)

	// Execute the workflow
	we, err := b.temporalClient.ExecuteWorkflow(ctx, options, bTemporal.BWorkflow, bRequest)
	if err != nil {
		return err
	}

	// Log workflow details
	logger.Logger(ctx).Infow("WORKFLOW_DETAILS", map[string]interface{}{
		"WorkflowID": we.GetID(),
		"RunID":      we.GetRunID(),
	})

	return nil
}

BWorkflow

const BTaskQueueName = "B_TASK_QUEUE_NAME"

func BWorkflow(ctx workflow.Context, request Request) (Response, error) {

        var futures []workflow.Future

	retryConfig := temporalHelper.NewRetryConfig(5, 120, 300, 2.0, 120, nil)
	activityContext := temporalHelper.WithActivityOptions(ctx, retryConfig)

	var bActivity *BActivity

	for _, id := range request.ids {
		activityInput := GatewayCaptureRequest{
			id:  id,
		}
		future := workflow.ExecuteActivity(activityContext, bActivity.BFirstActivity, activityInput)
		futures = append(futures, future)
	}

	for _, future := range futures {
		var result BFirstActivityResponse
		if err := future.Get(ctx, &result); err != nil {
			workflow.GetLogger(ctx).Error("Activity failed", "Error", err)
			return Response{Success: false}, err
		}
	}

	return Response{Success: true}, nil