Hey Community!
Temporal beginner here, but I’m writing this schedule that has this workflow that roughly does the following:
class PageProcessingWorkflow:
"""Workflow to process all pages of inventory"""
MAX_ITEMS_BEFORE_CONTINUE = 20000
@workflow.run
async def run(self, input: PageProcessingInput) -> Dict:
"""Main workflow execution logic"""
total_processed = input.processed_count
total_errors = 0
next_cursor = input.next_cursor
while True:
# Fetch next page
page_result = await workflow.start_activity(
Api.fetch_inventory_page,
args=[input.access_token, next_cursor],
start_to_close_timeout=timedelta(minutes=30),
retry_policy=RetryPolicy(
initial_interval=timedelta(seconds=5),
maximum_interval=timedelta(seconds=30),
maximum_attempts=3
)
)
items = page_result.items
next_cursor = page_result.next_cursor
current_tasks = []
if items:
mid_index = len(items) // 2
for batch in [items[:mid_index], items[mid_index:]]:
if batch:
current_tasks.append(
workflow.start_activity(
SyncActivity.process_company_inventory,
args=[input.company_id, batch],
task_queue=TaskQueue.ProcessQueue,
start_to_close_timeout=timedelta(minutes=30),
retry_policy=RetryPolicy(
initial_interval=timedelta(seconds=5),
maximum_interval=timedelta(seconds=30),
maximum_attempts=3
)
)
)
# Process activity results
if current_tasks:
results = await asyncio.gather(*current_tasks, return_exceptions=True)
for result in results:
if isinstance(result, Exception):
total_errors += 1
else:
total_processed += result.get("processed", 0)
total_errors += result.get("errors", 0)
# Continue-as-new if threshold reached
if total_processed >= self.MAX_ITEMS_BEFORE_CONTINUE:
return workflow.continue_as_new(
PageProcessingInput(
company_id=input.company_id,
access_token=input.access_token,
next_cursor=next_cursor,
processed_count=0
)
)
Gist of the above code:
Activity 1 makes an api call to get items (300 per call)
Activity 2 is called twice by splitting these items by len(items)/2 to process the items
Workflow waits for process to complete and calls the next page.
This happens till the items reach around 20000, after which we use continue as new and continue.
Problem I’m getting:
[2m2025-03-06T21:14:12.896037Z[0m [33m WARN[0m [2mtemporal_sdk_core::worker::workflow[0m[2m:[0m Task not found when completing [3merror[0m[2m=[0mstatus: NotFound, message: "Workflow task not found.", details: [8, 5, 18, 24, 87, 111, 114, 107, 102, 108, 111, 119, 32, 116, 97, 115, 107, 32, 110, 111, 116, 32, 102, 111, 117, 110, 100, 46, 26, 66, 10, 64, 116, 121, 112, 101, 46, 103, 111, 111, 103, 108, 101, 97, 112, 105, 115, 46, 99, 111, 109, 47, 116, 101, 109, 112, 111, 114, 97, 108, 46, 97, 112, 105, 46, 101, 114, 114, 111, 114, 100, 101, 116, 97, 105, 108, 115, 46, 118, 49, 46, 78, 111, 116, 70, 111, 117, 110, 100, 70, 97, 105, 108, 117, 114, 101], metadata: MetadataMap { headers: {"content-type": "application/grpc", "server": "temporal", "date": "Thu, 06 Mar 2025 21:14:12 GMT"} } [3mrun_id[0m[2m=[0m"01956d36-6800-76ee-a626-8d3ef1585dc8"
on the UI, I see the following error:
The API call activity takes around 12-20s, but is unreliable so with retry could go on up to 1minute and the process activity takes around 45s-1minute
Any help to debug/fix would be great