How to pass headers set in Workflow Outbound Interceptors to Activity Outbound Interceptors

Hi Team,

I have written a Client Interceptor that extends the OutboundInterceptor. I have overridden start_workflow method in this Client Interceptor. I am setting some headers input.headers (StartWorkflowInput) in this method before the start of the execution of the workflow.

I have written a Workflow Outbound Interceptor that extends the WorkflowOutboundInterceptor. I have overridden start_activity method in this Workflow Outbound Interceptor. I want to read the headers set in the above interceptor and append some new headers to the existing headers.

The problem I am facing is, in the start_activity method input.headers are empty({}).

Client Interceptor:

async def start_workflow(
        self, input: StartWorkflowInput
    ) -> WorkflowHandle[Any, Any]:
                header_input = "some_value"
		header_payload = self.payload_converter.to_payloads(
            [header_input]
        )[0]
        input.headers = {
            **input.headers,
            "myKey": header_payload,
        }
        return await super().start_workflow(input)

Workflow Outbound Interceptor:

def start_activity(self, input: StartActivityInput) -> workflow.ActivityHandle:
        interceptor_traces.append(("workflow.start_activity", input))
		my_data = PayloadConverter.default.from_payloads([input.headers.get("myKey")])[0]  <--- Error as input.headers is {}
		response = super().start_activity(input)
        return response

The headers would appear on the WorkflowInboundInterceptor in execute_workflow. It’s up to you to take them there and put them somewhere (e.g. contextvars) if you want to put them on other outbound headers. You set headers on outbound and receive them on inbound, but we don’t automatically forward them from inbound to outbound, you have to choose to do that.

Thank You @Chad_Retz.
Headers set in the Outbound Interceptor (start_workflow) were available in the Inbound Interceptor (execute_workflow).
To use these headers inside the WorkflowOutboundInterceptor (start_activity), I used the contextvars like you mentioned above.

I stored the headers in the contextvars inside the execute_workflow method. Inside start_activity, I read those headers.

I hope this will not cause any problems while executing multiple activities and child workflows.

This should cause no problems. This is exactly how OTel (and our OTel interceptor work)

Hi @Chad_Retz ,

I have written the WorkflowOutboundInterceptor that overrides the start_activity and start_child_workflow methods.

def start_activity(self, input: StartActivityInput) -> workflow.ActivityHandle:
        decoded_header = PayloadConverter.default.from_payloads(
            [input.headers["allData"]]
        )[0]
        value = decoded_header["key1"]

I also have written the WorkflowInboundInterceptor that overrides the execute_workflow.

There are some input headers that the start_activity method expects. I would like to know how to write the unit tests for this piece of code.

I have written a sample workflow to test this functionality. But I don’t know how to pass headers to activity while executing workflow.

Please provide me with a way to pass headers to an activity.

You should mutate the StartActivityInput.headers field before passing to the next interceptor (or make a new StartActivityInput dataclass via dataclasses.replace with the additional headers appended or however you want to do it). We don’t allow this direct from a workflow because it gets abused, but what many do is use contextvars in the workflow that are then visible in this interceptor. That is how our OpenTelemetry integration works.

Could you please provide me with an example to understand this better? I have a workflow w1 and an activity a1. While executing the workflow, I want to pass input along with some headers.

We do not have a sample for this specifically, but I have opened an issue to add one. In the meantime, the OpenTelemetry integration is an advanced example of using headers and it uses contextvars inside the OTel library (so it’s not explicit here, but the Python docs on contextvars should make it clear how to use).