Questions on temporal workers

Workers

  • Can we have workers running across different sites? As in part of different machines which can be in a different place in a world?

  • Can these workers talk to same workflow? How? (by communicating to same client probably?)

  • Can activities be part of separate workers in one workflow?

  • Can we have workers running across different sites? As in part of different machines which can be in a different place in a world?

Yes, absolutely.

Can these workers talk to same workflow? How? (by communicating to same client probably?)

They should talk to the same service endpoint and the namespace. Then any workflow can invoke activities and child workflows these workers host.

Can activities be part of separate workers in one workflow?

Yes, absolutely. Make sure that each worker uses a different task queue name. And when an activity is invoked the correct task queue name is specified in its options.

How can I host a different worker in a different machine?

Suppose, I have a temporal client/cluster running in my local machine. And a worker which connects to this cluster via task queues.

Now, I want to host another worker in a different machine. How do we set these up? and make sure it connection to my local cluster?

PS: I am assuming we can host multiple workflows across workers. We can, right?

In production you don’t run the Temporal service on your local machine. You either use Temporal Cloud or run the self-hosted version behind a load balancer.

Then each worker uses the service host:port to connect.

Yeah. That is right.

We can host multiple workflows across workers. Right?

Yes, the placement of workflows/activities to workers is your choice. The rule is that each worker pool has to use its own task queue name.

How does that work? I see no options for task queues in activity. Option for task queues are in workflow.

Right now my code looks like this (1 worker 1 workflow 4 activities)

Activities

→ Simple logic to create folders

Workflows

→ Runs each activity function using this :

result = await workflow.execute_activity(activity_func, folder_name, start_to_close_timeout=timedelta(seconds=30), retry_policy=retry_policy)

→ Runs workflow using this (after connecting to client) :

result = await client.execute_workflow(CreateFoldersWorkflow.run, folder_name, id=workflow_id, task_queue="folder-creation-task-queue")

Here we provide the task-queue.

Worker

→ Worker looks like this:

    worker = Worker(client, task_queue="folder-creation-task-queue", workflows=[CreateFoldersWorkflow], activities=[create_folder_lon, create_folder_mtl, create_folder_chn, create_folder_mum])
    await worker.run()

Now suppose we want to run 4 workers. Where do we pass on the task-queues?

Multiple Workers?

In worker I can create 4 workers like this :
where task queues are folder-creation-task-queue-1 … 2…3…4

    workerA = Worker(client, task_queue="folder-creation-task-queue-1", workflows=[CreateFoldersWorkflow], activities=[create_folder_lon])

    workerB = Worker(client, task_queue="folder-creation-task-queue-2", workflows=[CreateFoldersWorkflow], activities=[ create_folder_mtl])

How do we design updated workflow?
I am not sure what changes are req to make this work.

result = await client.execute_workflow(CreateFoldersWorkflow.run, folder_name, id=workflow_id, task_queue="folder-creation-task-queue-1")

How can we provide all task-queues names over here? Or do we have to do this differently ? Where do we specify all the new task-queues?

If we are able to add correct tasks to correct task-queues then this is solved. Not sure how to do that. If I am able to pass task-queue names into activities then it can be done I guess.

You create a worker per task queue. So you’ll have two workers in each process. One worker will contain workflow code and activities that don’t need to be linked to a specific host. The other worker will listen on a host specific task queue contain activities that are host specific.

see no options for task queues in activity.

You absolutely can specify the task queue in the activity options. See the fileprocessing sample.

Yeah. We can create workers for activities.
I created 4 workers for my 4 activities. But I am stuck with how to distribute tasks to those workers.

4 workers are using their own task queue and is linked to specific activity.

How do I go about in executing my workflow that runs all these 4 activities.

Workflow Definition

@workflow.defn
class CreateFoldersWorkflow:
    @workflow.run
    async def run(self, folder_name: str) -> dict:
        # Specify the sites and corresponding activity functions
        sites = ['lon', 'mtl', 'chn', 'mum']
        activity_functions = [create_folder_lon, create_folder_mtl, create_folder_chn, create_folder_mum]
        task_queues = ['lon-task-queue','mtl-task-queue','chn-task-queue','mum-task-queue']

        # Create a dictionary to store the results for each site
        results = {}

        # Asynchronously execute activities for each site
        for site, activity_func, task_queue in zip(sites, activity_functions,task_queues):
        	# maybe specify task-queue names?
            result = await workflow.execute_activity(activity_func, folder_name, start_to_close_timeout=timedelta(seconds=30), retry_policy=retry_policy, task_queue=task_queue)
            results[site] = result

        # Return the results
        return results

And this workflow is called from another script that has a separate worker for workflow.

    client = await Client.connect("localhost:7233", namespace="default")
    # Run the worker
    #worker = Worker(client, task_queue="hello-task-queue", workflows=[SayHello], activities=[say_hello])
    worker = Worker(client, task_queue="folder-creation-task-queue", workflows=[CreateFoldersWorkflow])
    await worker.run()

async def start_folder_workflow():
    client = await Client.connect("localhost:7233", namespace="default")
    folder_name = "TemporalwithPlumbum"
    workflow_id = "create_folders_workflow"
    result = await client.execute_workflow(CreateFoldersWorkflow.run, folder_name, id=workflow_id, task_queue="folder-creation-task-queue")
    print(result)

This doesnt run the activities on the 4 workers that I created but runs everything on Workflow Task Queue. I cant figure out why? Am I doing some silly mistake over here?

Why two workers? Cant I fit all the workflow logic in one worker (worker with Workflow Task Queues) which in turn add tasks to 4 workers I have created. (Activity task Queues)

I saw this question also asked on our Slack channel…

Nothing obvious, but the full code isn’t present (I cannot see the code running the separate task queue workers). If you can provide a standalone replication showing activities not distributed to the proper worker, we can help debug. Can do something like alter the simple activity sample.

Because you may have general shared work that doesn’t care what host and you may have host-specific work. Sure you can segment it to even more task queues if you’d like. For the worker-specific-task-queue pattern, you need one worker on the shared task queue ready to field requests to ask for worker-specific-task-queue name, and another to actually serve that worker-specific-task-queue.

Actually. Its running on the specific worker. My bad, there was no log statements to confirm but I checked UI and it is running on specific queues.

Please have a look at following codes :

Lets suppose this is the workflow code…

@workflow.defn
class CreateFoldersWorkflow:
    @workflow.run
    async def run(self, folder_name: str) -> dict:
        # Specify the sites and corresponding activity functions
        print("FOLDER CREATION WORKFLOW")
        sites = ['lon', 'mtl', 'chn', 'mum']
        activity_functions = [create_folder_lon, create_folder_mtl, create_folder_chn, create_folder_mum]
        task_queues = ['lon-task-queue','mtl-task-queue','chn-task-queue','mum-task-queue']

        # Create a dictionary to store the results for each site
        results = {}

        # Asynchronously execute activities for each site
        for site, activity_func, task_queue in zip(sites, activity_functions,task_queues):
        	# maybe specify task-queue names?
            result = await workflow.execute_activity(activity_func, folder_name, start_to_close_timeout=timedelta(seconds=30), retry_policy=retry_policy, task_queue=task_queue)
            results[site] = result

        # Return the results
        return results

And activities look like this:

# Example activity functions for each site (replace with actual logic)
@activity.defn
async def create_folder_lon(folder_name: str) -> str:
    # Actual logic to create a folder in London (by remoting using Plumbum)

    print("LON ACTIVITY")
    with SshMachine("lonpipebatch1") as conn:
        p = conn.path()
        (p / folder_name).mkdir()
        #mk = conn['mkdir']
        #mk(folder_name)
        await asyncio.sleep(2)
    return f"Folder '{folder_name}' created in London"

Individual worker code host specific to task queue: (There are 4 of these for each activity)

async def main():
    client = await Client.connect("localhost:7233", namespace="default")
    worker = Worker(client, task_queue="lon-task-queue", activities=[create_folder_lon])
    await worker.run()

And main worker code: (There is one)

async def main():
    client = await Client.connect("localhost:7233", namespace="default")
    worker = Worker(client, task_queue="folder-creation-task-queue", workflows=[CreateFoldersWorkflow])
    await worker.run()

I am running the workflow this way:


async def start_folder_workflow():
    client = await Client.connect("localhost:7233", namespace="default")
    folder_name = "TemporalwithPlumbum"
    workflow_id = "create_folders_workflow"
    result = await client.execute_workflow(CreateFoldersWorkflow.run, folder_name, id=workflow_id, task_queue="folder-creation-task-queue")
    print(result)

Right now all these workers on a single machine (local host).

Here is the sequence of things happening:

  1. When I run the workflow, workflow tasks gets added to folder-creation task queue and my main worker picks this workflow tasks.

  2. My folder-creation-workflow runs after getting picked up by main worker, this workflow add activity tasks to each of those 4 task queues (chn-task-queue, lon-task-queues…etc)

  3. The specific worker dedicated for each of these task-queue then picks up the curated task and runs the activity code

And VOILA I get the output.
Am I right? Is this what’s happening?

The UI and terminal shows specific activities running on correct queues.

Yeah I am using one worker for those tasks(shared workflow execution tasks) and 4 workers for each host specific task. In the workflow execution, I have kind of hardcoded the host specific execution of activities that will serve to that particular queue.