Unspecified WorkflowTaskFailed [Errno 32] Broken pipe

Getting a nifty error message that asks me to contact support, but also I’d love to learn how to troubleshoot this one myself. The issue started when I implemented a signal into the application to let the user ‘replay’ the game.

Error
Unspecified
The Workflow Task failed for an unknown reason. Please contact support.

Failure
{
“message”: “[Errno 32] Broken pipe”,
“source”: “”,
“stackTrace”: " File "/Users/rain/learn-temporal-python-SDK-v2/venv/lib/python3.9/site-packages/temporalio/worker/_workflow_instance.py", line 289, in activate\n self._run_once(check_conditions=index == 1 or index == 2)\n\n File "/Users/rain/learn-temporal-python-SDK-v2/venv/lib/python3.9/site-packages/temporalio/worker/_workflow_instance.py", line 1187, in _run_once\n raise self._current_activation_error\n\n File "/Users/rain/learn-temporal-python-SDK-v2/venv/lib/python3.9/site-packages/temporalio/worker/_workflow_instance.py", line 1205, in _run_top_level_workflow_function\n await coro\n\n File "/Users/rain/learn-temporal-python-SDK-v2/venv/lib/python3.9/site-packages/temporalio/worker/_workflow_instance.py", line 621, in run_workflow\n result = await self._inbound.execute_workflow(input)\n\n File "/Users/rain/learn-temporal-python-SDK-v2/venv/lib/python3.9/site-packages/temporalio/worker/_workflow_instance.py", line 1453, in execute_workflow\n return await input.run_fn(*args)\n\n File "/Users/rain/learn-temporal-python-SDK-v2/poker_workflow.py", line 41, in run\n await play_round(seed)\n\n File "/Users/rain/learn-temporal-python-SDK-v2/poker_workflow.py", line 15, in play_round\n print(f"Player {i + 1}‘s hand: {’, '.join(str(card) for card in player_hand)}")\n",
“encodedAttributes”: null,
“cause”: null,
“applicationFailureInfo”: {
“type”: “BrokenPipeError”,
“nonRetryable”: false,
“details”: null
}
}

Stack Trace
File “/Users/rain/learn-temporal-python-SDK-v2/venv/lib/python3.9/site-packages/temporalio/worker/_workflow_instance.py”, line 289, in activate
self._run_once(check_conditions=index == 1 or index == 2)

File “/Users/rain/learn-temporal-python-SDK-v2/venv/lib/python3.9/site-packages/temporalio/worker/_workflow_instance.py”, line 1187, in _run_once
raise self._current_activation_error

File “/Users/rain/learn-temporal-python-SDK-v2/venv/lib/python3.9/site-packages/temporalio/worker/_workflow_instance.py”, line 1205, in _run_top_level_workflow_function
await coro

File “/Users/rain/learn-temporal-python-SDK-v2/venv/lib/python3.9/site-packages/temporalio/worker/_workflow_instance.py”, line 621, in run_workflow
result = await self._inbound.execute_workflow(input)

File “/Users/rain/learn-temporal-python-SDK-v2/venv/lib/python3.9/site-packages/temporalio/worker/_workflow_instance.py”, line 1453, in execute_workflow
return await input.run_fn(*args)

File “/Users/rain/learn-temporal-python-SDK-v2/poker_workflow.py”, line 41, in run
await play_round(seed)

File “/Users/rain/learn-temporal-python-SDK-v2/poker_workflow.py”, line 15, in play_round
print(f"Player {i + 1}‘s hand: {’, '.join(str(card) for card in player_hand)}")

poker_workflow.py

from temporalio import workflow
from game_state import GameState
from deck_utils import deal_cards, create_deck, shuffle_deck
from hand_ranking_workflow import HandRankingWorkflow, HandRankingInput


async def play_round(seed: int) -> None:
    deck = await shuffle_deck(create_deck(), seed)
    game_state = GameState(deck=deck, players=[[], [], [], []])

    for i in range(4):
        game_state.players[i] = await deal_cards(game_state, 5)

    for i, player_hand in enumerate(game_state.players):
        print(f"Player {i + 1}'s hand: {', '.join(str(card) for card in player_hand)}")

    hand_ranks = [
        await workflow.execute_child_workflow(
            HandRankingWorkflow.run,
            HandRankingInput([str(card) for card in hand]),
        )
        for hand in game_state.players
    ]

    max_rank = max([rank[0] for rank in hand_ranks])
    winning_hands = [(i, rank) for i, rank in enumerate(hand_ranks) if rank[0] == max_rank]
    winner_idx, winner_rank = max(winning_hands, key=lambda x: (x[1][0], x[1][1]))

    print(
        f"Player {winner_idx + 1} wins with a {', '.join(str(card) for card in game_state.players[winner_idx])}!"
    )


@workflow.defn
class PokerWorkflow:
    def __init__(self):
        self.exit = None

    @workflow.run
    async def run(self, seed: int) -> None:
        await play_round(seed)

    @workflow.signal
    async def play_again(self, seed: int) -> None:
        await play_round(seed)

    @workflow.signal
    def exit(self) -> None:
        self.exit = True

main.py

import asyncio
import time
from temporalio.client import Client
from temporalio.worker import Worker
from poker_workflow import PokerWorkflow
from hand_ranking_workflow import HandRankingWorkflow


async def main():
    client = await Client.connect("localhost:7233")

    async with Worker(
        client,
        task_queue="poker-hand-ranking-task-queue",
        workflows=[PokerWorkflow, HandRankingWorkflow],
    ):
        seed = int(time.time())
        workflow_execution = await client.start_workflow(
            PokerWorkflow.run,
            seed,
            id="poker-hand-ranking-workflow-id",
            task_queue="poker-hand-ranking-task-queue",
        )

        play_again = True
        while play_again:
            user_input = input("Play again? Type 'yes', 'no', or 'exit': ").lower()
            play_again = user_input == "yes"
            if play_again:
                seed = int(time.time())
                await workflow_execution.signal(PokerWorkflow.play_again, seed)
            else:
                await workflow_execution.signal(PokerWorkflow.exit)

if __name__ == "__main__":
    asyncio.run(main())

At first glance, this appears to be an error with writing to stdout. A BrokenPipeError can happen in Python print when, say, the process is being killed or somehow the stdout stream is otherwise closed. (there are many articles visible via Google about this)

Workflows should not write to stdout as external IO is non-deterministic. But print should still technically work if I remember correctly. Does that happen every run immediately when starting?

It doesn’t happen every time. Isn’t that odd?

As for the Broken Pipe error, yeah, this was working / fine before I tried adding a signal to the mix.

Yes, but I do not believe it is Temporal specific. This seems to be an issue with process pipe going away while you are trying to print. Regardless, you should not print (or use any external IO) from a workflow. Use workflow.logger.

1 Like

The “Unspecified WorkflowTaskFailed [Errno 32] Broken pipe” error typically indicates a communication breakdown in a software system or network. To address it, first ensure your game network connection is stable, as this error often occurs in networked environments. Keeping your software or application updated is crucial, as updates may contain fixes for such issues. A simple restart of the application or your entire system can sometimes clear up transient errors. Additionally, verify the availability of any external resources or services your application relies on. Examining log files or detailed error messages can provide more specific insights into the cause of the problem. Also, consider the possibility of conflicts with other software running on your system, which might interfere with the normal operation of your application. If these steps don’t resolve the issue, you may need to consult technical support or a professional, especially if the problem persists or affects critical operations.