Thank you Tihomir. The demo looks interesting, it actually covers other aspects we aim to cover soon too - the reverts. I adapted the approach to our code but I still have some blockers.
What we needed to do differently is to communicate the retry/fail signals to particular workflow executions, rather than applying it to all workflows, as your demo does. I have a similar listener registered on the interceptor:
public interface PauseResumeInterceptorListener {
@SignalMethod
void retry();
@SignalMethod
void fail();
@QueryMethod(name = "awaitsManualRetry")
Boolean awaitsManualRetry();
}
Communication via signal request is working fine - when I spin two workflow instances and use particular workflow ID, this code seems to be correlating correctly with the corresponding interceptor instance:
private void sendSignal(String workflowId, String signalName) {
WorkflowServiceGrpc.WorkflowServiceBlockingStub workflowServiceBlockingStub =
workflowClient.getWorkflowServiceStubs().blockingStub();
SignalWorkflowExecutionRequest req = SignalWorkflowExecutionRequest.newBuilder()
.setNamespace(workflowClient.getOptions().getNamespace())
.setWorkflowExecution(WorkflowExecution.newBuilder().setWorkflowId(workflowId))
.setSignalName(signalName)
.build();
workflowServiceBlockingStub.signalWorkflowExecution(req);
}
Now, the additional questions.
1) finding out that there’s the need for retry/fail human intervention
We need to find out that there’s the manual retry/fail decision required. Again, we need this per workflow execution. The workflow events history does not show any error, once the workflow ends up “hanging” in the PauseResumeWorkflowOutboundCallsInterceptor. I guess this is conceptually expected, as the activity didn’t technically fail (yet). FYI I’m posting a separate question on activity histories too.
So what I tried instead - I added the awaitsManualRetry query method (see the code snippet above). Documentation on messages is not describing our case - it demonstrates messages via stubs. We do not have workflow stub available - we need to interact with a running workflow over multiple UI sessions (so we persist workflow ID and the ID is always our starting point). I tried invoking the query like this:
WorkflowStub ws = workflowClient.newUntypedWorkflowStub(workflowId);
Boolean b = ws.query("awaitsManualRetry", Boolean.class);
OR
WorkflowQuery query = WorkflowQuery.newBuilder().setQueryType("awaitsManualRetry").build();
QueryWorkflowRequest queryWorkflowRequest = QueryWorkflowRequest.newBuilder()
.setNamespace(workflowClient.getOptions().getNamespace())
.setExecution(WorkflowExecution.newBuilder().setWorkflowId(workflowId))
.setQuery(query)
.build();
QueryWorkflowResponse resp = workflowServiceBlockingStub.queryWorkflow(queryWorkflowRequest);
But I’m always getting
2024-11-14T15:06:48.926 [scheduling-1] DEBUG i.t.internal.retryer.GrpcRetryer - Final exception, throwing mdc=[traceId=ba34bd2c5b610a3632e4455890a049e8, spanId=23aff953699b78de]
io.grpc.StatusRuntimeException: NOT_FOUND: sql: no rows in result set
at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:271)
at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:252)
at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:165)
at io.temporal.api.workflowservice.v1.WorkflowServiceGrpc$WorkflowServiceBlockingStub.queryWorkflow(WorkflowServiceGrpc.java:4821)
at io.temporal.internal.client.external.GenericWorkflowClientImpl.lambda$query$10(GenericWorkflowClientImpl.java:208)
at io.temporal.internal.retryer.GrpcSyncRetryer.retry(GrpcSyncRetryer.java:69)
at io.temporal.internal.retryer.GrpcRetryer.retryWithResult(GrpcRetryer.java:60)
at io.temporal.internal.client.external.GenericWorkflowClientImpl.query(GenericWorkflowClientImpl.java:203)
at io.temporal.internal.client.RootWorkflowClientInvoker.query(RootWorkflowClientInvoker.java:424)
at io.temporal.common.interceptors.WorkflowClientCallsInterceptorBase.query(WorkflowClientCallsInterceptorBase.java:68)
at io.temporal.opentracing.internal.OpenTracingWorkflowClientCallsInterceptor.query(OpenTracingWorkflowClientCallsInterceptor.java:116)
at io.temporal.client.WorkflowStubImpl.query(WorkflowStubImpl.java:317)
at io.temporal.client.WorkflowStubImpl.query(WorkflowStubImpl.java:307)
Any ideas what goes wrong? Remember, our listener is registered on the interceptor, rather than workflow (same as in your demo). Is this the factor?
2) resilience
The human intervention retry/fail logic is implemented in the interceptor. It waits for the signal via Promises. Is my understanding correct that this state won’t survive JVM restart? Same applies to the list of saga compensation activities. What will happen if workflow is paused in the Promise, because the retry/fail signal was not sent yet, and JVM is restarted? Will the workflow instance be reloaded - the activity that was waiting for the human interaction gets re-executed automatically?
Thank you!