hello, we have a workflow that behaves like the snippet below where the activity and worker are computed on another k8s cluster than the orchestrator/workflow however both share the same temporal server, namespace and task queue. Thus we define activities=[]
since it’s computed in another cluster. That said, I was wondering how I could create a mock test for a use case like this
import asyncio
from dataclasses import dataclass
from datetime import timedelta
from temporalio import workflow
from temporalio.client import Client
from temporalio.worker import Worker
@dataclass
class ComposeGreetingInput:
greeting: str
name: str
@workflow.defn
class GreetingWorkflow:
@workflow.run
async def run(self, name: str) -> str:
workflow.logger.info("Running workflow with parameter %s" % name)
return await workflow.execute_activity(
"compose_greeting",
ComposeGreetingInput("Hello", name),
start_to_close_timeout=timedelta(seconds=10),
)
async def main():
client = await Client.connect("localhost:7233")
async with Worker(
client,
task_queue="hello-activity-task-queue",
workflows=[GreetingWorkflow],
activities=[],
):
result = await client.execute_workflow(
GreetingWorkflow.run,
"World",
id="hello-activity-workflow-id",
task_queue="hello-activity-task-queue",
)
print(f"Result: {result}")
if __name__ == "__main__":
asyncio.run(main())