Failed running eviction job for run ID 019dd4a5-3221-7f36-a26f-170ead6cc7fe, continually retrying eviction. Since eviction could not be processed, this worker may not complete and the slot may remain forever used unless it eventually completes.
Traceback (most recent call last):
File "/usr/local/lib/python3.13/site-packages/temporalio/worker/_workflow.py", line 581, in _handle_cache_eviction
await asyncio.wait_for(
handle_eviction_task, self._deadlock_timeout_seconds
)
File "/usr/local/lib/python3.13/asyncio/tasks.py", line 507, in wait_for
return await fut
^^^^^^^^^
File "/usr/local/lib/python3.13/concurrent/futures/thread.py", line 59, in run
result = self.fn(*self.args, **self.kwargs)
File "/usr/local/lib/python3.13/site-packages/temporalio/worker/_workflow.py", line 841, in activate
return self.instance.activate(act)
~~~~~~~~~~~~~~~~~~~~~~^^^^^
File "/usr/local/lib/python3.13/site-packages/temporalio/worker/workflow_sandbox/_runner.py", line 166, in activate
self._run_code(
~~~~~~~~~~~~~~^
"with __temporal_importer.applied():\n"
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
...<2 lines>...
__temporal_activation=act,
^^^^^^^^^^^^^^^^^^^^^^^^^^
)
^
File "/usr/local/lib/python3.13/site-packages/temporalio/worker/workflow_sandbox/_runner.py", line 183, in _run_code
exec(code, self.globals_and_locals, self.globals_and_locals)
~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "<string>", line 2, in <module>
File "/usr/local/lib/python3.13/site-packages/temporalio/worker/workflow_sandbox/_in_sandbox.py", line 84, in activate
return self.instance.activate(act)
~~~~~~~~~~~~~~~~~~~~~~^^^^^
File "/usr/local/lib/python3.13/site-packages/temporalio/worker/_workflow_instance.py", line 500, in activate
raise RuntimeError(
...<2 lines>...
)
RuntimeError: Eviction processed, but 324 tasks remain. Stack traces below:
i am getting these errors and then the activities start getting cancelled one by one
and also the workflow stop polling for tasks
This is happening when i am running approx 250 tasks in parellel ( although there can be approx more than 500 tasks ) which in turns call an activity to call several apis ( i am assuming since all them are network bound and not cpu bound so it should not have issue with much parellelism )
i am running tasks in batches with the below snipped
import asyncio
from collections.abc import Iterable
from typing import TYPE_CHECKING, Literal, cast, overload
from temporalio import workflow
from temporalio.exceptions import is_cancelled_exception
if TYPE_CHECKING:
from intent_engine.temporal.utils.types import TaskCoroutine
DEFAULT_BATCH_WORKER_YIELD_EVERY = 5
def _get_cancellation_error(
exception_group: BaseExceptionGroup,
) -> BaseException | None:
for exception in exception_group.exceptions:
if isinstance(exception, BaseExceptionGroup):
cancellation_error = _get_cancellation_error(cast("BaseExceptionGroup", exception))
if cancellation_error is not None:
return cancellation_error
continue
if is_cancelled_exception(exception):
return exception
return None
@overload
async def safe_gather[T](
*coros_or_futures: "TaskCoroutine[T]",
return_exceptions: Literal[False] = False,
) -> list[T]: ...
@overload
async def safe_gather[T](
*coros_or_futures: "TaskCoroutine[T]",
return_exceptions: bool,
) -> list[T | Exception]: ...
async def _gather_tasks_results[T](
*coros_or_futures: "TaskCoroutine[T]",
) -> list[T]:
tasks: list[asyncio.Task[T]] = []
try:
async with asyncio.TaskGroup() as task_group:
for coro_or_future in coros_or_futures:
tasks.append(task_group.create_task(coro_or_future))
except BaseExceptionGroup as exc_group:
cancellation_error = _get_cancellation_error(exc_group)
if cancellation_error is not None:
raise cancellation_error from None
raise
return [task.result() for task in tasks]
async def safe_gather[T](
*coros_or_futures: "TaskCoroutine[T]",
return_exceptions: bool = False,
) -> list[T] | list[T | Exception] | tuple[object, ...]:
"""
A small wrapper around asyncio.gather/TaskGroup that preserves
cancellation behavior when return_exceptions=True.
"""
if return_exceptions:
async def capture_exception(awaitable: "TaskCoroutine[T]") -> T | Exception:
try:
return await awaitable
except Exception as exc:
if is_cancelled_exception(exc):
raise
return exc
return await _gather_tasks_results(
*[capture_exception(coro_or_future) for coro_or_future in coros_or_futures]
)
else:
return await _gather_tasks_results(*coros_or_futures)
@overload
async def safe_gather_in_batches[T](
coros_or_futures: Iterable["TaskCoroutine[T]"],
*,
batch_size: int,
return_exceptions: Literal[False] = False,
) -> list[T]: ...
@overload
async def safe_gather_in_batches[T](
coros_or_futures: Iterable["TaskCoroutine[T]"],
*,
batch_size: int,
return_exceptions: bool,
) -> list[T | Exception]: ...
async def safe_gather_in_batches[T](
coros_or_futures: Iterable["TaskCoroutine[T] | TaskCoroutine[object]"],
*,
batch_size: int,
return_exceptions: bool = False,
) -> list[T] | list[T | Exception]:
if batch_size <= 0:
raise ValueError("'batch_size' must be greater than 0")
indexed_awaitables = iter(enumerate(coros_or_futures))
next_item_lock = asyncio.Lock()
total_count = 0
results: dict[int, T | Exception] = {}
async def next_item() -> tuple[int, "TaskCoroutine[T]"] | None:
nonlocal total_count
async with next_item_lock:
try:
idx, awaitable = next(indexed_awaitables)
except StopIteration:
return None
total_count += 1
return idx, cast("TaskCoroutine[T]", awaitable)
async def worker() -> None:
processed_since_yield = 0
while True:
item = await next_item()
if item is None:
return
idx, awaitable = item
item_result = await safe_gather(awaitable, return_exceptions=return_exceptions)
results[idx] = item_result[0]
processed_since_yield += 1
if processed_since_yield >= DEFAULT_BATCH_WORKER_YIELD_EVERY:
# Cooperative yield to keep workflow activations responsive under high fanout.
await workflow.sleep(0)
processed_since_yield = 0
_ = await safe_gather(
*(worker() for _ in range(batch_size)),
return_exceptions=False,
)
ordered_results = [results[idx] for idx in range(total_count)]
if return_exceptions:
return ordered_results
return cast("list[T]", ordered_results)