Hello,
I am looking into using temporal for solving my use-case. Let me describe the use-case first and then will come to using temporal for the same. In my system there are two type of nodes/pods → Manager and Worker.
Manager receives name of a kafka topic and then it needs to instruct workers to start consuming data from that kafka topic. Ideally manager would want to run consumer on some workers, such that No. of selected workers = No. of partitions in that kafka topic for maximum throughput, but lets say this is configurable and no. of workers is also specified in the original request to the manager. So essentially manager wants to expose an API -
StartConsumers(topicName string, numConsumers int)
Which will start numConsumers
consumers on different workers such that they are all part of the same kafka consumer group. And any addition or removal of a worker should also be handled (so that we have atleast numConsumers
consumers running at any point of time on different healthy workers).
Now how can I achieve this using temporal? From my research one way is to create per worker task queue (worker-1
uses queue-1
, worker-2
uses queue-2
and so on). And then in workflow we can just fanout activity execution to random numConsumers
queues.
func StartWorkflow(ctx workflow.Context, topicName string, numConsumers int) {
// Fetch list of all available queues that have an active/healthy worker.
queues := getAvailableQueues() // TODO: How to implement this?
for i := 0; i < numConsumers; i++ {
// Define activity options with a specific task queue.
activityOptions := workflow.ActivityOptions{
TaskQueue: queues[i],
}
// Create a new context with the activity options.
ctxWithOptions := workflow.WithActivityOptions(ctx, activityOptions)
// Execute the activity on the specified task queue.
ExecuteActivity(ctxWithOptions, StartKafkaConsumer, topicName)
}
}
But now I have following doubts in this implementation -
- How to keep track of all the available queues in the system, i.e. how to implement
getAvailableQueues
? (workers can dynamically leave/join our pool) - If a worker corresponding to a queues dies, then how to re-schedule activities (that were running on the dead worker) on some other healthy worker?
Will appreciate any help in solving this. Thanks!