Dear Temporal support team, I need your help.
I have an order execution workflow which should handle 1…* fill requests.
My idea is to use a the workflow WorkflowMethod method to initiate a long running worklow and:
a. to wait for a timeout duration OR
b. to terminate successfully the workflow when all the fill requests have been received
@WorkflowInterface
interface ExecutionWorkflow {
/**
* Create a new workflow instance to process
* all fill messaged related to this market/group
* or orders until it's filled (unless cancelled)
*/
@WorkflowMethod(name = "ExecutionWorkflow-v1")
fun new(startObject: ExecutionWorkflowStartObject)
/**
* Fill message received.
*/
@SignalMethod(name = "ExecutionWorkflow-NewFill-v1")
fun newFill(fillExecMessage: ExecutionModel.FillExecMessage)
}
Implementation:
override fun new(startObject: ExecutionWorkflowStartObject) {
val groupId = startObject.groupId
val qty = startObject.qty
// side effect not required here since the code is deterministic:
if (this.qty == null) { this.qty = qty }
if (groupId == null) { this.groupId = groupId }
// await for all orders to be filled Or timeout if not fully filled within 1 day
Workflow.await(Duration.ofDays(1)) {
log.info("New execution received: $groupId, $qty, $filled")
qty <= filled
}
}
override fun newFill(fillExecMessage: ExecutionModel.FillExecMessage) {
// perform allocation - here we should
// reconciliate client orders with partial executions
log.info("New fill: ${fillExecMessage.groupId} -> ${fillExecMessage.product}:${fillExecMessage.qty}")
val allocs = executionActivity.allocation(fillExecMessage)
filled.plus(fillExecMessage.qty)
// send allocations for booking
executionActivity.booking(allocs)
}
In my current implementation, an activity (from another workflow) starts the workflow:
val options = WorkflowOptions.newBuilder()
.setTaskQueue(queue)
.setWorkflowId(groupId.toString())
.setWorkflowIdReusePolicy(WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE)
.build()
val workflow = workflowClient.newUntypedWorkflowStub("ExecutionWorkflow-v1", options)
val we = workflow.start(ExecutionWorkflowStartObject(groupId, qty))
log.info("Workflow started: ${we.runId} for $groupId with $qty ($queue)")
return we.runId
Later on, another piece of service sends fill events to the long running workflow:
log.info("Release a new execution: $element")
val options = WorkflowOptions.newBuilder()
.setTaskQueue(queue)
.setWorkflowId(element.groupId.toString()) // same id as in the previous step
.build()
val workflow = workflowClient.newUntypedWorkflowStub("ExecutionWorkflow-v1", options)
workflow.signal("ExecutionWorkflow-NewFill-v1", element);
log.info("New fill sent for ${element.groupId} with ${element.qty}")
I am unfortunately getting an error while sending the signal:
2021-06-18 15:43:12.826 INFO 14151 --- [ce="default": 3] s.t.test.services.ExecutionServiceImpl : Workflow started: dd36fe9d-750f-4265-80db-5a30dedff069 for 18cdcb00-d03b-11eb-a4c2-3388fa33a035 with 15.15 (alloc_q)
2021-06-18 15:43:13.651 INFO 14151 --- [ scheduling-1] s.t.test.services.ExecutionServiceImpl : Release a new execution: FillExecMessage(fillId=204a7310-d03b-11eb-a4c2-3388fa33a035, execReqId=18cdcb00-d03b-11eb-a4c2-3388fa33a035:53f49fbb2fd9820768f25843846c7759f0af6b8238606871f145430831bb680b, groupId=18cdcb00-d03b-11eb-a4c2-3388fa33a035, product=ProductReq(assetClass=Fixed Income, isin=US03783310), qty=3.03)
2021-06-18 15:43:18.187 ERROR 14151 --- [ scheduling-1] o.s.s.s.TaskUtils$LoggingErrorHandler : Unexpected error occurred in scheduled task
java.lang.IllegalStateException: Null workflowId. Was workflow started?
at io.temporal.internal.sync.WorkflowStubImpl.checkStarted(WorkflowStubImpl.java:388) ~[temporal-sdk-1.0.7.jar:na]
at io.temporal.internal.sync.WorkflowStubImpl.signal(WorkflowStubImpl.java:91) ~[temporal-sdk-1.0.7.jar:na]
at seb.temporal.test.services.ExecutionServiceImpl.release(ExecutionService.kt:119) ~[classes/:na]
at seb.temporal.test.services.ExecutionServiceImpl.release(ExecutionService.kt:27) ~[classes/:na]
at seb.temporal.test.services.ChaosUnit.releaseMessages(Chaos.kt:68) ~[classes/:na]
at jdk.internal.reflect.GeneratedMethodAccessor43.invoke(Unknown Source) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:564) ~[na:na]
at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84) ~[spring-context-5.3.6.jar:5.3.6]
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) ~[spring-context-5.3.6.jar:5.3.6]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.runAndReset$$$capture(FutureTask.java:305) ~[na:na]
at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java) ~[na:na]
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
I can also see the workflow instance in the Temporal GUI so it seems to be started properly:
Workflow Name
ExecutionWorkflow-v1
Started At
Friday June 18th, 3:43:01 pm
Status
running
Workflow Id
18cdcb00-d03b-11eb-a4c2-3388fa33a035
Run Id
dd36fe9d-750f-4265-80db-5a30dedff069
Input
[
{
"groupId": "18cdcb00-d03b-11eb-a4c2-3388fa33a035",
"qty": 15.15
}
]
Task Queue
alloc_q
History Events
5
Pending Activities
Could you please help me to understand what’s wrong with my approach or implementation?
Thanks, Seb.