How to synchronize workflow resources in Python SDK

Hi

I am trying to synchronize the workflow resources, say create a critical section in which only one workflow can visit at a time.

Trying to limit the worker max_concurrent_workflow_tasks, max_cached_workflows and max_concurrent_activities doesn’t work. As an example code bottom.

BTW: I am googling the topic and get one samples-go/mutex at main · temporalio/samples-go · GitHub, but it is written in go. Not sure if Python SDK can do similar work.

Your help is highly appreciated!

import asyncio
import threading
import time
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from datetime import timedelta

from temporalio import activity, workflow
from temporalio.client import Client
from temporalio.worker import Worker

lock = asyncio.Lock()

@dataclass
class ComposeGreetingInput:
    greeting: str
    name: str


@activity.defn
async def compose_greeting(input: ComposeGreetingInput) -> str:
    # We'll wait for 3 seconds, heartbeating in between (like all long-running
    # activities should do), then return the greeting
    for _ in range(0, 5):
        print(f"{input.name} Heartbeating activity on thread {threading.get_ident()}")
        activity.heartbeat()
        await asyncio.sleep(10)
    return f"{input.greeting}, {input.name}!"


@workflow.defn
class GreetingWorkflow:
    @workflow.run
    async def run(self, name: str) -> str:
        # Lock the critical section
        # Expect the below activity executions can not be interruptable. 
        # And asyncio lock doesn't work.
        async with lock:
            ret = await workflow.execute_activity(
                compose_greeting,
                ComposeGreetingInput("Hello", name),
                start_to_close_timeout=timedelta(seconds=100),
                heartbeat_timeout=timedelta(seconds=20),
            )
            return await workflow.execute_activity(
                compose_greeting,
                ComposeGreetingInput("Hello", name),
                start_to_close_timeout=timedelta(seconds=100),
                heartbeat_timeout=timedelta(seconds=20),
            )

async def main():
    # Start client
    client = await Client.connect("localhost:7233")

    # Run a worker for the workflow
    worker = Worker(
        client,
        task_queue="hello-activity-task-queue",
        workflows=[GreetingWorkflow],
        activities=[compose_greeting],
        max_concurrent_workflow_tasks=1,
        max_cached_workflows=0,
        max_concurrent_activities=1,
        max_concurrent_workflow_task_polls=1,
        max_concurrent_activity_task_polls=1,
    )
    await worker.run()

if __name__ == "__main__":
    asyncio.run(main())

What is the maximum lock/unlock rate for the mutex?

the lock/unlock critical section is expected to run some activities, and overall should take from a minute to 30 minutes.

Then workflow based mutex as in the go sample should work fine.

I don’t think we have a Python sample for this

1 Like

Thank you Maxim. I will try to rewrite the mutex go example samples-go/mutex at main · temporalio/samples-go · GitHub in Python. Hopefully it could work. Before that, just want to estimate the feasibility since different underneath mechanism between go and python, say, go example leverages channel and side effect while there is no same part in Python, and Python leverages asyncio and go may be different. Thanks

The SDKs are feature compatible. They just use language specific idioms. Python has asyncio when Go has routines and channels

Don’t initialize the client inside an activity. A Client is a heavyweight object that manages external connections. It should always be instantiated once per process.

After some timeout, I would exit the mutex workflow on the empty signal queue. Another option is to keep it running forever and call continue-as-new periodically.

Thanks Maxim. Updated code with singleton client manager, and exit mutexworkflow with timeout