Buffer / Group Workflow

For my application I need to build out a workflow that resolves the following scenario:

A message gets sent to my API from user A. After receiving the message, I start a timeout of N seconds to wait for any new messages from user A. If user A sends another request within the timeout period N, the timeout will reset, and that new request will be added to the message buffer for that user. Now when that timeout expires, either because only 1 message was sent and the initial timeout expired, or because X messages were sent and the timeout after the Xth message expired, that entire block is passed to start an activity to process those messages.

After that block of messages starts the activity to process the messages, if user A sends any messages after the start of the activity until the end of the activity to process those messages, the new messages are held until the block currently being processed has completed.

Using the Python SDK for Temporal is there a way for me to block processing on a per user basis like this? While the timeout is occurring for a message, I can use signals to introduce more messages but I am not sure what function to use to “queue” the next set of messages for a workflow.

It seems like I should have 1 workflow / workflow ID per user so that additional workflows cannot be started for the same user?

Using the Python SDK for Temporal is there a way for me to block processing on a per user basis like this?

You run a workflow per user. Use SignalWithStart to signal the workflow.

To get a general idea about how this workflow should be implemented, look at hello_signal sample. In your case, you change the code to call wait_condition with a timeout parameter and the condition that unlocks when the number of pending messages changes.

1 Like

Yup, ended up using SignalWithStart, thanks for the information.