Heartbeat is Never Being Sent

Hi there, my setup is I have a go temporalio client that is requesting workflows to be executed by a python tempoalio client. My python activities are very long running and my go temporalio client would like to know how it is progressing. For now - I just want to get a “percent complete” number from the python client - one day - I might want a bit more information. From what I saw in the termporalio documentation it looks like the recommended way to do this is to use the DescribeWorkflowExecution and check the status of heartbeat details. Let me know if there is a better and/or easier way of getting this kind of information.

I’m using the go-sdk to submit a workflow request to the temporalio sdk. On the receiving/worker side I’m using the python-sdk to fulfill that work.

In the go client I’m checking for progress as recommended in this post:

specifically I’m using the

Client.DescribeWorkflowExecution(..)

function and the heartbeats are never populated. Initially I wasn’t sure where the issue was - i.e. on the sending of heartbeats from python or the receiving/query side in go. But, once I checked the long running activity in the temporal web admin - it was also showing no heartbeat details for my activity.

I’ve read through the temporalio doc’s and the only other thing that seems like it might be preventing the heartbeat from being sent is throttling (as determined by heartbeat timeouts)? What seems more likely is that I’m failing to perform some kind of a send (by calling something on the client/worker?).

As a test - I just had my python activity call a test function which just continuously posts a heartbeart and then my calling function just returns the result:

def test_heartbeat(activity:activity):
    num_heartbeats = 100
    for i in range(num_heartbeats):
        print(f"test_heartbeat: i: {i}: 1")
        activity.heartbeat(str(i))
        time.sleep(1)

Thanks!

Did you set the heartbeat timeout for the activity? Heartbeats are not sent if it is not set.

Thanks for the quick response!

I believe I am - the python code looks like this:

@workflow.defn
class FooWorkflow:
    @workflow.run
    async def run(self, name: str) -> str:
        return await workflow.execute_activity(
            
            BarActivity, name, schedule_to_close_timeout=timedelta(seconds=500),
                heartbeat_timeout=timedelta(seconds=5) 
        )

Is there a way to check in the activity if the heartbeat_timeout is set and valid? Or possibly some other kind of way to debug this?

Thanks again!

Check ActivityTaskScheduledEvent in UI/CLI to see if the timeout is specified.

See pending activities in UI or tctl wf describe CLI command to see if the heartbeat is reported.

I can give that a try.

But, in the meantime - I ended up just completely rewriting the python (I noticed the python samples have async, threaded and process examples - so I grabbed the async one) - specifically instead of calling

activity.heartbeat(str(i))

now I’m using the following syntax:

        handle = self.client.get_async_activity_handle(task_token=task_token)
        for i in range(0, 20):
            await handle.heartbeat(str(i))

This appears to be working - or at least I’m seeing a non nil payload on the Go side - I still need to validate the payload.

I’m just trying to get an end to end solution working as an evaluation of temporalio for managing long running python processes. Going back to one of my original questions - from a different post I see there are a few ways that clients can request progress on an activity (exp: using a query). I think heartbeat is probably the “right solution” for smaller updates (like percent complete - so just a float or short string). But, if I was to request a large payload (I supposed I’m limited to 4 MB’s because of GRPC) in addition to this progress update - what would be the recommended approach for that? Best just to deploy the file to some bucket online and send the client the URL? I’m thinking of payloads of the order of 20 - 200 MB’s.

Appreciate the quick responses!

Temporal is not designed to support large payloads. So using external storage and passing URLs through Temporal is the recommended way.

Regular activity.heartbeat should work without issue. Can you confirm the sample does not heartbeat for you either? If you can provide a standalone replication script where activity heartbeat is not working, we can debug.

Hi there @Chad_Retz and @maxim - I can create a branch here:

alternatively I’m happy to do a screen share and walk you thru it.

Thanks!

A simple single file replication may work best. Here’s an example of a workflow running against a localhost Temporal server that I have confirmed properly sends heartbeat details:

import asyncio
import logging
import time
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timedelta

from temporalio import activity, workflow
from temporalio.client import Client
from temporalio.worker import Worker


@activity.defn
def my_activity() -> str:
    # Just heartbeat every second
    while True:
        time.sleep(1)
        activity.heartbeat(f"Current time: {datetime.now()}")


@workflow.defn
class MyWorkflow:
    @workflow.run
    async def run(self) -> None:
        await workflow.execute_activity(
            my_activity,
            schedule_to_close_timeout=timedelta(days=30),
            heartbeat_timeout=timedelta(seconds=5),
        )


async def main():
    logging.basicConfig(level=logging.INFO)

    client = await Client.connect("localhost:7233")

    logging.info("Starting worker")
    async with Worker(
        client,
        task_queue="my-task-queue",
        workflows=[MyWorkflow],
        activities=[my_activity],
        activity_executor=ThreadPoolExecutor(),
    ):
        logging.info("Running workflow")
        await client.execute_workflow(
            MyWorkflow.run,
            id="my-workflow",
            task_queue="my-task-queue",
        )


if __name__ == "__main__":
    asyncio.run(main())

Note, heartbeat details are not sent every time you call heartbeat, they are sometimes batched and we just make sure they are sent within the heartbeat timeout (what we call the throttle interval). This is based on an internal algorithm, but in the example above that heartbeats every 1s, we only send to server every 4s (since timeout only cares about being with 5s).

I tried editing my previous reply - but the window of editing had timed out.

Thanks Chad - it looks like this might be “as designed”.

The issue appears to be a result of blocking the main event loop from within the context of an asynchronous activity function (or any function it calls).

So while the function I originally posted was not asynchronous - the calling code was - once I used asyncio.sleep() instead of time.sleep() in my asynchronous activity function (and didn’t block the main event loop using time.sleep() - it worked without issue:

    for i in range(10):
        activity.heartbeat(str(i))
        print(f'sent heartbeat: {i}')
        await asyncio.sleep(1)

You can repro this issue by creating an async activity function:

@activity.defn
async def ActivityFunc(name: str) -> str:

then add a sleep for more than one second (exp: using time.sleep(…)) - when the async activity returns - you’ll get:

2023-03-21T18:49:41.666551Z WARN temporal_sdk_core::worker::activities: Activity not found on completion. This may happen if the activity has already been cancelled but completed anyway. task_token=TaskToken(<TOKEN_ID>) details=Status { code: NotFound, message: "invalid activityID or activity already timed out or invoking workflow is completed", details: b"\x08\x05\x12Rinvalid activityID or activity already timed out or invoking workflow is completed\x1aB\n@type.googleapis.com/temporal.api.errordetails.v1.NotFoundFailure", metadata: MetadataMap { headers: {"content-type": "application/grpc"} }, source: None }

If you update the example you provided to this:

@activity.defn
async def my_activity() -> str:
    # Just heartbeat every second
    while True:
        time.sleep(1)
        activity.heartbeat(f"Current time: {datetime.now()}")

I think you’ll get the same error. Locally I’m able to repro it using the following loop:

    for i in range(2):
        activity.heartbeat(str(i))
        print(f'sent heartbeat: {i}')
        time.sleep(1)

What’s also interesting about the above is your heartbeat details are never sent. If I were guessing - I’d say it’s because you never give the “background async event handler” from actually sending the heartbeat.

Thanks for the prompt support!

1 Like

This is a bad thing and gums up the entire asyncio system. Asyncio runs on a single thread event loop and if you block that event loop, you are stopping down all async def calls in the entire system. They may not be sent due to a thread blocking asyncio which the activity worker runs on.

See Developing with asyncio — Python 3.12.0 documentation.

I actually forgot to ask above since it never ended up being my problem.

This is purely just a suggestion/feedback - but, in the case that someone attempts to perform a heartbeat and no heartbeat timeout has been set - would temporal considering throwing an error in this situation? From my understanding - no heartbeats that the caller attempted to dispatch would ever be sent (if no heartbeat timeout is set).

Thanks!

There are a few reasons we don’t.

First, we don’t really want to add erroring behavior in an activity that changes on the whim of a caller setting way back in a workflow. Activities can be used from multiple workflows, some that may set heartbeat timeout and some not. Workflow code changes to add a heartbeat timeout are also common.

Second, we don’t want (most of) our SDKs to throw on heartbeat. Heartbeat is meant to be accepted and asynchronously dispatched to a server on throttle/failure periods. It is easier for flow to know a heartbeat is a no-op for local activities and no-heartbeat-timeout activities. Also, heartbeats are often done asynchronously and would make user code challenging if they were expected to handle/bubble exceptions from heartbeat.

Third, many activities heartbeat as a courtesy but don’t have to. For example, a general purpose make-http-call activity that calls a URL based on its parameters may heartbeat while it waits. So the caller may know that the HTTP call they are asking to perform is near immediate and therefore heartbeat timeout is not needed. It’d be silly to error on heartbeat in this situation.

Finally, this is easy enough for a user to do if they must. You could very easily access activity.info().heartbeat_timeout to see if it’s set. If you want to go further, you could have an interceptor where you intercept all activity outbound heartbeat calls to throw on this timeout.

Makes sense - thanks again!

Hi @Chad_Retz and @maxim,
Amazing work. I’m facing the same issue here. I have a long activity in python which I invoke through a workflow in go and that’s where I set the heartbeat timeout.

This code shared by @ sourcetransformer reproduces to some extent my code and heartbeat is never sent therefore my workflow would timeout of heartbeat timeout error.

Thank you

You cannot time.sleep in an async def in Python, it can gum up the entire asyncio system. Do not block threads in async def functions.

    @wraps(fn)
    async def wrapper(*args, **kwargs):
        heartbeat_timeout = activity.info().heartbeat_timeout
        heartbeat_task = None
        if heartbeat_timeout:
            heartbeat_task = asyncio.create_task(
                heartbeat_every(heartbeat_timeout.total_seconds() / 2)
            )
        try:
            return await fn(*args, **kwargs)
        finally:
            if heartbeat_task:
                heartbeat_task.cancel()
                await asyncio.wait([heartbeat_task])

    return cast(F, wrapper)

async def heartbeat_every(delay: float) -> None:
    while True:
        try:
            await asyncio.sleep(delay)
            activity.heartbeat()
            logging.info("Heartbeat sent")
        except asyncio.CancelledError:
            logging.info("Long activity cancelled")
            raise

class Heartbeat
    @auto_heartbeater
    @ activity.defn(name="LongActivity")
    async def long(self) :
        ..... long activity 

The above is an example of my code, I have the activity in python and I invoke it through go where I specify the heartbeat timeout. The heartbeat is sent through the wrapper which is called every half the heartbeat timeout.
My activity would vary to finish from seconds in some occasion to 6 hours. However I still get heartbeat timeout.

Any advice one how to tackle this?

Thank you for your support.

There is nothing obvious that I can see unless long blocks the thread. Can you confirm everything in the long activity is async safe and does not block the thread? Can you confirm this sample works for you? Can you make a small, standalone replication? Maybe by altering that sample or this sample.

I noticed later that one of third party libraries functions I’m using inside long was not async safe. Heartbeat is working now.

Much thanks for all the support.

1 Like