I’m beginning to use Temporal for my use case, and I’ve successfully implemented the basic scenarios. However, there’s one potential scenario that I’m unsure about its feasibility and how to implement it with Temporal.
Scenario:
User 1 make request A, then its verified on external system K (first activity). It retrieves some data and is subsequently executed on external system X (second activity).
User 2 make request B, then its verified on external system K (first activity). It retrieves some data and is subsequently executed on external system X (second activity).
User 3 make request C, then its verified on external system K (first activity). It retrieves some data and is subsequently executed on external system Y (second activity).
However, there are certain system (X, Y, …) that it is better to group requests and execute all at once on those systems (throughput limitations).
How can I achieve this? The user make a request, I check on system K (first activity) then maybe a put on a queue (based on the system retrieved from first activity) before the second activity and wait a certain time (pulling timeout) and after that, group all requests related to the same system X and execute them together.
I understand that I can’t initially group these workflows because I only determine the system for execution after the first activity (system K).
Right now the workflow is based on those two activities running sequentially. There is no grouping or buffering for the second activity based on the system.
You can run a single activity worker for X and similar for Y. Then your activity implementation instead of sending the request would send it to an in-memory queue. Then a separate thread would read from that queue and call the external service periodically. Once the batch request is complete all the requests in it are packed which unlocks the activity implementation.
Also, within a workflow it is common to, say, collect signals into a list and when the list gets to a certain size or after a certain amount of time you process the list.
try:
await workflow.wait_condition(lambda: len(self.pending_updates) >= 5, 10)
# Reached here means pending updates has 5 items
except asyncio.TimeoutError:
# Reached here means we timed out waiting 10s before pending updates got 5 items
A single worker, a single workflow, two activities (executed from the workflow sequentially)
Make request
Workflow created
Worker takes the workflow request and execute.
Possible new structure:
A workflow (W1) that receives the request and execute the first activity (A1)
Depends on a parameter (that could be X, Y, Z, …) from A1 determine if its necessary to put in a queue or not to run the second activity (A2) all at once with other requests after a timeout. If it is not necessary, continue the same workflow W1 and execute A2.
If it is necessary, (maybe) make a request to another workflow (W2) that will act as a queue and grouping requests, it will start a counter/timeout (lets say 5min), which during this time will wait for another requests/signals (following the suggestion from @Chad_Retz answer). After that timeout, it will call an activity (A3) that groups those requests (according to some logic) and then calls A2.
After A2 it needs to send some signal to each W1 request to finish.
The new structure will be:
W1 receives the request
Execute A1
Check parameter for value X
If X, initiate W2 if not already initiated or send signals to W2 if initiated
W2 is initiated and waits for another requests/signals (of new W1 requests with the same parameter)
after timeout executes A3 (to group all requests in a single one)
then executes A2
finishes and send some signal to W1 to also finish
else another parameter execute A2 from W1
finishes W1
With that, one worker will be the same that I have (to run W1, A1, A2) and another to run W2, A3, A2.
And also if I need group for parameter with value Y, Z, … will be possible.
That makes sense? It is aligned with best practices? Thanks for the help
You can use “worker buffering,” as I described initially, or “workflow buffering,” as Chad proposed.
The workflow buffering doesn’t require more than one worker, but has limited throughput. I wouldn’t recommend this pattern if you plan to have more than one message per second per service on average.
I see, I’ll try to create both scenarios to see what fits better. In the case of “worker buffering”, for each parameter value that needs to be queued I will need to have a different worker in advance? Or is possible to create dynamically workers by demand (new parameter values needed to batch).
Like:
Requests are send to a in-memory queue (Also a workflow queue?)
After get info from external service and get parameter value decides what activity calls and the batch process.
Each worker has unique activity for each parameter value to execute.
So the structure will be:
One workflow (W1) to queue/batch all requests (if the requests doesn’t need to batch it calls the next activity)
Multiple workers, one for the workflow and initial activities and others workers each one for final activity based on parameter value.