Unresolved Worker.poll_activity_task despite scheduled activities

I’m seeing unexpected behavior with the temporal server/python client. There are multiple available activities in the server that are set to ActivityTaskScheduled, but these aren’t pulled down to the python client. It just hangs indefinitely on the poll_activity_task task:

I’ve believe this blocking is isolated to the task polling, through logs patched into the site-packages/temporalio/worker/_activity.py runner:

START ACTIVITY RUNNER
WILL POLL <Task pending name='Task-7' coro=<Worker.poll_activity_task() running at /root/.cache/pypoetry/virtualenvs/selectify-MATOk_fk-py3.11/lib/python3.11/site-packages/temporalio/bridge/worker.py:93>>
(indefinite hang)
async def run(self) -> None:
    print("START ACTIVITY RUNNER")
    # Create a task that fails when we get a failure on the queue
    async def raise_from_queue() -> NoReturn:
      raise await self._fail_worker_exception_queue.get()

    exception_task = asyncio.create_task(raise_from_queue())

    # Continually poll for activity work
    while True:
      try:
        # Poll for a task
        poll_task = asyncio.create_task(
          self._bridge_worker().poll_activity_task()
        )
        print("WILL POLL", poll_task)
        await asyncio.wait([poll_task, exception_task], return_when=asyncio.FIRST_COMPLETED)  # type: ignore
        print("DID WAIT1")
        # If exception for failing the worker happened, raise it.
        # Otherwise, the poll succeeded.
        if exception_task.done():
          poll_task.cancel()
          await exception_task
        task = await poll_task
        print("DID WAIT2")

This persists across restarts of both the temporal server and the client applications, so I’m relatively sure it’s not a corrupt memory state that either has found themselves in. Nothing seems amiss in the temporal server logs either:

temporal               | {"level":"info","ts":"2023-09-01T19:38:19.568Z","msg":"none","service":"matching","component":"matching-engine","wf-task-queue-name":"/_sys/document-renderer-queue/2","wf-task-queue-type":"Activity","wf-namespace":"default","lifecycle":"Started","logging-call-at":"taskQueueManager.go:320"}
temporal               | {"level":"info","ts":"2023-09-01T19:38:19.568Z","msg":"none","service":"matching","component":"matching-engine","wf-task-queue-name":"1171@59bf991b46c6-19a92c93d25d465ca347ff3269f30792","wf-task-queue-type":"Workflow","wf-namespace":"default","lifecycle":"Started","logging-call-at":"taskQueueManager.go:320"}
temporal               | {"level":"info","ts":"2023-09-01T19:38:19.568Z","msg":"none","service":"matching","component":"matching-engine","wf-task-queue-name":"document-renderer-queue","wf-task-queue-type":"Activity","wf-namespace":"default","lifecycle":"Started","logging-call-at":"taskQueueManager.go:320"}
temporal               | {"level":"info","ts":"2023-09-01T19:38:19.568Z","msg":"none","service":"matching","component":"matching-engine","wf-task-queue-name":"/_sys/document-renderer-queue/1","wf-task-queue-type":"Activity","wf-namespace":"default","lifecycle":"Started","logging-call-at":"taskQueueManager.go:320"}
temporal               | {"level":"info","ts":"2023-09-01T19:38:19.568Z","msg":"none","service":"matching","component":"matching-engine","wf-task-queue-name":"document-renderer-queue","wf-task-queue-type":"Workflow","wf-namespace":"default","lifecycle":"Started","logging-call-at":"taskQueueManager.go:320"}

This is the most recent task log in one such hanging workflow.

{
  "eventId": "11",
  "eventTime": "2023-09-01T05:11:26.209521757Z",
  "eventType": "ActivityTaskScheduled",
  "version": "0",
  "taskId": "66060594",
  "workerMayIgnore": false,
  "activityTaskScheduledEventAttributes": {
    "activityId": "2",
    "activityType": {
      "name": "log_metered_type"
    },
    "taskQueue": {
      "name": "document-renderer-queue",
      "kind": "Normal",
      "normalName": ""
    },
    "header": {
      "fields": {}
    },
    "input": {
      "payloads": [
        {
          "resource_type": "PAGE_RENDERS",
          "user_id": "b0daed0e-16d6-4e91-ad6b-ddfb8c5d81bf"
        }
      ]
    },
    "scheduleToCloseTimeout": "0s",
    "scheduleToStartTimeout": "0s",
    "startToCloseTimeout": "15s",
    "heartbeatTimeout": "0s",
    "workflowTaskCompletedEventId": "10",
    "retryPolicy": {
      "initialInterval": "1s",
      "backoffCoefficient": 2,
      "maximumInterval": "100s",
      "maximumAttempts": 0,
      "nonRetryableErrorTypes": []
    },
    "useCompatibleVersion": true
  },
  "name": "ActivityTaskScheduled",
  "id": "11",
  "timestamp": "2023-09-01 UTC 05:11:26.20",
  "classification": "Scheduled",
  "category": "activity",
  "attributes": {
    "type": "activityTaskScheduledEventAttributes",
    "activityId": "2",
    "activityType": "log_metered_type",
    "taskQueue": {
      "name": "document-renderer-queue",
      "kind": "Normal",
      "normalName": ""
    },
    "header": {
      "fields": {}
    },
    "input": {
      "payloads": [
        {
          "resource_type": "PAGE_RENDERS",
          "user_id": "b0daed0e-16d6-4e91-ad6b-ddfb8c5d81bf"
        }
      ]
    },
    "scheduleToCloseTimeout": "",
    "scheduleToStartTimeout": "",
    "startToCloseTimeout": "15 seconds",
    "heartbeatTimeout": "",
    "workflowTaskCompletedEventId": "10",
    "retryPolicy": {
      "initialInterval": "1s",
      "backoffCoefficient": 2,
      "maximumInterval": "100s",
      "maximumAttempts": 0,
      "nonRetryableErrorTypes": []
    },
    "useCompatibleVersion": true
  }
}

My current python worker spawn script looks like this. I started seeing this hanging behavior when I had workers assigned to each separate queue, but in an attempt to isolate this issue have paired it down to just the task queue that this activity belongs to.

async def simple_run_worker():
    settings = get_settings()
    client = await backoff_connect_temporal(settings.TEMPORAL_HOST)

    worker_definitions = [definition for definition in daemons.DEFINITIONS]
    definition = [
        definition for definition in worker_definitions if definition.queue_name == "document-renderer-queue"
    ][0]
    logging.info(f"Connected to temporal: {definition.queue_name}")

    print("Definition:")
    print(definition.activities)
    print(definition.workflows)

    worker = Worker(
        client,
        task_queue=definition.queue_name,
        workflows=definition.workflows,
        activities=definition.activities,
    )

    await worker.run()

I’ve confirmed in the console as well that the worker is connected and apparently pulling from the queue:

These logs come from a M1 arm64, but have seen spotty instances of this happening in our linux amd64 production instance as well. Are there additional more verbose debugging logs available? Any suspected root causes?

Hrmm, the only two things I can think of is you are either doing something blocking in an async def call that is preventing asyncio event loop from continuing, or the task queue is wrong on the worker compared to the scheduled activity.

If possible, can you make a standalone replication?

The only thing running in that async loop really should be the worker.run(). I’m calling async.run(simple_run_worker()) right after program startup.

The worker task queue also seems correct; here’s a log of the worker config coming from the worker bridge:

Bridge worker config WorkerConfig(namespace=‘default’, task_queue=‘document-renderer-queue’, build_id=‘151aed6089eb26211954c4bc6194c679’, identity_override=None, max_cached_workflows=1000, max_outstanding_workflow_tasks=100, max_outstanding_activities=100, max_outstanding_local_activities=100, max_concurrent_workflow_task_polls=5, nonsticky_to_sticky_poll_ratio=0.2, max_concurrent_activity_task_polls=5, no_remote_activities=False, sticky_queue_schedule_to_start_timeout_millis=10000, max_heartbeat_throttle_interval_millis=60000, default_heartbeat_throttle_interval_millis=30000, max_activities_per_second=None, max_task_queue_activities_per_second=None, graceful_shutdown_period_millis=0, use_worker_versioning=False)

Is it possible that the relationship between this dependent activity and the parent workflow is causing the poll to fail, because there’s not a worker to manage the other task queue? If this case is supposed to work as-is I can try to look at a minimum reproduction.

Are your activities async def or just def? If the former, they are also running on the same loop everyone shares, so make sure they have no blocking code (or switch to def and provide a thread pool executor or similar).

:+1: Definitely supposed to work as is. Many users run hundreds of activities and workflows simultaneously.