Implement Sliding Window Batching in Python

Hi,
I’m trying to translate the Sliding Window Batching example to python.

I’m encountering an issue where the parent workflow SlidingWindowBatchWorkflow misses a completion signal and than continues as new without the update, causing it to hang forever waiting for the signal.

This is the code:
Main Workflow:

@workflow.defn
class OfflineBatchWorkflow:
    """Main batch workflow that partitions work across multiple sliding windows."""

    @workflow.run
    async def process_batch(self, request: OfflineEnrichmentWIPRequest) -> int:
        """Process batch using multiple parallel sliding window workflows."""
        page_size = request.page_size
        sliding_window_size = request.sliding_window_size

        batch_files = await workflow.execute_activity(
            list_batch_files_activity,
            request.batches_path,
            start_to_close_timeout=TIMEOUT)

        files_metadata: List[FileInfo] = await workflow.execute_activity(fetch_files_metadata_activity,
                                                                         args=[request.batches_path, batch_files],
                                                                         start_to_close_timeout=TIMEOUT)

        partitions: List[FilePartitionInfo] = await workflow.execute_activity(
            create_partitions_activity,
            args=[files_metadata, request.partition_size],
            start_to_close_timeout=TIMEOUT)

        partition_handles = []
        for i, partition in enumerate(partitions):
            child_id = f"{workflow.info().workflow_id}/{i}"
            batch_input = ProcessBatchInput(
                file_info=partition.file_info,
                request=request,
                offset=partition.start_offset,
                total_count=partition.end_offset,
                page_size=page_size,
                sliding_window_size=sliding_window_size
            )

            handle = await workflow.start_child_workflow(
                workflow=SlidingWindowBatchWorkflow.process_batch,
                args=[batch_input],
                id=child_id
            )
            partition_handles.append(handle)

        results = await asyncio.gather(*partition_handles)
        return sum(results)

Child Workflows:

class SlidingWindowBatchWorkflow:
    """Implements batch processing using sliding window of parallel workflows."""

    @workflow.init
    def __init__(self, input: ProcessBatchInput) -> None:
        self._current_records = input.current_records.copy()
        self._records_to_remove = set()
        self._progress = input.progress
        count_before = len(self._current_records)
        self._current_records -= self._records_to_remove
        self._progress += count_before - len(self._current_records)

    @workflow.run
    async def process_batch(self, input: ProcessBatchInput) -> int:
        """Process batch of records using sliding window."""
        page_size = input.page_size
        offset = input.offset
        sliding_window_size = input.sliding_window_size
        request = input.request
        children_started = []

        # Get next batch of records
        records = await workflow.execute_activity(get_records_activity,
                                                  args=[input],
                                                  start_to_close_timeout=TIMEOUT)
        current_record_idx = 0

        while True:
            # Wait until window has space
            await workflow.wait_condition(
                lambda: len(self._current_records) < sliding_window_size
            )

            if not records or current_record_idx == len(records):
                # Wait for all children to complete
                await workflow.wait_condition(lambda: len(self._current_records) == 0)
                print(f'Workflow {workflow.info().workflow_id} completed')
                return offset + len(children_started)

            record = records[current_record_idx]

            child = await workflow.start_child_workflow(
                SingleRecordProcessorWorkflow.process_record,
                args=[request, record],
                id=f"{workflow.info().workflow_id}/{record.id}",
                parent_close_policy=workflow.ParentClosePolicy.ABANDON,
            )

            children_started.append(child)
            self._current_records.add(record.id)
            current_record_idx += 1

            if len(children_started) == page_size:
                await workflow.wait_condition(workflow.all_handlers_finished)
                next_batch = ProcessBatchInput(page_size=page_size,
                                               sliding_window_size=sliding_window_size,
                                               offset=offset + len(children_started),
                                               progress=self._progress,
                                               current_records=self._current_records,
                                               request=request,
                                               file_info=input.file_info,
                                               total_count=input.total_count)
                print(f'{workflow.info().workflow_id}: continuing as new with: {next_batch.current_records}')
                await workflow.continue_as_new(next_batch)

    @workflow.signal
    def report_completion(self, record_id: int) -> None:
        """Signal that record has been processed."""
        print(f'{workflow.info().workflow_id}: received signal for {record_id}')
        if self._current_records is None:
            self._records_to_remove.add(record_id)

        elif record_id in self._current_records:
            self._current_records.remove(record_id)
            self._progress += 1

    @workflow.query
    def get_progress(self) -> BatchProgress:
        """Query current progress."""
        return BatchProgress(
            progress=self._progress,
            current_records=self._current_records,
            records_to_remove=self._records_to_remove,
        )

Event history for the spoken issue is attached:

  • You can see that last event (with eventId of 63) is the continue-as-new with current_records with the value [92, 93, 94].
  • You can see that a previous event (with eventId of 43) of received signal with the record 92.

I expected this signal will be handled before the continue-as-new, and then the desired state would be current_records with the value [93, 94].

Would love to hear what I’m missing here:)

Thanks!

1 Like

At least one problem is that you are not waiting for the children to start before calling continue-as-new. Here is the line of the Java Sample that does this:

Hi, thanks for replying!

I thought that await workflow.start_child_workflow will only return when the child indeed had started, this is why I wrote it like that.

Anyhow, from debugging it I think I found the issue with this two lines:

child = await workflow.start_child_workflow(
                SingleRecordProcessorWorkflow.process_record,
                args=[request, record],
                id=f"{workflow.info().workflow_id}/{record.id}",
                parent_close_policy=workflow.ParentClosePolicy.ABANDON,
            )
self._current_records.add(record.id)

It may be the case where the child started and already finished before the execution of the second line self._current_records.add(record.id). So the parent did get the signal, but ignores it since it isn’t found in self._current_records.

Swapping the order of this statements solved the issue for me (or as you said, at least this problem).

Good catch!

You are correct that awaiting the start_child_workflow is enough to ensure a child has started.
I would advise against awaiting at that line as it forces a workflow task to be completed and thus the whole roundtrip to the service with a new workflow task on each child’s start. Note that the Java sample waits for all the child starts together. This allows starting all the children as a batch instead of one by one. You can validate this by comparing your workflow event histories for the Java sample.

Thanks!

Thanks also for the clarification on the await on the start_child, I wasn’t aware of this consideration.