Hi Team,
I have written a Client Interceptor that extends the OutboundInterceptor
. I have overridden start_workflow
method in this Client Interceptor. I am setting some headers input.headers
(StartWorkflowInput
) in this method before the start of the execution of the workflow.
I have written a Workflow Outbound Interceptor that extends the WorkflowOutboundInterceptor
. I have overridden start_activity
method in this Workflow Outbound Interceptor. I want to read the headers set in the above interceptor and append some new headers to the existing headers.
The problem I am facing is, in the start_activity
method input.headers
are empty({}
).
Client Interceptor:
async def start_workflow(
self, input: StartWorkflowInput
) -> WorkflowHandle[Any, Any]:
header_input = "some_value"
header_payload = self.payload_converter.to_payloads(
[header_input]
)[0]
input.headers = {
**input.headers,
"myKey": header_payload,
}
return await super().start_workflow(input)
Workflow Outbound Interceptor:
def start_activity(self, input: StartActivityInput) -> workflow.ActivityHandle:
interceptor_traces.append(("workflow.start_activity", input))
my_data = PayloadConverter.default.from_payloads([input.headers.get("myKey")])[0] <--- Error as input.headers is {}
response = super().start_activity(input)
return response
The headers would appear on the WorkflowInboundInterceptor
in execute_workflow
. It’s up to you to take them there and put them somewhere (e.g. contextvars
) if you want to put them on other outbound headers. You set headers on outbound and receive them on inbound, but we don’t automatically forward them from inbound to outbound, you have to choose to do that.
Thank You @Chad_Retz.
Headers set in the Outbound Interceptor (start_workflow
) were available in the Inbound Interceptor (execute_workflow
).
To use these headers inside the WorkflowOutboundInterceptor
(start_activity
), I used the contextvars
like you mentioned above.
I stored the headers in the contextvars
inside the execute_workflow
method. Inside start_activity
, I read those headers.
I hope this will not cause any problems while executing multiple activities and child workflows.
This should cause no problems. This is exactly how OTel (and our OTel interceptor work)
Hi @Chad_Retz ,
I have written the WorkflowOutboundInterceptor
that overrides the start_activity
and start_child_workflow
methods.
def start_activity(self, input: StartActivityInput) -> workflow.ActivityHandle:
decoded_header = PayloadConverter.default.from_payloads(
[input.headers["allData"]]
)[0]
value = decoded_header["key1"]
I also have written the WorkflowInboundInterceptor
that overrides the execute_workflow
.
There are some input headers that the start_activity
method expects. I would like to know how to write the unit tests for this piece of code.
I have written a sample workflow to test this functionality. But I don’t know how to pass headers to activity while executing workflow.
Please provide me with a way to pass headers to an activity.
You should mutate the StartActivityInput.headers
field before passing to the next interceptor (or make a new StartActivityInput
dataclass via dataclasses.replace
with the additional headers appended or however you want to do it). We don’t allow this direct from a workflow because it gets abused, but what many do is use contextvars in the workflow that are then visible in this interceptor. That is how our OpenTelemetry integration works.
Could you please provide me with an example to understand this better? I have a workflow w1 and an activity a1. While executing the workflow, I want to pass input along with some headers.
We do not have a sample for this specifically, but I have opened an issue to add one. In the meantime, the OpenTelemetry integration is an advanced example of using headers and it uses contextvars
inside the OTel library (so it’s not explicit here, but the Python docs on contextvars
should make it clear how to use).