Invoking coroutine in Activity (Python SDK)

I have a newbie question about python-sdk usage regarding invoking coroutines inside an activity method.

My first attempt was to declare the activity method as asyncso I can use await. That approach failed with a coroutine-cannot-be-serialized-to-JSON exception.

In the second approach I put the coroutine call in a separate local function and invoked it from the activity method with That seemed to work. The code looks like this:

 class MyActivity:
   def my_func():
   class MyActivityImpl(MyActivity):
   def my_func():
       async def fn():
           await async_http_client.init_session()
  1. Is there a canonical way to invoke co-routines in an activity method?
  2. While using works, it’s not ideal in my case. The coroutines I used maintains a connection pool and it’s more efficient to initialize it once and keep it around, and re-use it. The would terminate the event-loop at the end, which makes the resources inaccessible. I would like to keep the resources accessible for the duration of a workflow, but not sure how to accomplish that.
1 Like

Hey, I will have to add @firdaus because I’m genuinely not sure about Python.

In the temporal sdk , the @activity_method implementation itself can be async def so you can await directly within it.

In fact, I might remove support for non async activity method implementations all together forcing all to be async def.

Hi @firdaus. You’re absolutely correct that the Temporal Python SDK’s @activity_method indeed supports async def. I re-checked my code and realized the problem was with Cadence’s Python SDK. After some code cleanup and migrated everything to Temporal, the await calls work as expected. My apologies for the confusing question; it’s definitely a rookie mistake on my end.

Now, regarding my question about keeping resources available for the activities for the duration of the workflow - my use case requires activities updating records in a Cassandra db and the Cassandra driver client recommends setting up one connection session for the app’s runtime to avoid expensive reconnects to the Cassandra nodes. I was thinking that the worker might be a good place to house the Cassandra client. However, I haven’t figured out a way to access the client from the activities. For prototyping purposes, I am reinitializing the client in the activity method, which is fine for small numbers of workflows, but it won’t likely to scale. Do you have any recommendations on how to handle resources that need to live for the duration of the workflow? Or perhaps there are other ways to accomplish it and I’m going about it in the wrong way?

Can you pass your dependencies to the activity object before registering it with the worker?

Hi @maxim. For data structures, passing them to the activity object before registering works. The issue is with the asyncio based clients/libraries (e.g, cassandra, pg & amqp clients) which require an event loop. If I initialize those clients before registering the activity object, the clients are tied to the main thread’s event loop. After activity registration, I believe the Python SDK would start a new thread and run an activity event loop to execute activities. Then calling the clients in the activities would result in an error (RuntimeError: Event loop is closed).

My current workaround is to have an init activity method for initialization of all the asyncio based clients, and calls the init activity at the beginning of the workflow. That way the initialization runs on the activity thread.

async def my_workflow(self, param):
await self.activities.init()
await self.activities.dosomething(param)
await self.activities.close()

This approach is inefficient because the clients are re-initialized every workflow execution. Ideally, the clients should initialize only once for the whole lifetime of the worker.

I was wondering if the SDK can take the approach of using the main asyncio event loop in the activity thread. I have seen a couple of examples demonstrating that technique. Personally, I haven’t tried it so not sure if it’d really work.

Correct, the Python SDK starts a new thread and event loop per activity worker so you cannot do initialization of await-able objects in the constructor because they would be attached to the “main” event loop rather than the activity thread’s event loop.

A few possible solutions :

1.) Within the activity method, you can check whether the resources are initialized and initialize them if they aren’t. Since, the activity instance is a singleton, future calls to the activity will reuse the resources e.g.

class MyActivities:
   def __init__(self):
       self.connection = None

   def my_activity_function(self):
       if not self.connection:
            self.connection = create_connection(...........)

2.) Run workflow and activities in the main event loop as you propose – this is theoretically possible, I think I opted against this at first because I wanted it to be possible to write non async activity methods but I think that requirement might be a thing of the past – the idea here is that if the activity thread owns the event loop then it’s free to block as it wishes — it won’t prevent other code from running.

3.) Modify the Python SDK to invoke a constructor-like method of the activity instance when the activity thread is started e.g. post_init() or init_activity_instance() or init_async() or on_attached() — I wouldn’t mind making this change to the library if I can settle on the method name to use.

4.) Introduce a method in the Worker that allows us to pass the arguments to the activity class’s constructor and have the Worker instantiate the activity class on the activity worker’s thread e.g.

worker.register_activity_class(MyActivities, arg1, arg2, arg3).

Then the Python SDK will invoke the constructor MyActivities(arg1, arg2, arg3)
on the activity thread.

@Chen_Wan let me know what you think.

I’m actually struggling with an almost identical issue. I’m implementing DataConverter and the way it is done in the Java library is that the DataConverter is passed to WorkflowClient which is then passed to the Worker.

So the problem here is that in the Python SDK, WorkflowClient eagerly initializes the WorkflowServiceStub so we cannot pass the WorkflowServiceStub from WorkflowClient to the activity and workflow threads (at the moment there is no relationship between WorkflowClient and Workers).

Thinking out loud here, 2.) would actually solve both problems with the trade off that activity methods would have to be async — and using promises and threads we can actually work around that limitation as well and implement a layer on top of this that supports non-async activities.

One problem with using a single event loop is that badly written activities could starve the the workflow method from executing and meeting its execution time requirements.

Edit Dec 14: 2.) Is looking like the best option to me.

1 Like

Hi @firdaus. Thanks for the detailed analysis.
Also, thinking out loud:

  1. How do other SDKs (Java, Go) handle this scenario? Perhaps they don’t have the same problem because they don’t deal with the event loop. As long as the resource/client is thread-safe, they can be passed around between threads?
  2. Option 2 makes sense. With this design, does the activity even need to be running on a separate thread? You’re right that a badly written activity can block too long and cause a problem.
  3. Option 3 makes sense too. As long as I can initialize the resources on the activity thread at startup and performs clean up when the activity thread goes away.
  1. How do other SDKs (Java, Go) handle this scenario? Perhaps they don’t have the same problem because they don’t deal with the event loop. As long as the resource/client is thread-safe, they can be passed around between threads?

Java and Go use threads to execute activities and don’t employ any event loops. So any shared resources are expected to be thread safe.

  1. How do other SDKs (Java, Go) handle this scenario? Perhaps they don’t have the same problem because they don’t deal with the event loop. As long as the resource/client is thread-safe, they can be passed around between threads?

When I ported the Python client from Cadence to Temporal the biggest difference was that the Thrift/Tchannel client uses blocking IO whereas the GRPC client I chose (python-betterproto) uses asyncio. So when I was doing this I was thinking about how to map the initial design from cadence-python to “asyncio” and my idea was to stick to having a thread per worker and then just introduce a separate event loop per thread.

So now I’m running into the limitations of that solution (both the issue you’re experiencing and my DataConverter issue) and I think it’s time to settle on something more long term which I believe is Option 2 — which in retrospect is probably what I should have done initially.

I’ll start working on this on my end and I’ll let you know when it’s done - maybe a couple of weeks.

1 Like

@maxim & @firdaus, thanks again for your quick replies. Your timely and concise responses are much appreciated.

@firdaus, with option 2, I’d assume that if I put time.sleep(10) in the activity code, it will block the whole worker for 10 seconds and even the workflow execution would be blocked. I guess it’s the trade-off of using asyncio’s cooperative model that the developers should be aware of.

Yup that is correct, you need to use asyncio.sleep() in activities.

In the future, we can consider an API for writing activities that use blocking IO.

@firdaus. Understood, thanks.
I look forward to the new changes :slightly_smiling_face:

Hi @firdaus,
Is it possible to provide an update on the progress of the refactoring (option 2)? Thanks.

Hello, @Chen_Wan not much progress at the moment. I’m trying to carve out some free afternoons for this but it’s the usual beginning of the year craze for me at the moment.

@firdaus. I completely understand. Your work and efforts are highly appreciated.
If there is anything that I can help with (e.g, testing), please don’t hesitate to let me know.

1 Like

Could you try the latest commit.

@Chen_Wan Have a look at the HelloWorld example in the README. The contract for establishing a connection changed a bit.