To give you some context. At the moment, I’m building a web app for my company to manage marketing campaigns with complicated long-running workflows. My high-level plan is to use Temporal to build the BE that reads a workflow definition from my own DSL and then execute it.
For each campaign, we may have a population of 10mil+ users who are eligible to enter the campaign all at once when it is launched. Due to a bunch of requirements, I need to start one Temporal workflow for each user, meaning we may have 10mil+ workflows running.
Let’s say after a delay of 7 days, we need send a marketing email to those users. It means 10mil+ workflows may wake up around the same time and start calling my Worker service to send out emails. When my Worker service receives those requests, I don’t want to call another external API 10mil+ times to send out emails one-by-one.
Since I expect a lot of workflows to wake up around the same time, I want to implement a batching mechanism on my Worker service to batch up to 10,000 requests within 500ms interval into 1 single network call to the external API. When I get back a result from the API provider, I need to return a result or an error to individual request. For this purpose, I’m exploring Spring Integration, which I’ve never used before, to build the batching layer.
To test out batching layer, I created the following MathService
and MathGateway
.
@Service
public class MathService {
public int sum(List<Integer> numbers) {
return numbers.stream().mapToInt(Integer::intValue).sum();
}
public int sumExceptionally(List<Integer> numbers) {
throw new RuntimeException("Exception thrown from MathService.sumExceptionally()");
}
}
@MessagingGateway
public interface MathGateway {
@Gateway(requestChannel = "sumChannel")
CompletableFuture<Integer> sumAggregate(
@Header("correlationId") int correlationId,
@Payload Integer number
);
}
My goal is to batch calls to MathGateway.sumAggregate
before making a single call to MathService.sum
using the following IntegrationConfig
.
@Slf4j
@Configuration
@EnableIntegration
@IntegrationComponentScan("com.james.playground")
public class IntegrationConfig {
@Autowired
MathService mathService;
@Bean
public MessageChannel sumChannel() {
return new DirectChannel();
}
@Bean
public IntegrationFlow processSumAggregate() {
return IntegrationFlows.from(this.sumChannel())
.aggregate(aggregator -> aggregator
.correlationStrategy(message -> message.getHeaders().get("correlationId"))
.releaseStrategy(group -> group.size() == 5) // Max batch size
.groupTimeout(500) // Timeout for releasing the batch
.expireGroupsUponCompletion(true)
.sendPartialResultOnExpiry(true)
.outputProcessor(group -> {
try {
int sum = StreamEx.of(group.getMessages())
.map(message -> (Integer) message.getPayload())
.toListAndThen(mathService::sum);
List<Pair<Object, Integer>> payload = group.getMessages()
.stream()
.map(message -> Pair.of(message.getHeaders().getReplyChannel(), sum))
.collect(Collectors.toList());
return MessageBuilder.withPayload(payload).build();
} catch (Exception ex) {
List<Pair<Object, Exception>> payload = StreamEx.of(group.getMessages())
.map(message -> Pair.of(message.getHeaders().getReplyChannel(), ex))
.toList();
return MessageBuilder.withPayload(payload).build();
}
})
)
.split()
.<Pair<?, ?>, Message<?>>transform(
payload -> MessageBuilder.withPayload(payload.getRight())
.setReplyChannel((MessageChannel) payload.getLeft())
.setErrorChannel((MessageChannel) payload.getLeft())
.build()
)
// .log() // LOG LINE 1
.handle((payload, headers) -> payload)
// .log() // LOG LINE 2
.get();
}
}
Below is my testing endpoint.
@Slf4j
@RestController
@RequestMapping("/test")
public class TestController {
@Autowired
MathGateway mathGateway;
@PostMapping("/batch/sum/aggregate")
public String sumAggregate(@RequestParam(required = false) Integer correlationId) {
log.info("TestController.sumAggregate start");
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (int i = 0; i < 5; i++) {
int input = i;
int finalCorrelationId = correlationId == null ? i % 2 : correlationId;
CompletableFuture<Void> future = mathGateway.sumAggregate(finalCorrelationId, input)
.thenAccept(result -> {
log.info("TestController.sumAggregate result, input: {}, output: {}", input, result);
})
.exceptionally(throwable -> {
log.info("TestController.sumAggregate failed, input: {}, error: {}", input, throwable.getMessage());
return null;
});
futures.add(future);
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
log.info("TestController.sumAggregate finished");
return "DONE";
}
}
At the moment, the above code is working as I expected. If I send in a correlationId
, I will get the sum of all 5 numbers as the output of all 5 CompletableFuture. Otherwise, if correlationId
is missing, I will get 2 separate sums for even and odd numbers as the output of corresponding CompletableFuture at odd and even indices. When I switch the code to use MathService.sumExceptionally
, I am getting proper exception out of my CompletableFuture.
Now here are my questions for the Temporal and Spring Integration masters.
- On the high-level, is there any problems with batching API calls from Temporal activities in this way? I plan to use Asynchronous Activity Completion to return result from the batching layer via
CompleteableFuture.thenAccept
. - Is Spring Integration the right tool to build this batching layer? If not, why is that and is there a better alternative?
- The code is working from my point of view but maybe I cannot see what I don’t know. Do you see any problems with my code?
- Although the code is working, it looks quite complicated to me. I wonder if there’s a simpler approach/syntax in Spring Integration for what I’m doing in the code.
- When I enable
LOG LINE 1
andLOG LINE 2
, I see exactly the same thing printed on the console. Hence, I initially thought thehandle
call in the middle of the 2 log lines is useless. However, if I remove it, the code stops working. My TestController will get stuck because only 1 CompletableFuture can resolve. I tried to read documentations but I still cannot explain this behavior. Can you help me with this?
I’d be very grateful if you could help me with my questions above.