Separate Task queue for each Activity

Hi Team,

Want to use Temporal for ETL purpose. Our current ETL job do the following

  1. An ETL Job(JOB 1) reads from a kafka topic(topic 1), apply the transformations and publishes to another Kafka topic(topic 2).
  2. From topic2 another ETL Job (JOB 2)consumes the messages, apply the transformations and publishes to another kafka topic(topic 3).
  3. This flow continues

Scaling requirements of each ETL job is different and we are horizontally scaling each ETL job separately.

We are exploring to use Temporal for this use case and thinking of creating a separate activity for each ETL job. Is there a way to use separate task queue for each activity and output of Activity 1 pass to input task queue of Activity 2.

What are you trying to achieve by using a separate task queue for each activity?

What is the projected maximum rate of these activity executions?

Hi maxim,

What I’m thinking of is that, deploy a separate Worker with a single activity that polls the messages from a task queue(Queue 1), apply transformation and publish the message to another task queue(Queue 2). Another Worker with a Single Activity that consumes from Task queue(Queue 2) and perform the transformations and publishes to another task queue(Queue 2). So that, I can deploy workers for each task queue and also horizontally scale my workers independently.

Our expected through put is ~4K-5K executions/Second

What are these workers doing? Is it CPU-intensive transformation or calls to external services?

Do you have any requirements around error handling and conditional logic?

What you describe can be solved by chaining any other queue implementation like Kafka or SQS. I don’t see Temporal adding much value for simple transformations.

Hi maxim,

Thank you very much for the quick reply. Currently we are using Kafka for this use case. Yes, some of the ETL jobs has calls to external services and some are CPU-intensitive transformations. Yes, we do have requirements for error handling, conditional logic and retries. We want to maintain the state of each message.

In my use case, what I’m doing is that

  1. in the first ETL job, make a call to externals service, get the response(this can be a JSON or some other type of response payload) and publish this response a Kafka topic(Topic 1)
  2. Second ETL Job, consumes the messages from kafka Topic(Topic 1), parse and extract only the required data from response of the first call and publishes to another kafka topic(Topic 2)

Some times calls to external service fails and even after configurable retires if the calls to external service fails, we want publish the failed calls to another Kafka Topic(Topic 3)

We want to know, in what state the message is in currently and what flow it follows

I see. Then using Temporal does make sense. There is no need to use a task queue per activity type (what you call stage) unless you want to host them in different processes.

If I’m not using task queue per activity type; how to scale each stage independently? Calls to external service takes less CPUs(stage 1) and stage 2(parse & extract) is CPU-intensive, how can we scale each stage independently? We want to achieve high throughput at the same time.

In this case, you want to host them in different processes, and using different task queues for them is reasonable.

Temporal doesn’t put any hard limits on the number of task queues. You can have thousands (or even much more if the matching engine is provisioned accordingly) of them without a problem. So you can absolutely use a task queue per stage. I usually advocate against using multiple task queues in the same process if they need to compete for the same resources and are not rate limited.

Any Java Sample code for my use case? Using separate task queue for each stage and launching separate worker for each task queue?

I don’t think there is a sample for this. You should use a separate activity stub for each task queue.

Hi Maxim,

I found example code in the Java samples

Thank you

Hi @ravi.ov

This example is not using separate activity stubs for each taskqueue.

It would be something like this:

    private final Account lightMethod = Workflow.newActivityStub(Account.class, ActivityOptions.newBuilder()
            .setTaskQueue("taskQueue-lightMethod").build());

    private final Account_2 cpuIntensiveMethod = Workflow.newActivityStub(Account_2.class, ActivityOptions.newBuilder()
            .setTaskQueue("taskQueue-cpuIntensiveMethod").build());

    @Override
    public void transfer(
            String fromAccountId, String toAccountId, String referenceId, int amountCents) {
        lightMethod.withdraw(fromAccountId, referenceId, amountCents);
        cpuIntensiveMethod.deposit(toAccountId, referenceId, amountCents);
    }

Thank you very much @antonio.perez.