XDC Replication in Practice

I had started a thread in Slack about trying to find other users of XDC that exercise failovers of XDC regularly. While it’s experimental, it’s a crucial feature for us at Netflix to have a multi-region presence (especially for a Platform service like this), so I’m seeking out others who have been playing with it or actively depend on it like we do.

This is a post of what we know, what we’re doing and what our experience of the feature is so far. I’m eager to learn where we might improve, and also provide whatever additional info might be helpful for future users! :slightly_smiling_face:

Background

We currently operate Temporal out of AWS us-west-2 and us-east-1. We’ve elected to name each cluster after the region to make things easier to reason about: The Temporal doc examples show having clusters named “active” and “passive”, however this gets confusing once you failover and the cluster named “active” is no longer active. It’s important to note that a cluster cannot be renamed.

We’ve not experienced many problems with XDC on a day-to-day basis. Replication task lag is normally around ~170ms. My current understanding (based on intuition), is that this metric is how long it takes for the replication worker to actually process a task, rather than it being the end-to-end replication lag to deliver the task to passive clusters.

Our cross-regional latencies are quite low, so we’ve not observed any substantial issues here, although we’ve not yet attempted replicating across oceans.

Internal Temporal SDK

We offer a Netflix Temporal SDK, which layers on additional functionality for our customers atop the Java Temporal SDK. This includes integration with Spring Boot and a variety of Platform features and services, and includes a handful of conventions and higher-level abstractions that customers can use to be safer and more productive. One such abstraction is managed handling of Temporal in an XDC topology.

Following guidance of other posts in the community, an application will create a WorkerFactory for each cluster. A (partial) example of an application config, (note: the URIs are scrubbed; we use FQDNs in reality):

temporal:
  enabled: true
  defaultNamespace: spinnaker
  clusters:
    uswest2:
      uiTarget: https://temporal-main-uswest2
      workerTarget: temporal-main-uswest2:8980
    useast1:
      uiTarget: https://temporal-main-useast1
      workerTarget: temporal-main-useast1:8980
  workers:
    cloud-operation-runners:
      awaitTerminationTimeout: PT5M
      enabled: true
    titus-cloud-operations:
      awaitTerminationTimeout: PT20M
      enabled: true
    aws-cloud-operations:
      awaitTerminationTimeout: PT20M
      enabled: true

For each cluster, we create a WorkflowServiceStubs and WorkflowClient. Then, we iterate over the workers map, creating a WorkerFactory for each cluster. So, in the above example, we’d end up with 6 WorkerFactory objects in the Spring context. All WorkerFactory objects are polling by default, so if we as Temporal operators were to failover Temporal itself, applications would continue to process work without skipping a beat.

We’re aiming for Temporal to be as hands-off operationally as possible for our customers. Our SDK comes with an XdcClusterSupport class which regularly polls Temporal for the active cluster of the namespaces the application is connected to. Rather than developers injecting WorkflowServiceStubs or WorkflowClient directly and figuring out which one they should use, we have Provider classes for each (as well as WorkerFactory), which will automatically select the correct object for the namespace’s active cluster. For example:

@Component
public class WorkerFactoryProvider(
  private val workerFactories: Map<String, WorkerFactory>,
  private val xdcClusterSupport: XdcClusterSupport
) {

  /**
   * Get the [WorkerFactory] by its [name] attached to the active cluster.
   */
  public fun get(name: String, active: Boolean = true): WorkerFactory {
    return workerFactories
      .filter { it.key.split("-").first() == name }
      .values
      .firstOrNull {
        val workflowClient = it.workflowClient
        if (workflowClient !is ClusterAware) {
          false
        } else {
          val activeCluster = xdcClusterSupport.getActiveCluster(workflowClient.options.namespace)
          (workflowClient.clusterName == activeCluster) == active
        }
      }
      ?: throw NetflixTemporalException("WorkerFactory for '$name' not found")
  }
}

The XDC integration in the SDK also integrates with our dynamic config service so we as Temporal operators can globally (or singularly) signal applications to disable polling of a specific cluster if we need to actually take the cluster down for maintenance. This is especially important for us since we use SQL persistence, so we can’t simply scale out our persistence to meet new load demands like we could with Cassandra–we have to take cluster downtime.

Existing Issues

These aren’t all-inclusive, but they’re the ones we’ve identified so far in our failover exercises.

io.temporal.client.WorkflowExecutionAlreadyStarted

When using child workflows, parent workflows can fail with this exception during a failover. We hypothesize that it’s a race condition of parent & child workflows being assigned into different shards, and thereby not necessarily replicated at the same time.

In this case, it’s possible that the parent workflow starts a child workflow, and that child workflow’s shard is replicated to the new cluster, we then perform a failover, and the parent workflow’s event history is lagged and attempts to start the child again.

We’ve only observed this problem once. I’m not really sure what the resolution would be for this. Can we catch the exception and then populate our own child workflow stub? :man_shrugging: It’s a difficult condition to reproduce and test; we’ll likely need to create a new loadtest scenario for Maru that exercises child workflows.

SDK metrics don’t include cluster names

Right now, it’s not possible to associate metrics (e.g. polling metrics) with a particular Temporal cluster, as metric tags for the cluster are not included. We’re not totally sure if this is actually a material issue per-se, but it would help us to signal whether or not we’ve actually seen applications stop polling the desired cluster. Today, we can infer that a cluster is no longer receiving polling traffic from a cluster based on the the task poll metrics dropping, so that’s fine.

Future Improvements

We only support the Java SDK right now, but our “Paved Road” supports other languages which we’ll need feature parity with. In the future, I’d like to explore writing a global routing service that implements the Temporal gRPC API that would handle routing to the correct active cluster. There is the dcReplicationPolicy configuration that Temporal offers to route a subset of API traffic from a passive cluster to the active, but that behavior only works in the happy path when the cluster is actually healthy and online.

3 Likes

@RobZienert, I’m currently working on establishing cross-region replication between two temporal clusters (deployed to EKS using helm) and I’m getting i/o timeout issues in the history logs of one cluster trying to connect to the ALB ingress of the other cluster’s frontend service.

IP 10.33.43.222 represents the ALB ingress that routes traffic to the frontend pod (on port 7233)

transport: Error while dialing dial tcp 10.33.43.222:433: i/o timeout

I’ve verified that a pod (with the grpcurl cli tool installed) in one cluster can access the ALB ingress of the other cluster just fine (either by dns name or by IP address).

I’m not sure why the history pod would get an i/o timeout trying to connect to the frontend service ingress of the other cluster… unless I’m misunderstanding what the rpcAddress value of the clusterMetadata needs to point at or possibly an ALB ingress isn’t supported for XDC and I need to use an NLB ingress address for the rpcAddress value.

I’m curious how you have XDC replication across regions working.

Sure! First, we’re not using k8s, but deploying onto EC2 VMs directly. Here’s our clusterMetadata configuration for the useast1 cluster:

clusterMetadata:
  enableGlobalNamespace: true
  failoverVersionIncrement: 10
  masterClusterName: "uswest2"
  currentClusterName: "useast1"
  clusterInformation:
    uswest2:
      enabled: true
      initialFailoverVersion: 1
      rpcName: "frontend"
      rpcAddress: "temporal-main-uswest2:8980"
    useast1:
      enabled: true
      initialFailoverVersion: 2
      rpcName: "frontend"
      rpcAddress: "temporal-main-useast1:8980"

We’re using NLBs, but not out of requirement, ALBs should work fine. Unfortunately, when it comes to networking in AWS, I’m going to be pretty ineffective since the Netflix way of networking is quite unlike other AWS customers (so I hear). Is your frontend service in the public subnet and exposed to the internet (or have a tunnel to the other region)?

I’m using something like this:

    clusterMetadata:
      enableGlobalDomain: true
      failoverVersionIncrement: 10
      masterClusterName: "temporaltest-east1"
      currentClusterName: "temporaltest-east2"
      clusterInformation:
        temporaltest-east1:
          enabled: true
          initialFailoverVersion: 1
          rpcName: "temporaltest-frontend-east1"
          rpcAddress: "dns:///temporaltest-frontend-east1.mydomain.com:433"
        temporaltest-east2:
          enabled: true
          initialFailoverVersion: 2
          rpcName: "temporaltest-frontend-east2"
          rpcAddress: "temporaltest-frontend:7233"

The only differences I see are that you’re not using dns:/// for the other cluster and you’re not including an FQDN in the rpcAddress value for the other cluster. I’m not sure about the lack of dns:///, but I understand why an FQDN would be required in EKS due to the ClusterFirst dns resolution logic in pods (i.e. pod networking uses kubedns to traverse the *.cluster.local domain and subdomains to locate the hostname if an FQDN is not defined).

All my ALBs are in private subnets with VPC Peering connecting them. I’ve verified that connectivity between regions works fine. I think I’ll try without the dns:// to see if that changes anything.

I scrubbed our endpoints, but they’re FQDNs. I’ve updated the original post to set that context correctly. We don’t use dns:/// though, that’s true.

So, removing dns:/// didn’t do anything. Same i/o timeout issue.

I can invoke the ALB-based frontend grpc services in both clusters from custom-deployed pods in either cluster.

East 1 pod → East 2 frontend

bash-5.1$ grpcurl -d '{"service":"temporal.api.workflowservice.v1.WorkflowService"}' --insecure temporaltest-frontend-east2.mydomain.com:443 grpc.health.v1.Health/Check
{
  "status": "SERVING"
}

East 2 pod → East 1 frontend

bash-5.1$ grpcurl -d '{"service":"temporal.api.workflowservice.v1.WorkflowService"}' --insecure temporaltest-frontend-east1.mydomain.com:443 grpc.health.v1.Health/Check
{
  "status": "SERVING"
}

I’m not sure why the history pod would not work exactly like my custom-deployed grpcurl pod, networking-wise.

I guess I could be having an issue with TLS due to using self-signed certs. I get by that in my grpcurl cli tests by adding --insecure. I’ve added the self-signed cert and key as the certFile/keyFile/clientCaFiles values in the tls section with the tls.internode.client.disableHostVerification and tls.frontend.client.disableHostVerification values to true which I’m assuming will configure the history replication process to ignore client hostname verification issues when making connections to other clusters.

Just wanted to chime in that we have a very similar setup at DigitalOcean for some of the nascent temporal use cases (I think we had previously hopped on a call to do some knowledge sharing a while back when we had just started).

While our actual deployment topology and language stack (we use the go sdk) is different, we use a very similar approach in providing an internal temporal package that is xdc aware. Prior to this we had explored whether we could rely on the dc request redirection features to achieve this transparently, but we found that it introduced a number of edge cases around cluster state/health, replication status/delay/consistency. Your idea of a routing service is something we’ve also thought about but have not looked into further since the xdc aware client is currently serving our limited needs, but as adoption and use cases increase I can certainly see how this could be very valuable.

One question I had for you around your experience is regarding temporal version upgrades and how you manage that operationally. We’ve tried a few different approaches such as:

  • upgrading the clusters in place
  • upgrading the non-active cluster, failing over if it looks healthy, and then upgrading the previously active cluster

But our workloads are limited at the moment where in place upgrades are not really disruptive. I’m curious what your strategy has been and any learnings you’ve had exercising it.

Yes! Our call was very helpful in forming our XDC strategy!

Our upgrade process is somewhat straightforward (I think?). We use a deploy strategy called Rolling Red Black. Here’s a screenshot of our main environment delivery pipeline (which in this case, we skipped canaries, since it was part of the 1.11.x upgrade which had us redeploying a bunch for the advanced visibility migration).

In our configuration, we roll out each cluster[1] concurrently 10% at a time: We add 10% capacity, await for UP status in Eureka (service discovery), then disable 10% of the old cluster, then rinse repeat until we’ve reached 100% traffic on the new version. Once we’ve deployed to us-west-2, we deploy to us-east-1 following the same procedure. Since our Eureka registration process is a wee bit slow (by my own design), we don’t have a wait in between rollout steps–such a wait was recommended to us.

[1]: “cluster” in the Spinnaker taxonomy, e.g. temporal-main-frontend, temporal-main-history, temporal-main-matching.

We’ve not observed any issues with this deploy process from a metrics perspective, although we’ve predominantly stopped paying any attention to logs - there’s a lot of error noise in there that doesn’t really indicate any problem. Service metrics are unaffected during a rollout.

While Temporal supports failing over individual namespaces to different clusters, we don’t currently do this since we’re on SQL - all namespaces are active in a single cluster, while another is in standby and does basically nothing. So, for schema upgrades, we upgrade & deploy the standby cluster first, then failover all traffic to the standby and upgrade the original active cluster. Once the deploy has finished, we’ll failover back to the originally active cluster.

Our test environment is only doing ~30k workflows per day, so in-place schema migrations has been fine. I haven’t tried using in-place for staging (~50k/d) or main (~500k/d) because I’m cautious of perf issues in case of blocking DDL changes (e.g. DEFAULT on newly added columns, which I’ve seen frequently). I’m not a database expert by any means, so I may be unnecessarily cautious but that’s an expense I’m willing to pay for peace of mind.

ed: Fixed some metric numbers from an incorrect time range.