I’m trying to implement a batch operation pattern with a buffer.
The component initiating the work is a simple workflow that uses “signal with start”:
@workflow.defn(name="SingleUserWorkflow")
class SingleUserWorkflow(HBWorkflow):
async def send_to_buffer(self) -> List[PrioritizedActionItem]:
await workflow.execute_activity(
send_to_buffer_activity,
AddToBufferSignalParams(context_id=self.context_id, initiating_workflow_id=workflow.info().workflow_id),
schedule_to_close_timeout=timedelta(seconds=5),
)
@workflow.run
async def run(self, params: SingleUserWorkflowParams):
await super().run(params)
await self.send_to_buffer()
send_to_buffer_activity:
@activity.defn
async def send_to_buffer_activity(params: AddToBufferSignalParams) -> None:
from src.utils.temporal_client import get_temporal_client
async with get_temporal_client() as client:
task_queue = get_task_queue()
await client.start_workflow(
"BufferWorkflow",
BufferWorkflowParams(context_id="123"),
id="buffer-workflow",
task_queue=task_queue,
start_signal="add_to_buffer",
start_signal_args=[params],
)
The buffer workflow gets these signals and pushes data into an internal buffer until it’s full or until some time has passed, and then it should “flush” the buffer by offloading the work to a different workflow which performs the batch operation:
@workflow.defn(name="BufferWorkflow")
class BufferWorkflow:
def __init__(self):
self.buffer: List[AddToBufferSignalParams] = []
@workflow.run
async def run(self, params: BufferWorkflowParams) -> None:
print("BufferWorkflow running", workflow.info().run_id)
try:
await workflow.wait_condition(
lambda: len(self.buffer) == 3,
timeout=60,
)
print("BufferWorkflow buffer full", workflow.info().run_id)
except asyncio.TimeoutError:
print("BufferWorkflow buffer max time reached", workflow.info().run_id)
# create a ContextWorkflowPair list for BatchWorkflow
pairs = [
ContextWorkflowPair(
context_id=item.context_id,
initiating_workflow_id=item.initiating_workflow_id,
)
for item in self.buffer
]
print("BufferWorkflow flush", workflow.info().run_id)
await workflow.start_child_workflow(
"BatchWorkflow",
BatchWorkflowParams(context_workflow_pairs=pairs),
parent_close_policy=workflow.ParentClosePolicy.ABANDON,
)
print("BufferWorkflow done", workflow.info().run_id)
@workflow.signal(name="add_to_buffer")
async def add_to_buffer(self, params: AddToBufferSignalParams) -> None:
self.buffer.append(params)
what I’m afraid from is that when the buffer workflow enters the “flush” stage (after wait_codition
is done), it can still get signals. I want to make sure that once it starts flushing it can no longer receive signals. It must finish offloading the work, complete, and the next signal will start a fresh BufferWorkflow (because of “signal with start”).
Is my concern justified? Is there a way to achieve this guarantee?