Question about group/batch workflows/activities based on a specific parameter between activities

Hi,

I’m beginning to use Temporal for my use case, and I’ve successfully implemented the basic scenarios. However, there’s one potential scenario that I’m unsure about its feasibility and how to implement it with Temporal.

Scenario:
User 1 make request A, then its verified on external system K (first activity). It retrieves some data and is subsequently executed on external system X (second activity).
User 2 make request B, then its verified on external system K (first activity). It retrieves some data and is subsequently executed on external system X (second activity).
User 3 make request C, then its verified on external system K (first activity). It retrieves some data and is subsequently executed on external system Y (second activity).

However, there are certain system (X, Y, …) that it is better to group requests and execute all at once on those systems (throughput limitations).

How can I achieve this? The user make a request, I check on system K (first activity) then maybe a put on a queue (based on the system retrieved from first activity) before the second activity and wait a certain time (pulling timeout) and after that, group all requests related to the same system X and execute them together.

I understand that I can’t initially group these workflows because I only determine the system for execution after the first activity (system K).

Right now the workflow is based on those two activities running sequentially. There is no grouping or buffering for the second activity based on the system.

You can run a single activity worker for X and similar for Y. Then your activity implementation instead of sending the request would send it to an in-memory queue. Then a separate thread would read from that queue and call the external service periodically. Once the batch request is complete all the requests in it are packed which unlocks the activity implementation.

1 Like

Also, within a workflow it is common to, say, collect signals into a list and when the list gets to a certain size or after a certain amount of time you process the list.

try:
    await workflow.wait_condition(lambda: len(self.pending_updates) >= 5, 10)
    # Reached here means pending updates has 5 items
except asyncio.TimeoutError:
    # Reached here means we timed out waiting 10s before pending updates got 5 items
1 Like

So what I have now in terms of structure:

A single worker, a single workflow, two activities (executed from the workflow sequentially)

  • Make request
  • Workflow created
  • Worker takes the workflow request and execute.

Possible new structure:

A workflow (W1) that receives the request and execute the first activity (A1)

Depends on a parameter (that could be X, Y, Z, …) from A1 determine if its necessary to put in a queue or not to run the second activity (A2) all at once with other requests after a timeout. If it is not necessary, continue the same workflow W1 and execute A2.

If it is necessary, (maybe) make a request to another workflow (W2) that will act as a queue and grouping requests, it will start a counter/timeout (lets say 5min), which during this time will wait for another requests/signals (following the suggestion from @Chad_Retz answer). After that timeout, it will call an activity (A3) that groups those requests (according to some logic) and then calls A2.
After A2 it needs to send some signal to each W1 request to finish.

The new structure will be:

  • W1 receives the request
  • Execute A1
  • Check parameter for value X
  • If X, initiate W2 if not already initiated or send signals to W2 if initiated
    • W2 is initiated and waits for another requests/signals (of new W1 requests with the same parameter)
    • after timeout executes A3 (to group all requests in a single one)
    • then executes A2
    • finishes and send some signal to W1 to also finish
  • else another parameter execute A2 from W1
  • finishes W1

With that, one worker will be the same that I have (to run W1, A1, A2) and another to run W2, A3, A2.

And also if I need group for parameter with value Y, Z, … will be possible.

That makes sense? It is aligned with best practices? Thanks for the help

You can use “worker buffering,” as I described initially, or “workflow buffering,” as Chad proposed.

The workflow buffering doesn’t require more than one worker, but has limited throughput. I wouldn’t recommend this pattern if you plan to have more than one message per second per service on average.

1 Like

I see, I’ll try to create both scenarios to see what fits better. In the case of “worker buffering”, for each parameter value that needs to be queued I will need to have a different worker in advance? Or is possible to create dynamically workers by demand (new parameter values needed to batch).

Like:

  • Requests are send to a in-memory queue (Also a workflow queue?)
  • After get info from external service and get parameter value decides what activity calls and the batch process.
  • Each worker has unique activity for each parameter value to execute.

So the structure will be:

  • One workflow (W1) to queue/batch all requests (if the requests doesn’t need to batch it calls the next activity)
  • Multiple workers, one for the workflow and initial activities and others workers each one for final activity based on parameter value.

I understood right? Thanks!

Yes. If the batching interval is not too long you can have a single activity type and batch them up based on the activity arguments if you want.

1 Like