At Replay 2023, I participated in a panel about running Temporal as an internal service. In it, I outlined how Netflix made the move from on-prem to TCloud, primarily through a proxy service. There was a lot of interest in this, so this post is to go over the why’s and how’s.
overview
Netflix has a myriad of internal services that make development easier (and safer, etc, etc). When running on-prem, we had a customized version of Temporal Server that integrated these internal services. In adopting TCloud, we needed to preserve these capabilities: We want people to interact with Temporal as-if it is just another internal service. To that end, we built a proxy server through which all API & CLI requests are made. Its capabilities are:
- Authn via Metatron mTLS
- Authz via Gandalf
- Payload encryption via Cryptex
- JIT short-lived certificate minting for TCloud
When a user wants to onboard to Temporal, they create a temporal.infra.netflix.net/Namespace
resource that looks like this:
apiVersion: temporal.infra.netflix.net/v1alpha1
kind: Namespace
metadata:
name: sandbox-test
namespace: app-58778612-9edb-3686-a9e2-1d14cf73bd2d
spec:
region: us-west-2
retentionDays: 3
owner: "<EMAIL-REDACTED>"
codecServer:
endpoint: "<REDACTED>"
passAccessToken: false
includeCredentials: false
The temporaloperator
Kubernetes controller creates a sandbox-test.{customerId}
namespace in TCloud according to our config, and auto-invites all DIRECT members of the owner
closed Google Group DL, giving each member Write access to the namespace.
The temporalproxy
has an informer setup to listen to the Namespace
GVR. When a new namespace is created, updated, or deleted, we will update a ClientProvider
interface that generates a client with the specified CA, pointing to the correct TCloud endpoint (e.g. sndbox-test.{customerId}.tmprl.cloud
.
Now, when a user wants to create a worker, they MUST provide a x-temporal-namespace
gRPC header on their WorkflowService client; this allows the proxy to know which downstream cloud client to utilize. If this header is not available for any namespaced client, the request is rejected. Similarly, if someone provides a namespace header that is unknown, the request is also rejected.
When a request comes in, we terminate our internal mTLS, do authz, encrypt all payloads before sending to TCloud, and then re-sign the request with the appropriate CA. We use another internal tool to JIT create a short-lived certificate from the CA that we’ve shared with TCloud, and then forward the request. Responses are similarly decrypted and passed back to the client as-needed.
code stuff
Since there’s plenty of Netflix-specific sauce in the proxy, I’ll just enumerate the overall pattern here, rather than open sourcing our codebase.
First, the ClientProvider
. There’s nothing particularly special here, it just creates a grpc.ClientConn
and a workflowservice.WorkflowServiceClient
and makes those available.
package tcloud
// ClientProvider is a registry for TCloud clients.
// Client lifecycle is performed by handling Namespace resource changes.
type ClientProvider interface {
io.Closer
cache.ResourceEventHandler
Get(namespace string) workflowservice.WorkflowServiceClient
}
type clientHolder struct {
grpcConn *grpc.ClientConn
workflowService workflowservice.WorkflowServiceClient
}
// NewClientProvider returns a ClientProvider.
func NewClientProvider(ctx context.Context, config *proxy.ServerConfig) (ClientProvider, error) {
kpr, err := newKeyPairReloader(ctx, config.UpstreamCertFile, config.UpstreamKeyFile)
if err != nil {
return nil, fmt.Errorf("failed creating keypair reloader: %w", err)
}
kpr.Start()
return &clientProvider{
ctx: ctx,
ll: zlog.Ctx(ctx).Named("clientProvider"),
registry: spectatorx.Ctx(ctx),
certReloader: kpr,
config: config,
clients: map[string]clientHolder{},
}, nil
}
var _ ClientProvider = &clientProvider{}
type clientProvider struct {
ctx context.Context
ll *zap.Logger
registry *spectator.Registry
certReloader *keypairReloader
config *proxy.ServerConfig
mu sync.RWMutex
clients map[string]clientHolder
}
// Get a client by namespace.
func (c *clientProvider) Get(namespace string) workflowservice.WorkflowServiceClient {
ch, ok := c.clients[proxy.CanonicalNamespace(namespace)]
if !ok {
return nil
}
return ch.workflowService
}
// OnAdd creates a new client for the given resource if it doesn't already exist.
func (c *clientProvider) OnAdd(obj interface{}) {
ns, ok := obj.(*v1alpha1.Namespace)
if !ok {
c.ll.Error("failed casting obj to namespace")
c.registry.Counter("clientprovider.handler.errors", map[string]string{"method": "OnAdd", "namespace": "__unknown__"}).Increment()
return
}
cNS := proxy.CanonicalNamespace(ns.Name)
ll := c.ll.With(zap.String("namespace", cNS))
ll.Debug("checking if client exists")
c.mu.RLock()
if _, ok := c.clients[cNS]; ok {
ll.Debug("client for namespace already exists")
c.mu.RUnlock()
return
}
c.mu.RUnlock()
ll.Debug("creating client")
ch, err := c.newClientHolder(cNS)
if err != nil {
ll.Error("failed creating client", zap.Error(err))
c.registry.Counter("clientprovider.handler.errors", map[string]string{"method": "OnAdd", "namespace": cNS}).Increment()
return
}
c.mu.Lock()
c.clients[cNS] = *ch
c.mu.Unlock()
c.registry.Counter("clientprovider.handler.created", map[string]string{"method": "OnAdd", "namespace": cNS}).Increment()
ll.Debug("created new client")
}
// Omitted: OnUpdate
// Omitted: OnDelete
func (c *clientProvider) newClientHolder(namespace string) (*clientHolder, error) {
startTime := time.Now()
cc, err := c.newClientConn(namespace)
if err != nil {
return nil, fmt.Errorf("failed creating grpc.ClientConn: %s: %w", namespace, err)
}
workflowService := workflowservice.NewWorkflowServiceClient(cc)
if verr := c.validateClientConnection(c.ctx, workflowService, namespace); verr != nil {
c.registry.Timer("clientprovider.init-latency", map[string]string{"namespace": namespace, "result": "failure"}).Record(time.Since(startTime))
return nil, fmt.Errorf("failed validating client connection: %w", verr)
}
return &clientHolder{
grpcConn: cc,
workflowService: workflowService,
}, nil
}
func (c *clientProvider) newClientConn(namespace string) (*grpc.ClientConn, error) {
clientTLSConfig, err := c.getTLSConfig(namespace)
if err != nil {
c.ll.Fatal("failed creating cloud TLS config", zap.Error(err))
}
creds := credentials.NewTLS(clientTLSConfig)
codecs := []converter.PayloadCodec{NewPayloadCodec()}
codecs = append(codecs, c.config.ClientCodecs...)
clientInterceptor, err := converter.NewPayloadCodecGRPCClientInterceptor(
converter.PayloadCodecGRPCClientInterceptorOptions{
Codecs: codecs,
},
)
if err != nil {
c.ll.Fatal("unable to create interceptor: %v", zap.Error(err))
}
return grpc.DialContext(
c.ctx,
proxy.EndpointHostPort(namespace),
grpc.WithTransportCredentials(creds),
grpc.WithChainUnaryInterceptor(
otelgrpc.UnaryClientInterceptor(),
clientInterceptor,
grpc_spectator.UnaryClientInterceptor(c.registry, grpc_spectator.Tags{"namespace": namespace}),
),
)
}
func (c *clientProvider) validateClientConnection(ctx context.Context, workflowClient workflowservice.WorkflowServiceClient, namespace string) error {
resp, err := workflowClient.DescribeNamespace(ctx, &workflowservice.DescribeNamespaceRequest{
Namespace: namespace,
})
if err != nil {
return fmt.Errorf("unable to validate connection for '%s' namespace: %w", namespace, err)
}
c.ll.Debug("backend client connected to Temporal Cloud", zap.String("namespace", resp.NamespaceInfo.Name))
return nil
}
func (c *clientProvider) Close() error {
var errs []error
for _, h := range c.clients {
if err := h.grpcConn.Close(); err != nil {
errs = append(errs, err)
}
}
if err := c.certReloader.Close(); err != nil {
errs = append(errs, err)
}
if len(errs) != 0 {
return multierr.Combine(errs...)
}
return nil
}
Then, we have the proxy server. Pretty straightforward here, we expose a health server and workflow service server. The workflowservice.RegisterWorkflowServiceServer
func is provided by the Temporal gRPC generated code.
package server
type Proxy struct {
logger *zap.Logger
clientProvider tcloud.ClientProvider
}
// Omitted: New
func (p *Proxy) Serve(listener net.Listener, opts ...grpc.ServerOption) error {
p.logger.Debug("creating proxy grpc server")
srv := grpc.NewServer(opts...)
workflowservice.RegisterWorkflowServiceServer(srv, newWorkflowServiceProxy(p.clientProvider, p.logger.Named("workflowservice")))
grpc_health_v1.RegisterHealthServer(srv, newHealthServer())
p.logger.Info("serving workflow service")
if err := srv.Serve(listener); err != nil {
return fmt.Errorf("unable to serve: %w", err)
}
p.logger.Warn("proxy is no longer serving")
return nil
}
func (p *Proxy) Close() error {
return p.clientProvider.Close()
}
And now the important bit, the proxy workflow service client:
package server
func newWorkflowServiceProxy(clientProvider tcloud.ClientProvider, logger *zap.Logger) workflowservice.WorkflowServiceServer {
return &workflowServiceProxy{clientProvider: clientProvider, logger: logger}
}
var _ workflowservice.WorkflowServiceServer = &workflowServiceProxy{}
type workflowServiceProxy struct {
clientProvider tcloud.ClientProvider
logger *zap.Logger
}
// Omitted: All of the methods, spare one to show as an example.
func (w *workflowServiceProxy) RegisterNamespace(ctx context.Context, request *workflowservice.RegisterNamespaceRequest) (*workflowservice.RegisterNamespaceResponse, error) {
backend, err := w.getBackend(ctx)
if err != nil {
return nil, status.Errorf(codes.Internal, err.Error())
}
return backend.RegisterNamespace(ctx, request)
}
// getBackend extracts the requested namespace from the request (via gRPC request headers) and returns its client if
// it exists.
func (w *workflowServiceProxy) getBackend(ctx context.Context) (workflowservice.WorkflowServiceClient, error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, status.Errorf(codes.Internal, "failed getting metadata from incoming context")
}
header := md.Get(proxy.NamespaceHeader)
if len(header) != 1 {
return nil, status.Errorf(codes.NotFound, "%s header missing from request or malformed", proxy.NamespaceHeader)
}
namespace := proxy.CanonicalNamespace(header[0])
client := w.clientProvider.Get(namespace)
if client == nil {
return nil, status.Errorf(codes.NotFound, "client for namespace %v has not been initialized", namespace)
}
return client, nil
}
Now putting it all together:
package main
func main() {
flag.Parse()
ctx, logger := zlog.InitCtx(context.Background())
// Omitted: A bunch of bootstrapping
logger.Debug("creating versioned temporal client", zap.String("context", kubeContext))
temporalClient := versioned.NewForConfigOrDie(proxy.GetKubeConfig(logger, kubeContext, kubeConfig))
logger.Debug("starting proxy server", zap.Any("proxyConfig", config))
proxy, clientProvider, err := server.New(ctx, temporalClient, config)
if err != nil {
logger.Fatal("failed to start proxy", zap.Error(err))
}
go func() {
if cerr := proxy.Close(); cerr != nil {
logger.Error("failed closing proxy", zap.Error(cerr))
}
}()
logger.Debug("configuring listener", zap.String("addr", proxyHostPort))
listener, err := net.Listen("tcp", proxyHostPort)
if err != nil {
logger.Fatal("unable to create listener", zap.Error(err))
}
metricsHandler := metricsx.NewSpectatorMetricsHandler(ctx, metrics.ClientConfig{})
// http server for pprof
go func() {
//nolint:gosec
fmt.Println(http.ListenAndServe("localhost:1788", nil))
}()
opts := []grpc.ServerOption{
server.ObservabilityServerOption(logger, registry, clientProvider),
server.MetatronTLSOption(logger),
server.GandalfAuthorizerOption(logger, metricsHandler, gandalfDevServer),
}
if err := proxy.Serve(listener, opts...); err != nil {
logger.Fatal("failed serving proxy", zap.Error(err))
}
logger.Info("proxy shutdown")
}
summary
And that’s basically it. When new versions of Temporal are released, we update the dependencies and implement any new methods, then deploy it.
We deploy the proxy to every region that Netflix has a presence in and have latency-based routing setup on the DNS for workers to connect to. All of the server groups have auto scaling policies setup.
I’m happy to answer any questions!