Temporal Workflows and Kafka Topics
Temporal supports millions of concurrent workflows and it’s common to have a workflow for every business entity (customer, transaction, etc). The number of topics that Kafka supports has been growing over time but still we wouldn’t have a separate Kafka topic for each customer or for each transaction.
Meanwhile a Kafka topic with a lot of partitions on a tuned server might support a million messages per second while a single Temporal workflow instance may only be able to handle a few signals per second.
Thus going from Kafka to Temporal we can expect that often we’ll be “fanning out” from a Kafka topic to workflow instances (likely choosing a workflow id from something in the message), while going from Temporal to Kafka we’ll probably be “fanning in” from many workflows to a smaller number of Kafka topics.
Temporal to Kafka
We’d like to have a Temporal workflow deliver a message to a Kafka topic with exactly-once semantics.
Kafka supports source connectors with exactly-once semantics (Apache Kafka):
In order for a source connector to take advantage of this support, it must be able to provide meaningful source offsets for each record that it emits, and resume consumption from the external system at the exact position corresponding to any of those offsets without dropping or duplicating messages.
This isn’t really a great fit for Temporal. A temporal workflow will deliver a message by executing an activity, and the way to achieve exactly-once semantics on the Temporal side is by making activities idempotent. If we want to have the Kafka side be able to back up and consume previously sent messages, we’d need to interpose a database to do that.
OK, how about deduplicating messages on the Kafka side? After all, a standard way of making activities idempotent in Temporal is by using a idempotency key.
Confluent has an example of deduplicating messages: How to find distinct values in a stream of events using Kafka Streams; we could use our idempotency key as the “event id” that is used in the example.
I was perplexed by the DeduplicationTransformer
constructor setting the “left” and “right” durations to half of the desired deduplication interval:
DeduplicationTransformer(final long maintainDurationPerEventInMs, final KeyValueMapper<K, V, E> idExtractor) {
if (maintainDurationPerEventInMs < 1) {
throw new IllegalArgumentException("maintain duration per event must be >= 1");
}
leftDurationMs = maintainDurationPerEventInMs / 2;
rightDurationMs = maintainDurationPerEventInMs - leftDurationMs;
...
which is used to look for a matching message within the deduplication interval:
eventIdStore.fetch(
eventId,
eventTime - leftDurationMs,
eventTime + rightDurationMs);
...
Wouldn’t this only look back for duplicated messages for half the desired time? In my testing it seems so. I expect this may simply be an oversight.
Fixing the deduplication interval, this looks pretty good I think. Using a window store with a retention period set to our desired deduplication interval means that Kafka will automatically delete old idempotency keys; we don’t need to worry about cleanup to avoid having the Kafka store grow without bound.
On the Temporal side, we wouldn’t want to retry the activity delivering the message to Kafka for longer than our deduplication interval. A Schedule-To-Close Timeout looks like it’d be the right thing here.
Now we still have a case where A) the activity successfully delivers the message to Kafka; B) the activity worker crashes before the Temporal service is notified that the activity successfully completed; C) the activity is failed because we’re past our Start-To-Close timeout. Here the message has been delivered to Kafka but the workflow doesn’t know that. I think there’s probably not much we can do about this… the assumption is that the deduplication interval should be long enough to cover temporary glitches and downtime.
Kafka to Temporal
Looking at Kafka connectors again, a Kafka sink connector can store the Kafka offset in the destination system along with the messages. Thus, if we were delivering messages to a database for example, we could use a database transaction to ignore messages for offsets that we had already seen. Which, again, doesn’t seem to be a great fit for Temporal. If computers were infinitely fast we could have a single injester workflow consume all messages, deduplicate, and deliver to the business workflows… but that’s not practical in the real world.
Another consideration is delivering messages in order. Using multi-region namespaces it’s possible, if probably extremely rare, for total region failure to allow signals to be delivered out of order… which might be so rare as to not be worth worrying about… but, as long as we’re working on a design anyway, “always” is a nicer guarantee than “almost always”.
More prosaically, the Kafka consumer / Temporal client worker could simply immediately send a signal for each message that it processed (which would definitely allow signals to be delivered out of order)… or carefully deliver signals for a particular workflow one at a time. But given that the receiving workflow will need to deduplicate anyway, perhaps it could also order messages.
So we have a Kafka topic, and each message is destined to be delivered some particular workflow. Either the message contains the workflow id, or the workflow id can otherwise be derived from the message.
Suppose we have a Kafka Streams application that per workflow id adds an incrementing integer to messages destined to that workflow id. That is, the first message delivered to a particular workflow id would have a sequence number of 0, the second a sequence number of 1, etc.
Now it’s easy for the workflow to deduplicate and order messages. Keep a high water mark of the largest consecutive message sequence number received so far. Any new messages received that don’t have a higher sequence number are duplicates and can be discarded. A message with a higher but non-consecutive sequence number has been delivered out of order, and the workflow can hold on to it until the middle messages have been received.
To avoid the Kafka Streams application store from growing without bound, we’d like a workflow that’s shutting down to message the Kafka Streams application that it’s finished, and the Kafka Stream app could then clear that workflow id from its store.
This message deduplication and ordering logic would be the same for any workflow that wanted to received Kafka messages. This would complicate the business logic of the workflow, and would need to be written for each SDK language that the business workflows were implemented in.
Perhaps it would be convenient to have a utility workflow dedicated to handling this message logic. The business workflow could start the utility workflow has a child workflow, with a Parent Close Policy set to request cancellation. The utility workflow would receive the Kafka messages, and signal the business workflow with the messages deduplicated and ordered.
When the business workflow closes, the utility workflow would receive the cancellation request. The utility workflow could notify the Kafka Streams application, and then shut down itself.