import asyncio
import threading
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from datetime import timedelta
from temporalio import activity, workflow
from temporalio.client import Client
from temporalio.worker import Worker
from temporalio.common import RetryPolicy
class ClientManager:
_instance = None
def __init__(self):
if ClientManager._instance is not None:
raise Exception("This class is a singleton!")
ClientManager._instance = self
@staticmethod
async def get_client():
if ClientManager._instance is None:
ClientManager._instance = ClientManager()
ClientManager._instance.client = await Client.connect("localhost:7233")
return ClientManager._instance.client
@dataclass
class ComposeGreetingInput:
greeting: str
name: str
@dataclass
class SignalWFInput:
wf_id: str
run_id: str
ns: str
timeout: int
@activity.defn
async def compose_greeting(input: ComposeGreetingInput) -> str:
# We'll wait for 3 seconds, heartbeating in between (like all long-running
# activities should do), then return the greeting
for _ in range(0, 2):
print(f"{input.name} Heartbeating activity on thread {threading.get_ident()}")
activity.heartbeat()
await asyncio.sleep(3)
return f"{input.greeting}, {input.name}!"
class Mutex:
def __init__(self, workflow_id: str, run_id: str, lock_namespace: str):
self.workflow_id = workflow_id
self.run_id = run_id
self.lock_namespace = lock_namespace
self._lock_ready = False
self._exit = False
self._mutex_wf_id = ""
async def lock(self, timeout: int = 0) -> None:
self._lock_ready = False
# start a new workflow to do the locking
self._mutex_wf_id = await workflow.execute_local_activity(
request_lock_activity,
args = [self.lock_namespace, self.workflow_id, self.run_id, timeout],
start_to_close_timeout=timedelta(seconds=10),
retry_policy=RetryPolicy(
initial_interval=timedelta(seconds=1),
backoff_coefficient=2.0,
maximum_interval=timedelta(seconds=60),
maximum_attempts=5
),
)
# wait the lock_ready_signal
await workflow.wait_condition(
lambda: self._lock_ready or self._exit
)
if self._exit:
workflow.logger.info(f"Mutex lock receive exit signal")
return
async def unlock(self) -> None:
handle = workflow.get_external_workflow_handle(workflow_id = self._mutex_wf_id)
# send the unlock signal
await handle.signal("unlock_signal", SignalWFInput(self.workflow_id, self.run_id, self.lock_namespace))
def set_lock_ready(self):
if not self._lock_ready:
self._lock_ready = True
def set_exit(self):
if not self._exit:
self._exit = True
@activity.defn
async def request_lock_activity(ns: str, cid: str, crid: str, timeout: int) -> str:
# Start client
client = await ClientManager.get_client()
mutex_workflow_id = f"mutex:{ns}"
mutex_wf_handle = await client.start_workflow(
MutexWorkflow.run,
id = mutex_workflow_id,
task_queue="hello-activity-task-queue",
retry_policy=RetryPolicy(
initial_interval=timedelta(seconds=1),
backoff_coefficient=2.0,
maximum_interval=timedelta(seconds=60),
maximum_attempts=5
),
start_signal="request_lock_signal",
start_signal_args=[cid, crid, ns, timeout],
)
return mutex_wf_handle.id
def new_mutex(workflow_id: str, run_id: str, lock_namespace: str) -> Mutex:
return Mutex(workflow_id, run_id, lock_namespace)
@workflow.defn
class MutexWorkflow:
TIMEOUT_ERROR = 1
def __init__(self):
self._lock_queue: asyncio.Queue[SignalWFInput] = asyncio.Queue()
self._unlock_queue: asyncio.Queue[SignalWFInput] = asyncio.Queue()
self._exit = False
async def wait_signal(self, timeout: int = 0) -> int:
if timeout != 0:
try:
await workflow.wait_condition(
lambda: not self._unlock_queue.empty() or self._exit,
timeout = timedelta(seconds=timeout),
)
except asyncio.TimeoutError:
workflow.logger.info(f"unlockTimeout {timeout} exceeded")
return self.__class__.TIMEOUT_ERROR
else:
await workflow.wait_condition(
lambda: not self._unlock_queue.empty() or self._exit,
)
return 0
@workflow.run
async def run(self) -> None:
cwf = workflow.info()
mutex_workflow_id = cwf.workflow_id
mutex_run_id = cwf.run_id
while True:
if self._exit:
workflow.logger.info(f"MutexWorkflow exit")
break
# wait the sender workflow signal
workflow.logger.info(f"MutexWorkflow wait signal")
try:
await workflow.wait_condition(
lambda: not self._lock_queue.empty() or self._exit,
timeout = timedelta(seconds=10),
)
except asyncio.TimeoutError:
workflow.logger.info(f"MutexWorkflow idle more than 10s, exit")
self._exit = True
if self._exit:
workflow.logger.info(f"MutexWorkflow exit")
break
wf_input = self._lock_queue.get_nowait()
# get the sender workflow handle
handle = workflow.get_external_workflow_handle(workflow_id = wf_input.wf_id, run_id = wf_input.run_id)
# send the lock ready signal
await handle.signal("enter_lock_signal", mutex_workflow_id)
# wait the sender workflow send unlock signal
ret = await self.wait_signal(wf_input.timeout)
if ret == self.__class__.TIMEOUT_ERROR:
workflow.logger.info(f"{wf_input.wf_id}, {wf_input.run_id} timeout exceeded , ignore it")
continue
workflow.logger.info(f"MutexWorkflow got unlock signal from {unlock_wf_input.wf_id}, {unlock_wf_input.run_id}")
@workflow.signal
async def request_lock_signal(self, sender_workflow_id: str, sender_run_id: str, ns: str, timeout) -> None:
workflow.logger.info(f"MutexWorkflow receives request_lock_signal from {sender_workflow_id}, {sender_run_id} {ns}")
await self._lock_queue.put(SignalWFInput(sender_workflow_id, sender_run_id, ns, timeout))
@workflow.signal
async def unlock_signal(self, input: SignalWFInput) -> None: # sender_workflow_id: str, sender_run_id: str, lock_namespace: str) -> None:
workflow.logger.info(f"MutexWorkflow receives unlock_signal from {input.wf_id}, {input.run_id}, {input.ns}")
await self._unlock_queue.put(input)
@workflow.signal
def exit(self):
self._exit = True
@workflow.defn
class GreetingWorkflow:
def __init__(self):
self.mutex = None
@workflow.run
async def run(self, name: str) -> str:
cwf = workflow.info()
current_workflow_id = cwf.workflow_id
current_run_id = cwf.run_id
self.mutex = new_mutex(current_workflow_id, current_run_id, "default")
await self.mutex.lock()
workflow.logger.info(f"{current_workflow_id} {current_run_id} resource locked")
# Lock the critical section
ret = await workflow.execute_activity(
compose_greeting,
ComposeGreetingInput("Hello", name),
start_to_close_timeout=timedelta(seconds=100),
heartbeat_timeout=timedelta(seconds=20),
)
ret = await workflow.execute_activity(
compose_greeting,
ComposeGreetingInput("Hello", wf_input.name),
start_to_close_timeout=timedelta(seconds=100),
heartbeat_timeout=timedelta(seconds=20),
)
await self.mutex.unlock()
workflow.logger.info(f"{current_workflow_id} {current_run_id} finished")
return ret
@workflow.signal
async def enter_lock_signal(self, mutex_wf_id: str) -> None:
if self.mutex is not None:
self.mutex.set_lock_ready()
else:
workflow.logger.info("self.mutex is None, can not set lock ready")
async def main():
# Start client
client = await ClientManager.get_client()
# Run a worker for the workflow
worker = Worker(
client,
task_queue="hello-activity-task-queue",
workflows=[GreetingWorkflow, MutexWorkflow],
activities=[compose_greeting, request_lock_activity],
)
await worker.run()
if __name__ == "__main__":
asyncio.run(main())