Hi Team - I am also observing a pattern where I am using the auto_heartbeater decorator to send activity heartbeats and my activities are failing with heartbeat timeouts.I have close to 10 activities running concurrently with heartbeat timeout set for each to 10 seconds. These activities are compute and i/o heavy tasks and I have confirmed that the auto_heartbeater is using a separate thread to send heartbeats. Can anyone help me with what could be potential issue here?
sharing the snippet which i was able to reproduce this locally
event history and the code snippet can be found here - Temporal workflow with activities simulating a i/o heavy task · GitHub
import asyncio
import logging
import threading
from datetime import timedelta
from temporalio import activity, workflow
from temporalio.common import RetryPolicy
from temporalio.client import Client
from temporalio.worker import Worker
from temporalio.worker.workflow_sandbox import (
SandboxedWorkflowRunner,
SandboxRestrictions,
)
from experiments.utils import auto_heartbeater
class Activities:
@staticmethod
@activity.defn
@auto_heartbeater
async def first_activity():
"""Makes multiple synchronous API calls"""
import requests
num_calls = 5
results = []
print(f"Making {num_calls} sequential API calls...")
for i in range(num_calls):
print(f"API call {i+1}/{num_calls}")
try:
# This public API has some latency which helps with our blocking example
response = requests.get('https://jsonplaceholder.typicode.com/posts', timeout=5)
results.append(response.json())
except Exception as e:
print(f"Error in API call: {e}")
results.append(None)
print(f"Completed {len(results)} API calls")
@workflow.defn
class MyWorkflow:
@workflow.run
async def run(self) -> None:
# Create multiple concurrent activity executions
retry_policy = RetryPolicy(
maximum_attempts=4,
backoff_coefficient=2,
)
activities = [
workflow.execute_activity_method(
Activities.first_activity,
schedule_to_close_timeout=timedelta(days=1),
heartbeat_timeout=timedelta(seconds=10),
retry_policy=retry_policy,
)
for _ in range(10)
]
# Wait for all activities to complete
await asyncio.gather(*activities)
async def create_worker(temporal_client: Client):
worker = Worker(
temporal_client,
task_queue="my-task-queue",
workflows=[MyWorkflow],
activities=[Activities.first_activity],
workflow_runner=SandboxedWorkflowRunner(
restrictions=SandboxRestrictions.default.with_passthrough_modules(
"application_sdk", "asyncio"
)
),
)
await worker.run()
async def main():
logging.basicConfig(level=logging.INFO)
client = await Client.connect("localhost:7233")
logging.info("Starting worker")
worker_thread = threading.Thread(
target=lambda: asyncio.run(create_worker(client)), daemon=True
)
worker_thread.start()
logging.info("Running workflow")
await client.execute_workflow(
MyWorkflow.run,
id="my-workflow",
task_queue="my-task-queue",
)
print("Workflow completed")
if __name__ == "__main__":
asyncio.run(main())