How to distribute activities evenly among workers?

I’m doing a PoC to see how Temporal solves embarrassingly parallel problems.

I did this by computing the sum of the first Nth odd number. First, draw a random number for rank. Secondly, get the Nth odd number in partitioned execution. Thirdly, compute the reduction sum and assert the sum.

I’d expect that if I launch multiple workers (num > 5). Each worker will poll the activities from the task queue. But from the worker console log, I confirmed that only one worker got all the tasks and the rest of the workers are idle.

func PartitionFlow(ctx workflow.Context, upperBound int32) (int32, error) {
	// Define the activity options
	options := workflow.ActivityOptions{
		StartToCloseTimeout: time.Second * 45,
	}

	ctx = workflow.WithActivityOptions(ctx, options)

	// Simulate generating a random rank
	// var randNum int32
	//err := workflow.ExecuteActivity(ctx, DrawRandomNumber, upperBound).Get(ctx, &randNum)
	//if err != nil {
	//	return 0, err
	//}
	var randNum int32
	encodedRandom := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} {
		r, _ := DrawRandomNumber(upperBound)
		return r
	})
	encodedRandom.Get(&randNum)

	// Simulate partitioning tasks
	futures := make([]workflow.Future, randNum)
	for rank := 0; rank < int(randNum); rank++ {
		futures[rank] = workflow.ExecuteActivity(ctx, GetNthOddNumber, int32(rank+1))
	}

	// Wait for all activities to complete
	var results []int32
	for _, future := range futures {
		var result int32
		if err := future.Get(ctx, &result); err != nil {
			return 0, err
		}
		results = append(results, result)
	}

	// Simulate reduce operation
	var actaulSum int32
	if err := workflow.ExecuteActivity(ctx, ReductionSum, results).Get(ctx, &actaulSum); nil != err {
		return 0, err
	}

	// Assert the sum of the first n odd numbers
	var isValid bool
	if err := workflow.ExecuteActivity(ctx, AssertNthOddNumberSum, randNum, actaulSum).Get(ctx, &isValid); nil != err {
		return 0, err
	}
	if !isValid {
		return 0, temporal.NewNonRetryableApplicationError("Invalid sum of odd numbers", "PartitionFlowError", nil)
	}

	return actaulSum, nil
}

func DrawRandomNumber(u int32) (int32, error) {
	// Simulate drawing a random number
	r := rand.Int31n(u) + 1 // Random number in [1, u + 1)
	return r, nil
}

// GetNthOddNumber returns the nth odd number, where n is 1-based index.
func GetNthOddNumber(n int32) (int32, error) {
	time.Sleep(10 * time.Second) // Simulate some processing time
	return 2*n - 1, nil
}

func ReductionSum(nums []int32) (int32, error) {
	sum := int32(0)
	for _, num := range nums {
		sum += num
	}
	return sum, nil
}

func AssertNthOddNumberSum(n int32, actaulSum int32) (bool, error) {
	return n*n == actaulSum, nil
}

I have tweaked worker options: MaxConcurrentActivityExecutionSize, MaxConcurrentLocalActivityExecutionSize, TaskQueueActivitiesPerSecond, and WorkerActivitiesPerSecond. It doesn’t change the task assignment.

func main() {
	c, err := client.Dial(client.Options{})
	if err != nil {
		log.Fatalln("Unable to create client", err)
	}
	defer c.Close()

	workerOptions := worker.Options{
		MaxConcurrentActivityExecutionSize:      1,
		MaxConcurrentLocalActivityExecutionSize: 1,
		WorkerActivitiesPerSecond:               0.1,
		TaskQueueActivitiesPerSecond:            0.1,
	}
	w := worker.New(c, calc.TaskQueueName, workerOptions)

	w.RegisterWorkflow(calc.PartitionFlow)
	w.RegisterActivity(calc.DrawRandomNumber)
	w.RegisterActivity(calc.GetNthOddNumber)
	w.RegisterActivity(calc.ReductionSum)
	w.RegisterActivity(calc.AssertNthOddNumberSum)

	err = w.Run(worker.InterruptCh())
	if err != nil {
		log.Fatalln("Unable to start worker", err)
	}
}

I tried fixed slot suppliers and add to the workerOptions. But there are no Workflow Task Handler. The workflow stalled.

The feature is experimental. Yet, it didn’t solve the load balancing problem among the worker.

func getWorkerTuner() worker.WorkerTuner {
	compositeTuner, err := worker.NewFixedSizeTuner(worker.FixedSizeTunerOptions{
		NumWorkflowSlots:      1,
		NumActivitySlots:      1,
		NumLocalActivitySlots: 1,
		NumNexusSlots:         1,
	})

	if nil != err {
		log.Fatalln("Unable to create composite tuner", err)
	}
	return compositeTuner
}