Typescript - 1.9.0 - inject context log attributes across workflow and activities

Hi!
I’m trying to add some metadata logs that will be attached to every log in workflow/activities.
Implemented this interceptor:

export const extraLogAttributes: Record<string, any> = {};

export class LogAttributesWorkflowInterceptor
  implements WorkflowOutboundCallsInterceptor
{
  getLogAttributes(): Record<string, any> {
    const { workflowId, workflowType, taskQueue, namespace } = workflowInfo();
    return {
      workflowId,
      workflowType,
      taskQueue,
      namespace,
      ...extraLogAttributes,
    };
  }

  async scheduleActivity(input, next) {
    return await next({
      ...input,
      headers: {
        MDC: defaultPayloadConverter.toPayload(this.getLogAttributes()),
      },
    });
  }
}

This allows me to add to extraLogAttributes object at workflow runtime (each workflow has its own sandbox, right?)
And every log inside the workflow has the added log attributes.

Tried to do the same for activities, and tried to add headers as suggested first:

export class ActivityInboundIntercept implements ActivityInboundCallsInterceptor {
  async execute(
    input: ActivityExecuteInput,
    next: Next<ActivityInboundCallsInterceptor, 'execute'>,
  ): Promise<unknown> {
    const encoded = Object.keys(input.headers).reduce((acc, key) => {
      acc[key] = defaultPayloadConverter.fromPayload(input.headers[key]);
      return acc;
    }, {});
    return await next({
      ...input,
      args: [...input.args, encoded],
    });
  }
}

But I can’t “catch” / add the headers inside the getLogAttribute of ActivityOutboundCallsInterceptor:

export class LogAttributesActivityInterceptor
  implements ActivityOutboundCallsInterceptor
{
  constructor(public readonly context: Context) {}

  getLogAttributes(
    input: GetLogAttributesInput,
    next: Next<this, 'getLogAttributes'>,
  ) {
    console.log({ input }); // does not include headers
    return {}; // want here to return the same log attributes as in the workflow
  }
}

This is how I added those interceptors to my worker:

interceptors: {
      activity: [
        () => ({
          inbound: new ActivityInboundIntercept(),
        }),
        (ctx) => ({
          outbound: new LogAttributesActivityInterceptor(ctx),
        }),
      ],
      workflowModules: [require.resolve('./interceptors')], // workflow interceptors
}

So my question is: how to do it?

This is the solution I ended up finding, feels a bit of a cheat, as I add headers attribute on context, which doesn’t have it by default, but it does work:
export class ActivityInboundHeadersInterceptor
implements ActivityInboundCallsInterceptor
{
constructor(
public readonly context: Context & { headers?: Record<string, any> },
) {}

async execute(
input: ActivityExecuteInput,
next: Next<ActivityInboundCallsInterceptor, ‘execute’>,
): Promise {
this.context.headers = Object.keys(input.headers).reduce((acc, key) => {
acc[key] = defaultPayloadConverter.fromPayload(input.headers[key]);
return acc;
}, {});
return await next(input);
}
}

Is it the best solution out there?

I’m working on a complete Context Propagation sample right now. I will share as soon as I’m done.

Sorry, was really eager to solve this :sweat_smile::sweat_smile:
I’ll wait patiently.

Sorry, was really eager to solve this :sweat_smile::sweat_smile:

No problem. What you have is a perfectly valid approach.

Here’s a rough take at a general purpose context propagation sample. I only did minimal testing and unfortunately don’t have time to refine more at the moment, but it seems to work.

Thanks for such details example!
It seems to be working well for me, thank you once again!