I’m seeing unexpected behavior with the temporal server/python client. There are multiple available activities in the server that are set to ActivityTaskScheduled, but these aren’t pulled down to the python client. It just hangs indefinitely on the poll_activity_task task:
I’ve believe this blocking is isolated to the task polling, through logs patched into the site-packages/temporalio/worker/_activity.py
runner:
START ACTIVITY RUNNER
WILL POLL <Task pending name='Task-7' coro=<Worker.poll_activity_task() running at /root/.cache/pypoetry/virtualenvs/selectify-MATOk_fk-py3.11/lib/python3.11/site-packages/temporalio/bridge/worker.py:93>>
(indefinite hang)
async def run(self) -> None:
print("START ACTIVITY RUNNER")
# Create a task that fails when we get a failure on the queue
async def raise_from_queue() -> NoReturn:
raise await self._fail_worker_exception_queue.get()
exception_task = asyncio.create_task(raise_from_queue())
# Continually poll for activity work
while True:
try:
# Poll for a task
poll_task = asyncio.create_task(
self._bridge_worker().poll_activity_task()
)
print("WILL POLL", poll_task)
await asyncio.wait([poll_task, exception_task], return_when=asyncio.FIRST_COMPLETED) # type: ignore
print("DID WAIT1")
# If exception for failing the worker happened, raise it.
# Otherwise, the poll succeeded.
if exception_task.done():
poll_task.cancel()
await exception_task
task = await poll_task
print("DID WAIT2")
This persists across restarts of both the temporal server and the client applications, so I’m relatively sure it’s not a corrupt memory state that either has found themselves in. Nothing seems amiss in the temporal server logs either:
temporal | {"level":"info","ts":"2023-09-01T19:38:19.568Z","msg":"none","service":"matching","component":"matching-engine","wf-task-queue-name":"/_sys/document-renderer-queue/2","wf-task-queue-type":"Activity","wf-namespace":"default","lifecycle":"Started","logging-call-at":"taskQueueManager.go:320"}
temporal | {"level":"info","ts":"2023-09-01T19:38:19.568Z","msg":"none","service":"matching","component":"matching-engine","wf-task-queue-name":"1171@59bf991b46c6-19a92c93d25d465ca347ff3269f30792","wf-task-queue-type":"Workflow","wf-namespace":"default","lifecycle":"Started","logging-call-at":"taskQueueManager.go:320"}
temporal | {"level":"info","ts":"2023-09-01T19:38:19.568Z","msg":"none","service":"matching","component":"matching-engine","wf-task-queue-name":"document-renderer-queue","wf-task-queue-type":"Activity","wf-namespace":"default","lifecycle":"Started","logging-call-at":"taskQueueManager.go:320"}
temporal | {"level":"info","ts":"2023-09-01T19:38:19.568Z","msg":"none","service":"matching","component":"matching-engine","wf-task-queue-name":"/_sys/document-renderer-queue/1","wf-task-queue-type":"Activity","wf-namespace":"default","lifecycle":"Started","logging-call-at":"taskQueueManager.go:320"}
temporal | {"level":"info","ts":"2023-09-01T19:38:19.568Z","msg":"none","service":"matching","component":"matching-engine","wf-task-queue-name":"document-renderer-queue","wf-task-queue-type":"Workflow","wf-namespace":"default","lifecycle":"Started","logging-call-at":"taskQueueManager.go:320"}
This is the most recent task log in one such hanging workflow.
{
"eventId": "11",
"eventTime": "2023-09-01T05:11:26.209521757Z",
"eventType": "ActivityTaskScheduled",
"version": "0",
"taskId": "66060594",
"workerMayIgnore": false,
"activityTaskScheduledEventAttributes": {
"activityId": "2",
"activityType": {
"name": "log_metered_type"
},
"taskQueue": {
"name": "document-renderer-queue",
"kind": "Normal",
"normalName": ""
},
"header": {
"fields": {}
},
"input": {
"payloads": [
{
"resource_type": "PAGE_RENDERS",
"user_id": "b0daed0e-16d6-4e91-ad6b-ddfb8c5d81bf"
}
]
},
"scheduleToCloseTimeout": "0s",
"scheduleToStartTimeout": "0s",
"startToCloseTimeout": "15s",
"heartbeatTimeout": "0s",
"workflowTaskCompletedEventId": "10",
"retryPolicy": {
"initialInterval": "1s",
"backoffCoefficient": 2,
"maximumInterval": "100s",
"maximumAttempts": 0,
"nonRetryableErrorTypes": []
},
"useCompatibleVersion": true
},
"name": "ActivityTaskScheduled",
"id": "11",
"timestamp": "2023-09-01 UTC 05:11:26.20",
"classification": "Scheduled",
"category": "activity",
"attributes": {
"type": "activityTaskScheduledEventAttributes",
"activityId": "2",
"activityType": "log_metered_type",
"taskQueue": {
"name": "document-renderer-queue",
"kind": "Normal",
"normalName": ""
},
"header": {
"fields": {}
},
"input": {
"payloads": [
{
"resource_type": "PAGE_RENDERS",
"user_id": "b0daed0e-16d6-4e91-ad6b-ddfb8c5d81bf"
}
]
},
"scheduleToCloseTimeout": "",
"scheduleToStartTimeout": "",
"startToCloseTimeout": "15 seconds",
"heartbeatTimeout": "",
"workflowTaskCompletedEventId": "10",
"retryPolicy": {
"initialInterval": "1s",
"backoffCoefficient": 2,
"maximumInterval": "100s",
"maximumAttempts": 0,
"nonRetryableErrorTypes": []
},
"useCompatibleVersion": true
}
}
My current python worker spawn script looks like this. I started seeing this hanging behavior when I had workers assigned to each separate queue, but in an attempt to isolate this issue have paired it down to just the task queue that this activity belongs to.
async def simple_run_worker():
settings = get_settings()
client = await backoff_connect_temporal(settings.TEMPORAL_HOST)
worker_definitions = [definition for definition in daemons.DEFINITIONS]
definition = [
definition for definition in worker_definitions if definition.queue_name == "document-renderer-queue"
][0]
logging.info(f"Connected to temporal: {definition.queue_name}")
print("Definition:")
print(definition.activities)
print(definition.workflows)
worker = Worker(
client,
task_queue=definition.queue_name,
workflows=definition.workflows,
activities=definition.activities,
)
await worker.run()
I’ve confirmed in the console as well that the worker is connected and apparently pulling from the queue:
These logs come from a M1 arm64, but have seen spotty instances of this happening in our linux amd64 production instance as well. Are there additional more verbose debugging logs available? Any suspected root causes?