Awaiting workflow with signals - Null workflowId. Was workflow started?

Dear Temporal support team, I need your help.

I have an order execution workflow which should handle 1…* fill requests.
My idea is to use a the workflow WorkflowMethod method to initiate a long running worklow and:
a. to wait for a timeout duration OR
b. to terminate successfully the workflow when all the fill requests have been received

@WorkflowInterface
interface ExecutionWorkflow {
    /**
     * Create a new workflow instance to process
     * all fill messaged related to this market/group
     * or orders until it's filled (unless cancelled)
     */
    @WorkflowMethod(name = "ExecutionWorkflow-v1")
    fun new(startObject: ExecutionWorkflowStartObject)
    /**
     * Fill message received.
     */
    @SignalMethod(name = "ExecutionWorkflow-NewFill-v1")
    fun newFill(fillExecMessage: ExecutionModel.FillExecMessage)
}

Implementation:

override fun new(startObject: ExecutionWorkflowStartObject) {
  val groupId = startObject.groupId
  val qty = startObject.qty
  // side effect not required here since the code is deterministic:
  if (this.qty == null) { this.qty = qty }
  if (groupId == null)  { this.groupId = groupId }
  // await for all orders to be filled Or timeout if not fully filled within 1 day
  Workflow.await(Duration.ofDays(1)) {
      log.info("New execution received: $groupId, $qty, $filled")
      qty <= filled
  }
}
override fun newFill(fillExecMessage: ExecutionModel.FillExecMessage) {
  // perform allocation  - here we should
  // reconciliate client orders with partial executions
  log.info("New fill: ${fillExecMessage.groupId} -> ${fillExecMessage.product}:${fillExecMessage.qty}")
  val allocs = executionActivity.allocation(fillExecMessage)
  filled.plus(fillExecMessage.qty)
  // send allocations for booking
  executionActivity.booking(allocs)
}

In my current implementation, an activity (from another workflow) starts the workflow:

val options = WorkflowOptions.newBuilder()
      .setTaskQueue(queue)
      .setWorkflowId(groupId.toString())
      .setWorkflowIdReusePolicy(WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE)
      .build()
val workflow = workflowClient.newUntypedWorkflowStub("ExecutionWorkflow-v1", options)
val we = workflow.start(ExecutionWorkflowStartObject(groupId, qty))
log.info("Workflow started: ${we.runId} for $groupId with $qty ($queue)")
return we.runId

Later on, another piece of service sends fill events to the long running workflow:

log.info("Release a new execution: $element")
val options = WorkflowOptions.newBuilder()
        .setTaskQueue(queue)
        .setWorkflowId(element.groupId.toString()) // same id as in the previous step
        .build()
val workflow = workflowClient.newUntypedWorkflowStub("ExecutionWorkflow-v1", options)
workflow.signal("ExecutionWorkflow-NewFill-v1", element);
log.info("New fill sent for ${element.groupId} with ${element.qty}")

I am unfortunately getting an error while sending the signal:

2021-06-18 15:43:12.826  INFO 14151 --- [ce="default": 3] s.t.test.services.ExecutionServiceImpl   : Workflow started: dd36fe9d-750f-4265-80db-5a30dedff069 for 18cdcb00-d03b-11eb-a4c2-3388fa33a035 with 15.15 (alloc_q)
2021-06-18 15:43:13.651  INFO 14151 --- [   scheduling-1] s.t.test.services.ExecutionServiceImpl   : Release a new execution: FillExecMessage(fillId=204a7310-d03b-11eb-a4c2-3388fa33a035, execReqId=18cdcb00-d03b-11eb-a4c2-3388fa33a035:53f49fbb2fd9820768f25843846c7759f0af6b8238606871f145430831bb680b, groupId=18cdcb00-d03b-11eb-a4c2-3388fa33a035, product=ProductReq(assetClass=Fixed Income, isin=US03783310), qty=3.03)
2021-06-18 15:43:18.187 ERROR 14151 --- [   scheduling-1] o.s.s.s.TaskUtils$LoggingErrorHandler    : Unexpected error occurred in scheduled task

java.lang.IllegalStateException: Null workflowId. Was workflow started?
	at io.temporal.internal.sync.WorkflowStubImpl.checkStarted(WorkflowStubImpl.java:388) ~[temporal-sdk-1.0.7.jar:na]
	at io.temporal.internal.sync.WorkflowStubImpl.signal(WorkflowStubImpl.java:91) ~[temporal-sdk-1.0.7.jar:na]
	at seb.temporal.test.services.ExecutionServiceImpl.release(ExecutionService.kt:119) ~[classes/:na]
	at seb.temporal.test.services.ExecutionServiceImpl.release(ExecutionService.kt:27) ~[classes/:na]
	at seb.temporal.test.services.ChaosUnit.releaseMessages(Chaos.kt:68) ~[classes/:na]
	at jdk.internal.reflect.GeneratedMethodAccessor43.invoke(Unknown Source) ~[na:na]
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
	at java.base/java.lang.reflect.Method.invoke(Method.java:564) ~[na:na]
	at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84) ~[spring-context-5.3.6.jar:5.3.6]
	at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) ~[spring-context-5.3.6.jar:5.3.6]
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
	at java.base/java.util.concurrent.FutureTask.runAndReset$$$capture(FutureTask.java:305) ~[na:na]
	at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java) ~[na:na]
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) ~[na:na]
	at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]

I can also see the workflow instance in the Temporal GUI so it seems to be started properly:

Workflow Name
    ExecutionWorkflow-v1
Started At
    Friday June 18th, 3:43:01 pm
Status
    running 
Workflow Id
    18cdcb00-d03b-11eb-a4c2-3388fa33a035
Run Id
    dd36fe9d-750f-4265-80db-5a30dedff069
Input
    [
      {
        "groupId": "18cdcb00-d03b-11eb-a4c2-3388fa33a035",
        "qty": 15.15
      }
    ]
Task Queue
    alloc_q 
History Events
    5
Pending Activities

Could you please help me to understand what’s wrong with my approach or implementation?

Thanks, Seb.

Later on, another piece of service sends fill events to the long running workflow…

WorkflowClient newUntypedWorkflowStub(String workflowType, WorkflowOptions options)
is used to start a single workflow execution.

In your code (that sends the signal) , if you have the workflow interface class available you could use the
WorkflowClient newWorkflowStub(Class<T> workflowInterface, String workflowId, Optional<String> runId) method and then call the signal method to send the signal.
Or if you want to use the workflow type (string) as specified in your
@WorkflowMethod use WorkflowClient
newUntypedWorkflowStub( String workflowId, Optional<String> runId, Optional<String> workflowType)
and then call the signal method.

val allocs = executionActivity.allocation(fillExecMessage)

You should not call your Activities in signal methods. Move them to your workflow method code and invoke after signal is received.

If your workflow is long running you should also look into Contunue-As-New in order to not run into history size limitations, meaning that you need to periodically based on time or number of new fills for example continue as a new workflow in order not to run into this limitation.

There are two types of newUntypedWorkflowStub methods. One is used to start workflow and another to connect to the already running workflow. It was a mistake to name them the same as you are not the first one who ran into this issue.

Use this one that takes workflowID as an argument.

1 Like

Thanks a lot for your prompt responses.

I am using the same approach than in io.temporal.samples.hello.HelloSignal now and it works like a charm.

@tihomir Just for the sake of documentation for potential further readers, I deliberately do not use workflow interfaces to decouple in this way: springboot-microservices-managed-by-temporal-io-rabbitmq since multiple microservices will rely on Temporal queues to communicate together with NO shared code.

Thanks, Seb.

@maxim We have run into this issue multiple times (of mistakenly using the wrong workflow stub type). Would it be possible to deprecate the ambiguous and error-prone types and replace them with something more explicit? And if not the types, then at least the factory methods.

2 Likes