Proxying access to Temporal Cloud

At Replay 2023, I participated in a panel about running Temporal as an internal service. In it, I outlined how Netflix made the move from on-prem to TCloud, primarily through a proxy service. There was a lot of interest in this, so this post is to go over the why’s and how’s.

overview

Netflix has a myriad of internal services that make development easier (and safer, etc, etc). When running on-prem, we had a customized version of Temporal Server that integrated these internal services. In adopting TCloud, we needed to preserve these capabilities: We want people to interact with Temporal as-if it is just another internal service. To that end, we built a proxy server through which all API & CLI requests are made. Its capabilities are:

  • Authn via Metatron mTLS
  • Authz via Gandalf
  • Payload encryption via Cryptex
  • JIT short-lived certificate minting for TCloud

When a user wants to onboard to Temporal, they create a temporal.infra.netflix.net/Namespace resource that looks like this:

apiVersion: temporal.infra.netflix.net/v1alpha1
kind: Namespace
metadata:
  name: sandbox-test
  namespace: app-58778612-9edb-3686-a9e2-1d14cf73bd2d
spec:
  region: us-west-2
  retentionDays: 3
  owner: "<EMAIL-REDACTED>"
  codecServer:
    endpoint: "<REDACTED>"
    passAccessToken: false
    includeCredentials: false

The temporaloperator Kubernetes controller creates a sandbox-test.{customerId} namespace in TCloud according to our config, and auto-invites all DIRECT members of the owner closed Google Group DL, giving each member Write access to the namespace.

The temporalproxy has an informer setup to listen to the Namespace GVR. When a new namespace is created, updated, or deleted, we will update a ClientProvider interface that generates a client with the specified CA, pointing to the correct TCloud endpoint (e.g. sndbox-test.{customerId}.tmprl.cloud.

Now, when a user wants to create a worker, they MUST provide a x-temporal-namespace gRPC header on their WorkflowService client; this allows the proxy to know which downstream cloud client to utilize. If this header is not available for any namespaced client, the request is rejected. Similarly, if someone provides a namespace header that is unknown, the request is also rejected.

When a request comes in, we terminate our internal mTLS, do authz, encrypt all payloads before sending to TCloud, and then re-sign the request with the appropriate CA. We use another internal tool to JIT create a short-lived certificate from the CA that we’ve shared with TCloud, and then forward the request. Responses are similarly decrypted and passed back to the client as-needed.

code stuff

Since there’s plenty of Netflix-specific sauce in the proxy, I’ll just enumerate the overall pattern here, rather than open sourcing our codebase.

First, the ClientProvider. There’s nothing particularly special here, it just creates a grpc.ClientConn and a workflowservice.WorkflowServiceClient and makes those available.

package tcloud

// ClientProvider is a registry for TCloud clients.
// Client lifecycle is performed by handling Namespace resource changes.
type ClientProvider interface {
	io.Closer
	cache.ResourceEventHandler

	Get(namespace string) workflowservice.WorkflowServiceClient
}


type clientHolder struct {
	grpcConn        *grpc.ClientConn
	workflowService workflowservice.WorkflowServiceClient
}

// NewClientProvider returns a ClientProvider.
func NewClientProvider(ctx context.Context, config *proxy.ServerConfig) (ClientProvider, error) {
	kpr, err := newKeyPairReloader(ctx, config.UpstreamCertFile, config.UpstreamKeyFile)
	if err != nil {
		return nil, fmt.Errorf("failed creating keypair reloader: %w", err)
	}
	kpr.Start()

	return &clientProvider{
		ctx:          ctx,
		ll:           zlog.Ctx(ctx).Named("clientProvider"),
		registry:     spectatorx.Ctx(ctx),
		certReloader: kpr,
		config:       config,
		clients:      map[string]clientHolder{},
	}, nil
}

var _ ClientProvider = &clientProvider{}

type clientProvider struct {
	ctx          context.Context
	ll           *zap.Logger
	registry     *spectator.Registry
	certReloader *keypairReloader
	config       *proxy.ServerConfig

	mu      sync.RWMutex
	clients map[string]clientHolder
}

// Get a client by namespace.
func (c *clientProvider) Get(namespace string) workflowservice.WorkflowServiceClient {
	ch, ok := c.clients[proxy.CanonicalNamespace(namespace)]
	if !ok {
		return nil
	}
	return ch.workflowService
}

// OnAdd creates a new client for the given resource if it doesn't already exist.
func (c *clientProvider) OnAdd(obj interface{}) {
	ns, ok := obj.(*v1alpha1.Namespace)
	if !ok {
		c.ll.Error("failed casting obj to namespace")
		c.registry.Counter("clientprovider.handler.errors", map[string]string{"method": "OnAdd", "namespace": "__unknown__"}).Increment()
		return
	}

	cNS := proxy.CanonicalNamespace(ns.Name)
	ll := c.ll.With(zap.String("namespace", cNS))

	ll.Debug("checking if client exists")
	c.mu.RLock()
	if _, ok := c.clients[cNS]; ok {
		ll.Debug("client for namespace already exists")
		c.mu.RUnlock()
		return
	}
	c.mu.RUnlock()

	ll.Debug("creating client")
	ch, err := c.newClientHolder(cNS)
	if err != nil {
		ll.Error("failed creating client", zap.Error(err))
		c.registry.Counter("clientprovider.handler.errors", map[string]string{"method": "OnAdd", "namespace": cNS}).Increment()
		return
	}

	c.mu.Lock()
	c.clients[cNS] = *ch
	c.mu.Unlock()

	c.registry.Counter("clientprovider.handler.created", map[string]string{"method": "OnAdd", "namespace": cNS}).Increment()
	ll.Debug("created new client")
}

// Omitted: OnUpdate
// Omitted: OnDelete

func (c *clientProvider) newClientHolder(namespace string) (*clientHolder, error) {
	startTime := time.Now()

	cc, err := c.newClientConn(namespace)
	if err != nil {
		return nil, fmt.Errorf("failed creating grpc.ClientConn: %s: %w", namespace, err)
	}

	workflowService := workflowservice.NewWorkflowServiceClient(cc)
	if verr := c.validateClientConnection(c.ctx, workflowService, namespace); verr != nil {
		c.registry.Timer("clientprovider.init-latency", map[string]string{"namespace": namespace, "result": "failure"}).Record(time.Since(startTime))
		return nil, fmt.Errorf("failed validating client connection: %w", verr)
	}

	return &clientHolder{
		grpcConn:        cc,
		workflowService: workflowService,
	}, nil
}

func (c *clientProvider) newClientConn(namespace string) (*grpc.ClientConn, error) {
	clientTLSConfig, err := c.getTLSConfig(namespace)
	if err != nil {
		c.ll.Fatal("failed creating cloud TLS config", zap.Error(err))
	}

	creds := credentials.NewTLS(clientTLSConfig)

	codecs := []converter.PayloadCodec{NewPayloadCodec()}
	codecs = append(codecs, c.config.ClientCodecs...)

	clientInterceptor, err := converter.NewPayloadCodecGRPCClientInterceptor(
		converter.PayloadCodecGRPCClientInterceptorOptions{
			Codecs: codecs,
		},
	)
	if err != nil {
		c.ll.Fatal("unable to create interceptor: %v", zap.Error(err))
	}

	return grpc.DialContext(
		c.ctx,
		proxy.EndpointHostPort(namespace),
		grpc.WithTransportCredentials(creds),
		grpc.WithChainUnaryInterceptor(
			otelgrpc.UnaryClientInterceptor(),
			clientInterceptor,
			grpc_spectator.UnaryClientInterceptor(c.registry, grpc_spectator.Tags{"namespace": namespace}),
		),
	)
}

func (c *clientProvider) validateClientConnection(ctx context.Context, workflowClient workflowservice.WorkflowServiceClient, namespace string) error {
	resp, err := workflowClient.DescribeNamespace(ctx, &workflowservice.DescribeNamespaceRequest{
		Namespace: namespace,
	})
	if err != nil {
		return fmt.Errorf("unable to validate connection for '%s' namespace: %w", namespace, err)
	}

	c.ll.Debug("backend client connected to Temporal Cloud", zap.String("namespace", resp.NamespaceInfo.Name))

	return nil
}

func (c *clientProvider) Close() error {
	var errs []error
	for _, h := range c.clients {
		if err := h.grpcConn.Close(); err != nil {
			errs = append(errs, err)
		}
	}
	if err := c.certReloader.Close(); err != nil {
		errs = append(errs, err)
	}

	if len(errs) != 0 {
		return multierr.Combine(errs...)
	}

	return nil
}

Then, we have the proxy server. Pretty straightforward here, we expose a health server and workflow service server. The workflowservice.RegisterWorkflowServiceServer func is provided by the Temporal gRPC generated code.

package server

type Proxy struct {
	logger         *zap.Logger
	clientProvider tcloud.ClientProvider
}

// Omitted: New

func (p *Proxy) Serve(listener net.Listener, opts ...grpc.ServerOption) error {
	p.logger.Debug("creating proxy grpc server")
	srv := grpc.NewServer(opts...)

	workflowservice.RegisterWorkflowServiceServer(srv, newWorkflowServiceProxy(p.clientProvider, p.logger.Named("workflowservice")))
	grpc_health_v1.RegisterHealthServer(srv, newHealthServer())

	p.logger.Info("serving workflow service")
	if err := srv.Serve(listener); err != nil {
		return fmt.Errorf("unable to serve: %w", err)
	}
	p.logger.Warn("proxy is no longer serving")
	return nil
}

func (p *Proxy) Close() error {
	return p.clientProvider.Close()
}

And now the important bit, the proxy workflow service client:

package server

func newWorkflowServiceProxy(clientProvider tcloud.ClientProvider, logger *zap.Logger) workflowservice.WorkflowServiceServer {
	return &workflowServiceProxy{clientProvider: clientProvider, logger: logger}
}

var _ workflowservice.WorkflowServiceServer = &workflowServiceProxy{}

type workflowServiceProxy struct {
	clientProvider tcloud.ClientProvider
	logger         *zap.Logger
}

// Omitted: All of the methods, spare one to show as an example.

func (w *workflowServiceProxy) RegisterNamespace(ctx context.Context, request *workflowservice.RegisterNamespaceRequest) (*workflowservice.RegisterNamespaceResponse, error) {
	backend, err := w.getBackend(ctx)
	if err != nil {
		return nil, status.Errorf(codes.Internal, err.Error())
	}
	return backend.RegisterNamespace(ctx, request)
}

// getBackend extracts the requested namespace from the request (via gRPC request headers) and returns its client if
// it exists.
func (w *workflowServiceProxy) getBackend(ctx context.Context) (workflowservice.WorkflowServiceClient, error) {
	md, ok := metadata.FromIncomingContext(ctx)
	if !ok {
		return nil, status.Errorf(codes.Internal, "failed getting metadata from incoming context")
	}

	header := md.Get(proxy.NamespaceHeader)
	if len(header) != 1 {
		return nil, status.Errorf(codes.NotFound, "%s header missing from request or malformed", proxy.NamespaceHeader)
	}

	namespace := proxy.CanonicalNamespace(header[0])

	client := w.clientProvider.Get(namespace)
	if client == nil {
		return nil, status.Errorf(codes.NotFound, "client for namespace %v has not been initialized", namespace)
	}
	return client, nil
}

Now putting it all together:

package main

func main() {
	flag.Parse()

	ctx, logger := zlog.InitCtx(context.Background())

	// Omitted: A bunch of bootstrapping

	logger.Debug("creating versioned temporal client", zap.String("context", kubeContext))
	temporalClient := versioned.NewForConfigOrDie(proxy.GetKubeConfig(logger, kubeContext, kubeConfig))

	logger.Debug("starting proxy server", zap.Any("proxyConfig", config))

	proxy, clientProvider, err := server.New(ctx, temporalClient, config)
	if err != nil {
		logger.Fatal("failed to start proxy", zap.Error(err))
	}
	go func() {
		if cerr := proxy.Close(); cerr != nil {
			logger.Error("failed closing proxy", zap.Error(cerr))
		}
	}()

	logger.Debug("configuring listener", zap.String("addr", proxyHostPort))
	listener, err := net.Listen("tcp", proxyHostPort)
	if err != nil {
		logger.Fatal("unable to create listener", zap.Error(err))
	}

	metricsHandler := metricsx.NewSpectatorMetricsHandler(ctx, metrics.ClientConfig{})

	// http server for pprof
	go func() {
		//nolint:gosec
		fmt.Println(http.ListenAndServe("localhost:1788", nil))
	}()

	opts := []grpc.ServerOption{
		server.ObservabilityServerOption(logger, registry, clientProvider),
		server.MetatronTLSOption(logger),
		server.GandalfAuthorizerOption(logger, metricsHandler, gandalfDevServer),
	}

	if err := proxy.Serve(listener, opts...); err != nil {
		logger.Fatal("failed serving proxy", zap.Error(err))
	}
	logger.Info("proxy shutdown")
}

summary

And that’s basically it. When new versions of Temporal are released, we update the dependencies and implement any new methods, then deploy it.

We deploy the proxy to every region that Netflix has a presence in and have latency-based routing setup on the DNS for workers to connect to. All of the server groups have auto scaling policies setup.

I’m happy to answer any questions!

7 Likes