Python SDK verison-1.2.0: How to renew the token without stopping the worker

Hello Team,

I have a question about how to automatically renew the token before it expires so that the worker will not stop.

When I used the code snippet below, it threw an error.

Can you please suggest the correct solution?

[Referenced Link] temporalio.client.Client

I have tried the following approaches, but none of them worked.
1st way

    async main():
        client = await Client.connect(target_host=target_host, tls=True, rpc_metadata=get_token())
        worker = Worker(client, task_queue="dummy", workflows=[Workflow], activities=[activity1, activity2])
        await worker.run()


    def get_token():
        token = "logic to get the token"
        return token

2nd way:

    async main():
        try:
            client = await Client.connect(target_host=target_host, tls=True, rpc_metadata=get_token())
            worker = Worker(client, task_queue="dummy", workflows=[Workflow], activities=[activity1, activity2])
            await worker.run()
        except Exception as err:
            client.rpc_metadata=get_token()
            worker = Worker(client, task_queue="dummy", workflows=[Workflow], activities=[activity1, activity2])
            await worker.run()


    def get_token():
        token = "logic to get the token"
        return token

3rd way: I think this will works but I am not sure every time is it create new worker?

    async main():
        while True:
            try:
                client = await Client.connect(target_host=target_host, tls=True, rpc_metadata=get_token())
                worker = Worker(client, task_queue="dummy", workflows=[Workflow], activities=[activity1, activity2])
                await worker.run()
            except Exception as err:
                print(f"Exception {err}"")
                continue

    def get_token():
        token = "logic to get the token"
        return token

Why wait for error to update token in the second way? You should just be able to call client.rpc_metadata = {"Authorization": f"Bearer {get_token()}"} within the refresh period. This is the purpose of that setter, to be able to update those headers while the client is used.

You mean before await worker.run()
Again I need to set client.rpc_metadata = {}?

Is this updated code is correct?

async main():
try:
client = await Client.connect(target_host=target_host, tls=True, rpc_metadata=get_token())
worker = Worker(client, task_queue=“dummy”, workflows=[Workflow], activities=[activity1, activity2])
client.rpc_metadata=get_token()
await worker.run()
except Exception as err:
client.rpc_metadata=get_token()
worker = Worker(client, task_queue=“dummy”, workflows=[Workflow], activities=[activity1, activity2])
await worker.run()

def get_token():
    token = "logic to get the token"
    return token

And whats your suggestions on 3rd way

I mean during. You can run the worker in the background (e.g. asyncio.create_task(worker.run()) and update the credentials in a loop (e.g. asyncio.sleep then update in a loop).

You want to set this value any time you want to change the headers. Even while it is in use by the worker.

No, these are concurrent operations. worker.run() is an async call that can be an asyncio task, and you can do other asyncio things while that task runs, such as update credentials every so often.

@Chad_Retz #support I updated the code as per your suggestion. Is this correct?

import asyncio

async def worker(credentials):
    ### Create the client and worker with the provided credentials
    client = await Client.connect(target_host=target_host, tls=True, rpc_metadata=credentials)
    worker = Worker(client, task_queue='dummy', workflows=[Workflow], activities=[activity1, activity2])
    # Start the worker
    await worker.run()

async def update_credentials():
    while True:
        # Fetch new credentials from some source (e.g. a file, database, API)
        new_credentials = get_new_credentials()
        # Update the worker's RPC metadata with the new credentials
        worker.client.rpc_metadata = new_credentials
        # Wait for some time before updating again
        await asyncio.sleep(60)

async def main():

    ### Start the worker and update_credentials tasks
    worker_task = asyncio.create_task(worker(get_token()))
    update_credentials_task = asyncio.create_task(update_credentials())

    ###Run both tasks concurrently using asyncio.run()
    try:    
        await asyncio.run(asyncio.gather(worker_task, update_credentials_task))
    except:
        update_credentials_task.cancel()
        await worker.shutdown()

def get_new_credentials():
    token = logic to get the token
    return {"Authorization": f"Bearer token}"}

if __name__ == '__main__':
    asyncio.run(main())

This is more Python async than Temporal, but here’s a snippet I just typed here in chat (so it may not run and have bugs, just meant to give an idea):

async def run_worker_with_token_updater(
    client: Client,
    worker: Worker,
    interval_seconds: int,
    get_token: Callable[[], str],
) -> None:
    async def token_updater():
        while True:
            await asyncio.sleep(interval_seconds)
            client.rpc_metadata = {"Authorization": f"Bearer {get_token()}"}

    # Update in background
    update_task = asyncio.create_task(token_updater())
    worker_task = asyncio.create_task(worker.run())
    try:
        asyncio.gather(update_task, worker_task)
    finally:
        update_task.cancel()
        await worker.shutdown()

But you can just use the async with approach of the worker if you don’t need to manually call run and do other things, e.g.

async def run_worker_with_token_updater(
    client: Client,
    worker: Worker,
    interval_seconds: int,
    get_token: Callable[[], str],
) -> None:
    async with worker:
        while True:
            await asyncio.sleep(interval_seconds)
            client.rpc_metadata = {"Authorization": f"Bearer {get_token()}"}

Below are the approaches which are tried as per your suggestion and It works.

1st way:

import asyncio
from typing import Callable

from temporalio.client import Client
from temporalio.worker import Worker

async def run_worker_with_token_updater(
    client: Client,
    worker: Worker,
    interval_seconds: int,
    get_token: Callable[[], str],
) -> None:
    async def token_updater():
        while True:
            await asyncio.sleep(interval_seconds)
            client.rpc_metadata = {"Authorization": f"Bearer {get_token()}"}

    # Update in background
    update_task = asyncio.create_task(token_updater())
    worker_task = asyncio.create_task(worker.run())
    try:
        asyncio.gather(update_task, worker_task)
    finally:
        update_task.cancel()
        await worker.shutdown()

async def main():
    token = get_token()
    client = await Client.connect(target_host=target_host, tls=True, rpc_metadata=token)
    worker = Worker(client, task_queue='dummy', workflows=[Workflow], activities=[activity1, activity2])
    await run_worker_with_token_updater(client, worker, 10, get_token)


def get_token():
    token = "logic to get the token"
    return {"Authorization": f"Bearer {token}"}


if __name__ == '__main__':
    asyncio.run(main())

2nd way:

import asyncio
from typing import Callable

from temporalio.client import Client
from temporalio.worker import Worker

async def run_worker_with_token_updater(
    client: Client,
    worker: Worker,
    interval_seconds: int,
    get_token: Callable[[], str],
) -> None:
    async with worker:
        while True:
            await asyncio.sleep(interval_seconds)
            client.rpc_metadata = {"Authorization": f"Bearer {get_token()}"}

async def main():
    token = get_token()
    client = await Client.connect(target_host=target_host, tls=True, rpc_metadata=token)
    worker = Worker(client, task_queue='dummy', workflows=[Workflow], activities=[activity1, activity2])
    await run_worker_with_token_updater(client, worker, 10, get_token)

def get_token():
    token = "logic to get the token"
    return {"Authorization": f"Bearer {token}"}


if __name__ == '__main__':
    asyncio.run(main())

3rd way: This is working but it’s not a suggested solution.

import asyncio

from temporalio.client import Client
from temporalio.worker import Worker

async main():
    while True:
        try:
            client = await Client.connect(target_host=target_host, tls=True, rpc_metadata=get_token())
            worker = Worker(client, task_queue="dummy", workflows=[Workflow], activities=[activity1, activity2])
            await worker.run()
        except Exception as err:
            print(f"Exception {err}")
            continue

def get_token():
    token = "logic to get the token"
    return {"Authorization": f"Bearer {token}"}

if __name__ == '__main__':
    asyncio.run(main())

Thanks @Chad_Retz , Please add if anything missing.

The first and second way make sense, but not the third. That except clause is not reached unless the worker errors. If you need to regularly update token, you should not wait for the worker to fail. Not only do we try for a minute before failing, it’s good practice to prevent worker failure (worker failure will send a cancel to all running activities).