Concurrent execution of same activity across different workers

Hello,

I am looking into using temporal for solving my use-case. Let me describe the use-case first and then will come to using temporal for the same. In my system there are two type of nodes/pods → Manager and Worker.

Manager receives name of a kafka topic and then it needs to instruct workers to start consuming data from that kafka topic. Ideally manager would want to run consumer on some workers, such that No. of selected workers = No. of partitions in that kafka topic for maximum throughput, but lets say this is configurable and no. of workers is also specified in the original request to the manager. So essentially manager wants to expose an API -

StartConsumers(topicName string, numConsumers int)

Which will start numConsumers consumers on different workers such that they are all part of the same kafka consumer group. And any addition or removal of a worker should also be handled (so that we have atleast numConsumers consumers running at any point of time on different healthy workers).

Now how can I achieve this using temporal? From my research one way is to create per worker task queue (worker-1 uses queue-1, worker-2 uses queue-2 and so on). And then in workflow we can just fanout activity execution to random numConsumers queues.

func StartWorkflow(ctx workflow.Context, topicName string, numConsumers int) {
  // Fetch list of all available queues that have an active/healthy worker.
  queues := getAvailableQueues() // TODO: How to implement this?
  for i := 0; i < numConsumers; i++ {
    // Define activity options with a specific task queue.
    activityOptions := workflow.ActivityOptions{
        TaskQueue: queues[i],
    }

    // Create a new context with the activity options.
    ctxWithOptions := workflow.WithActivityOptions(ctx, activityOptions)

    // Execute the activity on the specified task queue.
    ExecuteActivity(ctxWithOptions, StartKafkaConsumer, topicName)
  }
}

But now I have following doubts in this implementation -

  1. How to keep track of all the available queues in the system, i.e. how to implement getAvailableQueues ? (workers can dynamically leave/join our pool)
  2. If a worker corresponding to a queues dies, then how to re-schedule activities (that were running on the dead worker) on some other healthy worker?

Will appreciate any help in solving this. Thanks!

If the number of queues is not very large, you can have a Manager workflow that maintains the list of queues. If it is large (exceeds the 2 MB limit), you can keep it in an external DB.

Number of queues will be proportional to workers and max workers should not be more than 30 in our case. Can you elaborate more on the manager workflow?

You create a workflow that keeps the list of queues in its variables. Then, you use update/signal to register new queues. And a query to return the list of queues.