Python support for dependency injection (pinject)

In order to use Temporal in our systems, we need the ability to inject dependencies such as grpc connections, db connections, kafka connections, auth data into temporal workflows and activities, using pinject, that are scoped at different levels (global, customer specific).

Here’s an example:
Say you have a python class to fetch customer scoped data from redis:

from pinject import new_object_graph
import redis

class CustomerDataAccess:
    def __init__(self, customer_partition):
       # this is simplified. IRL there would be nested dependencies injected
        self.customer_partition = customer_partition
        self.redis_client = redis.Redis()

    def get_data(self):
        # Access data from the specified customer partition in Redis
        data = self.redis_client.get(self.customer_partition)
        return data

and then you have a workflow and an activity that needs this dependency:

# Define the Workflow interface
class CustomerWorkflow:
    @workflow_method(task_queue="customer-data")
    def get_customer_data(self) -> str:
        pass

# Implement the Workflow
class CustomerWorkflowImpl(CustomerWorkflow):
    def __init__(self, customer_data_access: CustomerDataAccess):
        self.data_access = customer_data_access

    def get_customer_data(self) -> str:
        # Access data from a specific customer partition using the Activity
        data = self.data_access.get_data("customer1")
        # do business logic on this data
        return result

Can this be achieved with the current temporal sdk? This would be a very important consideration for us to move onto this platform as a company.

You would never access something external from a workflow, you would use an activity.

So you could just add @activity.defn to get_data like so:

from pinject import new_object_graph
import redis
from temporalio import activity

class CustomerDataAccess:
    def __init__(self, customer_partition):
       # this is simplified. IRL there would be nested dependencies injected
        self.customer_partition = customer_partition
        self.redis_client = redis.Redis()

    @activity.defn
    def get_data(self):
        # Access data from the specified customer partition in Redis
        data = self.redis_client.get(self.customer_partition)
        return data

This assumes whatever is in data is JSON serializable by default. Then you would instantiate your CustomerDataAccess class however needed when you create your worker and register the activity with a Worker param like activities=[my_customer_data_access_instance.get_data]. Then your workflow might look like:

from datetime import timedelta
from temporalio import workflow
with workflow.unsafe.imports_passed_through():
    from .my_other_package import CustomerDataAccess

@workflow.defn
class CustomerWorkflow:
    @workflow.run
    async def get_customer_data(self) -> str:
        # Access data from a specific customer partition using the Activity
        data = await workflow.execute_activity_method(
            CustomerDataAccess.get_data,
            start_to_close_timeout=timedelta(minutes=5),
        )
        # do business logic on this data
        return result

There is no @workflow_method. Make sure you are using GitHub - temporalio/sdk-python: Temporal Python SDK and see the README there for more details.

@Chad_Retz - Thanks for the response. In your code example, how would the customer_partition be passed to CustomerDataAccess when the workflow is executed? In my example, pinject (google’s dependency injection framework) handles this aspect.

This is done at worker create for activities and is unrelated to a workflow. If you want something in your activity to be specific to something in a workflow, have that workflow pass that information as activity arguments.

I don’t know anything about pinject, but from a quick read you’d have something like this when you create your worker:

obj_graph = pinject.new_object_graph()
# ...
customer_data_access = obj_graph.provide(CustomerDataAccess)
my_worker = Worker(
    client,
    task_queue="my-task-queue",
    activities=[customer_data_access.get_data],
    workflows=[CustomerWorkflow],
)

If you needed this per activity instead, you could instead create your class with obj_graph passed to __init__ and then call provide from inside the activity to get what you need (possibly using arguments passed from the workflow to constrain what is asked for). Activities are just your code, so do whatever you think is best. Workflows are constrained, isolated pieces of business logic so they only call activities and child workflows.

I see. Following this approach, in order to inject customer-scoped dependencies, we’d then want to create workers per customer and instantiate them with customer-scoped objects. Does that make sense?

I wouldn’t do it that way, I’d still create a single worker. Can you show how you create customer-scoped objects with pinject today? Do you create an object graph per customer or use custom scopes? And if it’s a custom scope, how are you choosing to pass runtime values to that decorator? Whatever you would do to obtain the CustomerDataAccess instance in a normal script, you should do inside the activity. An activity function is just like any other function/code.

We actually create an object graph today per customer in our services, which also reflects in our setup of a grpc servicer (instantiated with this object graph) per customer. I’m thinking of extending this to the notion of workers/customer.

We have a a similar issue. In our case, we are contemplating creating same activities objects for every customer scope by changing their names (prefix with customer). I don’t know if its a solid approach or if there was a better way to do this.

I would usually recommend a single activity (object) based on what it does, and have the activity method do what it needs to do different by input depending on that input.