Redefine Max Retries for Activity

I have a workflow that takes a list of entities that need to be processed, and calls an activity to process each individual entity (in batches of at most 5 at a time). I save the ones where the activity, with 5 max retries, failed, so as not to block the rest of the processing. At the end, however, I want the failed cases to be retried forever. Is it possible to redefine the max retries when you call an Activity, or is that something that can only be done at activity instantiation time? Or is there a simpler way to handle this scenario?

Right now we are processing up to 400 entities, but in the future we may want to support processing a few thousand.

Would do something like:

For each “batch” start a child workflow (async) that executes the activities parallel. In child hold on to the activity inputs and can wait for all futures to complete (activity that fails due to retries will also complete the future). Iterate through each future to see which completed / failed and then report back to parent workflow (via signal) the reached milestone and then wait for parent signal to continue child execution.
In parent when all batches are completed you can signal the child workflows to continue and re-execute all failed activities with a different retry policy and the original input that you stored (would need to create new proxyActivities again i believe).

Hope this helps.

Thanks for the suggestions. Here is a simple example I got working using this method with a dummy Activity and Workflow (for this basic example it can all be done in one workflow without signals, but this just demonstrates the logic for more complicated cases where child workflows will be needed):

Workflows:

import * as activities from './activities'
import { condition, continueAsNew, defineSignal, executeChild, getExternalWorkflowHandle, proxyActivities, setHandler, sleep } from '@temporalio/workflow'
import _ from 'lodash'

interface BatchResult {
    failed: number
    workflowId: string
}

const firstAttemptSignal = defineSignal<[BatchResult]>('firstAttempt')
const retryFailedSignal = defineSignal('retryFailedEntities')

export async function processWorkflow(entities: number[]): Promise<string> {
    const batches = _.chunk(entities, 5)
    const failedJobs: string[] = []
    let successful = 0

    setHandler(firstAttemptSignal, ({ failed, workflowId }: BatchResult) => {
        if (failed > 0) {
            failedJobs.push(workflowId)
        } else {
            successful += 1
        }

        if (failedJobs.length + successful === batches.length) {
            // Everything has been tried. Retry failed jobs indefinitely
            failedJobs.forEach(async job => {
                await getExternalWorkflowHandle(job).signal(retryFailedSignal)
            })
        }
    })

    const pendingJobs = []

    for (let i = 0; i < batches.length; i += 1) {
        pendingJobs.push(executeChild(handleBatch, {
            args: [batches[i], `childWorkflow-${i}`],
            workflowId: `childWorkflow-${i}`,
        }))
    }

    await Promise.all(pendingJobs)

    return `Success!`
}

export async function handleBatch(entities: number[], workflowId: string): Promise<string> {
    const failedEntities: number[] = []
    let retryFailedJobs = false

    setHandler(retryFailedSignal, () => {
        retryFailedJobs = true
    })

    let pending = entities.map(async entity => {
        const { testActivity } = proxyActivities<typeof activities>({
            // ...
            scheduleToCloseTimeout: '5m',
            retry: {
                initialInterval: '1s',
                backoffCoefficient: 2,
                maximumAttempts: 5,
                maximumInterval: '5s',
                nonRetryableErrorTypes: [],
            },
        })

        return testActivity(entity)
    })

    for (let job = 0; job < pending.length; job += 1) {
        try {
            await pending[job]
        } catch (err) {
            failedEntities.push(entities[job])
        }
    }

    await getExternalWorkflowHandle('parentWorkflow').signal(firstAttemptSignal, { failed: failedEntities.length, workflowId })
    await condition(() => retryFailedJobs)

    pending = failedEntities.map(async entity => {
        const { testActivity } = proxyActivities<typeof activities>({
            scheduleToCloseTimeout: '5m',
        })

        return testActivity(entity)
    })

    await Promise.all(pending)

    return `Success!`
}

Activity:

export async function testActivity(input: number): Promise<string> {
    if (input % 2 === 0) {
        throw Error(`Test error: ${input}`)
    }

    return `Success!`
}
1 Like