Hi!
I’m trying to add some metadata logs that will be attached to every log in workflow/activities.
Implemented this interceptor:
export const extraLogAttributes: Record<string, any> = {};
export class LogAttributesWorkflowInterceptor
implements WorkflowOutboundCallsInterceptor
{
getLogAttributes(): Record<string, any> {
const { workflowId, workflowType, taskQueue, namespace } = workflowInfo();
return {
workflowId,
workflowType,
taskQueue,
namespace,
...extraLogAttributes,
};
}
async scheduleActivity(input, next) {
return await next({
...input,
headers: {
MDC: defaultPayloadConverter.toPayload(this.getLogAttributes()),
},
});
}
}
This allows me to add to extraLogAttributes
object at workflow runtime (each workflow has its own sandbox, right?)
And every log inside the workflow has the added log attributes.
Tried to do the same for activities, and tried to add headers as suggested first:
export class ActivityInboundIntercept implements ActivityInboundCallsInterceptor {
async execute(
input: ActivityExecuteInput,
next: Next<ActivityInboundCallsInterceptor, 'execute'>,
): Promise<unknown> {
const encoded = Object.keys(input.headers).reduce((acc, key) => {
acc[key] = defaultPayloadConverter.fromPayload(input.headers[key]);
return acc;
}, {});
return await next({
...input,
args: [...input.args, encoded],
});
}
}
But I can’t “catch” / add the headers inside the getLogAttribute of ActivityOutboundCallsInterceptor:
export class LogAttributesActivityInterceptor
implements ActivityOutboundCallsInterceptor
{
constructor(public readonly context: Context) {}
getLogAttributes(
input: GetLogAttributesInput,
next: Next<this, 'getLogAttributes'>,
) {
console.log({ input }); // does not include headers
return {}; // want here to return the same log attributes as in the workflow
}
}
This is how I added those interceptors to my worker:
interceptors: {
activity: [
() => ({
inbound: new ActivityInboundIntercept(),
}),
(ctx) => ({
outbound: new LogAttributesActivityInterceptor(ctx),
}),
],
workflowModules: [require.resolve('./interceptors')], // workflow interceptors
}
So my question is: how to do it?