Hi,
I’m trying to translate the Sliding Window Batching example to python.
I’m encountering an issue where the parent workflow SlidingWindowBatchWorkflow
misses a completion signal and than continues as new without the update, causing it to hang forever waiting for the signal.
This is the code:
Main Workflow:
@workflow.defn
class OfflineBatchWorkflow:
"""Main batch workflow that partitions work across multiple sliding windows."""
@workflow.run
async def process_batch(self, request: OfflineEnrichmentWIPRequest) -> int:
"""Process batch using multiple parallel sliding window workflows."""
page_size = request.page_size
sliding_window_size = request.sliding_window_size
batch_files = await workflow.execute_activity(
list_batch_files_activity,
request.batches_path,
start_to_close_timeout=TIMEOUT)
files_metadata: List[FileInfo] = await workflow.execute_activity(fetch_files_metadata_activity,
args=[request.batches_path, batch_files],
start_to_close_timeout=TIMEOUT)
partitions: List[FilePartitionInfo] = await workflow.execute_activity(
create_partitions_activity,
args=[files_metadata, request.partition_size],
start_to_close_timeout=TIMEOUT)
partition_handles = []
for i, partition in enumerate(partitions):
child_id = f"{workflow.info().workflow_id}/{i}"
batch_input = ProcessBatchInput(
file_info=partition.file_info,
request=request,
offset=partition.start_offset,
total_count=partition.end_offset,
page_size=page_size,
sliding_window_size=sliding_window_size
)
handle = await workflow.start_child_workflow(
workflow=SlidingWindowBatchWorkflow.process_batch,
args=[batch_input],
id=child_id
)
partition_handles.append(handle)
results = await asyncio.gather(*partition_handles)
return sum(results)
Child Workflows:
class SlidingWindowBatchWorkflow:
"""Implements batch processing using sliding window of parallel workflows."""
@workflow.init
def __init__(self, input: ProcessBatchInput) -> None:
self._current_records = input.current_records.copy()
self._records_to_remove = set()
self._progress = input.progress
count_before = len(self._current_records)
self._current_records -= self._records_to_remove
self._progress += count_before - len(self._current_records)
@workflow.run
async def process_batch(self, input: ProcessBatchInput) -> int:
"""Process batch of records using sliding window."""
page_size = input.page_size
offset = input.offset
sliding_window_size = input.sliding_window_size
request = input.request
children_started = []
# Get next batch of records
records = await workflow.execute_activity(get_records_activity,
args=[input],
start_to_close_timeout=TIMEOUT)
current_record_idx = 0
while True:
# Wait until window has space
await workflow.wait_condition(
lambda: len(self._current_records) < sliding_window_size
)
if not records or current_record_idx == len(records):
# Wait for all children to complete
await workflow.wait_condition(lambda: len(self._current_records) == 0)
print(f'Workflow {workflow.info().workflow_id} completed')
return offset + len(children_started)
record = records[current_record_idx]
child = await workflow.start_child_workflow(
SingleRecordProcessorWorkflow.process_record,
args=[request, record],
id=f"{workflow.info().workflow_id}/{record.id}",
parent_close_policy=workflow.ParentClosePolicy.ABANDON,
)
children_started.append(child)
self._current_records.add(record.id)
current_record_idx += 1
if len(children_started) == page_size:
await workflow.wait_condition(workflow.all_handlers_finished)
next_batch = ProcessBatchInput(page_size=page_size,
sliding_window_size=sliding_window_size,
offset=offset + len(children_started),
progress=self._progress,
current_records=self._current_records,
request=request,
file_info=input.file_info,
total_count=input.total_count)
print(f'{workflow.info().workflow_id}: continuing as new with: {next_batch.current_records}')
await workflow.continue_as_new(next_batch)
@workflow.signal
def report_completion(self, record_id: int) -> None:
"""Signal that record has been processed."""
print(f'{workflow.info().workflow_id}: received signal for {record_id}')
if self._current_records is None:
self._records_to_remove.add(record_id)
elif record_id in self._current_records:
self._current_records.remove(record_id)
self._progress += 1
@workflow.query
def get_progress(self) -> BatchProgress:
"""Query current progress."""
return BatchProgress(
progress=self._progress,
current_records=self._current_records,
records_to_remove=self._records_to_remove,
)
Event history for the spoken issue is attached:
- You can see that last event (with eventId of 63) is the continue-as-new with
current_records
with the value[92, 93, 94]
. - You can see that a previous event (with eventId of 43) of received signal with the record
92
.
I expected this signal will be handled before the continue-as-new, and then the desired state would be current_records
with the value [93, 94]
.
Would love to hear what I’m missing here:)
Thanks!