Temporal and Kafka

Hi!

I’m new to Temporal but find it very interesting. We are looking at using it for the next rollout of our platform scheduled for end of year so I’d like to get an idea of what’s the best way to architect it.

  1. We have ‘spikes’ in demand sometimes - for example an influx of leads for which we need to perform a set of workflow steps. Would it be recommended to put Kafka as an incoming queue so we can handle spikes?
  2. Per-tenant (a tenant is a customer - an outside organization or agency) we sometimes need to limit the number of leads flowing through the workflow at a given time. Does this mean I need to create a long-running workflow per tenant and send it leads through signals? Or should we use per-lead workflows and would there be some way to manage them?
  3. For rate-limited tasks like sending emails we are thinking of making an async activity and pushing them onto an outbound Kafka queue. Is there a way to do this directly in Temporal instead?

The architecture we are thinking off looks like this:

(External Workflow Triggers) -> Inbound Kafka Queue
                                         |
                                         |
                                         V
                                      Temporal ---- (listens on Kafka and start workflow for each trigger)
                                         |
                                         |
                                         V
                               Outbound Kafka Queue

Any inputs/suggestions would be appreciated. It would help us decide if Temporal is a good fit and how best to use it going forward.

Cheers!

  1. It depends on the maximum rate you want to handle. Giving the number of boxes you would need for a Kafka cluster I wouldn’t be surprised that having Temporal cluster that can sustain the peak workflow start load is simpler and cheaper. What is the maximum start workflow rate you are targeting?
  2. Yes, the simplest implementation would be to have a workflow per tenant.
  3. No need for Kafka here. Use a separate activity task queue which already supports throttling limit.
1 Like
  1. The maximum rate we are targeting is 10,000 events/minute. This includes tracking activities like web clicks, email opens, etc
  2. Ah that’s what I thought as well. How could we rate-limit the number of workflow instances per tenant in this case?
  3. Is there some documentation on how to use the task queue? I read https://docs.temporal.io/docs/java-implementing-workflows but what I understood was there was a task queue name I could provide when invoking an activity.

Thank you for your helpful replies @maxim :heart:

To provide more detail on our requirement:

  1. 10,000 events/minute
  2. Around 4 Million Send events daily (upto 20% surge sometimes) [ outgoing activity ]
  3. 1000 Tenants of varying loads (some very small, some large)
  4. 10-30 different workflows can be triggered per tenant. Each workflow has 3-5 actions

You are welcome!

  1. It is about 200 requests a second. It is a pretty reasonable rate for Temporal to handle. So no need for Kafka here.
  2. Temporal guarantees the uniqueness of workflows by their ID. So just use tenantId as workflowId and only one instance per tenant will be created.
  3. Basically, when you start a worker object you pass the task queue name as a parameter to the factory method. Then when you schedule activity you would need to pass that name.

The worker initialization code:

    Worker worker = factory.newWorker(EMAIL_QUEUE);
    worker.registerActivitiesImplementations(emailActivities);

Workflow code:

    EmailActivities activities =
        Workflow.newActivityStub(
            EmailActivities.class,
            ActivityOptions.newBuilder()
                .setTaskQueue(EMAIL_QUEUE)
                .setStartToCloseTimeout(Duration.ofSeconds(30))
                .build());
    activities.sendEmail(...); 

Got it. Excited to try out your suggestions @maxim ! Temporal looks like an amazing solution.

Thanks again! Have a good WE.

3 Likes

Consider using SignalWithStart to start per tenant workflows. It sends a signal to a workflow starting it if it is not running. It eliminates need to decide which event has to start the workflow.

The SignalWithStart codefrom moneybatch sample:

    AccountTransferWorkflow transferWorkflow =
        workflowClient.newWorkflowStub(AccountTransferWorkflow.class, options);
    // Signal with start sends a signal to a workflow starting it if not yet running
    BatchRequest request = workflowClient.newSignalWithStartRequest();
    request.add(transferWorkflow::deposit, to, BATCH_SIZE);
    request.add(transferWorkflow::withdraw, from, reference, amountCents);
    workflowClient.signalWithStart(request);

where AccountTransferWorklfow is defined as:

@WorkflowInterface
public interface AccountTransferWorkflow {

  @WorkflowMethod
  void deposit(String toAccountId, int batchSize);

  @SignalMethod
  void withdraw(String fromAccountId, String referenceId, int amountCents);

  @QueryMethod
  int getBalance();

  @QueryMethod
  int getCount();
}

Trying to wrap my head around this correctly. So the workflow would look like:


@WorkflowMethod
public void tenantWorkflow() {
    while(pendingEvent.length > 0) {
        current = pending.pop()
        workflowFound = findWorkflowFor(tenant, current)
        runChildWorkflow(workflowFound, current)
    }
}

@SignalMethod
public void addEvent(Lead lead, Event event) {
      pendingEvents.add(new EventWrapper(lead, event))
}

And the code that handles the event would start/signal this workflow using signalWithStart() as shown in the moneybatch sample. Is that right?

Then we add rate limiting code to the main tenantWorkflow() method keeping track of the times each child workflow was launched and number of pending events.

Your code and design look good to me.

1 Like

@maxim In this scenario using SignalWithStart is a best practice but calling SignalWithStart will restart the workflow(say workflowId equals tenantId 123) even though workflow 123 was already terminated.

Let’s assume that a specific workflow(with workflowID 123) was terminated because the workflow has been executed successfully. If the kafak consumer continues receiving any messages associated with tenantId 123, which equals the workflowId, it will try restarting the workflow(with workflowId 123) again due to SignalWithStart behavior.

What’s the best practice to prevent this? how to avoid restarting that workflow if it has been successfully terminated if we wanna use SignalWithStart in the Kafka consumer?

You can specify WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE WorkflowOptions.IdReusePolicy when calling SignalWithStart.

1 Like