Actually. Its running on the specific worker. My bad, there was no log statements to confirm but I checked UI and it is running on specific queues.
Please have a look at following codes :
Lets suppose this is the workflow code…
@workflow.defn
class CreateFoldersWorkflow:
@workflow.run
async def run(self, folder_name: str) -> dict:
# Specify the sites and corresponding activity functions
print("FOLDER CREATION WORKFLOW")
sites = ['lon', 'mtl', 'chn', 'mum']
activity_functions = [create_folder_lon, create_folder_mtl, create_folder_chn, create_folder_mum]
task_queues = ['lon-task-queue','mtl-task-queue','chn-task-queue','mum-task-queue']
# Create a dictionary to store the results for each site
results = {}
# Asynchronously execute activities for each site
for site, activity_func, task_queue in zip(sites, activity_functions,task_queues):
# maybe specify task-queue names?
result = await workflow.execute_activity(activity_func, folder_name, start_to_close_timeout=timedelta(seconds=30), retry_policy=retry_policy, task_queue=task_queue)
results[site] = result
# Return the results
return results
And activities look like this:
# Example activity functions for each site (replace with actual logic)
@activity.defn
async def create_folder_lon(folder_name: str) -> str:
# Actual logic to create a folder in London (by remoting using Plumbum)
print("LON ACTIVITY")
with SshMachine("lonpipebatch1") as conn:
p = conn.path()
(p / folder_name).mkdir()
#mk = conn['mkdir']
#mk(folder_name)
await asyncio.sleep(2)
return f"Folder '{folder_name}' created in London"
Individual worker code host specific to task queue: (There are 4 of these for each activity)
async def main():
client = await Client.connect("localhost:7233", namespace="default")
worker = Worker(client, task_queue="lon-task-queue", activities=[create_folder_lon])
await worker.run()
And main worker code: (There is one)
async def main():
client = await Client.connect("localhost:7233", namespace="default")
worker = Worker(client, task_queue="folder-creation-task-queue", workflows=[CreateFoldersWorkflow])
await worker.run()
I am running the workflow this way:
async def start_folder_workflow():
client = await Client.connect("localhost:7233", namespace="default")
folder_name = "TemporalwithPlumbum"
workflow_id = "create_folders_workflow"
result = await client.execute_workflow(CreateFoldersWorkflow.run, folder_name, id=workflow_id, task_queue="folder-creation-task-queue")
print(result)
Right now all these workers on a single machine (local host).
Here is the sequence of things happening:
-
When I run the workflow, workflow tasks gets added to folder-creation task queue and my main worker picks this workflow tasks.
-
My folder-creation-workflow runs after getting picked up by main worker, this workflow add activity tasks to each of those 4 task queues (chn-task-queue, lon-task-queues…etc)
-
The specific worker dedicated for each of these task-queue then picks up the curated task and runs the activity code
And VOILA I get the output.
Am I right? Is this what’s happening?
The UI and terminal shows specific activities running on correct queues.