Python SDK verison-1.2.0: How to renew the token without stopping the worker

Below are the approaches which are tried as per your suggestion and It works.

1st way:

import asyncio
from typing import Callable

from temporalio.client import Client
from temporalio.worker import Worker

async def run_worker_with_token_updater(
    client: Client,
    worker: Worker,
    interval_seconds: int,
    get_token: Callable[[], str],
) -> None:
    async def token_updater():
        while True:
            await asyncio.sleep(interval_seconds)
            client.rpc_metadata = {"Authorization": f"Bearer {get_token()}"}

    # Update in background
    update_task = asyncio.create_task(token_updater())
    worker_task = asyncio.create_task(worker.run())
    try:
        asyncio.gather(update_task, worker_task)
    finally:
        update_task.cancel()
        await worker.shutdown()

async def main():
    token = get_token()
    client = await Client.connect(target_host=target_host, tls=True, rpc_metadata=token)
    worker = Worker(client, task_queue='dummy', workflows=[Workflow], activities=[activity1, activity2])
    await run_worker_with_token_updater(client, worker, 10, get_token)


def get_token():
    token = "logic to get the token"
    return {"Authorization": f"Bearer {token}"}


if __name__ == '__main__':
    asyncio.run(main())

2nd way:

import asyncio
from typing import Callable

from temporalio.client import Client
from temporalio.worker import Worker

async def run_worker_with_token_updater(
    client: Client,
    worker: Worker,
    interval_seconds: int,
    get_token: Callable[[], str],
) -> None:
    async with worker:
        while True:
            await asyncio.sleep(interval_seconds)
            client.rpc_metadata = {"Authorization": f"Bearer {get_token()}"}

async def main():
    token = get_token()
    client = await Client.connect(target_host=target_host, tls=True, rpc_metadata=token)
    worker = Worker(client, task_queue='dummy', workflows=[Workflow], activities=[activity1, activity2])
    await run_worker_with_token_updater(client, worker, 10, get_token)

def get_token():
    token = "logic to get the token"
    return {"Authorization": f"Bearer {token}"}


if __name__ == '__main__':
    asyncio.run(main())

3rd way: This is working but it’s not a suggested solution.

import asyncio

from temporalio.client import Client
from temporalio.worker import Worker

async main():
    while True:
        try:
            client = await Client.connect(target_host=target_host, tls=True, rpc_metadata=get_token())
            worker = Worker(client, task_queue="dummy", workflows=[Workflow], activities=[activity1, activity2])
            await worker.run()
        except Exception as err:
            print(f"Exception {err}")
            continue

def get_token():
    token = "logic to get the token"
    return {"Authorization": f"Bearer {token}"}

if __name__ == '__main__':
    asyncio.run(main())

Thanks @Chad_Retz , Please add if anything missing.