Exception with Activity Poller

I have a worker service that is communicating with a deployed temporal server. When I first start the work service, I’m able to create and execute/complete a couple of workflow. However, if i wait like 5 mins my worker service starts error out the following message in the logs. I was wondering if you have any insights on the error message. its confusing since it was able to create and complete a workflow.

In this setup i have a workflow, 3 activities that the workflow calls. We have a worker/taskqueue for the workflow. For each activity, it has its own worker and task queue as well.

From the error message, it looks like the worker is still polling the task queues even thou the workflow is complete. Is there some clean up calls I need to make that i’m not accounting for?

ERROR [2020-10-27 19:28:45,969] io.temporal.internal.worker.Poller: Failure in thread Activity Poller taskQueue="SampleActivities1", namespace="default": 5
! io.grpc.StatusRuntimeException: UNAUTHENTICATED: processing token failed: invalid token. Errors = [Invalid Base64 encoding: illegal base64 data at input byte 118,, Invalid JWK fields,, Invalid algorithm,, JWT not currently valid,, invalid format for figment token]
! at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:244)
! at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:225)
! at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:142)
! at io.temporal.api.workflowservice.v1.WorkflowServiceGrpc$WorkflowServiceBlockingStub.pollActivityTaskQueue(WorkflowServiceGrpc.java:2702)
! at io.temporal.internal.worker.ActivityPollTask.poll(ActivityPollTask.java:95)
! at io.temporal.internal.worker.ActivityPollTask.poll(ActivityPollTask.java:38)
! at io.temporal.internal.worker.Poller$PollExecutionTask.run(Poller.java:273)
! at io.temporal.internal.worker.Poller$PollLoopTask.run(Poller.java:242)
! at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
! at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
! at java.base/java.lang.Thread.run(Thread.java:834)

Can you share more information about how you deployed this? The exception seems to indicate invalid HTTP headers. Do you have any proxies in between the worker process and the cluster that could mess with the HTTP headers?

@SergeyBykov

So i think i found the issue. It’s because the token that we attach to the request has a short life span and expires after a couple of minutes. This requires a new token to be generated for every call.

When the Temporal worker starts up, it will start the initial worker. The client that we pass to the workerFactory has a token that expires after a couple of min.

Is there a way to tell configure the client to keep generating a new token for the client.

Here is the code:

   WorkflowClient client = TemporalClient.getWorkflowClient(config);
   
    // Register workflow
    WorkerFactory workerFactory = WorkerFactory.newInstance(client);

    final Worker worker = workerFactory.newWorker(TASK_QUEUE);
    worker.registerWorkflowImplementationTypes(DslWorkflowImpl.class);

    for (SampleActivities.SampleActivitiesBase activity : activities) {
      final Worker activityWorker =
          workerFactory.newWorker(activity.getClass().getInterfaces()[0].getSimpleName());
      activityWorker.registerActivitiesImplementations(activity);
    }

    workerFactory.start();

Below is what the getWorkerClient would look like.

final WorkflowServiceStubs service = 
String authToken  = getAuthToken();
final ManagedChannel channel = ManagedChannelBuilder.forAddress(host, port);
        final WorkflowServiceStubs service = new WorkflowServiceStubsImpl(null, WorkflowServiceStubsOptions
                .newBuilder()
                .setChannel(channel)
                .setRpcTimeout(Duration.ofMillis(rpcTimeout))
                .setQueryRpcTimeout(Duration.ofMillis(queryRpcTimeout))
                .setBlockingStubInterceptor(
                        (stub) ->
                        {
                            Metadata headers = new Metadata();
                            headers.put(Metadata.Key.of(AUTH_HEADER, ASCII_STRING_MARSHALLER), authToken);
                            return stub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(headers));
                        })
                .setFutureStubInterceptor(
                        (stub) ->
                        {
                            Metadata headers = new Metadata();
                            headers.put(Metadata.Key.of(AUTH_HEADER, ASCII_STRING_MARSHALLER), authToken);
                            return stub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(headers));
                        }
                )
                .build());
return WorkflowClient.newInstance(service);

In here TemporalClient.getWorkflowClient will create a WorkerClient that inject a token in the Headers of the the GRPC request. Not sure how you would refresh the token.

My theory is that after it’s been initialize the client will not request a new token. I’m guessing eventhou a workflow is complete, the worker is still polling to see if there is any workflows to process

Now it makes sense. Yes, the client will stay connected (via a long poll) waiting for other work to appear for the worker and keep reconnecting.
Don’t you need i timer here to keep refreshing the token before it expires, e.g. after a half of the token validity period passes? If you do that, the code above should just work I think. Or is there some other problem here that I missed?

I think i figured out the issue. I realized that when I inject the token into the grpc, I don’t generate it from scratch. So i would need to implement this. Sorry for taking up your time

@SergeyBykov

From my understanding, the interceptor should be triggered every time a request is made. So below, I updated the code to generate a new token in the interceptor. However, when I execute the call it doesn’t do that.

For example in the code below, I would expect to see New Token Generated for each call to the stub. But i’m only seeing one.

       final WorkflowServiceStubs service = new WorkflowServiceStubsImpl(null, WorkflowServiceStubsOptions
                .newBuilder()
                .setChannel(channel)
                .setBlockingStubInterceptor(
                        (stub) ->
                        {
                            System.out.println("New Token Generated");
                            String authToken = getToken();
                            Metadata headers = new Metadata();
                            headers.put(Metadata.Key.of(AUTH_HEADER, ASCII_STRING_MARSHALLER), authToken);
                            return stub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(headers));
                        })
                .setFutureStubInterceptor(
                        (stub) ->
                        {
                           System.out.println("New Token Generated");
                            String authToken = getToken();
                            Metadata headers = new Metadata();
                            headers.put(Metadata.Key.of(AUTH_HEADER, ASCII_STRING_MARSHALLER), authToken);
                            return stub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(headers));
                        }
                )
                .build());
     final WorkflowClient client = WorkflowClient.newInstance(service);

    // make first client call
    final WorkflowStub stub =
        workflowClient.newWorkflowStub(
            Workflow.class, WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).build());
    stub.execute(configuration);

   // make second call
   stub.execute(configuration);

I did notice that the WorkflowClientOptions does allow for WorkflowClientInterceptor. Should I be using that to inject the token header?

Normally, I would implement a interceptor with the following, but not sure how this would be implement with the workflow API:

 ManagedChannel originChannel = ManagedChannelBuilder.forAddress("localhost", 8080)
            .usePlaintext()
            .build();

ClientInterceptor interceptor = new HeaderClientInterceptor();
Channel channel = ClientInterceptors.intercept(originChannel, interceptor);
public class HeaderClientInterceptor implements ClientInterceptor {

  static final Metadata.Key<String> CUSTOM_HEADER_KEY =
    Metadata.Key.of("Authorization", Metadata.ASCII_STRING_MARSHALLER);

  @Override
  public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
    CallOptions callOptions, Channel next) {
    return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {

      @Override
      public void start(Listener<RespT> responseListener, Metadata headers) {
        /* put custom header */
        System.out.println("Generating token for header");
        headers.put(CUSTOM_HEADER_KEY, "header");
        super.start(new SimpleForwardingClientCallListener<RespT>(responseListener) {
          @Override
          public void onHeaders(Metadata headers) {
            /**
             * if you don't need receive header from server,
             * you can use {@link io.grpc.stub.MetadataUtils#attachHeaders}
             * directly to send header
             */
            super.onHeaders(headers);
          }
        }, headers);
      }
    };
  }
}

It does look like setBlockingStubInterceptor and setFutureStubInterceptor are only invoked once when the connections gets open. The approach with a client interceptor works better - the interceptor gets invoked for every request.
However, for that you need to initialize a ManagedChannel and inject it with setChannel as your code above does. That means you’d have to configure the target address, TLS, etc., which creates quite a bit of friction. I think we should provide an easier way to inject per-call headers. I already filed a ticket for that last week for triage.

Can you link me to the ticket so I can follow it? There are two approaches for implementing the clientIntercepter.

  1. Override the channel like in my example. however, this returns a Channel and not a managed Channel. it looks like the interface for WorkflowServiceStubsImpl only takes a managedChannel vs. I have a Channel.
WorkflowServiceStubsImpl(null, WorkflowServiceStubsOptions
                .newBuilder()
                .setChannel(channel)
  1. Implement a WorkflowClientInterceptor. Do you have an example of code where it implements that interface. I’m trying to follow on what kind of logic I would need to implement of the interface.

I’m guessing in your suggestion you are going with Option1. I’m not sure how you would cast a ChannelIntercept class to ManagedClass which is what the WorkflowServiceStubsImpl expects

Caused by: java.lang.ClassCastException: class io.grpc.ClientInterceptors$InterceptorChannel cannot be cast to class io.grpc.ManagedChannel (io.grpc.ClientInterceptors$InterceptorChannel and io.grpc.ManagedChannel are in unnamed module of loader 'app')

If it’s the second option:

public class LcaWorkflowClientInterceptor implements WorkflowClientInterceptor {
    @Override
    public WorkflowStub newUntypedWorkflowStub(String workflowType, WorkflowOptions options, WorkflowStub next) {
        return null;
    }

    @Override
    public WorkflowStub newUntypedWorkflowStub(WorkflowExecution execution, Optional<String> workflowType, WorkflowStub next) {
        return null;
    }

    @Override
    public ActivityCompletionClient newActivityCompletionClient(ActivityCompletionClient next) {
        return null;
    }
}

I guess it’s not clear to me what the implementation would look like?

any suggestions to get this working until there is a fix?

That ticket is in the internal system. So I can’t give a link to it. If it gets approved, then I’ll add a GitHub issue for it.

Here’s the code that worked for me.

ClientInterceptor interceptor = new HeaderClientInterceptor();
    ManagedChannel channel =
        ManagedChannelBuilder.forAddress("localhost", 7233)
            .usePlaintext()
            .intercept(interceptor)
            .build();

public class HeaderClientInterceptor implements ClientInterceptor {

  static final Metadata.Key<String> CUSTOM_HEADER_KEY =
      Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER);

  @Override
  public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
      MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
    return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
        next.newCall(method, callOptions)) {

      @Override
      public void start(Listener<RespT> responseListener, Metadata headers) {
        /* put custom header */
        System.out.println("Generating token for header");
        headers.put(CUSTOM_HEADER_KEY, "xyz123");
        super.start(
            new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(
                responseListener) {
              @Override
              public void onHeaders(Metadata headers) {
                /**
                 * if you don't need receive header from server, you can use {@link
                 * io.grpc.stub.MetadataUtils#attachHeaders} directly to send header
                 */
                super.onHeaders(headers);
              }
            },
            headers);
      }
    };
  }
}

Thanks that works. However, I had to remove the WorkflowServiceStubsImpl.

.setBlockingStubInterceptor(
                        (stub) ->
                        {
                            System.out.println("New Token Generated");
                            String authToken = getToken();
                            Metadata headers = new Metadata();
                            headers.put(Metadata.Key.of(AUTH_HEADER, ASCII_STRING_MARSHALLER), authToken);
                            return stub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(headers));
                        })
                .setFutureStubInterceptor(
                        (stub) ->
                        {
                           System.out.println("New Token Generated");
                            String authToken = getToken();
                            Metadata headers = new Metadata();
                            headers.put(Metadata.Key.of(AUTH_HEADER, ASCII_STRING_MARSHALLER), authToken);
                            return stub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(headers));
                        }
                )

for it to work or else it I think the logic will ignore the channel’s interceptor

Yes, the channel’s interceptors aren’t needed with this approach, I agree.