Workflow Task Timed out

Hey Community!

Temporal beginner here, but I’m writing this schedule that has this workflow that roughly does the following:

class PageProcessingWorkflow:
    """Workflow to process all pages of inventory"""

    MAX_ITEMS_BEFORE_CONTINUE = 20000  

    @workflow.run
    async def run(self, input: PageProcessingInput) -> Dict:
        """Main workflow execution logic"""
        total_processed = input.processed_count
        total_errors = 0
        next_cursor = input.next_cursor

        while True:
            # Fetch next page
            page_result = await workflow.start_activity(
                Api.fetch_inventory_page,
                args=[input.access_token, next_cursor],
                start_to_close_timeout=timedelta(minutes=30),
                retry_policy=RetryPolicy(
                    initial_interval=timedelta(seconds=5),
                    maximum_interval=timedelta(seconds=30),
                    maximum_attempts=3
                )
            )
            
            items = page_result.items
            next_cursor = page_result.next_cursor
            current_tasks = []

            if items:
                mid_index = len(items) // 2
                for batch in [items[:mid_index], items[mid_index:]]:
                    if batch:
                        current_tasks.append(
                            workflow.start_activity(
                                SyncActivity.process_company_inventory,
                                args=[input.company_id, batch],
                                task_queue=TaskQueue.ProcessQueue,
                                start_to_close_timeout=timedelta(minutes=30),
                                retry_policy=RetryPolicy(
                                    initial_interval=timedelta(seconds=5),
                                    maximum_interval=timedelta(seconds=30),
                                    maximum_attempts=3
                                )
                            )
                        )

            # Process activity results
            if current_tasks:
                results = await asyncio.gather(*current_tasks, return_exceptions=True)
                for result in results:
                    if isinstance(result, Exception):
                        total_errors += 1
                    else:
                        total_processed += result.get("processed", 0)
                        total_errors += result.get("errors", 0)

            # Continue-as-new if threshold reached
            if total_processed >= self.MAX_ITEMS_BEFORE_CONTINUE:
                return workflow.continue_as_new(
                    PageProcessingInput(
                        company_id=input.company_id,
                        access_token=input.access_token,
                        next_cursor=next_cursor,
                        processed_count=0
                    )
                )

Gist of the above code:

Activity 1 makes an api call to get items (300 per call)
Activity 2 is called twice by splitting these items by len(items)/2 to process the items
Workflow waits for process to complete and calls the next page.
This happens till the items reach around 20000, after which we use continue as new and continue.

Problem I’m getting:


[2m2025-03-06T21:14:12.896037Z[0m [33m WARN[0m [2mtemporal_sdk_core::worker::workflow[0m[2m:[0m Task not found when completing [3merror[0m[2m=[0mstatus: NotFound, message: "Workflow task not found.", details: [8, 5, 18, 24, 87, 111, 114, 107, 102, 108, 111, 119, 32, 116, 97, 115, 107, 32, 110, 111, 116, 32, 102, 111, 117, 110, 100, 46, 26, 66, 10, 64, 116, 121, 112, 101, 46, 103, 111, 111, 103, 108, 101, 97, 112, 105, 115, 46, 99, 111, 109, 47, 116, 101, 109, 112, 111, 114, 97, 108, 46, 97, 112, 105, 46, 101, 114, 114, 111, 114, 100, 101, 116, 97, 105, 108, 115, 46, 118, 49, 46, 78, 111, 116, 70, 111, 117, 110, 100, 70, 97, 105, 108, 117, 114, 101], metadata: MetadataMap { headers: {"content-type": "application/grpc", "server": "temporal", "date": "Thu, 06 Mar 2025 21:14:12 GMT"} } [3mrun_id[0m[2m=[0m"01956d36-6800-76ee-a626-8d3ef1585dc8"

on the UI, I see the following error:

The API call activity takes around 12-20s, but is unreliable so with retry could go on up to 1minute and the process activity takes around 45s-1minute

Any help to debug/fix would be great :slight_smile:

Getting a very similar problem of one async activity finishing 10s before the other one, so the async gather throws workflow task timeout.
@maxim could you please help?