I am trying to execute an async function inside my worker interceptor’s start_activity
method. The function calls an API using the aiohttp
library and sends the activity name and some information.
The method start_activity
is synchronous method. And hence, how I am executing my function is:
loop = asyncio.get_event_loop()
resp= loop.run_until_complete(my_function(some_args))
But asyncio.get_event_loop()
is throwing NotImplementedError
How can we do an API call from an interceptor’s start activity method?
Are you referring to WorkflowOutboundInterceptor.start_activity
? This is called when the workflow invokes start (i.e. not when it actually starts) and it is within the context of a workflow, so you cannot make external calls or do anything non-deterministic. If you want to execute something at the beginning of an activity attempt, you can implement ActivityInboundInterceptor.execute_activity
.
Are you referring to WorkflowOutboundInterceptor.start_activity
?
Yes.
I have to implement the API call inside start_activity
only.
The scenario is that we need to validate something about the activities (e.g., execution permission, permission on some resource, etc.), and if we find something wrong, then we need to cancel scheduling that activity and raise some exception.
If it is not possible by default, is there any workaround?
If you must do something on every workflow call to schedule an activity (not to be confused with actually running the activity), yes, WorkflowOutboundInterceptor.start_activity
is acceptable. But the interceptor is just like if you put the lines of code above user’s call to execute_activity
/start_activity
, it is subject to the same workflow constraints. If you need to do something non-deterministic like make an HTTP call, it needs to be done via an activity too.