How to use Temporal for Machine Learning Workflows

Is there a best practice in terms of using Temporal with ML workflows? If I have an ML model that needs to be instantiated once since it takes a lot of time for this process to happen and I need this instantiated model to be used inside an activity, how do I go about doing this? Do I also use non-async activities if the model will use CPU? GPU?

Also, any ML-related workflow/activity example/s for Python would also be greatly appreciated. Thanks!

Temporal is definitely a good choice for orchestrating ML jobs.

We don’t have a sample specifically for resource-intensive ML limiting yet, but we do have the activity sticky queues sample which shows how to do host-specific activity queues which are useful in this situation.

To limit work, in addition to max-concurrent-activity settings on a worker, you can also use the fact that only one workflow can be present for an ID so you could have only one workflow per expensive resource that delegates work for that resource.

We have plans to make more samples showing these use cases, but they are not available yet.

To add here, yes, you can use non-async activities or you can use async and do something like run_in_executor yourself. Mostly depends on where you will be heartbeating.

Thanks a lot for all the information and answers. And I’m definitely looking forward to seeing more examples for ML for Python! Another question I have is how do I create an instance of an ML model that can be used by a Temporal activity? The current workaround I’m using looks something like this:

ml_model: MLModel | None = None

@activity.defn
def infer(data: ActivityInput) -> ActivityOutput:
   global ml_model

   if ml_model is None:
       ml_model = MLModel()

   output: ActivityOutput = ml_model.infer(data)

   return output

Where MLModel class actually does some fairly slow things during instantiation like maybe downloading some file/s, putting the model into CPU or GPU memory and other things that would take very long if the instance is created every time an activity is called.

I’m also using a ProcessPoolExecutor to create this activity. Thanks again and more power to you and the Temporal team!

P.S. I don’t think the pseudo code is correct nor optimal though so I apologize in advance!

You don’t have to make it global, you can have activities as methods like:

class MyActivities:
    def __init__(self) -> None:
        self.model = MLModel()

    @activity.defn
    def infer(self, data: ActivityInput) -> ActivityOutput:
        # Use self.model here
        ...

Then when creating the worker, you can create an instance as my_activities = MyActivities() and set as worker options like activities=[my_activities.infer] and call from a workflow like await workflow.execute_activity_method(MyActivities.infer, start_to_close_timeout=timedelta(hours=1), heartbeat_timeout=timedelta(minutes=1).

Granted that assumes willingness to eagerly execute which, if you know it will be called on this worker you might as well. But you can change it to be lazy if you want. But just make sure if you do make a non-immediate activity you heartbeat regularly using an interval within the timeout. If you want to do a sync activity w/ lazy model instantiation, make sure that part calls activity.heartbeat regularly while working. If that code cannot have heartbeat within it, consider using an async def activity that heartbeat in one asyncio task and runs your synchronous code in run_in_executor.

Note, while we support multiprocess activities, we may not support every scenario. An example may be here where MyActivities is not picklable. You should only reach for multiprocess if you need, and even then you might want to control ProcessPoolExecutor calls yourself. The only benefit to letting Temporal do it is that we have the infrastructure to support cross-process heartbeat and cancellation notification (but you still have to listen for it). To do anything more advance, you may want just a normal activity where you make multiprocess calls.

Thanks for the detailed explanation! We have a scenario currently where we pass string to start_activity since the code base for both workflow and activities are separate. Is it possible to decorate the method we want to expose with @activity.defn(name="activity-name") and just call the activity name from the workflow (residing in a different place and started in a different worker/process)?

class MyActivities:
    def __init__(self) -> None:
        self.model = MLModel()

    @activity.defn(name="activity-name")
    def infer(self, data: ActivityInput) -> ActivityOutput:
        # Use self.model here
        ...

We like that we can use the name of the activity in the workflow (using start_activity or execute_activity) instead of importing the activity itself in the workflow. Thanks again for the answers!

Sure. You can use the string name when invoking an activity. We just offer the typed form for those that like static type checking. You can even have an empty stub of the activity on the caller side matching the signature on the impl side and use the stub instead of a string name.

So on your workflow side you could have:

@activity.defn(name="activity-name")
def infer(data: ActivityInput) -> ActivityOutput:
    # This is a never-called stub
    ...

@workflow.defn
class MyWorkflow:
    @workflow.run
    async def run(self) -> None:
        await workflow.execute_activity(infer, ActivityInput(...), start_to_close_timeout=...

Thanks again for the clarification! and thanks for the support! Just curious, do you have a timeline for when the ML examples would be published? Thanks!

I do not, but we’d welcome any contribution on samples too. If you have a favorite ML framework free to add. All the Python ML samples would really show is that you can run your own code in activities (like any other code). There is nothing ML-specific about Temporal.

1 Like