I’m doing a PoC to see how Temporal solves embarrassingly parallel problems.
I did this by computing the sum of the first Nth odd number. First, draw a random number for rank. Secondly, get the Nth odd number in partitioned execution. Thirdly, compute the reduction sum and assert the sum.
I’d expect that if I launch multiple workers (num > 5). Each worker will poll the activities from the task queue. But from the worker console log, I confirmed that only one worker got all the tasks and the rest of the workers are idle.
func PartitionFlow(ctx workflow.Context, upperBound int32) (int32, error) {
// Define the activity options
options := workflow.ActivityOptions{
StartToCloseTimeout: time.Second * 45,
}
ctx = workflow.WithActivityOptions(ctx, options)
// Simulate generating a random rank
// var randNum int32
//err := workflow.ExecuteActivity(ctx, DrawRandomNumber, upperBound).Get(ctx, &randNum)
//if err != nil {
// return 0, err
//}
var randNum int32
encodedRandom := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} {
r, _ := DrawRandomNumber(upperBound)
return r
})
encodedRandom.Get(&randNum)
// Simulate partitioning tasks
futures := make([]workflow.Future, randNum)
for rank := 0; rank < int(randNum); rank++ {
futures[rank] = workflow.ExecuteActivity(ctx, GetNthOddNumber, int32(rank+1))
}
// Wait for all activities to complete
var results []int32
for _, future := range futures {
var result int32
if err := future.Get(ctx, &result); err != nil {
return 0, err
}
results = append(results, result)
}
// Simulate reduce operation
var actaulSum int32
if err := workflow.ExecuteActivity(ctx, ReductionSum, results).Get(ctx, &actaulSum); nil != err {
return 0, err
}
// Assert the sum of the first n odd numbers
var isValid bool
if err := workflow.ExecuteActivity(ctx, AssertNthOddNumberSum, randNum, actaulSum).Get(ctx, &isValid); nil != err {
return 0, err
}
if !isValid {
return 0, temporal.NewNonRetryableApplicationError("Invalid sum of odd numbers", "PartitionFlowError", nil)
}
return actaulSum, nil
}
func DrawRandomNumber(u int32) (int32, error) {
// Simulate drawing a random number
r := rand.Int31n(u) + 1 // Random number in [1, u + 1)
return r, nil
}
// GetNthOddNumber returns the nth odd number, where n is 1-based index.
func GetNthOddNumber(n int32) (int32, error) {
time.Sleep(10 * time.Second) // Simulate some processing time
return 2*n - 1, nil
}
func ReductionSum(nums []int32) (int32, error) {
sum := int32(0)
for _, num := range nums {
sum += num
}
return sum, nil
}
func AssertNthOddNumberSum(n int32, actaulSum int32) (bool, error) {
return n*n == actaulSum, nil
}
I have tweaked worker options: MaxConcurrentActivityExecutionSize, MaxConcurrentLocalActivityExecutionSize, TaskQueueActivitiesPerSecond, and WorkerActivitiesPerSecond. It doesn’t change the task assignment.
func main() {
c, err := client.Dial(client.Options{})
if err != nil {
log.Fatalln("Unable to create client", err)
}
defer c.Close()
workerOptions := worker.Options{
MaxConcurrentActivityExecutionSize: 1,
MaxConcurrentLocalActivityExecutionSize: 1,
WorkerActivitiesPerSecond: 0.1,
TaskQueueActivitiesPerSecond: 0.1,
}
w := worker.New(c, calc.TaskQueueName, workerOptions)
w.RegisterWorkflow(calc.PartitionFlow)
w.RegisterActivity(calc.DrawRandomNumber)
w.RegisterActivity(calc.GetNthOddNumber)
w.RegisterActivity(calc.ReductionSum)
w.RegisterActivity(calc.AssertNthOddNumberSum)
err = w.Run(worker.InterruptCh())
if err != nil {
log.Fatalln("Unable to start worker", err)
}
}