MDC context logging properties not propagating inside promise.handle

Hello.

The MDC markers placed before a promise handler do not appear alongside the log statement in the handler. They reappear after the handler. Below are a demo test case and relevant logs.

Demo

package mdcPromiseCase;

import static java.util.stream.Collectors.toMap;

import io.temporal.activity.Activity;
import io.temporal.activity.ActivityInterface;
import io.temporal.activity.ActivityOptions;
import io.temporal.api.common.v1.Payload;
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowOptions;
import io.temporal.common.context.ContextPropagator;
import io.temporal.common.converter.DataConverter;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.worker.Worker;
import io.temporal.worker.WorkerFactory;
import io.temporal.workflow.Async;
import io.temporal.workflow.Promise;
import io.temporal.workflow.Workflow;
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
import java.time.Duration;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import lombok.CustomLog;
import org.slf4j.MDC;

@CustomLog
public class HelloAsync {

  // Define the task queue name
  static final String TASK_QUEUE = "HelloAsyncActivityTaskQueue";

  // Define our workflow unique id
  static final String WORKFLOW_ID = "HelloAsyncActivityWorkflow";

  @WorkflowInterface
  public interface GreetingWorkflow {

    @WorkflowMethod
    String getGreeting(String name);
  }

  @ActivityInterface
  public interface GreetingActivities {

    String composeGreeting(String greeting, String name);
  }

  public static class GreetingWorkflowImpl implements GreetingWorkflow {

    private final GreetingActivities activities = Workflow.newActivityStub(
        GreetingActivities.class,
        ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(10)).build());

Handler section

    @Override
    public String getGreeting(String name) {

      MDC.put("INSIDE getGreeting(key1)", "before promise handler βœ…");
      log.info("Hello from getGreeting(before handler) ");

      Promise<String> hello = Async.function(activities::composeGreeting, "Hello", name);
      Promise<String> bye = Async.function(activities::composeGreeting, "Bye", name);

      hello.handle(
          (string, exception) -> {
            MDC.put("INSIDE getGreeting(key2)", "inside promise handler ❌");
            log.info("Hello from handler ");
            return string;
          });

      MDC.put("INSIDE getGreeting(key3)", "after promise handler ❌_❌");
      log.info("Hello from getGreeting(after handler) ");

      return hello.get() + "\n" + bye.get() + "\n";
    }
  }
  static class GreetingActivitiesImpl implements GreetingActivities {
    @Override
    public String composeGreeting(String greeting, String name) {
      return greeting
          + " "
          + name
          + "! "
          + Base64.getEncoder().encodeToString(Activity.getExecutionContext().getTaskToken());
    }
  }

  public static void main(String[] args) {

    WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs();

    WorkflowClient client = WorkflowClient.newInstance(service);

    WorkerFactory factory = WorkerFactory.newInstance(client);

    Worker worker = factory.newWorker(TASK_QUEUE);

    worker.registerWorkflowImplementationTypes(GreetingWorkflowImpl.class);

    worker.registerActivitiesImplementations(new GreetingActivitiesImpl());

    factory.start();

    GreetingWorkflow workflow = client.newWorkflowStub(
        GreetingWorkflow.class,
        WorkflowOptions.newBuilder()
            .setWorkflowId(WORKFLOW_ID)
            .setTaskQueue(TASK_QUEUE)
            .setContextPropagators(List.of(new MDCContextPropagator()))
            .build());

    String greeting = workflow.getGreeting("World ");

    // Display workflow execution results
    System.out.println(greeting);
    System.exit(0);
  }
}

final class MDCContextPropagator implements ContextPropagator {

  private final transient DataConverter converter = DataConverter.getDefaultInstance();
  private final transient Supplier<Map<String, String>> supplier;

  public MDCContextPropagator() {
    this.supplier = () -> MDC.getCopyOfContextMap();
  }

  public MDCContextPropagator(final Map<String, String> map) {
    this.supplier = () -> map;
  }

  @Override
  public String getName() {
    return getClass().getName();
  }

  @Override
  public Object getCurrentContext() {
    if (supplier.get() == null) {
      return null;
    }

    return supplier.get().entrySet().stream()
        .filter(entry -> entry.getValue() != null)
        .collect(toMap(Map.Entry::getKey, Map.Entry::getValue));
  }

  @Override
  public void setCurrentContext(final Object context) {
    if (!(context instanceof Map)) {
      return;
    }

    @SuppressWarnings("unchecked")
    final var contextMap = (Map<String, String>) context;

    contextMap.forEach((key, value) -> MDC.put(key, value));
  }

  @Override
  public Map<String, Payload> serializeContext(final Object context) {
    if (!(context instanceof Map)) {
      return Map.of();
    }

    @SuppressWarnings("unchecked")
    final var contextMap = (Map<String, String>) context;

    return contextMap.entrySet().stream()
        .collect(toMap(Map.Entry::getKey, entry -> toPayload(entry.getValue())));
  }

  @Override
  public Object deserializeContext(final Map<String, Payload> context) {
    return context.entrySet().stream()
        .collect(toMap(Map.Entry::getKey, entry -> fromPayload(entry.getValue())));
  }

  private Payload toPayload(final String value) {
    return converter.toPayload(value).get();
  }

  private String fromPayload(final Payload payload) {
    return converter.fromPayload(payload, String.class, String.class);
  }
}

Relevant logs

{
    "@timestamp": "2022-05-11T10:25:26.768-07:00",
    "@version": "1",
    "description": "Hello from getGreeting() ",
    "logger": "mdcPromiseCase.HelloAsync",
    "thread": "workflow-method-HelloAsyncActivityWorkflow-9f0284e6-b6df-411a-8603-095eebfb25a3",
    "level": "INFO",
    "level_value": 20000,
    "WorkflowType": "GreetingWorkflow",
    "INSIDE getGreeting(key1)": "before promise handler βœ…",
    "TaskQueue": "HelloAsyncActivityTaskQueue",
    "WorkflowId": "HelloAsyncActivityWorkflow",
    "RunId": "9f0284e6-b6df-411a-8603-095eebfb25a3",
    "Namespace": "default",
    "class": "io.temporal.internal.logging.ReplayAwareLogger",
    "method": "info",
    "file": "ReplayAwareLogger.java",
    "line": 233,
    "data_version": 2,
    "type": "log"
}
{
    "@timestamp": "2022-05-11T10:25:26.785-07:00",
    "@version": "1",
    "description": "Hello from getGreeting(AH) ",
    "logger": "mdcPromiseCase.HelloAsync",
    "thread": "workflow-method-HelloAsyncActivityWorkflow-9f0284e6-b6df-411a-8603-095eebfb25a3",
    "level": "INFO",
    "level_value": 20000,
    "INSIDE getGreeting(key3)": "after promise handler ❌_❌",
    "WorkflowType": "GreetingWorkflow",
    "INSIDE getGreeting(key1)": "before promise handler βœ…",
    "TaskQueue": "HelloAsyncActivityTaskQueue",
    "WorkflowId": "HelloAsyncActivityWorkflow",
    "RunId": "9f0284e6-b6df-411a-8603-095eebfb25a3",
    "Namespace": "default",
    "class": "io.temporal.internal.logging.ReplayAwareLogger",
    "method": "info",
    "file": "ReplayAwareLogger.java",
    "line": 233,
    "data_version": 2,
    "type": "log"
}
{
    "@timestamp": "2022-05-11T10:25:26.894-07:00",
    "@version": "1",
    "description": "Hello from handler ",
    "logger": "mdcPromiseCase.HelloAsync",
    "thread": "activity completion callback",
    "level": "INFO",
    "level_value": 20000,
    "INSIDE getGreeting(key2)": "inside promise handler ❌",
    "WorkflowType": "GreetingWorkflow",
    "TaskQueue": "HelloAsyncActivityTaskQueue",
    "WorkflowId": "HelloAsyncActivityWorkflow",
    "RunId": "9f0284e6-b6df-411a-8603-095eebfb25a3",
    "Namespace": "default",
    "class": "io.temporal.internal.logging.ReplayAwareLogger",
    "method": "info",
    "file": "ReplayAwareLogger.java",
    "line": 233,
    "data_version": 2,
    "type": "log"
}
Hello World ! 
Bye World ! 

Hi, is the question why you get
"INSIDE getGreeting(key3)": "after promise handler
log before
"INSIDE getGreeting(key2)"?

I think this is because the blocking call to wait for hello promise, hello.get(), is after getGreeting(key3).
Let me know if I am misunderstanding your question tho.

Hello.
My question is why the marker before the handler

MDC.put("INSIDE getGreeting(key1)", "before promise handler βœ…");

doesn’t appear as part of the log inside the handler

    "@timestamp": "2022-05-11T10:25:26.894-07:00",
    "@version": "1",
    "description": "Hello from handler ",
    "logger": "mdcPromiseCase.HelloAsync",
    "thread": "activity completion callback",
    "level": "INFO",
    "level_value": 20000,
    "INSIDE getGreeting(key2)": "inside promise handler ❌",
    "WorkflowType": "GreetingWorkflow",
     ...

Hey Justin.

Right now propagation works in a way that child threads of a workflow gets the same context as workflow thread got originally. So, the changes to MDC context client done on the client side will be propagated, but the changes happened inside the workflow code will not.

It’s a known peculiarity and it’s related to the fact that propagation works through headers attached to Temporal network requests, but there is no headers produced when we spawn a new thread of the same workflow.
But feel free to open an issue in GitHub - temporalio/sdk-java: Temporal Java SDK and I may consider reworking it at some moment.

1 Like

Alright, thank you.