Hi Team , we are using grafana for observability .
worker pods are not just getting the load from temporal queues but also receving the http traffic .
That means client and consumer is same .
All the logs wrt Http calls at client side have trace id as well as spanid added in them .
but the logs that are added inside the workflow / activity have trace_id , span_id missing .
quick reference of setup :
@Bean
public WorkflowClient workflowClient(
final WorkflowServiceStubs workflowServiceStubs,
final OpenTelemetry openTelemetry) {
Tracer otTracer = OpenTracingShim.createTracerShim(openTelemetry);
OpenTracingOptions openTracingOptions = OpenTracingOptions.newBuilder().setTracer(otTracer).build();
return WorkflowClient.newInstance(workflowServiceStubs,
WorkflowClientOptions.newBuilder()
.setNamespace(temporalServiceConfig.getTemporalNamespace())
.setInterceptors(new OpenTracingClientInterceptor(openTracingOptions))
.setContextPropagators(List.of(new MDCContextPropagator()))
.build());
}
@Bean
public WorkerFactory workerFactory(
final WorkflowClient workflowClient) {
return WorkerFactory.newInstance(workflowClient);
}
@Bean
@Named("AccountPropagationTemporalWorker")
@Order(1)
public Worker accountPropagationTemporalWorker(
final WorkerFactory workerFactory, DbServiceAccessor dbServiceAccessor,
BcifServiceAccessor bcifServiceAccessor, FinacleServiceAccessor finacleServiceAccessor, AadhaarServiceAccessor aadhaarServiceAccessor,
ActivityFactory activityFactory, SalesforceServiceAccessor salesforceServiceAccessor, S3Accessor s3Accessor,
MasterServiceAccessor masterServiceAccessor, SmsNotificationFactory smsNotificationFactory, CrnActivationHelperService crnActivationHelperService,
DocumentServiceAccessor documentServiceAccessor, WorkflowAccessor workflowAccessor, TemporalServiceConfig temporalServiceConfig,ExternalConfig externalConfig,BcifEnableBankingErrorMessagesConfig bcifEnableBankingErrorMessagesConfig) {
Worker workerAccountPropagation = workerFactory.newWorker(temporalServiceConfig.getTaskQueues().getTemporalAccountPropagationTaskQueue());
......}
public class MDCContextPropagator implements ContextPropagator {
private static final Logger log = LoggerFactory.getLogger(MDCContextPropagator.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final DefaultDataConverter DATA_CONVERTER =
(DefaultDataConverter) DefaultDataConverter.STANDARD_INSTANCE;
@Override
public String getName() {
return "MDCContextPropagator";
}
@Override
public Map<String, Payload> serializeContext(Object context) {
if (!(context instanceof Map)) {
log.debug("[serializeContext] No MDC context found.");
return Map.of();
}
@SuppressWarnings("unchecked")
Map<String, String> mdcContext = (Map<String, String>) context;
try {
byte[] jsonBytes = OBJECT_MAPPER.writeValueAsBytes(mdcContext);
return Map.of("mdc", DATA_CONVERTER.toPayload(jsonBytes).orElseThrow());
} catch (JsonProcessingException e) {
log.error("[serializeContext] Failed to serialize MDC context", e);
throw new RuntimeException("Failed to serialize MDC context", e);
}
}
@Override
public Object deserializeContext(Map<String, Payload> header) {
if (header == null || !header.containsKey("mdc")) {
log.debug("[deserializeContext] No MDC header found.");
return Map.of();
}
try {
byte[] jsonBytes = DATA_CONVERTER.fromPayload(
header.get("mdc"), byte[].class, byte[].class.getGenericSuperclass());
Map<String, String> mdcContext = OBJECT_MAPPER.readValue(jsonBytes, Map.class);
return mdcContext;
} catch (IOException e) {
log.error("[deserializeContext] Failed to deserialize MDC context", e);
throw new RuntimeException("Failed to deserialize MDC context", e);
}
}
@Override
public Object getCurrentContext() {
Map<String, String> copy = MDC.getCopyOfContextMap();
return copy != null ? new HashMap<>(copy) : Map.of();
}
@Override
public void setCurrentContext(Object context) {
MDC.clear();
if (context instanceof Map) {
@SuppressWarnings("unchecked")
Map<String, String> mdcContext = (Map<String, String>) context;
if (mdcContext != null) {
MDC.setContextMap(mdcContext);
}
} else {
log.debug("[setCurrentContext] No MDC context to restore.");
}
}
can someone me with the missing peice