Understanding Workers with long-running Activities, and avoiding WorkflowTaskTimedOut on startToClose

Hello,

We’re running into an issue hitting WorkflowTaskTimedOut exceeding the default 10 second startToClose timer. Reading up on this, it seems as though this may happen when your workers are unavailable / busy / some other problem. I’m wondering if our current design is the reason for this, and we’re just unaware of the appropriate way to redesign the worker/workflow. A little background on whats going on:

  1. We queue a number of jobs up (lets say 100 for easy example) into a single task queue called Process. These show up as running workflows waiting for a worker to pick them up. These are a single workflow (called ProcessCall) with 5 activities that each run in sequence via await. The first activity is quite fast (1-3 seconds), but the second activity can take as little as 5 seconds, and as much as 30 minutes. The remaining activities are similar, they range from fast (<10seconds) to slow (1-30minutes). Currently, the worker is very basic:
    worker = Worker(
        client,
        task_queue="Process",
        workflows=[ ProcessCall ],
        activities=[
            activities.ingest_recording,
            activities.transcribe_recording,
            activities.create_embeddings,
            activities.add_sentiment_analysis,
            activities.reprocess_all_tags,
        ],
    )
  1. When we start a single worker, we notice that it will start picking up a workflow, processing activity #1. Then, instead of moving on to activity #2 within the same workflow, it will usually pick up a new workflow, also starting on activity #1. This can happen a handful of times before moving on to activity #2 in other workflows. This behavior does make sense, given the way I understand the task queue to hand out jobs.

  2. After about 10 seconds, jobs will start hitting WorkflowTaskTimedOut on startToClose warnings, presumably because the single worker is busy doing a long (e.g. 15 minutes) activity. Once it times out, it will go back to waiting for a worker to pick it up, and wait happily until one asks to start again.

I apologize if any of these sound foolish, I’m brand new to temporal and appreciate I may not understand the intended approach. But now, my questions:

  • Shouldn’t it be true that if the one worker is busy, it doesn’t attempt to process workflowtasks? or is this happening similarly to how i described in #2, where fast jobs can be handed out off the task queue, without necessarily a commitment that the worker will then do the thing the workflow is waiting for?
  • Should we be doing something differently with respect to how workers that run long activities do so without causing the worker to be otherwise unavailable?
  • We did try running more workers (5) but still ran into the timeout, which would make sense given that each of these workers could also be “stuck” all processing the same kind of long activity that makes them all block. It’s also not as simple as just starting a ton of workers, as this long running step can only be done 3 times concurrently at the moment before incurring additional speed penalties.
  • After attempting running a high number of workers, and feeling like we could be running into the same issue still, it made me wonder if there always needed to be 1 more worker than there were jobs in the queue, but this doesnt feel scalable either
  • It would be okay with us if every worker picked up a workflow and saw it through each activity step from start to end before picking up another workflow - effectively limiting the number of concurrent workflows it could be ‘responsible for’ to 1. This feels like it violates a core principle of how temporal wants to work, but I wondered if it were possible / sane either way.

I did also try splitting each activity out into its own task queue, and that didn’t seem to help either - still ran into timeouts that way, although I can’t recall if it was exactly the same behavior or slightly less common.

Any feedback or advice would be greatly appreciated - feels like I’m just missing some simple concept here.

I’m not a Python expert, but I believe this can happen if your activity blocks the asyncio loop by calling some synchronous operation. You can make the activity synchronous by removing async from its signature. See Foundations - Python SDK feature guide | Temporal Documentation

1 Like

OK so changing it to use multprocess has solved that issue, because im pretty sure we were in fact blocking the loop due to long-running activities.

Now, I’m not sure if its something that makes sense to do, but I’m curious if theres a way to have temporal workers pick up and handle a single workflow from start to finish before taking on another - or some variation of this?

The reason I ask is, we create one workflow per “job” - in our case, a job is about 5 activities, each that progress synchronously one after the other, to process a call recording file and ultimately insert a bunch of extra data into a database. These “jobs” can take anywhere from 1-20 minutes depending on the size and complexity. We’re currently trying to process the past few months worth of data, so we load, say 50k jobs in, each their own workflow. When the workers start picking up activities to accomplish, it’s seemingly semi-randomly picking up the first activity across all of the workflows, then progresses through each activity across the entire workflow queue.

As a result, it takes a very long time before we have any single workflow completed, but /many/ jobs are 30-60% done all at the same time. I think I understand why this is happening, but I’m curious if there are other strategies that would help avoid this outcome. The only strategy I’ve come up with so far is to deliberately only put 5000 jobs in at a time and wait for them to finish before queueing the rest, but maybe there’s a better way?

Would strongly recommend threaded activities instead of multiprocess. They have better cancellation mechanisms and don’t require pickling.

Can you start a new thread on this general design question?