Observing issues with heartbeat in case of processing tasks

Hi Team - I am also observing a pattern where I am using the auto_heartbeater decorator to send activity heartbeats and my activities are failing with heartbeat timeouts.I have close to 10 activities running concurrently with heartbeat timeout set for each to 10 seconds. These activities are compute and i/o heavy tasks and I have confirmed that the auto_heartbeater is using a separate thread to send heartbeats. Can anyone help me with what could be potential issue here?

sharing the snippet which i was able to reproduce this locally

event history and the code snippet can be found here - Temporal workflow with activities simulating a i/o heavy task · GitHub

import asyncio
import logging
import threading
from datetime import timedelta

from temporalio import activity, workflow
from temporalio.common import RetryPolicy
from temporalio.client import Client
from temporalio.worker import Worker
from temporalio.worker.workflow_sandbox import (
    SandboxedWorkflowRunner,
    SandboxRestrictions,
)
from experiments.utils import auto_heartbeater

class Activities:
    @staticmethod
    @activity.defn
    @auto_heartbeater
    async def first_activity():
        """Makes multiple synchronous API calls"""
        import requests
        num_calls = 5
        results = []
        print(f"Making {num_calls} sequential API calls...")
        for i in range(num_calls):
            print(f"API call {i+1}/{num_calls}")
            try:
                # This public API has some latency which helps with our blocking example
                response = requests.get('https://jsonplaceholder.typicode.com/posts', timeout=5)
                results.append(response.json())
            except Exception as e:
                print(f"Error in API call: {e}")
                results.append(None)
        
        print(f"Completed {len(results)} API calls")



@workflow.defn
class MyWorkflow:
    @workflow.run
    async def run(self) -> None:
        # Create multiple concurrent activity executions
        retry_policy = RetryPolicy(
            maximum_attempts=4,
            backoff_coefficient=2,
        )

        activities = [
            workflow.execute_activity_method(
                Activities.first_activity,
                schedule_to_close_timeout=timedelta(days=1),
                heartbeat_timeout=timedelta(seconds=10),
                retry_policy=retry_policy,
            )
            for _ in range(10)
        ]
        
        # Wait for all activities to complete
        await asyncio.gather(*activities)

async def create_worker(temporal_client: Client):
    worker = Worker(
        temporal_client,
        task_queue="my-task-queue",
        workflows=[MyWorkflow],
        activities=[Activities.first_activity],
        workflow_runner=SandboxedWorkflowRunner(
            restrictions=SandboxRestrictions.default.with_passthrough_modules(
                "application_sdk", "asyncio"
            )
        ),
    )
    await worker.run()


async def main():
    logging.basicConfig(level=logging.INFO)

    client = await Client.connect("localhost:7233")

    logging.info("Starting worker")

    worker_thread = threading.Thread(
        target=lambda: asyncio.run(create_worker(client)), daemon=True
    )
    worker_thread.start()

    logging.info("Running workflow")
    await client.execute_workflow(
        MyWorkflow.run,
        id="my-workflow",
        task_queue="my-task-queue",
    )

    print("Workflow completed")


if __name__ == "__main__":
    asyncio.run(main())

I’m not python expert. But you are not supposed to call synchronous APIs from async activities. My guess is that the event loop gets blocked by this.