(cross-posted to slack at Slack)
Hey all, I’m trying to build a file processing pipeline based off the samples-typescript/worker-specific-task-queues at main · temporalio/samples-typescript · GitHub sample.The pipeline is CPU intensive, so my goal is that once a particular node starts processing a file, it shouldn’t take on any other files until its done with the current ones.
With the example as-is, I’m seeing a pretty much unlimited number of the file processing workflows all in flight on a single node and the individual activities seem to wait a long-time as “scheduled” before getting run.
That is, the total number of activities is limited at maxConcurrentActivityTaskExecutions per host rather than the total number of file processing pipelines per host.I’m quite sure there’s something I’m missing here, but I don’t see how the sample addresses this.
I’ll put some of my configuration in thread in case it is useful.
worker configuration
workers = await Promise.all([
createWorker(ctx, {
connection,
taskQueue: 'default',
workflowsPath: path.join(import.meta.dirname, '../workflows/index.ts'),
activities: activities.defaultActivities,
maxConcurrentWorkflowTaskExecutions: 40,
maxConcurrentActivityTaskExecutions: 100,
}),
createWorker(ctx, {
connection,
taskQueue: os.hostname(),
activities: activities.hostSpecificActivities,
maxConcurrentActivityTaskExecutions: 8,
}),
])
workflow
const { getHostname } = proxyActivities<typeof defaultActivities>({
startToCloseTimeout: '1 minute',
taskQueue: 'default',
retry: { maximumAttempts: 3 },
})
const maxAttempts = 5
export async function analyze(workflow: {}): Promise<{}> {
// perform manual retries so that we have a chance to find a new hostname on each failure
for (let attempt = 1; ; ++attempt) {
const hostname = await getHostname()
const activities = proxyActivities<typeof hostSpecificActivities>({
taskQueue: hostname,
// use scheduleToCloseTimeout because this task queue is tied to this host
// if it goes away, there won't be a way for these activities to progress.
scheduleToCloseTimeout: '5 minute',
retry: { maximumAttempts: 3 },
})
const workDir = await activities.hostMakeWorkDir()
try {
// actual pipeline here (downloads asset, analyzes asset)
return {}
} catch (e) {
if (attempt >= maxAttempts) {
throw e
}
log.error(`Attempt ${attempt} failed with error, retrying...`)
} finally {
await activities.hostRemoveWorkDir(workDir)
}
}
}
Let me know if there is other info that would be helpful.