Mock for temporal activity

hello, we have a workflow that behaves like the snippet below where the activity and worker are computed on another k8s cluster than the orchestrator/workflow however both share the same temporal server, namespace and task queue. Thus we define activities=[] since it’s computed in another cluster. That said, I was wondering how I could create a mock test for a use case like this

import asyncio
from dataclasses import dataclass
from datetime import timedelta

from temporalio import workflow
from temporalio.client import Client
from temporalio.worker import Worker


@dataclass
class ComposeGreetingInput:
    greeting: str
    name: str

@workflow.defn
class GreetingWorkflow:
    @workflow.run
    async def run(self, name: str) -> str:
        workflow.logger.info("Running workflow with parameter %s" % name)
        return await workflow.execute_activity(
            "compose_greeting",
            ComposeGreetingInput("Hello", name),
            start_to_close_timeout=timedelta(seconds=10),
        )

async def main():
    client = await Client.connect("localhost:7233")
    async with Worker(
        client,
        task_queue="hello-activity-task-queue",
        workflows=[GreetingWorkflow],
        activities=[],
    ):

        result = await client.execute_workflow(
            GreetingWorkflow.run,
            "World",
            id="hello-activity-workflow-id",
            task_queue="hello-activity-task-queue",
        )
        print(f"Result: {result}")


if __name__ == "__main__":
    asyncio.run(main())

That code is creating a worker when you run the program. Since you create the worker in your test, you can set whatever activities you want on that worker unrelated to this main worker. See the sample test.

1 Like

thank you!