Eviction Issue in workflow

Failed running eviction job for run ID 019dd4a5-3221-7f36-a26f-170ead6cc7fe, continually retrying eviction. Since eviction could not be processed, this worker may not complete and the slot may remain forever used unless it eventually completes.
Traceback (most recent call last):
File "/usr/local/lib/python3.13/site-packages/temporalio/worker/_workflow.py", line 581, in _handle_cache_eviction
await asyncio.wait_for(
handle_eviction_task, self._deadlock_timeout_seconds
)
File "/usr/local/lib/python3.13/asyncio/tasks.py", line 507, in wait_for
return await fut
^^^^^^^^^
File "/usr/local/lib/python3.13/concurrent/futures/thread.py", line 59, in run
result = self.fn(*self.args, **self.kwargs)
File "/usr/local/lib/python3.13/site-packages/temporalio/worker/_workflow.py", line 841, in activate
return self.instance.activate(act)
~~~~~~~~~~~~~~~~~~~~~~^^^^^
File "/usr/local/lib/python3.13/site-packages/temporalio/worker/workflow_sandbox/_runner.py", line 166, in activate
self._run_code(
~~~~~~~~~~~~~~^
"with __temporal_importer.applied():\n"
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
...<2 lines>...
__temporal_activation=act,
^^^^^^^^^^^^^^^^^^^^^^^^^^
)
^
File "/usr/local/lib/python3.13/site-packages/temporalio/worker/workflow_sandbox/_runner.py", line 183, in _run_code
exec(code, self.globals_and_locals, self.globals_and_locals)
~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "<string>", line 2, in <module>
File "/usr/local/lib/python3.13/site-packages/temporalio/worker/workflow_sandbox/_in_sandbox.py", line 84, in activate
return self.instance.activate(act)
~~~~~~~~~~~~~~~~~~~~~~^^^^^
File "/usr/local/lib/python3.13/site-packages/temporalio/worker/_workflow_instance.py", line 500, in activate
raise RuntimeError(
...<2 lines>...
)
RuntimeError: Eviction processed, but 324 tasks remain. Stack traces below:

i am getting these errors and then the activities start getting cancelled one by one
and also the workflow stop polling for tasks

This is happening when i am running approx 250 tasks in parellel ( although there can be approx more than 500 tasks ) which in turns call an activity to call several apis ( i am assuming since all them are network bound and not cpu bound so it should not have issue with much parellelism )

i am running tasks in batches with the below snipped

import asyncio
from collections.abc import Iterable
from typing import TYPE_CHECKING, Literal, cast, overload

from temporalio import workflow
from temporalio.exceptions import is_cancelled_exception

if TYPE_CHECKING:
    from intent_engine.temporal.utils.types import TaskCoroutine

DEFAULT_BATCH_WORKER_YIELD_EVERY = 5


def _get_cancellation_error(
    exception_group: BaseExceptionGroup,
) -> BaseException | None:
    for exception in exception_group.exceptions:
        if isinstance(exception, BaseExceptionGroup):
            cancellation_error = _get_cancellation_error(cast("BaseExceptionGroup", exception))
            if cancellation_error is not None:
                return cancellation_error
            continue
        if is_cancelled_exception(exception):
            return exception
    return None


@overload
async def safe_gather[T](
    *coros_or_futures: "TaskCoroutine[T]",
    return_exceptions: Literal[False] = False,
) -> list[T]: ...


@overload
async def safe_gather[T](
    *coros_or_futures: "TaskCoroutine[T]",
    return_exceptions: bool,
) -> list[T | Exception]: ...


async def _gather_tasks_results[T](
    *coros_or_futures: "TaskCoroutine[T]",
) -> list[T]:
    tasks: list[asyncio.Task[T]] = []
    try:
        async with asyncio.TaskGroup() as task_group:
            for coro_or_future in coros_or_futures:
                tasks.append(task_group.create_task(coro_or_future))
    except BaseExceptionGroup as exc_group:
        cancellation_error = _get_cancellation_error(exc_group)
        if cancellation_error is not None:
            raise cancellation_error from None
        raise

    return [task.result() for task in tasks]


async def safe_gather[T](
    *coros_or_futures: "TaskCoroutine[T]",
    return_exceptions: bool = False,
) -> list[T] | list[T | Exception] | tuple[object, ...]:
    """
    A small wrapper around asyncio.gather/TaskGroup that preserves
    cancellation behavior when return_exceptions=True.
    """
    if return_exceptions:

        async def capture_exception(awaitable: "TaskCoroutine[T]") -> T | Exception:
            try:
                return await awaitable
            except Exception as exc:
                if is_cancelled_exception(exc):
                    raise
                return exc

        return await _gather_tasks_results(
            *[capture_exception(coro_or_future) for coro_or_future in coros_or_futures]
        )
    else:
        return await _gather_tasks_results(*coros_or_futures)


@overload
async def safe_gather_in_batches[T](
    coros_or_futures: Iterable["TaskCoroutine[T]"],
    *,
    batch_size: int,
    return_exceptions: Literal[False] = False,
) -> list[T]: ...


@overload
async def safe_gather_in_batches[T](
    coros_or_futures: Iterable["TaskCoroutine[T]"],
    *,
    batch_size: int,
    return_exceptions: bool,
) -> list[T | Exception]: ...


async def safe_gather_in_batches[T](
    coros_or_futures: Iterable["TaskCoroutine[T] | TaskCoroutine[object]"],
    *,
    batch_size: int,
    return_exceptions: bool = False,
) -> list[T] | list[T | Exception]:
    if batch_size <= 0:
        raise ValueError("'batch_size' must be greater than 0")
    indexed_awaitables = iter(enumerate(coros_or_futures))
    next_item_lock = asyncio.Lock()
    total_count = 0
    results: dict[int, T | Exception] = {}

    async def next_item() -> tuple[int, "TaskCoroutine[T]"] | None:
        nonlocal total_count
        async with next_item_lock:
            try:
                idx, awaitable = next(indexed_awaitables)
            except StopIteration:
                return None
            total_count += 1
            return idx, cast("TaskCoroutine[T]", awaitable)

    async def worker() -> None:
        processed_since_yield = 0
        while True:
            item = await next_item()
            if item is None:
                return

            idx, awaitable = item
            item_result = await safe_gather(awaitable, return_exceptions=return_exceptions)
            results[idx] = item_result[0]
            processed_since_yield += 1
            if processed_since_yield >= DEFAULT_BATCH_WORKER_YIELD_EVERY:
                # Cooperative yield to keep workflow activations responsive under high fanout.
                await workflow.sleep(0)
                processed_since_yield = 0

    _ = await safe_gather(
        *(worker() for _ in range(batch_size)),
        return_exceptions=False,
    )
    ordered_results = [results[idx] for idx in range(total_count)]
    if return_exceptions:
        return ordered_results
    return cast("list[T]", ordered_results)

Hi,

I think this is related to the deadlock detector , something is blocking the asyncio event loop

Could you share the workflow code? if not, can you check if there are any blocking calls time.sleep() , Event.wait()

Could you share the SDK version? See this issue https://github.com/temporalio/sdk-python/pull/806

Antonio

We are on temporalio==1.27.0

  • We are still seeing repeated:

    • TMPRL1104 Workflow task exceeded 5 seconds
    • then Failed running eviction job ... slot may remain forever used.
  • We don’t use time.sleep() / threading.Event.wait() in workflow code. We use workflow.sleep(0) for cooperative yields in batch loops.

  • After stall, metrics look like:

    • workflow pollers for queue drop to 0,
    • workflow slots become used=5, available=0,
    • activity pollers remain healthy.

I am not able to understand what could prevent eviction if the batching is like the code shared above and i don’t use any try except loop above this so that the exception shouldn’t be swallowed anywhere else, Can you pls let me know how eviction work in temporal so that i could go through my code to find the issue