I am testing a new worker in python that is going to be run in Kubernetes as a job, however it seems that after the temporal worker has been completed, the pod stays alive.
I am trying to make this run as an Argo Workflow workflow.
How should I handle this correctly?
The code I have is here:
try:
worker = Worker(
self.client,
task_queue=self.task_queue_name,
workflows=[example_workflow],
activities=[activities.example_activity],
)
logging.info("started worker")
await worker.run()
logging.info("finished worker")
sys.exit(0)
except Exception as e:
logging.error(f"TemporalWorker >> Failed to start the worker: {e}")
if __name__ == "__main__":
asyncio.run(main())
There are not exact approaches, this is the same as stopping any asyncio system. Some of the examples, like the worker on the encryption one, show a way to support KeyboardInterrupt in Python to set an interrupt event which shuts down the worker. Note, this will initiate worker shutdown, but worker shutdown will wait for all outstanding activities to handle the cancellations they’ll be sent.
worker.run() runs forever until you call worker.shutdown() or cancel the asyncio task. You can choose to exit the process then. How that affects the pod is up to you.
It runs forever until you decide you want it to be done and tell it to shutdown (or it has a rare fatal error). When await worker.run() returns/throws, it has completed shutting down.
Hi, our design is to start pod when the workflow is running and turn off the pod when the workflow run is finished. since based on documentation await worker.run() suppose to run worker and execute activity and workflow. Can you guide us where execute we need to put await worker.shutdown() in terms of workflow is done running. Based on my starting this code just start workflow and actucal execution happening in worker handle = await self.temporal_client.start_workflow(
Workflow.run,
args=[…],
id=temporal_workflow_id,
task_queue=task_queue,
)
logging.info(f"Started workflow. Workflow ID: {handle.id}, RunID {handle.result_run_id}")
return handle.id, handle.result_run_id
When you want to shutdown the worker. This is up to you. Maybe you want to shut it down on signal for example. What others do is wait on signal inside the with like this example shows.
You don’t usually run the worker for the life of the workflow, you run it for a long time as a server/system that handles all workflows.
The examples (tried encryption one) seem to be not working as expected.
I tried something like below in my setup (k8s, workers running as deployment with autoscaling, sync activities with custom ThreadPoolExecutor etc), this seems to be working (had sync activity that was sleeping etc, so once I sent SIGTERM, it was waiting for it to finish before exiting etc).
import signal
async def handler():
print("shutting down worker")
await worker.shutdown()
print("finished shutting down worker")
if __name__ == "__main__":
loop = asyncio.get_event_loop()
for signame in ('SIGINT', 'SIGTERM'):
loop.add_signal_handler(getattr(signal, signame),
lambda: asyncio.create_task(handler()))
loop.run_until_complete(main())
@Chad_Retz Not sure the interaction between temporal asyncio loop and what I have done above. Does the above one look okay? anything I should be aware of?
This seems ok (I can’t see main() but I assume it’s just awaiting a worker.run()). I can’t think of anything you need to be aware of so long as you’re properly awaiting the worker run.
Thanks, I have rolled out this in prod, tested this multiple times (by doing rolling restart etc), seems to be working fine so far, will keep an eye on it.
For reference, here is the full code, including the main
worker = None
async def handler():
print("signal received, shutting down worker")
await worker.shutdown()
print("finished shutting down worker")
async def main():
logging.basicConfig(level=logging.INFO)
with open(settings.TEMPORAL_CLOUD_TLS_CERT, "rb") as f:
client_cert = f.read()
with open(settings.TEMPORAL_CLOUD_TLS_KEY, "rb") as f:
client_private_key = f.read()
client = await Client.connect(
settings.TEMPORAL_CLOUD_HOST_PORT,
namespace=settings.TEMPORAL_CLOUD_NAMESPACE,
tls=TLSConfig(
client_cert=client_cert,
client_private_key=client_private_key,
),
data_converter=pydantic_data_converter,
)
with concurrent.futures.ThreadPoolExecutor(
max_workers=settings.TEMPORAL_WORKER_MAX_THREADS
) as activity_executor:
global worker
worker = Worker(
client,
task_queue=settings.TEMPORAL_CLOUD_TASK_QUEUE,
workflows=[MyWorkflow],
activities=[
my_activity,
],
activity_executor=activity_executor,
workflow_runner=new_sandbox_runner(),
graceful_shutdown_timeout=timedelta(seconds=300),
)
await asyncio.gather(worker.run())
if __name__ == "__main__":
loop = asyncio.get_event_loop()
for signame in ("SIGINT", "SIGTERM"):
loop.add_signal_handler(
getattr(signal, signame), lambda: asyncio.create_task(handler())
)
loop.run_until_complete(main())