@tihomir, sharing the code snippet.
main.go
bWorker := worker.New(temporalClient, bTemporal.bTaskQueueName, worker.Options{})
bActivity := btemporal.NewBActivity(bService)
bWorker.RegisterWorkflow(bTemporal.BWorkflow)
bWorker.RegisterActivity(bActivity.BFirstActivity)
aWorker := worker.New(temporalClient, atemporal.ATaskQueueName, worker.Options{})
aActivity := atemporal.NewAActivity(aService)
aWorker.RegisterWorkflow(atemporal.AWorkflow)
aWorker.RegisterActivity(aActivity.AFirstActivity)
go func() {
err := aWorker.Run(worker.InterruptCh())
if err != nil {
fmt.Println("Unable to start A Worker")
}
}()
go func() {
err := bWorker.Run(worker.InterruptCh())
if err != nil {
fmt.Println"Unable to start B Worker")
}
}()
AWorkflow
const ATaskQueueName = "A_TASK_QUEUE_NAME"
func AWorkflow(ctx workflow.Context) (*AResponse, error) {
aPolicy := &temporal.RetryPolicy{
InitialInterval: time.Second * 5,
BackoffCoefficient: 2.0,
MaximumInterval: time.Second * 300,
}
aOptions := workflow.ActivityOptions{
StartToCloseTimeout: time.Minute * 30,
RetryPolicy: aPolicy,
}
aCtx := workflow.WithActivityOptions(ctx, aOptions)
aErr := workflow.ExecuteActivity(aCtx, aActivity.AFirstActivity, "1234").Get(aCtx, nil)
}
AActivity
func (a *AActivity) AFirstActivity(ctx context.Context, batchId string) error {
return a.sService.TriggerSService(ctx, batchId)
}
Start B Workflow which is triggered by S service.
func (b *Service) TriggerSService(ctx context.Context, batchId string) errors.IError {
ids := b.GetIds(batchId)
var wg sync.WaitGroup
errChan := make(chan error, len(ids)/BatchSize+1) // Buffer to collect errors
// Process payments in batches
for start := 0; start < len(ids); start += BatchSize {
end := start + BatchSize
if end > len(ids) {
end = len(ids)
}
idsInBatch := ids[start:end]
wg.Add(1)
go func(batch []string) {
defer wg.Done()
// Start workflow for the current batch
if err := b.startBWorkflow(ctx, batch); err != nil {
errChan <- err
}
}(idsInBatch)
}
// Wait for all goroutines to finish
wg.Wait()
close(errChan)
// Check for errors
if len(errChan) > 0 {
return errors.New("failed to start one or more B workflows")
}
return nil
}
func (b *Service) startBWorkflow(ctx context.Context, id []string) error {
// Configure workflow options
options := client.StartWorkflowOptions{
ID: fmt.Sprintf("b-workflow-%d", time.Now().UnixNano()),
TaskQueue: bWorkflow.bTaskQueueName,
}
// Create a gateway capture request
bRequest := b.NewRequest(id)
// Execute the workflow
we, err := b.temporalClient.ExecuteWorkflow(ctx, options, bTemporal.BWorkflow, bRequest)
if err != nil {
return err
}
// Log workflow details
logger.Logger(ctx).Infow("WORKFLOW_DETAILS", map[string]interface{}{
"WorkflowID": we.GetID(),
"RunID": we.GetRunID(),
})
return nil
}
BWorkflow
const BTaskQueueName = "B_TASK_QUEUE_NAME"
func BWorkflow(ctx workflow.Context, request Request) (Response, error) {
var futures []workflow.Future
retryConfig := temporalHelper.NewRetryConfig(5, 120, 300, 2.0, 120, nil)
activityContext := temporalHelper.WithActivityOptions(ctx, retryConfig)
var bActivity *BActivity
for _, id := range request.ids {
activityInput := GatewayCaptureRequest{
id: id,
}
future := workflow.ExecuteActivity(activityContext, bActivity.BFirstActivity, activityInput)
futures = append(futures, future)
}
for _, future := range futures {
var result BFirstActivityResponse
if err := future.Get(ctx, &result); err != nil {
workflow.GetLogger(ctx).Error("Activity failed", "Error", err)
return Response{Success: false}, err
}
}
return Response{Success: true}, nil