How to implement liveness and readiness in Python Temporal application

I have a Python worker_client.py that connects to a temporal client using await Client.connect() and starts the worker using await worker.run().
I start the worker using the command python worker_client.py. This application is deployed in the Kubernetes environment. I want to implement the liveness and readiness for this application similar to the actuator we use in the Java application.
How can I implement this functionality?

It depends on what the definition of liveness is, but there is nothing special about Temporal here vs any other Python application. So you should be able to use the same approach you would for any other Python environment. If the worker has now raised an error out of run it is “alive” (but it does internally retry on failure, and only raises exceptions on certain situations after a while).

Yes, that is the problem. The application still runs even when the worker cannot connect to Temporal. We don’t have any API available inside our application other than the worker-client. It would be great if you could provide an example or reference to implement readiness and liveness for this worker client will help.

Do you think we can use the following configuration of Kubernetes?

readinessProbe:
      tcpSocket:
        port: 8080

You’ll want to make an endpoint available, then when implementing it, you can attempt client call in your health check via await my_client.service_client.check_health(). Temporal does not do anything Kubernetes specific, so implement a health check the same way as you do for any other Python process (and just make the client call inside the health check).

Only if you plan on opening a TCP socket when starting your process and closing it when unhealthy. There is no Temporal-specific Kubernetes health check, you may have to make your own endpoint.

Thanks @Chad_Retz.
With this check_health() call, I can check whether the worker is ready to serve the request.
If the WorkflowService is up, does this mean that the worker is registered and ready to serve?

The problem that I am facing is that my microservice shows that the service is up even when the worker is not running/registered to the temporal.

I want to fail my microservice when the worker is not connected to temporal after x number of retries.

Usually yes, if the worker is also running (i.e. hasn’t return from the run call or the async with). There may be cases of a worker failing but a client succeeding, but this is very rare and logs/metrics would show why. The worker is built to recover.

Is your worker running and the client can connect but the worker is not processing work? This would be rare and worth investigating deeply.

Are there any logs? Is the worker.run asyncio call running?

Whether a worker fails after a certain amount of time number of retries is an internal behavior that you cannot configure. But we do offer metrics (see samples for how to configure, and Temporal SDK metrics reference | Temporal Documentation for request failure metrics offered, you’d likely want temporal_long_request_failure).

The await Client.connect() was successful but the await worker.run() failed with the following exception:

{“time”:“2024-03-14 02:29:10 PM GMT”, “stack_trace”:“None”, “log_level”:“INFO”, “logger_name”:“temporalio.worker._worker”, “message”:“Beginning worker shutdown, will wait 0:00:00 before cancelling activities”, “thread_id”:“139692215798720”, “thread_priority”:“NA”}
RuntimeError: Activity worker failed
raise RuntimeError(“Activity worker failed”) from err
File “/usr/lib64/python3.11/site-packages/temporalio/worker/_activity.py”, line 180, in run
Traceback (most recent call last):

The service was running after this, but the worker was no longer polling. I want to implement a health check for this scenario.

  1. I want the service to self-heal. It should keep on trying to register the worker.
  2. In the worst case, I want to mark my service as it cannot serve (it is down).

If run fails, this is fatal and you should not consider this healthy and not continue running the service.

You can try to completely recreate/restart the worker after it fails if you want, but a failure coming from run is fatal for the existing worker instance. Workers only only good for one run call.

If worker.run is not running (i.e. has failed) you should consider this as down

Agree but it didn’t fail my service. It should have failed it. I am running only a worker inside it.
Can I check this situation with this call await my_client.service_client.check_health() mentioned by you earlier?

Did worker.run return/raise? Or did it hang trying to shutdown activities?

That would depend on why the worker is shutting down. If it’s shutting down due to server access issues, then the client calls should also fail during service access issues. Are there logs before “Beginning worker shutdown”?

Note, when this message occurs, it’s either because you have stopped the worker (e.g. cancelled the asyncio call) or because of a worker failure which should be logged above it. But we do not complete the run call until all activities have been sent cancels and then completed. If you have an activity that does not complete when cancelled (e.g. doesn’t listen to heartbeat), the worker may never return from this call.

I tried the following example. It always returns True even when the worker is not running. I want it to fail when the worker is not running.

client = await Client.connect(
TEMPORAL_CONSTANTS.HOST + “:” + TEMPORAL_CONSTANTS.PORT, namespace=TEMPORAL_CONSTANTS.NAMESPACE, tls=False
)
output = await client.service_client.check_health()
print("RESPONSE : ", output)

Note, when this message occurs, it’s either because you have stopped the worker (e.g. cancelled the asyncio call) or because of a worker failure which should be logged above it. But we do not complete the run call until all activities have been sent cancels and then completed. If you have an activity that does not complete when cancelled (e.g. doesn’t listen to heartbeat), the worker may never return from this call.

Yes, the worker didn’t return in our case. Hence the microservice didn’t shut down.

How are you determining whether the worker is “running”? If the worker run call has not returned, it is “running”. If it is failing and retrying it logs as such. If it is at its maximum concurrent limit of work, it will not poll (metrics can show available slots, and if they are 0 it does not poll). If easy enough, can you provide a replication of the situation where the worker seems to not be “running”?

What I meant is, I didn’t start the worker. The Temporal Server was up and running. I just created the sample code I shared above to check the health. It returned true even when the worker was not running.

You need to combine both a client check and that worker.run is running to confirm that server is reachable and worker is running.

It would be great if you could provide some snippets. I think it is just returning True. It is not checking whether the worker is available to pick up the request.

@Chad_Retz I think I found an issue. The worker is not properly shutting down if there is an error while polling the activity task.

It gets hung in the finalize_shutdown

This is what happened in our case. The Microservice was up but the worker got hung.

It looks like an issue with the Temporal. I found these references:

worker_running = False

async def run_worker(worker: Worker) -> None:
    global worker_running
    worker_running = True
    try:
        await worker.run()
    finally:
        worker_running = False

async def check_health(client: Client) -> None:
    if not worker_running:
        raise RuntimeError("worker not running")
    await client.service_client.check_health()

Something like that.

What was the polling error? Is that error not also occurring from a client health check?

We are getting the same error mentioned in this issue:

It says PermissionDenied error while polling. The activity worker gets failed but the worker gets hung.
e[2mtemporal_sdk_core::worker::activities::activity_task_poller_streame[0me[2m:e[0m Error while polling for activity tasks e[3merrore[0me[2m=e[0mStatus { code: PermissionDenied, message: “Request unauthorized.” }

Do you also get this error while checking health using the same client you started the worker with? If so, that should properly fail your health check correct? I will see about prioritizing fixing the worker shutdown issue.

Do you have any timelines for fixing this worker shutdown issue?