Using dynamic task queues for traffic routing

Today, I did a little lightning talk at the Community Meetup about what we’ve done at Netflix to extend Temporal for our purposes. One extension that we’ve added to the Java SDK is traffic routing via dynamic task queue names. There was some interest for me to go a little more indepth on this particular feature, so this post will dive into what that looks like and how it works!

Why?

In my eyes, Traffic Routing has two pillars of use:

  1. Development
  2. Operations

For development / debugging, it can be helpful to send a slice of production traffic to your laptop to dig into end-to-end behaviors. Our product, Spinnaker, consists of many services and a huge variety of usages by users. Running everything locally to exercise an end-to-end use cases is not often possible without melting a laptop. Instead, being able to run a single service and route production traffic can make things pretty easy.

For operations, traffic routing rules can be used to dynamically shard workers by whatever rules you like. Let’s say you are writing a multi-cloud control plane. The task queue may be cloud-operations initially, but eventually you want to run each cloud provider on different worker infrastructure. With traffic routing, you could dynamically shard this traffic without making any code changes.

It’s worth noting that having powerful functionality like this is also inherently dangerous: With a bad configuration, you can cause downtime. We believe the flexibility it provides is worth the risk, but you should weigh risks vs rewards for yourself. As for Netflix, we have this particular capability available - for better and worse - across a wide range of infrastructure and use cases separate from Temporal.

High-level

Thankfully, task queues in Temporal are cheap to make. If this were not the case, a feature like traffic routing would be harder or impossible to implement at scale. In our particular use cases, these route task queues are typically somewhat short-lived.

The Netflix Temporal SDK has an activity interface called TaskQueueNamer which forms the basis of this feature. Different implementations are wired up into Spring Boot based on context. We use this TaskQueueNamer everywhere that task queue names are specified. In most cases and by default behavior, a provided name input will result in an equal output.

/**
 * Creates task queue names.
 */
@ActivityInterface(namePrefix = "TaskQueueNamer-")
public interface TaskQueueNamer {

  /**
   * Returns a task queue name given a base {@code name}.
   */
  @Nonnull
  String name(@Nonnull String name);
}

By using this interface everywhere, we’re able to compose conventions on top of task queues names that application developers don’t need to care about while still getting additive features - like traffic routing - for “free”.

A non-traffic routing use case for this is our LaptopTaskQueueNamer, which renames all task queues when running an application locally so that operations running on your laptop do not interfere with applications running in the test environment (all laptops connect by default to our test Temporal clusters, rather than running it locally).

Traffic Routing rules are saved into Fast Properties (FP); a global Platform service for dynamic configuration at Netflix. All Netflix Spring Boot apps have a procedure to regularly update FPs that apply to their scope which we utilize for updating each application’s traffic route rules. These FPs have their own constraints that allow us to set scope at any infrastructure granularity (globally, by-region, AZ, application name, cluster name, individual instance ID, etc).

The value of a FP can be whatever we want, and for our purposes, it’s a YAML document. A traffic route rule consists of a name, strategy and its own application-specific constraints separate from those of the Fast Property. Here’s an example for the property temporal.routing.clouddriver-cloud-operations.rzienert:

strategy: laptop
config:
  user: hi@example.com
constraints:
  include:
    mdc/X-SPINNAKER-USER: someone-else@example.com

Adding this Fast Property will then rename all invocations of clouddriver-cloud-operations to hi@example.com/clouddriver-cloud-operations, which is our naming convention for laptop task queues, but only when the X-SPINNAKER-USER MDC value matches someone-else@example.com. We use a ContextPropagator in the Temporal SDK to pass this information into Workflow & Activity executions.

The property name is meaningful as well, since it includes the original task queue name (clouddriver-cloud-operations) and a namespace (rzienert). Since Fast Properties are non-hierarchical, the rzienert bit just makes it so multiple routes can exist for the same original task queue name. By convention, it’s named after the user who created it, but it could be anything (like a feature name).

Let’s look into how this all works behind the scenes… incoming Kotlin & Spring code!

Implementation

There are two TaskQueueNamer implementations for traffic routing: RoutingLaptopTaskQueueNamer and RoutingTaskQueueNamer. The first is only wired up when spring.profiles.active includes laptop, which is automatically set when running an application on your laptop. The other is the default TaskQueueNamer that is wired up in a deployed application. Both implementations pull configuration from a TrafficRouteRepository, which stores a bunch of TrafficRouteProperty objects.

/**
 * The shape of a traffic route fast property value.
 */
public class TrafficRouteProperty(
  public val strategy: String,
  public val constraints: TrafficRouteConstraints = TrafficRouteConstraints(),
  public val config: Map<String, String> = mapOf()
) {
  public lateinit var originalTaskQueueName: String
}

This should look similar to the shape of the YAML from above: We deserialize the YAML onto this. Each property must have a strategy defined, which informs the TaskQueueNamers on how to rename the task queue. We ship with two built-in strategies: value and laptop, but an application owner could provide their own strategy if they were motivated. The config field is open-ended and used to configure the strategy. For laptop, we just need user so we know how to rename the queue following the laptop task queue naming convention:

/**
 * Renames task queues for work that should be routed to a laptop.
 *
 * Must be compatible with [LaptopTaskQueueNamer] in `temporal-java-spring`.
 */
@Component
public class LaptopTaskQueueRouteStrategy : TaskQueueRouteStrategy {

  private val log by lazy { LoggerFactory.getLogger(javaClass) }

  override val type: String = "laptop"

  override fun resolveTaskQueueName(property: TrafficRouteProperty): String {
    val user = property.config["user"]
    if (user.isNullOrBlank()) {
      log.warn("Misconfigured laptop config: Missing 'user', will not override task queue: $property")
      return property.originalTaskQueueName
    }
    return "$user/${property.originalTaskQueueName}"
  }
}

The RoutingTaskQueueNamer (which is used in deployed applications) then uses the repository of TrafficRouteProperty objects, finding the configured strategy and attempts to find a property that matches:

public const val routingTaskQueueNamerBeanName: String = "routingTaskQueueNamer"

/**
 * Deployed worker traffic routing task queue namer.
 */
@Component(routingTaskQueueNamerBeanName)
public class RoutingTaskQueueNamer(
  private val repository: TrafficRouteRepository,
  private val strategies: List<TaskQueueRouteStrategy>
) : TaskQueueNamer {

  private val log by lazy { LoggerFactory.getLogger(javaClass) }

  public override fun name(name: String): String {
    val property = repository.findMatching(name) ?: return name
    return strategies.of(property.strategy, log)?.resolveTaskQueueName(property) ?: name
  }
}

All of this is good and well, but renaming all task queue interactions isn’t often helpful (I’d say it’s dangerous). To help with scoping, we include constraints on a TrafficRouteProperty:

/**
 * A constraint for the task queue traffic route property, providing the main mechanism for targeting
 * specific tasks for rerouting.
 *
 * All [include] and [exclude] rules must evaluate successfully for the overall constraint to be satisfied.
 */
public data class TrafficRouteConstraints(
  val include: Map<String, String?> = mapOf(),
  val exclude: Map<String, String?> = mapOf()
)

All constraint keys share the same naming convention: {evaluatorName}/{key}. Constraint evaluators are defined as an interface, so application developers can also add their own unique evaluators if the ones that we ship with the SDK don’t fit the bill (MDC and Environment):

/**
 * Evaluates [TrafficRouteConstraints] associated with a [TrafficRouteProperty].
 */
public interface ConstraintEvaluator {

  /**
   * Returns true if all property constraints are satisfied.
   */
  public fun evaluate(constraints: TrafficRouteConstraints): Boolean
}

Calling back to our original property example, there was a constraint named mdc/X-SPINNAKER-USER, which would map to the MdcConstraintEvaluator:

private const val MDC_RULE_PREFIX = "mdc/"

/**
 * Matches constraints based on MDC values.
 *
 * The evaluator will look for constraint rules that have a key that starts with [MDC_RULE_PREFIX].
 * This evaluator is case-sensitive, and accepts `null` values for constraining on the the _absence_
 * of a particular key.
 */
@Component
public class MdcConstraintEvaluator : ConstraintEvaluator {

  override fun evaluate(constraints: TrafficRouteConstraints): Boolean {
    var includeMatch = true
    if (constraints.include.hasMdcRules()) {
      includeMatch = constraints.include.matchesMdc()
    }

    var excludeMatch = false
    if (constraints.exclude.hasMdcRules()) {
      excludeMatch = constraints.exclude.matchesMdc()
    }

    return includeMatch && !excludeMatch
  }

  private fun Map<String, String?>.hasMdcRules(): Boolean {
    return keys.any { it.startsWith(MDC_RULE_PREFIX) }
  }

  private fun Map<String, String?>.matchesMdc(): Boolean {
    return entries
      .filter { it.key.startsWith(MDC_RULE_PREFIX) }
      .all {
        val mdcValue = MDC.get(it.key.substring(4))
        val expectedValue = it.value

        if (mdcValue == null && expectedValue == null) {
          true
        } else {
          mdcValue == expectedValue
        }
      }
  }
}

That’s really all of the major pieces. With an application that is deployed with the traffic routing library, we can route individual workflows and activities anywhere we like! One major feature gap in this implementation is that we have no way of restoring in-flight traffic to the original task queue. It’s possible that we may make a deregistration process that shovels a defunct traffic routed task queue to the original, but I haven’t really figured out if that is possible or even worth the effort - so far we’ve been able to get on fine without it.

OSS Plans?

There was a question about open sourcing our various SDK improvements we’ve built, including this one. Right now, this isn’t a priority for us, primarily because we’re building these SDK enhancements atop closed-source Netflix libraries. If people would like unbuildable code to reference and inspire an OSS version, however, I’d be open to throwing up a GitHub repo for people to poke at.

That said, we’d be happy to explore contributing various features to the SDKs we’ve involved in if there’s enough demand / interest from the Temporal team.

Thanks for tuning in. As always, I’m happy to answer any questions or give more detailed code examples, etc.

5 Likes

@RobZienert great write-up. Longer-term I see us implementing a similar routing system natively by the service. This post could be used as an inspiration. I filed a feature request to get this tracked.

Have you considered using interceptors to perform all the task queue manipulation without changing any workflow code?

1 Like

Thanks! We’ll keep an eye on that ticket!

I hadn’t considered using interceptors for this use case specifically. I really like that since it’d make the whole thing an implementation detail rather than yet another thing for people to remember to use. :eyes: Might have to revisit this feature to see how it’d look done like this.

One curiosity I have with interceptors - since the logic is performed within the Workflow thread context, would resolving the task queue name still be done through an activity? I would presume so, but just verifying.

One curiosity I have with interceptors - since the logic is performed within the Workflow thread context, would resolving the task queue name still be done through an activity? I would presume so, but just verifying.

Changing a task queue name (as well as any of the ActivityOptions) is a backward-compatible change. So there is no need to use activity to look those up in an interceptor. The only caveat is that the resolver should be relatively fast to not block the workflow thread for long which is going to trip the deadlock detector.

1 Like

Would be interesting to see these SDKs, so we can dig into them and see other ways to improve Temporal Java SDK.

2 Likes

Here’s the traffic-routing code. I deleted all of the tests from the repo, as they had a bunch of email address references, etc that I didn’t feel like scrubbing out… but it should be mostly readable: GitHub - robzienert/temporal-sdk-snippets

1 Like