Aggregate data in workflow

Hello!
I want to implement workflow that aggregates some future events, grouped by day(UTC). Aggregation will continue until the end of a day or 10 minutes before the earliest event. After aggregation completes, it will send report from aggregated events to external service. What is the best way to implement this system?
Is it possible to implement this workflow, using Signals? For example:

//receive `event` somewhere in code
WorkflowStub workflowStub = workflowClient.newUntypedWorkflowStub(
    "EventAggregator",
    WorkflowOptions
          .newBuilder()
          .setWorkflowId("report;date=" + Date.from(event.time())
          .setWorkflowIdReusePolicy(WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE)
          .build()
);

workflowStub.signalWithStart(
    "addEvent",
    new Object[]{event},
    new Object[]{}
);

and then in EventAggregator:

@WorkflowImplementation
public class EventAggregator {
    private List<Event> events = newLinkedList();
    private Optional<Event> earliestEvent = Optional.empty();
    private SendReportActivity sendReportActivity = Workflow.newActivityStub(SendReportActivity.class)

    @SignalMethod
    public void addEvent(Event event) {
        events.add(event);
        if (earliestEvent.stream().noneMatch(e -> e.time().isBefore(event.time()))) {
            earliestEvent = Optional.of(event);
        }
    }

    @WorkflowMethod()
    void sendReport() {
        while (needWait()) {
            var duration = durationBeforeEarliestEvent().orElse(Duration.ofHours(24));
            Workflow.await(duration, () -> !needWait());
        }

        sendReportActivity.send(events)
    }

    private boolean needWait() {
        return durationBeforeEarliestEvent().stream().noneMatch(d -> d.compareTo(Duration.ofMinutes(10)) <= 0);
    }

    private Optional<Duration> durationBeforeEarliestEvent() {
        return earliestEvent.stream().map(e -> Duration.between(Workflow.currentTimeMillis(), e.time())).findFirst();
    }
}

Can i use this while loop in workflow method?
How can i guaranty that no events will be lost in race conditions?

Having a while loop is fine when you use Workflow.await. I’m a bit confused as to why you need a while loop here at all though; after the call to await returns you know that needWait() is false; so why would you need to check it again?

Using signals is fine. Once a signal has been recorded by the Temporal service, it will be delivered to the running workflow in a reliable and durable fashion.

The Temporal Java SDK uses multiple threads in order to allow one part of the workflow execution to be paused (such as waiting on an await) while allowing other parts of the workflow to execute (such as responding to signals)… however, in workflow executions the Java SDK only runs one thread at a time in a particular workflow execution.

If you might be familiar with async and await in JavaScript, JavaScript is single threaded and you can’t have concurrent execution, but you can have multiple execution stacks (e.g. function A can call B which calls C which pauses with await, and then the event loop can run other code which then has its own execution stack).

The Temporal Java SDK provides the same facility (you never need to deal with concurrent code execution but you do get multiple execution stacks) in a language where the only way to have multiple execution stacks is by using threads.

Thus, when you call await, this allows a waiting signal to be handled by calling your signal method, but that execution either completes or itself pauses to wait on something before the await returns. You don’t get the signal method being called while the sendReport method is executing.

This means you don’t need to worry about race conditions like you would in standard Java. You don’t need (and shouldn’t use) locks, synchronized methods, volatiles, or atomic variables to ensure that a write by one thread is seen by another thread that’s using it. This is taken care of for you in workflow execution.

(Just to be clear, activities are written in standard Java, can use Java threads if they want to, and have the usual concerns about race conditions if they do).

Another consideration is what volume of signals any particular workflow instance might need to handle. Temporal supports millions of workflows executing in parallel, but any one particular workflow instance only supports processing a limited number of events per second. If you have a lot of events being aggregated into a single report, you might need to do that in an event streaming system (e.g. Kafka Streams, etc)

1 Like

Each time new event comes to aggregator, we need to recalculate earliestEvent, thats why i use loop. I found example of loop with queue for signals here samples-java/core/src/main/java/io/temporal/samples/hello/HelloSignal.java at main · temporalio/samples-java · GitHub
But you are right, i have mistake in loop logic. It should be something like this:

    @WorkflowImplementation
    public class EventAggregator {
        private List<Event> eventsToAdd = newLinkedList();
        private SendReportActivity sendReportActivity = Workflow.newActivityStub(SendReportActivity.class)

        @SignalMethod
        public void addEvent(Event event) {
            eventsToAdd.add(event);
        }

        @WorkflowMethod()
        void sendReport() {
            Workflow.await(() -> !eventsToAdd.isEmpty());
            
            PriorityQueue<Event> events = new PriorityQueue<>(10, Comparator.comparing(e -> e.time()))
            
            while (true) {
                events.addAll(eventsToAdd);
                eventsToAdd.clear();
                
                var sendReportTime = events.peek().time().minus(Duration.ofMinutes(10));
                var currentTime = Instant.ofEpochMilli(Workflow.currentTimeMillis());
                
                if (currentTime.isAfter(sendReportTime)) {
                    sendReportActivity.send(events);
                    return;
                } else {
                    Workflow.await(Duration.between(currentTime, sendReportTime), () -> !eventsToAdd.isEmpty());
                }
            }
        }
    }