Thoughts on implementing Kafka exactly-once messaging integration

Temporal Workflows and Kafka Topics

Temporal supports millions of concurrent workflows and it’s common to have a workflow for every business entity (customer, transaction, etc). The number of topics that Kafka supports has been growing over time but still we wouldn’t have a separate Kafka topic for each customer or for each transaction.

Meanwhile a Kafka topic with a lot of partitions on a tuned server might support a million messages per second while a single Temporal workflow instance may only be able to handle a few signals per second.

Thus going from Kafka to Temporal we can expect that often we’ll be “fanning out” from a Kafka topic to workflow instances (likely choosing a workflow id from something in the message), while going from Temporal to Kafka we’ll probably be “fanning in” from many workflows to a smaller number of Kafka topics.

Temporal to Kafka

We’d like to have a Temporal workflow deliver a message to a Kafka topic with exactly-once semantics.

Kafka supports source connectors with exactly-once semantics (Apache Kafka):

In order for a source connector to take advantage of this support, it must be able to provide meaningful source offsets for each record that it emits, and resume consumption from the external system at the exact position corresponding to any of those offsets without dropping or duplicating messages.

This isn’t really a great fit for Temporal. A temporal workflow will deliver a message by executing an activity, and the way to achieve exactly-once semantics on the Temporal side is by making activities idempotent. If we want to have the Kafka side be able to back up and consume previously sent messages, we’d need to interpose a database to do that.

OK, how about deduplicating messages on the Kafka side? After all, a standard way of making activities idempotent in Temporal is by using a idempotency key.

Confluent has an example of deduplicating messages: How to find distinct values in a stream of events using Kafka Streams; we could use our idempotency key as the “event id” that is used in the example.

I was perplexed by the DeduplicationTransformer constructor setting the “left” and “right” durations to half of the desired deduplication interval:

DeduplicationTransformer(final long maintainDurationPerEventInMs, final KeyValueMapper<K, V, E> idExtractor) {
            if (maintainDurationPerEventInMs < 1) {
                throw new IllegalArgumentException("maintain duration per event must be >= 1");
            }
            leftDurationMs = maintainDurationPerEventInMs / 2;
            rightDurationMs = maintainDurationPerEventInMs - leftDurationMs;
...

which is used to look for a matching message within the deduplication interval:

eventIdStore.fetch(
                    eventId,
                    eventTime - leftDurationMs,
                    eventTime + rightDurationMs);
...

Wouldn’t this only look back for duplicated messages for half the desired time? In my testing it seems so. I expect this may simply be an oversight.

Fixing the deduplication interval, this looks pretty good I think. Using a window store with a retention period set to our desired deduplication interval means that Kafka will automatically delete old idempotency keys; we don’t need to worry about cleanup to avoid having the Kafka store grow without bound.

On the Temporal side, we wouldn’t want to retry the activity delivering the message to Kafka for longer than our deduplication interval. A Schedule-To-Close Timeout looks like it’d be the right thing here.

Now we still have a case where A) the activity successfully delivers the message to Kafka; B) the activity worker crashes before the Temporal service is notified that the activity successfully completed; C) the activity is failed because we’re past our Start-To-Close timeout. Here the message has been delivered to Kafka but the workflow doesn’t know that. I think there’s probably not much we can do about this… the assumption is that the deduplication interval should be long enough to cover temporary glitches and downtime.

Kafka to Temporal

Looking at Kafka connectors again, a Kafka sink connector can store the Kafka offset in the destination system along with the messages. Thus, if we were delivering messages to a database for example, we could use a database transaction to ignore messages for offsets that we had already seen. Which, again, doesn’t seem to be a great fit for Temporal. If computers were infinitely fast we could have a single injester workflow consume all messages, deduplicate, and deliver to the business workflows… but that’s not practical in the real world.

Another consideration is delivering messages in order. Using multi-region namespaces it’s possible, if probably extremely rare, for total region failure to allow signals to be delivered out of order… which might be so rare as to not be worth worrying about… but, as long as we’re working on a design anyway, “always” is a nicer guarantee than “almost always”.

More prosaically, the Kafka consumer / Temporal client worker could simply immediately send a signal for each message that it processed (which would definitely allow signals to be delivered out of order)… or carefully deliver signals for a particular workflow one at a time. But given that the receiving workflow will need to deduplicate anyway, perhaps it could also order messages.

So we have a Kafka topic, and each message is destined to be delivered some particular workflow. Either the message contains the workflow id, or the workflow id can otherwise be derived from the message.

Suppose we have a Kafka Streams application that per workflow id adds an incrementing integer to messages destined to that workflow id. That is, the first message delivered to a particular workflow id would have a sequence number of 0, the second a sequence number of 1, etc.

Now it’s easy for the workflow to deduplicate and order messages. Keep a high water mark of the largest consecutive message sequence number received so far. Any new messages received that don’t have a higher sequence number are duplicates and can be discarded. A message with a higher but non-consecutive sequence number has been delivered out of order, and the workflow can hold on to it until the middle messages have been received.

To avoid the Kafka Streams application store from growing without bound, we’d like a workflow that’s shutting down to message the Kafka Streams application that it’s finished, and the Kafka Stream app could then clear that workflow id from its store.

This message deduplication and ordering logic would be the same for any workflow that wanted to received Kafka messages. This would complicate the business logic of the workflow, and would need to be written for each SDK language that the business workflows were implemented in.

Perhaps it would be convenient to have a utility workflow dedicated to handling this message logic. The business workflow could start the utility workflow has a child workflow, with a Parent Close Policy set to request cancellation. The utility workflow would receive the Kafka messages, and signal the business workflow with the messages deduplicated and ordered.

When the business workflow closes, the utility workflow would receive the cancellation request. The utility workflow could notify the Kafka Streams application, and then shut down itself.

1 Like

@awwx, thank you for the thoughtful post. When we designed Temporal, Kafka didn’t support exactly once semantics, so we didn’t spend any cycles on the problems you outlined.

This is the first time anyone has mentioned that such integration would be useful. I don’t see why we couldn’t technically provide a platform-level solution for both publishing and subscribing. When this might happen depends on the demand.

Regarding possible additions to the Temporal platform, something that might be broadly useful is support for idempotent signals.

We already recommend using idempotent keys for activities calling API’s that support them. Going the other way, we might have our own API endpoint that sends a signal to a Temporal workflow; and we might like to allow the client of our API to provide an idempotent key of their own.

Perhaps the Temporal client could provide a “idempotentSignal” method that included an idempotent key argument. The Temporal service could retain the keys for a deduplication interval and discard signals that reused an idempotent key.

For someone who wanted to deliver signals from a Kafka topic, they could use the same mechanism for deduplicating signals without the Temporal platform itself needing to do anything special to support Kafka in particular.

I suspect that if the topic volume was so high that a Kafka Connect style implementation was called for (i.e. processing blocks of messages with Kafka offsets committed to the Temporal service), I imagine that we might be in a high-performance data pipeline kind of scenario that Temporal wouldn’t be a good fit for anyway.

Temporal already dedupes signals based on requestId. The limitation is that the requestId map is not passed through continue-as-new.

Do any of the client SDK’s support request_id yet?

They all assign it to avoid duplicates due to gRPC-level retries. I don’t think it is exposed to the application code.

ok, does this sound like it would work?

  • an API client makes a call to our API layer, and the request includes an idempotency key
  • the API server, as a Temporal client, sends a signal to a workflow, setting the request_id to the client provided idempotency key (either by making a gRPC-level request itself, or by some future addition to the client SDK)
  • sending the signal successfully completes, but the API server crashes before sending a response to the API client
  • the API client retries, the signal is sent again, but with the same request_id it’s duduplicated by the Temporal service

Yes, it should work. I’m not sure what the limitations of the requested format are. It might require UUID.

1 Like