Understanding Temporal internals

While going though this video, where Samar is explaining Cadence internals. I had a few question:

Q1: if we’re running 5 history hosts, how does the FE figure out to which host to send the request? Does it send to all history hosts with the the shard_id, and whichever host has it responds with ack?

Q2: correct me if my understanding is wrong:
If we have 5 cadence shards and our hashing function is wf_id mod shard_count
For a workflow_id 42 will always be handled by a shard_id 2 (42%5)Now FE also has the same hash function and can determine the shard_id.
Due to this if we change the cadence shard count, that will mess up the results of the shard function.

Q3: But if we restart FE and History hosts with the updated Cadence shard count, would that not solve this problem?

Q4: Does the cadence shard count also influence the PK/partionId in the the database, if so then restarting the hosts may not be enough.

Q1: if we’re running 5 history hosts, how does the FE figure out to which host to send the request? Does it send to all history hosts with the shardId, and whichever host has it responds with ack?
There are two levels of sharding in Cadence and Temporal.

The first level hashes workflowId to a shardId where number of shards is fixed during cluster lifetime.

The second level uses a consistent hash to map shardId to a specific history service host given current cluster membership. Assuming a stable cluster state every shard is assigned to one of the history hosts.

So in your example FE will first hash to a shardId and then find out which host owns the shard using consistent hashing. So the request is always sent to a specific history service host.

Q2: correct me if my understanding is wrong:
If we have 5 cadence shards and our hashing function is wf_id mod shard_count
For a workflow_id 42 will always be handled by a shard_id 2 (42%5)Now FE also has the same hash function and can determine the shard_id.
Due to this if we change the cadence shard count, that will mess up the results of the shard function.
Q3: But if we restart FE and History hosts with the updated Cadence shard count, would that not solve this problem?
Q4: Does the cadence shard count also influence the PK/partionId in the the database, if so then restarting the hosts may not be enough.

Each shard has a lot of resources in DB associated with it. For example, workflow so-called mutable state (which contains all the info about the current workflow state) has sharId as a primary key. Also, each shard maintains its own instance of a durable timer queue and the so-called transfer queue. When the number of shards is changed the routing layer will look for a workflow in a different shard while its state wasn’t migrated from the original one. So changing the number of shards is not possible unless a very complicated (as it has to migrate timer queue tasks as well) data migration code is implemented one day.

1 Like

Thanks for the answer @maxim , this helps clarify things. A few more questions:

Assuming a stable cluster state every shard is assigned to one of the history hosts.

Q4: How is this guaranteed, can there be intermittent failures when a host which owns shard 1 goes down? How is that handled? How long can be this delay? During this delay the new workflows on this shard can’t be created I assume.

Q5: If a history host goes down, then who get’s the shards on that host? Also how are the shards balanced if we scale up/scale down hosts?

Q6: On shard ownership, how is guaranteed that the range id will be unique for each host? If there’s conflict there, it could lead to multiple writers for a workflow.

Q7: What happens if a shard writes the workflow data onto the persistence layer and move the task to the transfer Queue, but before it could move the task to the Marcher’s Tasklist it dies? Are the Cadence workers meant to handle these scenarios?

Q8: In the video, Samar says, if a task is lost, it’ll cause a timeout and the workflows would needs to handle those scenarios. What timeout field is being talked about here? Can we control it? What happens is that value is very high say 10 years? The workflow stays stuck for 10 years?

Q9: Why is Cadence called a write heavy system? Should it not be read heavy due to fetching workflow state, history, on async polls?

Q4: How is this guaranteed, can there be intermittent failures when a host which owns shard 1 goes down? How is that handled? How long can be this delay? During this delay the new workflows on this shard can’t be created I assume.

We use a gossip-based membership library ringpop. It is responsible for detecting host failures and updating the cluster membership in a timely manner. The delay usually within a second. The front end host retries a request to another host if shard membership changed during the request and usually the retry goes through. The SDK client code also retries requests if needed. We’ve seen use cases that had pretty tight SLA on workflow starts and we were able to ensure high availability during cluster deployments with a 3-second external client timeout.

Q5: If a history host goes down, then who get’s the shards on that host? Also how are the shards balanced if we scale up/scale down hosts?

Whatever host owns it based on the consistent hashing algorithm. So every time the cluster membership changes the shards are rebalanced. The consistent hashing is used to ensure that majority of the shards stay at their current hosts during resharding.

Q6: On shard ownership, how is guaranteed that the range id will be unique for each host? If there’s conflict there, it could lead to multiple writers for a workflow.

The shard hashing and the cluster membership based on gossip can lead to situations when more than one host believes that it owns the shard. To avoid inconsistencies all updates to the history service database are protected by a conditional update on the so-called range-id value. So Temporal uses DB to ensure 100% consistency in these situations. This video contains some details about this mechanism.

Q7: What happens if a shard writes the workflow data onto the persistence layer and moves the task to the transfer Queue, but before it could move the task to the Marcher’s Tasklist it dies? Are the Cadence workers meant to handle these scenarios?

The new host is going to pick up the shard and then process all unacknowledged tasks from the transfer queue. So the task is going to be redelivered to the matching engine. Note that it doesn’t mean that workers will receive the duplicate as tasks are deduped before being sent to a worker.

Q8: In the video, Samar says, if a task is lost, it’ll cause a timeout and the workflows would need to handle those scenarios. What timeout field is being talked about here? Can we control it? What happens is that value is very high say 10 years? Does the workflow stay stuck for 10 years?

It depends on the task type. The relevant activity timeout is StartToClose. All the timeouts are configurable by the workflow code. If an activity StartToClose timeout is set to 10 years (and workflow run timeout is set to a same or larger value as well) the workflow will be waiting for the timeout for 10 years.

Q9: Why is Cadence called a write heavy system? Should it not be read heavy due to fetching workflow state, history, on async polls?

Because of caching. Almost everything is cached on worker and history service nodes. So fetching of workflow states and full histories is only needed when a state is lost either due to process restarts or data being pushed out of caches in LRU manner.

1 Like

Q10: Since Frontend acts as a proxy to the underlying services What is it’s durability of the service? How is the availability guaranteed?

I believe it is explained in this video. Do you have any specific questions about the design?