Batch signal workflows from Java SDK

Since it was announced that tctl will be discontinued, I am searching a way to start a batch operation of type signal to notify many workflows based on a visibility query. It seems this is not really well documented as of now and therefore could anybody validate following theoretical code snippet?

var startBatchOperationResponse = WorkflowServiceStubs.newServiceStubs(
                WorkflowServiceStubsOptions.newBuilder()
                        .setTarget("temporal.url")
                        .validateAndBuildWithDefaults())
        .blockingStub()
        .startBatchOperation(StartBatchOperationRequest.newBuilder()
                .setVisibilityQuery("WorkflowType = \"namespace.MyWorkflowType\" and ExecutionStatus = \"Running\"")
                .setSignalOperation(BatchOperationSignal.newBuilder()
                        .setSignal("MySignalMethod")
                        .setInput(Payloads.newBuilder()
                                .addPayloads(new JacksonJsonPayloadConverter()
                                        .toData(myObjectPassedAsFirstParameter)
                                        .orElse(null))
                                .build()))
                .build());

Assumptions:

  1. To be run from an activity
  2. myObjectPassedAsFirstParameter must be serialisable using the JacksonJsonPayloadConverter

All this seems a bit too much of boilerplate, so if there is any other technique to send a signal to many workflows, I am in.

Typical use case and motivation:

Temporal allows to replace event sourcing and may therefore simplify architecture. There are situations where this becomes difficult, typically when several clients need to receive a notification that some state changed. A possible workaround is to represent a client subscription to some state by a workflow that does nothing except waiting to receive a signal in case the state changes. Instead of explicitly registering such observers in whatever collection, the idea is to rely on temporal visibility and distribute a signal to all workflows based on a specific query. There are of course downsides such as need of cleaning the observer workflows and possible performance hit in case of larger number of those observers.

Hello @vaclav

you can use the new cli to do the same:
temporal workflow signal --query "(ExecutionStatus=\"Running\")" --name "signalName " -i "input" -reason "reason"

The code looks good. The only thing you might need to add namespace, jobId, and reason to StartBatchOperationRequest

Keep in mind that the visibility store is eventually consistent.

Hi, thanks for you hint.

Agree with the eventual consistency of the visibility store. That’s not perfect of course and one need to be aware of that.

I have in the meanwhile done some tests and indeed namespace, jobId and reason were missing.

I get the signal delivered to the workflows but the input is not deserialised as expected. Probably there some kind of unwanted double serialisation. This is what I get in the visibility UI when looking in the signal input:

image

I am unable to get to to correctly provide an arbitrary object to the the method setInput() above. It expects Payloads or Builder. In the code above I have tried to construct Payloads using available builders. Either there is a bug or I don’t see some obvious flaw in my code. It would actually be much easier to have another version of setInput as follows: setInput(Object ... vars) because this is actually the first place in the whole SDK where serialisation of inputs is expected to be done manually.