Hello!
I want to implement workflow that aggregates some future events, grouped by day(UTC). Aggregation will continue until the end of a day or 10 minutes before the earliest event. After aggregation completes, it will send report from aggregated events to external service. What is the best way to implement this system?
Is it possible to implement this workflow, using Signals? For example:
//receive `event` somewhere in code
WorkflowStub workflowStub = workflowClient.newUntypedWorkflowStub(
"EventAggregator",
WorkflowOptions
.newBuilder()
.setWorkflowId("report;date=" + Date.from(event.time())
.setWorkflowIdReusePolicy(WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE)
.build()
);
workflowStub.signalWithStart(
"addEvent",
new Object[]{event},
new Object[]{}
);
and then in EventAggregator:
@WorkflowImplementation
public class EventAggregator {
private List<Event> events = newLinkedList();
private Optional<Event> earliestEvent = Optional.empty();
private SendReportActivity sendReportActivity = Workflow.newActivityStub(SendReportActivity.class)
@SignalMethod
public void addEvent(Event event) {
events.add(event);
if (earliestEvent.stream().noneMatch(e -> e.time().isBefore(event.time()))) {
earliestEvent = Optional.of(event);
}
}
@WorkflowMethod()
void sendReport() {
while (needWait()) {
var duration = durationBeforeEarliestEvent().orElse(Duration.ofHours(24));
Workflow.await(duration, () -> !needWait());
}
sendReportActivity.send(events)
}
private boolean needWait() {
return durationBeforeEarliestEvent().stream().noneMatch(d -> d.compareTo(Duration.ofMinutes(10)) <= 0);
}
private Optional<Duration> durationBeforeEarliestEvent() {
return earliestEvent.stream().map(e -> Duration.between(Workflow.currentTimeMillis(), e.time())).findFirst();
}
}
Can i use this while loop in workflow method?
How can i guaranty that no events will be lost in race conditions?