I’m trying to test a fastapi method that calls temporal_client.list_workflows() to return list of ids of all workflows but I keep getting an error like below:
self = <temporalio.service._BridgeServiceClient object at 0x7fb13894bcb0>, rpc = 'list_workflow_executions'
req = namespace: "default"
page_size: 1000
resp_type = <class 'temporal.api.workflowservice.v1.request_response_pb2.ListWorkflowExecutionsResponse'>
async def _rpc_call(
self,
rpc: str,
req: google.protobuf.message.Message,
resp_type: Type[ServiceResponse],
*,
service: str,
retry: bool,
metadata: Mapping[str, Union[str, bytes]],
timeout: Optional[timedelta],
) -> ServiceResponse:
global LOG_PROTOS
if LOG_PROTOS:
logger.debug("Service %s request to %s: %s", service, rpc, req)
try:
client = await self._connected_client()
resp = await client.call(
service=service,
rpc=rpc,
req=req,
resp_type=resp_type,
retry=retry,
metadata=metadata,
timeout=timeout,
)
if LOG_PROTOS:
logger.debug("Service %s response from %s: %s", service, rpc, resp)
return resp
except temporalio.bridge.client.RPCError as err:
# Intentionally swallowing the cause instead of using "from"
status, message, details = err.args
> raise RPCError(message, RPCStatusCode(status), details)
E temporalio.service.RPCError: Method temporal.api.workflowservice.v1.WorkflowService/ListWorkflowExecutions is unimplemented
.venv/lib/python3.13/site-packages/temporalio/service.py:1366: RPCError
The implementation of test is as follows:
async def test_list_workflows():
async with await WorkflowEnvironment.start_time_skipping() as env:
async with Worker(
env.client,
task_queue="python-fastapi-example-queue",
workflows=[PaymentWorkflow],
activities=[notify],
):
# Override the temporal client dependency with the test client
app.dependency_overrides[get_temporal_client_dependency] = (
lambda: env.client
)
with TestClient(app) as test_client:
response = test_client.get("/workflows")
assert response.status_code == 200
assert response.json()["workflows"] == []
And the fastapi code:
async def get_temporal_client_dependency() -> Client:
try:
return await get_temporal_client()
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to initialize Temporal client: {str(e)}"
)
app = FastAPI()
@app.get("/workflows")
async def list_workflows(
temporal_client: Client = Depends(get_temporal_client_dependency),
) -> dict[str, list[str]]:
workflows = temporal_client.list_workflows()
ids = []
async for w in workflows:
ids.append(w.workflow_id)
return {"workflows": ids}
Is this simply impossible to use in tests or am I doing something wrong?