Correct, the Python SDK starts a new thread and event loop per activity worker so you cannot do initialization of await-able objects in the constructor because they would be attached to the “main” event loop rather than the activity thread’s event loop.
A few possible solutions :
1.) Within the activity method, you can check whether the resources are initialized and initialize them if they aren’t. Since, the activity instance is a singleton, future calls to the activity will reuse the resources e.g.
class MyActivities:
def __init__(self):
self.connection = None
def my_activity_function(self):
if not self.connection:
self.connection = create_connection(...........)
2.) Run workflow and activities in the main event loop as you propose – this is theoretically possible, I think I opted against this at first because I wanted it to be possible to write non async activity methods but I think that requirement might be a thing of the past – the idea here is that if the activity thread owns the event loop then it’s free to block as it wishes — it won’t prevent other code from running.
3.) Modify the Python SDK to invoke a constructor-like method of the activity instance when the activity thread is started e.g. post_init()
or init_activity_instance()
or init_async()
or on_attached()
— I wouldn’t mind making this change to the library if I can settle on the method name to use.
4.) Introduce a method in the Worker that allows us to pass the arguments to the activity class’s constructor and have the Worker instantiate the activity class on the activity worker’s thread e.g.
worker.register_activity_class(MyActivities, arg1, arg2, arg3).
Then the Python SDK will invoke the constructor MyActivities(arg1, arg2, arg3)
on the activity thread.
@Chen_Wan let me know what you think.
I’m actually struggling with an almost identical issue. I’m implementing DataConverter
and the way it is done in the Java library is that the DataConverter
is passed to WorkflowClient which is then passed to the Worker.
So the problem here is that in the Python SDK, WorkflowClient eagerly initializes the WorkflowServiceStub so we cannot pass the WorkflowServiceStub from WorkflowClient to the activity and workflow threads (at the moment there is no relationship between WorkflowClient and Workers).
Thinking out loud here, 2.) would actually solve both problems with the trade off that activity methods would have to be async
— and using promises and threads we can actually work around that limitation as well and implement a layer on top of this that supports non-async activities.
One problem with using a single event loop is that badly written activities could starve the the workflow method from executing and meeting its execution time requirements.
Edit Dec 14: 2.) Is looking like the best option to me.