XDC Replication in Practice

I had started a thread in Slack about trying to find other users of XDC that exercise failovers of XDC regularly. While it’s experimental, it’s a crucial feature for us at Netflix to have a multi-region presence (especially for a Platform service like this), so I’m seeking out others who have been playing with it or actively depend on it like we do.

This is a post of what we know, what we’re doing and what our experience of the feature is so far. I’m eager to learn where we might improve, and also provide whatever additional info might be helpful for future users! :slightly_smiling_face:

Background

We currently operate Temporal out of AWS us-west-2 and us-east-1. We’ve elected to name each cluster after the region to make things easier to reason about: The Temporal doc examples show having clusters named “active” and “passive”, however this gets confusing once you failover and the cluster named “active” is no longer active. It’s important to note that a cluster cannot be renamed.

We’ve not experienced many problems with XDC on a day-to-day basis. Replication task lag is normally around ~170ms. My current understanding (based on intuition), is that this metric is how long it takes for the replication worker to actually process a task, rather than it being the end-to-end replication lag to deliver the task to passive clusters.

Our cross-regional latencies are quite low, so we’ve not observed any substantial issues here, although we’ve not yet attempted replicating across oceans.

Internal Temporal SDK

We offer a Netflix Temporal SDK, which layers on additional functionality for our customers atop the Java Temporal SDK. This includes integration with Spring Boot and a variety of Platform features and services, and includes a handful of conventions and higher-level abstractions that customers can use to be safer and more productive. One such abstraction is managed handling of Temporal in an XDC topology.

Following guidance of other posts in the community, an application will create a WorkerFactory for each cluster. A (partial) example of an application config, (note: the URIs are scrubbed; we use FQDNs in reality):

temporal:
  enabled: true
  defaultNamespace: spinnaker
  clusters:
    uswest2:
      uiTarget: https://temporal-main-uswest2
      workerTarget: temporal-main-uswest2:8980
    useast1:
      uiTarget: https://temporal-main-useast1
      workerTarget: temporal-main-useast1:8980
  workers:
    cloud-operation-runners:
      awaitTerminationTimeout: PT5M
      enabled: true
    titus-cloud-operations:
      awaitTerminationTimeout: PT20M
      enabled: true
    aws-cloud-operations:
      awaitTerminationTimeout: PT20M
      enabled: true

For each cluster, we create a WorkflowServiceStubs and WorkflowClient. Then, we iterate over the workers map, creating a WorkerFactory for each cluster. So, in the above example, we’d end up with 6 WorkerFactory objects in the Spring context. All WorkerFactory objects are polling by default, so if we as Temporal operators were to failover Temporal itself, applications would continue to process work without skipping a beat.

We’re aiming for Temporal to be as hands-off operationally as possible for our customers. Our SDK comes with an XdcClusterSupport class which regularly polls Temporal for the active cluster of the namespaces the application is connected to. Rather than developers injecting WorkflowServiceStubs or WorkflowClient directly and figuring out which one they should use, we have Provider classes for each (as well as WorkerFactory), which will automatically select the correct object for the namespace’s active cluster. For example:

@Component
public class WorkerFactoryProvider(
  private val workerFactories: Map<String, WorkerFactory>,
  private val xdcClusterSupport: XdcClusterSupport
) {

  /**
   * Get the [WorkerFactory] by its [name] attached to the active cluster.
   */
  public fun get(name: String, active: Boolean = true): WorkerFactory {
    return workerFactories
      .filter { it.key.split("-").first() == name }
      .values
      .firstOrNull {
        val workflowClient = it.workflowClient
        if (workflowClient !is ClusterAware) {
          false
        } else {
          val activeCluster = xdcClusterSupport.getActiveCluster(workflowClient.options.namespace)
          (workflowClient.clusterName == activeCluster) == active
        }
      }
      ?: throw NetflixTemporalException("WorkerFactory for '$name' not found")
  }
}

The XDC integration in the SDK also integrates with our dynamic config service so we as Temporal operators can globally (or singularly) signal applications to disable polling of a specific cluster if we need to actually take the cluster down for maintenance. This is especially important for us since we use SQL persistence, so we can’t simply scale out our persistence to meet new load demands like we could with Cassandra–we have to take cluster downtime.

Existing Issues

These aren’t all-inclusive, but they’re the ones we’ve identified so far in our failover exercises.

io.temporal.client.WorkflowExecutionAlreadyStarted

When using child workflows, parent workflows can fail with this exception during a failover. We hypothesize that it’s a race condition of parent & child workflows being assigned into different shards, and thereby not necessarily replicated at the same time.

In this case, it’s possible that the parent workflow starts a child workflow, and that child workflow’s shard is replicated to the new cluster, we then perform a failover, and the parent workflow’s event history is lagged and attempts to start the child again.

We’ve only observed this problem once. I’m not really sure what the resolution would be for this. Can we catch the exception and then populate our own child workflow stub? :man_shrugging: It’s a difficult condition to reproduce and test; we’ll likely need to create a new loadtest scenario for Maru that exercises child workflows.

SDK metrics don’t include cluster names

Right now, it’s not possible to associate metrics (e.g. polling metrics) with a particular Temporal cluster, as metric tags for the cluster are not included. We’re not totally sure if this is actually a material issue per-se, but it would help us to signal whether or not we’ve actually seen applications stop polling the desired cluster. Today, we can infer that a cluster is no longer receiving polling traffic from a cluster based on the the task poll metrics dropping, so that’s fine.

Future Improvements

We only support the Java SDK right now, but our “Paved Road” supports other languages which we’ll need feature parity with. In the future, I’d like to explore writing a global routing service that implements the Temporal gRPC API that would handle routing to the correct active cluster. There is the dcReplicationPolicy configuration that Temporal offers to route a subset of API traffic from a passive cluster to the active, but that behavior only works in the happy path when the cluster is actually healthy and online.

5 Likes

@RobZienert, I’m currently working on establishing cross-region replication between two temporal clusters (deployed to EKS using helm) and I’m getting i/o timeout issues in the history logs of one cluster trying to connect to the ALB ingress of the other cluster’s frontend service.

IP 10.33.43.222 represents the ALB ingress that routes traffic to the frontend pod (on port 7233)

transport: Error while dialing dial tcp 10.33.43.222:433: i/o timeout

I’ve verified that a pod (with the grpcurl cli tool installed) in one cluster can access the ALB ingress of the other cluster just fine (either by dns name or by IP address).

I’m not sure why the history pod would get an i/o timeout trying to connect to the frontend service ingress of the other cluster… unless I’m misunderstanding what the rpcAddress value of the clusterMetadata needs to point at or possibly an ALB ingress isn’t supported for XDC and I need to use an NLB ingress address for the rpcAddress value.

I’m curious how you have XDC replication across regions working.

Sure! First, we’re not using k8s, but deploying onto EC2 VMs directly. Here’s our clusterMetadata configuration for the useast1 cluster:

clusterMetadata:
  enableGlobalNamespace: true
  failoverVersionIncrement: 10
  masterClusterName: "uswest2"
  currentClusterName: "useast1"
  clusterInformation:
    uswest2:
      enabled: true
      initialFailoverVersion: 1
      rpcName: "frontend"
      rpcAddress: "temporal-main-uswest2:8980"
    useast1:
      enabled: true
      initialFailoverVersion: 2
      rpcName: "frontend"
      rpcAddress: "temporal-main-useast1:8980"

We’re using NLBs, but not out of requirement, ALBs should work fine. Unfortunately, when it comes to networking in AWS, I’m going to be pretty ineffective since the Netflix way of networking is quite unlike other AWS customers (so I hear). Is your frontend service in the public subnet and exposed to the internet (or have a tunnel to the other region)?

I’m using something like this:

    clusterMetadata:
      enableGlobalDomain: true
      failoverVersionIncrement: 10
      masterClusterName: "temporaltest-east1"
      currentClusterName: "temporaltest-east2"
      clusterInformation:
        temporaltest-east1:
          enabled: true
          initialFailoverVersion: 1
          rpcName: "temporaltest-frontend-east1"
          rpcAddress: "dns:///temporaltest-frontend-east1.mydomain.com:433"
        temporaltest-east2:
          enabled: true
          initialFailoverVersion: 2
          rpcName: "temporaltest-frontend-east2"
          rpcAddress: "temporaltest-frontend:7233"

The only differences I see are that you’re not using dns:/// for the other cluster and you’re not including an FQDN in the rpcAddress value for the other cluster. I’m not sure about the lack of dns:///, but I understand why an FQDN would be required in EKS due to the ClusterFirst dns resolution logic in pods (i.e. pod networking uses kubedns to traverse the *.cluster.local domain and subdomains to locate the hostname if an FQDN is not defined).

All my ALBs are in private subnets with VPC Peering connecting them. I’ve verified that connectivity between regions works fine. I think I’ll try without the dns:// to see if that changes anything.

I scrubbed our endpoints, but they’re FQDNs. I’ve updated the original post to set that context correctly. We don’t use dns:/// though, that’s true.

So, removing dns:/// didn’t do anything. Same i/o timeout issue.

I can invoke the ALB-based frontend grpc services in both clusters from custom-deployed pods in either cluster.

East 1 pod → East 2 frontend

bash-5.1$ grpcurl -d '{"service":"temporal.api.workflowservice.v1.WorkflowService"}' --insecure temporaltest-frontend-east2.mydomain.com:443 grpc.health.v1.Health/Check
{
  "status": "SERVING"
}

East 2 pod → East 1 frontend

bash-5.1$ grpcurl -d '{"service":"temporal.api.workflowservice.v1.WorkflowService"}' --insecure temporaltest-frontend-east1.mydomain.com:443 grpc.health.v1.Health/Check
{
  "status": "SERVING"
}

I’m not sure why the history pod would not work exactly like my custom-deployed grpcurl pod, networking-wise.

I guess I could be having an issue with TLS due to using self-signed certs. I get by that in my grpcurl cli tests by adding --insecure. I’ve added the self-signed cert and key as the certFile/keyFile/clientCaFiles values in the tls section with the tls.internode.client.disableHostVerification and tls.frontend.client.disableHostVerification values to true which I’m assuming will configure the history replication process to ignore client hostname verification issues when making connections to other clusters.

Just wanted to chime in that we have a very similar setup at DigitalOcean for some of the nascent temporal use cases (I think we had previously hopped on a call to do some knowledge sharing a while back when we had just started).

While our actual deployment topology and language stack (we use the go sdk) is different, we use a very similar approach in providing an internal temporal package that is xdc aware. Prior to this we had explored whether we could rely on the dc request redirection features to achieve this transparently, but we found that it introduced a number of edge cases around cluster state/health, replication status/delay/consistency. Your idea of a routing service is something we’ve also thought about but have not looked into further since the xdc aware client is currently serving our limited needs, but as adoption and use cases increase I can certainly see how this could be very valuable.

One question I had for you around your experience is regarding temporal version upgrades and how you manage that operationally. We’ve tried a few different approaches such as:

  • upgrading the clusters in place
  • upgrading the non-active cluster, failing over if it looks healthy, and then upgrading the previously active cluster

But our workloads are limited at the moment where in place upgrades are not really disruptive. I’m curious what your strategy has been and any learnings you’ve had exercising it.

Yes! Our call was very helpful in forming our XDC strategy!

Our upgrade process is somewhat straightforward (I think?). We use a deploy strategy called Rolling Red Black. Here’s a screenshot of our main environment delivery pipeline (which in this case, we skipped canaries, since it was part of the 1.11.x upgrade which had us redeploying a bunch for the advanced visibility migration).

In our configuration, we roll out each cluster[1] concurrently 10% at a time: We add 10% capacity, await for UP status in Eureka (service discovery), then disable 10% of the old cluster, then rinse repeat until we’ve reached 100% traffic on the new version. Once we’ve deployed to us-west-2, we deploy to us-east-1 following the same procedure. Since our Eureka registration process is a wee bit slow (by my own design), we don’t have a wait in between rollout steps–such a wait was recommended to us.

[1]: “cluster” in the Spinnaker taxonomy, e.g. temporal-main-frontend, temporal-main-history, temporal-main-matching.

We’ve not observed any issues with this deploy process from a metrics perspective, although we’ve predominantly stopped paying any attention to logs - there’s a lot of error noise in there that doesn’t really indicate any problem. Service metrics are unaffected during a rollout.

While Temporal supports failing over individual namespaces to different clusters, we don’t currently do this since we’re on SQL - all namespaces are active in a single cluster, while another is in standby and does basically nothing. So, for schema upgrades, we upgrade & deploy the standby cluster first, then failover all traffic to the standby and upgrade the original active cluster. Once the deploy has finished, we’ll failover back to the originally active cluster.

Our test environment is only doing ~30k workflows per day, so in-place schema migrations has been fine. I haven’t tried using in-place for staging (~50k/d) or main (~500k/d) because I’m cautious of perf issues in case of blocking DDL changes (e.g. DEFAULT on newly added columns, which I’ve seen frequently). I’m not a database expert by any means, so I may be unnecessarily cautious but that’s an expense I’m willing to pay for peace of mind.

ed: Fixed some metric numbers from an incorrect time range.

@RobZienert (et al), Thanks for the information about the upgrade strategies.

One more crazy thing to note regarding my issues with XDC replication.

Not sure why it never occurred to me before but I finally figured out that I can use the TEMPORAL_CLI_ADDRESS env var to verify GRPC connectivity between clusters from any temporal pod as they all have the tctl cli. The results I got are head-scratching.

From the history pod in east-1: (could have been any temporal pod)

bash-5.0# export TEMPORAL_CLI_ADDRESS=temporaltest-frontend-east2.mydomain.com:443
bash-5.0# tctl cl h
temporal.api.workflowservice.v1.WorkflowService: SERVING
bash-5.0# 

And I have verified that the tctl command is, in fact, connecting to the other cluster by running the tctl admin cluster describe command and verifying I’m seeing the IPs for the east-2 cluster pods, just not posting those results here (I’m lazy and don’t want to scrub any more :slight_smile: )

And for good measure, I’ve checked the ALB access logs and can see the tctl commands are actually connecting to the east-2 ALB from the east-1 pod. However, interestingly enough I don’t see any ALB access logs for the history service attempting to connect to the east-2 ALB, which I find strange.

Finally, for thoroughness, I tried configuring the TEMPORAL_CLI_ADDRESS env var as one of the ALB IPs and get the same results as above.

I’m seriously scratching my head why the tctl admin cluster describe can connect to both clusters but the history service get’s an i/o timeout.

I’ve still not found a valid reason why my xdc connections are “timing out” and there appear to be 4 concurrent timeouts with each set of 4 timeouts reoccurring every 1 second.

I found this PR which seems to indicate that current ringpop traffic was not using TLS prior to this PR. I am wondering if someone knows if XDC replication is impacted by this change (i.e. was XDC replication using ringpop communication channels and was thus not using mTLS but with this PR, with this PR it should now be using the internode mTLS configuration?)

Your networking issues might be better addressed via Slack? Debugging network issues is always a pain since it’s often so uniquely implemented company-to-company. Interesting callout about internode mTLS. That fix shouldn’t affect XDC, though, since it replicates through the frontend service (as opposed to the worker calling the history service directly, for example).

Unfortunately my organization does not use slack. I wish it did. We’re an MS Teams shop :slightly_frowning_face:

Can anyone comment one why the tctl commands would work but the temporal-server executable (that the docker image is running) would not be able to connect?

The other thing I think of that could be impacting things is that I do have the TEMPORAL_CLI_TLS_* env vars defined in the container to control which certs and validation to perform for the tctl command line tool.

From what I can see in the temporal code, the tctl binary uses these env vars but the temporal-server binary does not appear to use these… which makes me wonder if mTLS is actually my root cause. However, I’m not sure that the env vars are the issue as I get a different error when i don’t set these env vars.

Error Details: rpc error: code = Unavailable desc = connection error: desc = "transport: authentication handshake failed: x509: certificate signed by unknown authority"
('export TEMPORAL_CLI_SHOW_STACKS=1' to see stack traces)

So, I was finally able to get replication between clusters working but I had to switch over to an NLB to get it to work. I still have the ALB which works fine for SDK clients and plan on continuing to use it for that purpose.

Unfortunately I couldn’t get the direct-to-ip NLB type (aws-load-balancer-type: "nlb-ip") to work and had to revert to the nodeport based NLB type (aws-load-balancer-type: "nlb"). It’s one extra network hop but at least it works. Something about NLB direct-to-pod communication is being blocked in my environment, not sure why.

@RobZienert, did you have to create the namespace on each cluster or does replication handle creating the namespace on each replication target?

It’s been awhile since I’ve had to do this, but I believe namespace creation is replicated for you, just like namespace changes are. @Wenquan_Xing would be able to confirm.

So, the namespace is getting created on the other cluster now but now when I fail over the namespace to the other cluster and start my app connecting to the new active cluster with the failed over namespace, I can’t even start a new workflow. I get a connection timeout. If I connect to the same cluster but use a namespace that’s local to that cluster, it works fine.

io.grpc.StatusRuntimeException: DEADLINE_EXCEEDED: deadline exceeded after 9.999788326s.

Any ideas?

Also, I’ve found that I can’t terminate a workflow that’s been synchronized over to the failover cluster from that cluster’s web interface even though I’ve marked the failover cluster as active. I get the same context deadline exceeded error.

However, I can terminate workflows from either cluster’s web interface if I haven’t failed over the namespace.

When you say the namespace was created on the other cluster, that was done via Temporal, correct? You shouldn’t need to create the namespace on each cluster, otherwise replication will get messed up. It kinda sounds like your ack levels for the namespaces are buggered, but I don’t recall the resolution to this.

Assuming connectivity between the Temporal clusters is working, you should be able to start and terminate from either cluster regardless of which cluster is active for the namespace (assuming you have DC replication enabled - which you absolutely should have).

Namespaces in the non-active cluster is created via the xdc process built into Temporal, yes.

I can definitely stop and start workflows from either cluster so long as I haven’t failed the namespace over. However, as soon as I fail over the namespace to the other cluster I can no longer stop/start any workflows and calls to start/stop workflows time out.

I’m not sure what you mean by ack levels.

One thing I’m questioning is whether I have the right RPC address values for both clusters. Right now I’m using a cluster-local service name (no FQDN) for the local frontend and an FQDN for the RPC address for the other cluster. I’m wondering if I should be using the FQDN RPC address for both in both clusters.

Also, I’ve played around with changing the masterClusterName to see how that changes things. I’ve tested pointing both clusters at a single master cluster and I’ve also tested having each cluster be it’s own master but that change only really seems to impact whether I can directly create new namespaces in the cluster. Replication seems to behave the same regardless whether there’s only one master or whether each is it’s own master. However, I’m not 100% positive as I’ve really not been able to fail over in either scenario.

Here’s what I have for the cluster config for each cluster right now:

Temporal Config In East 1

enableGlobalNamespace: true
failoverVersionIncrement: 10
currentClusterName: temporaltest-east1
masterClusterName: temporaltest-east1
clusterInformation:
  temporaltest-east1:
    enabled: true
    initialFailoverVersion: 1
    rpcAddress: temporaltest-frontend:443
    rpcName: temporaltest-frontend-east1
  temporaltest-east2:
    enabled: true
    initialFailoverVersion: 2
    rpcAddress: dns:///temporaltest-frontend-east2.mydomain.com:443
    rpcName: temporaltest-frontend-east2

Temporal Config In East 2

enableGlobalNamespace: true
failoverVersionIncrement: 10
currentClusterName: temporaltest-east2
masterClusterName: temporaltest-east2
clusterInformation:
  temporaltest-east2:
    enabled: true
    initialFailoverVersion: 1
    rpcAddress: temporaltest-frontend:443
    rpcName: temporaltest-frontend-east2
  temporaltest-east1:
    enabled: true
    initialFailoverVersion: 2
    rpcAddress: dns:///temporaltest-frontend-east1.mydomain.com:443
    rpcName: temporaltest-frontend-east1

I think I found my problem. I think my problem was that the initialFailoverVersion value for each cluster was not the same across clusters. I was actually reversing the numbers in the config yaml file from one cluster to the other cluster.