File processing workflow with heavy CPU usage (TypeScript file processing sample)

(cross-posted to slack at Slack)

Hey all, I’m trying to build a file processing pipeline based off the samples-typescript/worker-specific-task-queues at main · temporalio/samples-typescript · GitHub sample.The pipeline is CPU intensive, so my goal is that once a particular node starts processing a file, it shouldn’t take on any other files until its done with the current ones.

With the example as-is, I’m seeing a pretty much unlimited number of the file processing workflows all in flight on a single node and the individual activities seem to wait a long-time as “scheduled” before getting run.

That is, the total number of activities is limited at maxConcurrentActivityTaskExecutions per host rather than the total number of file processing pipelines per host.I’m quite sure there’s something I’m missing here, but I don’t see how the sample addresses this.

I’ll put some of my configuration in thread in case it is useful.

worker configuration

    workers = await Promise.all([
      createWorker(ctx, {
        connection,
        taskQueue: 'default',
        workflowsPath: path.join(import.meta.dirname, '../workflows/index.ts'),
        activities: activities.defaultActivities,
        maxConcurrentWorkflowTaskExecutions: 40,
        maxConcurrentActivityTaskExecutions: 100,
      }),
      createWorker(ctx, {
        connection,
        taskQueue: os.hostname(),
        activities: activities.hostSpecificActivities,
        maxConcurrentActivityTaskExecutions: 8,
      }),
    ])

workflow

const { getHostname } = proxyActivities<typeof defaultActivities>({
  startToCloseTimeout: '1 minute',
  taskQueue: 'default',
  retry: { maximumAttempts: 3 },
})

const maxAttempts = 5

export async function analyze(workflow: {}): Promise<{}> {
  // perform manual retries so that we have a chance to find a new hostname on each failure
  for (let attempt = 1; ; ++attempt) {
    const hostname = await getHostname()

    const activities = proxyActivities<typeof hostSpecificActivities>({
      taskQueue: hostname,
      // use scheduleToCloseTimeout because this task queue is tied to this host
      // if it goes away, there won't be a way for these activities to progress.
      scheduleToCloseTimeout: '5 minute',
      retry: { maximumAttempts: 3 },
    })

    const workDir = await activities.hostMakeWorkDir()
    try {
      // actual pipeline here (downloads asset, analyzes asset)
      return {}
    } catch (e) {
      if (attempt >= maxAttempts) {
        throw e
      }
      log.error(`Attempt ${attempt} failed with error, retrying...`)
    } finally {
      await activities.hostRemoveWorkDir(workDir)
    }
  }
}

Let me know if there is other info that would be helpful.

This seems unfortunately complicated for what I’m hoping to achieve and I’m not even 100% sure I understand it well enough to implement it.

Each process creates a separate worker with fixed parallelism (for example 1). Let’s call its task queue “process-lock”.
You mean a worker that is roughly these options?

taskQueue: 'process-lock',
activities: { lockProcess },
maxConcurrentActivityTaskExecutions: 1,

Each process listens on a host specific task queue. For example, “host1”.

So a second worker?

taskQueue: os.hostname(), // assumes this is unique
activities: activities.hostSpecificActivities,

To start the sequence of activities invoke “lock-process” activity at “process-lock” task queue.
This activity starts executing at one of the free processes. While this activity is executing the process is considered locked by the caller.
The lock activity is expected to run until it is canceled or an “unlock” activity is executed at the host specific task queue.
When started the “lock” activity sends a “lock granted” signal to the workflow that invoked it. The signal argument contains the name of the host specific task queue to use.
The workflow upon receiving the signal starts dispatching activities to the host specific task queue as long as needed.

export const lockGrantedSignal = wf.defineSignal<[{hostname:string}]>('lockGranted');

async function fileProcessingWorkflow() {
  let hostname: string | undefined
  wf.setHandler(lockGrantedSignal, (hn) => {
    hostname = hn
  });

  // lockProcess will trigger lockGrantedSignal with the hostname it ran on
  // but we're not awaiting it since it needs to keep running to keep the process-lock task queue busy
  const x = lockProcess() 

  // wait for the lock to signal
  await wf.condition(() => hostname !== undefined);  

The workflow upon receiving the signal starts dispatching activities to the host specific task queue as long as needed.

   // now we can run the hostname-specific activities 
   const activities = proxyActivities<typeof hostSpecificActivities>({
      taskQueue: hostname,
      // use scheduleToCloseTimeout because this task queue is tied to this host
      // if it goes away, there won't be a way for these activities to progress.
      scheduleToCloseTimeout: '5 minute',
      retry: { maximumAttempts: 3 },
    })

    const workDir = await activities.hostMakeWorkDir()
    try {
      // actual pipeline here (downloads asset, analyzes asset)
      return {}
    } catch (e) {
      if (attempt >= maxAttempts) {
        throw e
      }
      log.error(`Attempt ${attempt} failed with error, retrying...`)
    } finally {
      await activities.hostRemoveWorkDir(workDir)
    }

When the whole sequence of activities is completed the “unlock” activity is dispatched. This activity notifies the “lock-process” activity to complete.
Once “lock-process” activity is completed the host is considered free and can pick up the next “lock-process” activity from another workflow.

Sadly at this point, my newness to Temporal is hurting me – I’m not clear how to stop the long-running lockProcess (or what the internal implementation of lockProcess should be).

Am I even remotely close with this? Can you give a pointer to the logic that exists in one of the other languages so I can try to translate it?

The lockProcess activity should heartbeat, waiting on some condition that is unblocked from another activity. I don’t know Typescript, quick search found Promise that apparently can be used for this. The promises can be kept in a map by the workflow id. Here is how to share state between activities: Core application - TypeScript SDK | Temporal Platform Documentation

I’d like to see if I understand the functionality offered by the session approach. If I understand, your requirements are as following?

  • You want to be able to run multiple CPU intensive activities on the same worker so that they can share local state.
  • While a workflow is using one of these heavy CPU activity workers, you want that workflow to have exclusive access to that particular worker; a single heavy CPU activity worker should only be used by one workflow at a time.

Do I have that right?

(If I don’t understand something about your requirements, you can ignore the rest of this comment :wink:)

For the first one, you can create a separate task queue for each worker, and that way you can run activities on a particular worker.

For the second one, you need some way of coordinating between workflows so that only one workflow is running activities on a particular task queue at a time.

If you were unconcerned about workflow termination, and didn’t have many reserve/release requests per second, you could have a coordinating workflow which your workflow would signal when it wanted to reserve a heavy worker, and the coordinating workflow could signal back when a heavy worker was available.

However, if your workflow was terminated, it wouldn’t signal the coordinating workflow that it was done, and so the reservation of the heavy worker wouldn’t ever be released. (There are ways around this, for example you could have your workflow set up a child workflow to signal the coordinating workflow when the workflow closed, but it’s a bit awkward.). And, if you had a high rate of reserve/release requests, you might run into the problem that a single workflow can only process a limited number of events per second.

In the session approach, the reservation is represented by a long running activity. This means that the activity is automatically canceled if the calling workflow closes (whether normally or by being terminated). And because reservation requests are posted to the “process-lock” task queue, waiting requests are simply waiting in the task queue.

You want to be able to run multiple CPU intensive activities on the same worker so that they can share local state.
While a workflow is using one of these heavy CPU activity workers, you want that workflow to have exclusive access to that particular worker; a single heavy CPU activity worker should only be used by one workflow at a time.

I’d say that’s close to my requirement, but not quite (in particular its not the workflow doesn’t need exclusive access, it just needs a consistent worker). I’ll restate my requirements, in case the differences matter:

I have a workflow that needs to download a file and then analyze it

  • Because of the download step at the beginning, once the workflow starts, I need all activities in the workflow to continue on the same machine (
  • Because the analysis itself is CPU intensive, I need to limit the number of these workflows running at any point in time on the machine (some small number like 4-8)

The real problem here is that I’m thinking of it as scheduling the workflow to a worker and temporal really wants you to think of it as scheduling activities to a worker (which makes sense in the general case).

I played a bit with trying to implement the session functionality as described above and reading through the go sdk code for it. I quickly realized I was getting into the weeds on this one. Part of the problem I have is that my workers are EKS pods that come and go as needed (using keda for scaling) – so its hard to correctly schedule one of these workflows to any particular pod when that pod might go away in between scheduling and starting.

Anything I do really needs a way to say: “try to run this workflow from start to finish on an available worker, if that fails, try again on the next available worker”

At this point, I’ve changed this particular workflow to spawn an AWS Batch Job for these steps and wait for it to finish. This isn’t ideal (for a number of reasons), but its functional far quicker for the stage of project I’m at.

Suppose you had a semaphore implementation available to your workflows. You’d initialize the semaphore to the number of workflows that you wanted to be able to use the worker simultaneously (4-8). When a workflow started to use the worker it would decrement the semaphore; when the workflow stopped using the worker it would increment the semaphore. When the semaphore value reached 0 that would mean that that worker wasn’t available for another workflow to start using.
Would that do the trick?

Regardless of how you might implement the workflows in Temporal, it sounds like you might need some way of delaying the shutdown of a worker pod until it had finished its current work? When you wanted to shut down a worker, you’d mark it as unavailable for new workflows so that it wouldn’t taking on new work; and then when the current workflows finished using it, it could then shut down. (I don’t know keda, so I don’t know if maybe this is something keda could do, or if maybe you’d need to implement your own scaling system?)

Suppose you had a semaphore implementation available to your workflows. You’d initialize the semaphore to the number of workflows that you wanted to be able to use the worker simultaneously (4-8). When a workflow started to use the worker it would decrement the semaphore; when the workflow stopped using the worker it would increment the semaphore. When the semaphore value reached 0 that would mean that that worker wasn’t available for another workflow to start using.
Would that do the trick?

That sounds like what I’m looking for combined with the ability to make sure the activities within the workflow stay on the worker (but that part seems more straightforward via having a task queue with the worker’s hostname)

Regardless of how you might implement the workflows in Temporal, it sounds like you might need some way of delaying the shutdown of a worker pod until it had finished its current work?

Yeah, graceful shutdown is something I have on my todo list for shortly. I’ve done it before with other services so not particularly worried about it (the overview is that you intercept SIGTERM from EKS and you stop accepting new workflows at that point then you wait for existing workflows/activities to complete and then process.exit – if you take too long EKS will kill you, but that’s configurable and expected)

What is your maximum expected volume of semaphore acquire or release events (per second, or per minute, etc)?

Early startup phase, so take this with a heaping grain of salt, but I expect a low peak – 1rps would surprise me anytime in the near future.

So that could be easily handled by a single coordinating workflow, if we had a convenient way of detecting client workflow termination. I’m just speculating here, and I’ll have to think about it, but I’m wondering if a bridge that would allow workflows to handle activities might be a useful building block for easily putting together such a system.

Oh, another random thought, if you were able to store the downloads / intermediate state in EFS or S3, that would reduce your architectural complexity because then you wouldn’t need to route activities to the same worker.

I’d need to think this through as well, but perhaps the session approach could be used for counting semaphores (i.e. you want to allow some number 4-8 of simultaneous access) by increasing the parallelism of the “process-lock” worker in the session implementation

Each process creates a separate worker with fixed parallelism (for example 1). Let’s call its task queue “process-lock”.

I don’t understand why the proposed solution doesn’t solve this specific problem.

That was actually my previous question

In his initial implementation @mdouglass had

taskQueue: 'process-lock',
activities: { lockProcess },
maxConcurrentActivityTaskExecutions: 1,

Given that he wants each worker to run 4-8 simultaneous heavy activities (call it 6), we’d set the process-lock maxConcurrentActivityTaskExecutions to 6?

This works because the process-lock task queue is shared (polled) by all of the heavy workers. The heavy worker chooses how many simultaneous locks it will accept for itself simply by setting maxConcurrentActivityTaskExecutions. A workflow requesting a lock will automatically be granted a lock by a heavy worker which has available locks (if there are any), because only those heavy workers with available locks will be currently pulling activity tasks from the process-lock task queue.

This is referring to step 5 of the session algorithm?

  1. The lock activity is expected to run until it is canceled or an “unlock” activity is executed at the host specific task queue.

So the lock activity needs to heartbeat to be able to detect that it has been canceled, and to respond to the “unlock” activity being run on the host-specific task queue (which as you say can be accomplished by using a Promise which resolves when the “unlock” activity is processed).

I think I’m up to speed now :slight_smile:

1 Like

I don’t understand why the proposed solution doesn’t solve this specific problem.

TBH it may, I’m just finding that I don’t understand temporal well enough yet to be able to implement it from the description (downside of only being a week into the tool :slight_smile:)

Yeah, that was definitely one option I had considered. The files are generally large so I was trying to avoid the overhead of moving them onto a machine multiple times (especially when doing so is transparent to the code so its not obvious that we’re paying a bandwidth tax for it).

If you have specific questions we can help.

I took a stab at a demo session implementation in Typescript: GitHub - awwx/temporal-typescript-session-demo

Of course, please let me know if I missed anything!