I’m implementing a command queue pattern in my Temporal workflow similar to the Injecting work into the main Workflow pattern described in the docs. My workflow has a main event loop that processes commands from a queue, and an update method that adds commands to the queue.
Here’s a simplified version of my implementation:
val commandQueue: Queue<Execution> = LinkedList()
@WorkflowMethod
fun myWorkflowMethod() {
while (!currentState.isTerminal) {
Workflow.await { commandQueue.isNotEmpty() }
executeTopCommand() // polls the top of the queue and calls an activity
}
}
@UpdateMethod
fun post(command: Command): ExecutionRecord {
val execution = postCommand(command)
return Async.function {
execution.completedRecord[TIMEOUT]
}
.exceptionally { execution.record }
.get()
}
data class Execution(
val record: ExecutionRecord, // contains the command and some metadata
val completedRecord: CompletablePromise<ExecutionRecord>, // contains the result of a completed command
)
My question
Does the Async.function
in the post
method yield control back to the main workflow thread immediately? Or does it block until the completedRecord
promise resolves or times out?
I want to ensure that:
- The main workflow loop can continue processing other commands in the queue while a
post
is waiting for a result - Multiple simultaneous calls to
post
don’t block each other
Any insights or best practices around this pattern would be appreciated!