Hello, I was wondering if workflows could be called remotely by another workflow where that workflow lives in a different k8s cluster? We tried defining a child workflow and having the parent workflow call that child workflow but we ran into the following error
py3.11/lib/python3.11/site-packages/temporalio/worker/_workflow.py", line 309, in _create_workflow_instance
raise temporalio.exceptions.ApplicationError(
temporalio.exceptions.ApplicationError: NotFoundError: Workflow class GreetingSubWorkflow is not registered on this worker, available workflows: GreetingWorkflow
we thought a child workflow could be hosted by a separate set of workers that don’t contain the parent workflow code. Is there a working example that we could reference for this? We’re currently able to call activities/workers remotely by ensuring that both the remote worker and workflow calling it have the same server, temporal ns, task queue string and the same activity name. We were wondering if we could do the same for calling remote workflows where activity A in workflow A could call a remote workflow B where workflow A and B live in different k8s clusters but reference the same temporal server and namespace
You would need to have your parent and child workflow on different task queues (can specify task queue name in child workflow options, if not specified it would use the same one as the parent).
Then set up different worker fleets for the task queues, one having the parent workflow impl registered and the workers polling on the child workflow task queue only need to child workflow impl registered.
So, workers that poll on same task queue need to have all workflow types registered that are being started on this task queue.
Thank you for the reply. Just to clarify, would doing this ^ allow us to have workflow A call workflow B where both workflow A and B live on completely different k8s clusters and codebases?
Right now we’re considering the following subworkflow and workflow below (where the subworkflow and workflow would be deployed in 2 different clusters). Is the main change that we would need to make is making sure the subworkflow and the workflow have different task queue strings?
activity and subworkflow
import asyncio
import logging
import string
import random
from dataclasses import dataclass
from datetime import timedelta
from temporalio import workflow
from temporalio import activity
from temporalio.client import Client
from temporalio.worker import Worker
interrupt_event = asyncio.Event()
@dataclass
class ComposeGreetingInput:
greeting: str
name: str
# Basic activity that logs and does string concatenation
@activity.defn
async def compose_greeting(input: ComposeGreetingInput) -> str:
print(f"processing {input}")
activity.logger.info("Running activity with parameter %s" % input)
return f"Greetings!!!! {input.greeting}, {input.name}!"
@workflow.defn(name="GreetingSubWorkflow")
class SubWorkflow:
@workflow.run
async def run(self, input: ComposeGreetingInput) -> str:
workflow.logger.info("Running subworkflow with parameter %s" % input.name)
return await workflow.execute_activity(
"compose_greeting",
ComposeGreetingInput("Hello", input.name),
start_to_close_timeout=timedelta(seconds=100),
)
task_queue = "hello-activity-task-queue"
# workflow_name = "GreetingWorkflow"
# activity_name = "say-hello-activity"
async def main():
# Create client to localhost on default namespace
client = await Client.connect("localhost:7233")
# Run activity worker
async with Worker(client, task_queue=task_queue, activities=[compose_greeting], workflows=[SubWorkflow]):
await interrupt_event.wait()
if __name__ == "__main__":
asyncio.run(main())
and the following workflow
import asyncio
import logging
from dataclasses import dataclass
from datetime import timedelta
from temporalio import workflow
from temporalio.client import Client
from temporalio.worker import Worker
@dataclass
class ComposeGreetingInput:
greeting: str
name: str
# Basic workflow that logs and invokes an activity
@workflow.defn
class GreetingWorkflow:
@workflow.run
async def run(self, name: str) -> str:
workflow.logger.info("Running workflow with parameter %s" % name)
return await workflow.execute_child_workflow(
"GreetingSubWorkflow",
ComposeGreetingInput("Hello", name),
)
# return results
async def main():
# Start client
client = await Client.connect("localhost:7233")
# Run a worker for the workflow
async with Worker(
client,
task_queue="hello-activity-task-queue",
workflows=[GreetingWorkflow],
activities=[],
):
import random
import string
jobs = []
for i in range(100):
workflow_id = "".join(
random.choices(string.ascii_uppercase + string.digits, k=30)
)
jobs.append(
client.execute_workflow(
GreetingWorkflow.run,
f"World {i}",
id=workflow_id,
task_queue="hello-activity-task-queue",
)
)
for result in asyncio.as_completed(jobs):
print(f"Result: {await result}")
if __name__ == "__main__":
asyncio.run(main())
Can define its workflow type as string here (since you would not have the child defn available if these are two separate services).
Then would need to have workers polling on that “child-wf-task-queue-name” task queue which have the child workflow registered.
If you run both parent and child on same task queue then all workers polling on this task queue need to have both the parent and child workflow defn registered.