Hi
I am trying to synchronize the workflow resources, say create a critical section in which only one workflow can visit at a time.
Trying to limit the worker max_concurrent_workflow_tasks, max_cached_workflows and max_concurrent_activities doesn’t work. As an example code bottom.
BTW: I am googling the topic and get one samples-go/mutex at main · temporalio/samples-go · GitHub, but it is written in go. Not sure if Python SDK can do similar work.
Your help is highly appreciated!
import asyncio
import threading
import time
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from datetime import timedelta
from temporalio import activity, workflow
from temporalio.client import Client
from temporalio.worker import Worker
lock = asyncio.Lock()
@dataclass
class ComposeGreetingInput:
greeting: str
name: str
@activity.defn
async def compose_greeting(input: ComposeGreetingInput) -> str:
# We'll wait for 3 seconds, heartbeating in between (like all long-running
# activities should do), then return the greeting
for _ in range(0, 5):
print(f"{input.name} Heartbeating activity on thread {threading.get_ident()}")
activity.heartbeat()
await asyncio.sleep(10)
return f"{input.greeting}, {input.name}!"
@workflow.defn
class GreetingWorkflow:
@workflow.run
async def run(self, name: str) -> str:
# Lock the critical section
# Expect the below activity executions can not be interruptable.
# And asyncio lock doesn't work.
async with lock:
ret = await workflow.execute_activity(
compose_greeting,
ComposeGreetingInput("Hello", name),
start_to_close_timeout=timedelta(seconds=100),
heartbeat_timeout=timedelta(seconds=20),
)
return await workflow.execute_activity(
compose_greeting,
ComposeGreetingInput("Hello", name),
start_to_close_timeout=timedelta(seconds=100),
heartbeat_timeout=timedelta(seconds=20),
)
async def main():
# Start client
client = await Client.connect("localhost:7233")
# Run a worker for the workflow
worker = Worker(
client,
task_queue="hello-activity-task-queue",
workflows=[GreetingWorkflow],
activities=[compose_greeting],
max_concurrent_workflow_tasks=1,
max_cached_workflows=0,
max_concurrent_activities=1,
max_concurrent_workflow_task_polls=1,
max_concurrent_activity_task_polls=1,
)
await worker.run()
if __name__ == "__main__":
asyncio.run(main())