Buffer workflow pattern

I’m trying to implement a batch operation pattern with a buffer.

The component initiating the work is a simple workflow that uses “signal with start”:

@workflow.defn(name="SingleUserWorkflow")
class SingleUserWorkflow(HBWorkflow):

    async def send_to_buffer(self) -> List[PrioritizedActionItem]:
        await workflow.execute_activity(
            send_to_buffer_activity,
            AddToBufferSignalParams(context_id=self.context_id, initiating_workflow_id=workflow.info().workflow_id),
            schedule_to_close_timeout=timedelta(seconds=5),
        )


    @workflow.run
    async def run(self, params: SingleUserWorkflowParams):
        await super().run(params)
        await self.send_to_buffer()

send_to_buffer_activity:

@activity.defn
async def send_to_buffer_activity(params: AddToBufferSignalParams) -> None:
    from src.utils.temporal_client import get_temporal_client
    async with get_temporal_client() as client:
        task_queue = get_task_queue()
        await client.start_workflow(
            "BufferWorkflow",
            BufferWorkflowParams(context_id="123"),
            id="buffer-workflow",
            task_queue=task_queue,
            start_signal="add_to_buffer",
            start_signal_args=[params],
        )

The buffer workflow gets these signals and pushes data into an internal buffer until it’s full or until some time has passed, and then it should “flush” the buffer by offloading the work to a different workflow which performs the batch operation:

@workflow.defn(name="BufferWorkflow")
class BufferWorkflow:

    def __init__(self):
        self.buffer: List[AddToBufferSignalParams] = []

    @workflow.run
    async def run(self, params: BufferWorkflowParams) -> None:
        print("BufferWorkflow running", workflow.info().run_id)
        try:
            await workflow.wait_condition(
                lambda: len(self.buffer) == 3,
                timeout=60,
            )
            print("BufferWorkflow buffer full", workflow.info().run_id)
        except asyncio.TimeoutError:
            print("BufferWorkflow buffer max time reached", workflow.info().run_id)

        # create a ContextWorkflowPair list for BatchWorkflow
        pairs = [
            ContextWorkflowPair(
                context_id=item.context_id,
                initiating_workflow_id=item.initiating_workflow_id,
            )
            for item in self.buffer
        ]

        print("BufferWorkflow flush", workflow.info().run_id)
        await workflow.start_child_workflow(
            "BatchWorkflow",
            BatchWorkflowParams(context_workflow_pairs=pairs),
            parent_close_policy=workflow.ParentClosePolicy.ABANDON,
        )
        print("BufferWorkflow done", workflow.info().run_id)

    @workflow.signal(name="add_to_buffer")
    async def add_to_buffer(self, params: AddToBufferSignalParams) -> None:
        self.buffer.append(params)

what I’m afraid from is that when the buffer workflow enters the “flush” stage (after wait_codition is done), it can still get signals. I want to make sure that once it starts flushing it can no longer receive signals. It must finish offloading the work, complete, and the next signal will start a fresh BufferWorkflow (because of “signal with start”).

Is my concern justified? Is there a way to achieve this guarantee?

You need to make sure to drain the buffer before completing the workflow. Every time the workflow blocks the buffer can be increased. In your case the workflow calls await start_child_workflow after checking the buffer size so the signal loss is possible.

Is there a way to achieve this guarantee of not receiving any more signals after a certain point?

another option I am considering is calling “continue as new” after starting the child workflow, and passing the current buffer to the new workflow, this way if it got more signals during the drain stage, those will not be lost but passed on to the next workflow.

There is no currently way to block signals besides closing the workflow.

Passing the buffer is a common pattern as well.

1 Like