Hello.
The MDC markers placed before a promise handler do not appear alongside the log statement in the handler. They reappear after the handler. Below are a demo test case and relevant logs.
Demo
package mdcPromiseCase;
import static java.util.stream.Collectors.toMap;
import io.temporal.activity.Activity;
import io.temporal.activity.ActivityInterface;
import io.temporal.activity.ActivityOptions;
import io.temporal.api.common.v1.Payload;
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowOptions;
import io.temporal.common.context.ContextPropagator;
import io.temporal.common.converter.DataConverter;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.worker.Worker;
import io.temporal.worker.WorkerFactory;
import io.temporal.workflow.Async;
import io.temporal.workflow.Promise;
import io.temporal.workflow.Workflow;
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
import java.time.Duration;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import lombok.CustomLog;
import org.slf4j.MDC;
@CustomLog
public class HelloAsync {
// Define the task queue name
static final String TASK_QUEUE = "HelloAsyncActivityTaskQueue";
// Define our workflow unique id
static final String WORKFLOW_ID = "HelloAsyncActivityWorkflow";
@WorkflowInterface
public interface GreetingWorkflow {
@WorkflowMethod
String getGreeting(String name);
}
@ActivityInterface
public interface GreetingActivities {
String composeGreeting(String greeting, String name);
}
public static class GreetingWorkflowImpl implements GreetingWorkflow {
private final GreetingActivities activities = Workflow.newActivityStub(
GreetingActivities.class,
ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(10)).build());
Handler section
@Override
public String getGreeting(String name) {
MDC.put("INSIDE getGreeting(key1)", "before promise handler β
");
log.info("Hello from getGreeting(before handler) ");
Promise<String> hello = Async.function(activities::composeGreeting, "Hello", name);
Promise<String> bye = Async.function(activities::composeGreeting, "Bye", name);
hello.handle(
(string, exception) -> {
MDC.put("INSIDE getGreeting(key2)", "inside promise handler β");
log.info("Hello from handler ");
return string;
});
MDC.put("INSIDE getGreeting(key3)", "after promise handler β_β");
log.info("Hello from getGreeting(after handler) ");
return hello.get() + "\n" + bye.get() + "\n";
}
}
static class GreetingActivitiesImpl implements GreetingActivities {
@Override
public String composeGreeting(String greeting, String name) {
return greeting
+ " "
+ name
+ "! "
+ Base64.getEncoder().encodeToString(Activity.getExecutionContext().getTaskToken());
}
}
public static void main(String[] args) {
WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs();
WorkflowClient client = WorkflowClient.newInstance(service);
WorkerFactory factory = WorkerFactory.newInstance(client);
Worker worker = factory.newWorker(TASK_QUEUE);
worker.registerWorkflowImplementationTypes(GreetingWorkflowImpl.class);
worker.registerActivitiesImplementations(new GreetingActivitiesImpl());
factory.start();
GreetingWorkflow workflow = client.newWorkflowStub(
GreetingWorkflow.class,
WorkflowOptions.newBuilder()
.setWorkflowId(WORKFLOW_ID)
.setTaskQueue(TASK_QUEUE)
.setContextPropagators(List.of(new MDCContextPropagator()))
.build());
String greeting = workflow.getGreeting("World ");
// Display workflow execution results
System.out.println(greeting);
System.exit(0);
}
}
final class MDCContextPropagator implements ContextPropagator {
private final transient DataConverter converter = DataConverter.getDefaultInstance();
private final transient Supplier<Map<String, String>> supplier;
public MDCContextPropagator() {
this.supplier = () -> MDC.getCopyOfContextMap();
}
public MDCContextPropagator(final Map<String, String> map) {
this.supplier = () -> map;
}
@Override
public String getName() {
return getClass().getName();
}
@Override
public Object getCurrentContext() {
if (supplier.get() == null) {
return null;
}
return supplier.get().entrySet().stream()
.filter(entry -> entry.getValue() != null)
.collect(toMap(Map.Entry::getKey, Map.Entry::getValue));
}
@Override
public void setCurrentContext(final Object context) {
if (!(context instanceof Map)) {
return;
}
@SuppressWarnings("unchecked")
final var contextMap = (Map<String, String>) context;
contextMap.forEach((key, value) -> MDC.put(key, value));
}
@Override
public Map<String, Payload> serializeContext(final Object context) {
if (!(context instanceof Map)) {
return Map.of();
}
@SuppressWarnings("unchecked")
final var contextMap = (Map<String, String>) context;
return contextMap.entrySet().stream()
.collect(toMap(Map.Entry::getKey, entry -> toPayload(entry.getValue())));
}
@Override
public Object deserializeContext(final Map<String, Payload> context) {
return context.entrySet().stream()
.collect(toMap(Map.Entry::getKey, entry -> fromPayload(entry.getValue())));
}
private Payload toPayload(final String value) {
return converter.toPayload(value).get();
}
private String fromPayload(final Payload payload) {
return converter.fromPayload(payload, String.class, String.class);
}
}
Relevant logs
{
"@timestamp": "2022-05-11T10:25:26.768-07:00",
"@version": "1",
"description": "Hello from getGreeting() ",
"logger": "mdcPromiseCase.HelloAsync",
"thread": "workflow-method-HelloAsyncActivityWorkflow-9f0284e6-b6df-411a-8603-095eebfb25a3",
"level": "INFO",
"level_value": 20000,
"WorkflowType": "GreetingWorkflow",
"INSIDE getGreeting(key1)": "before promise handler β
",
"TaskQueue": "HelloAsyncActivityTaskQueue",
"WorkflowId": "HelloAsyncActivityWorkflow",
"RunId": "9f0284e6-b6df-411a-8603-095eebfb25a3",
"Namespace": "default",
"class": "io.temporal.internal.logging.ReplayAwareLogger",
"method": "info",
"file": "ReplayAwareLogger.java",
"line": 233,
"data_version": 2,
"type": "log"
}
{
"@timestamp": "2022-05-11T10:25:26.785-07:00",
"@version": "1",
"description": "Hello from getGreeting(AH) ",
"logger": "mdcPromiseCase.HelloAsync",
"thread": "workflow-method-HelloAsyncActivityWorkflow-9f0284e6-b6df-411a-8603-095eebfb25a3",
"level": "INFO",
"level_value": 20000,
"INSIDE getGreeting(key3)": "after promise handler β_β",
"WorkflowType": "GreetingWorkflow",
"INSIDE getGreeting(key1)": "before promise handler β
",
"TaskQueue": "HelloAsyncActivityTaskQueue",
"WorkflowId": "HelloAsyncActivityWorkflow",
"RunId": "9f0284e6-b6df-411a-8603-095eebfb25a3",
"Namespace": "default",
"class": "io.temporal.internal.logging.ReplayAwareLogger",
"method": "info",
"file": "ReplayAwareLogger.java",
"line": 233,
"data_version": 2,
"type": "log"
}
{
"@timestamp": "2022-05-11T10:25:26.894-07:00",
"@version": "1",
"description": "Hello from handler ",
"logger": "mdcPromiseCase.HelloAsync",
"thread": "activity completion callback",
"level": "INFO",
"level_value": 20000,
"INSIDE getGreeting(key2)": "inside promise handler β",
"WorkflowType": "GreetingWorkflow",
"TaskQueue": "HelloAsyncActivityTaskQueue",
"WorkflowId": "HelloAsyncActivityWorkflow",
"RunId": "9f0284e6-b6df-411a-8603-095eebfb25a3",
"Namespace": "default",
"class": "io.temporal.internal.logging.ReplayAwareLogger",
"method": "info",
"file": "ReplayAwareLogger.java",
"line": 233,
"data_version": 2,
"type": "log"
}
Hello World !
Bye World !