Hi. I want to add some activities implemented in Python so that I can use them from my typescript workflows. I don’t have much experience with Python, so I have some questions:
Most of my activities are synchronous, and they’re registered as instance methods because they have dependencies that need to be injected at construction time. So, I must pass an activity_executor to my worker.
Using ThreadPoolExecutor, is there any way to pass a different instance to each thread or is it assumed that they must be thread-safe and share no internal state? Because I have some 3rd party classes that are injected into the classes that define the activities but they aren’t thread-safe, so they break when being called from multiple threads. So, I should either synchronize their access or limit the concurrency to a single activity at a time?
My concurrency issues seem to be gone when using a ProcessPoolExecutor. I believe my instances state are being deep copied using pickle and then delivered to each spawned process. Now they are independent and share no state. So, it seems like this achieves what I need. However, the documentation says that threads are preferred for several reasons, but I’m not sure I understand this:
Users should prefer threaded activities over multiprocess ones since, among other reasons, threaded activities can raise on cancellation.
They should be thread safe, but they can share thread-safe state.
Whether you limit to a single activity at a time across the entire worker is often more dependent on what your worker can handle resource-wise. I would recommend synchronizing access.
If your objects support copying for each use, why not do that as the first step in your threaded activities? For most non-thread-safe objects, if copy-via-pickle magically makes them appear thread safe, there’s likely undefined dangerous behavior that will happen within. I’d have to understand what kind of objects they are to give better guidance. If they’re third party, don’t assume copy-via-pickle makes them thread safe unless the docs strangely say that is an accepted approach.
From the README:
By default, cancellation of a synchronous multithreaded activity is done via a
temporalio.exceptions.CancelledError thrown into the activity thread.
So we can “inject” an exception on cancellation for threaded activities, but we can’t for multiprocess, so you’ll have to manually watch for cancellation via some methods on the activity module. And in general multiprocess is heavier than mulithreaded and most of the time unnecessary. Don’t forget to heartbeat and set a heartbeat timeout unless it’s essentially an immediate-completion activity.
These are existing service classes that have dependencies injected via a DI framework. And now I want to use them with temporal, so I’m just adding a decorator to register them.
I’ve noticed some activities failing due to some issue with nltk lazy loading corpuses but I managed to solve it by preloading them before I create the worker. Still, I’m not sure about the thread safety of nltk, and other libraries that I’m using, so I’ll have to continue testing.
I will rule out multiprocessing and go the multithreading route as recommended. I think I can inject thread-safe variants with locks in case some concurrency issue comes up. I’m not really concerned about performance for these activities, and in case it’s an issue I think I can just spin up more worker instances that consume from the same task queue.
Thank you so much for your help.