Using Spring Integration to batch API requests from Temporal activity at client side into 1 single network call

To give you some context. At the moment, I’m building a web app for my company to manage marketing campaigns with complicated long-running workflows. My high-level plan is to use Temporal to build the BE that reads a workflow definition from my own DSL and then execute it.

For each campaign, we may have a population of 10mil+ users who are eligible to enter the campaign all at once when it is launched. Due to a bunch of requirements, I need to start one Temporal workflow for each user, meaning we may have 10mil+ workflows running.

Let’s say after a delay of 7 days, we need send a marketing email to those users. It means 10mil+ workflows may wake up around the same time and start calling my Worker service to send out emails. When my Worker service receives those requests, I don’t want to call another external API 10mil+ times to send out emails one-by-one.

Since I expect a lot of workflows to wake up around the same time, I want to implement a batching mechanism on my Worker service to batch up to 10,000 requests within 500ms interval into 1 single network call to the external API. When I get back a result from the API provider, I need to return a result or an error to individual request. For this purpose, I’m exploring Spring Integration, which I’ve never used before, to build the batching layer.

To test out batching layer, I created the following MathService and MathGateway.

@Service
public class MathService {
  public int sum(List<Integer> numbers) {
    return numbers.stream().mapToInt(Integer::intValue).sum();
  }

  public int sumExceptionally(List<Integer> numbers) {
    throw new RuntimeException("Exception thrown from MathService.sumExceptionally()");
  }
}

@MessagingGateway
public interface MathGateway {
  @Gateway(requestChannel = "sumChannel")
  CompletableFuture<Integer> sumAggregate(
      @Header("correlationId") int correlationId,
      @Payload Integer number
  );
}

My goal is to batch calls to MathGateway.sumAggregate before making a single call to MathService.sum using the following IntegrationConfig.

@Slf4j
@Configuration
@EnableIntegration
@IntegrationComponentScan("com.james.playground")
public class IntegrationConfig {
  @Autowired
  MathService mathService;

  @Bean
  public MessageChannel sumChannel() {
    return new DirectChannel();
  }

  @Bean
  public IntegrationFlow processSumAggregate() {
    return IntegrationFlows.from(this.sumChannel())
        .aggregate(aggregator -> aggregator
            .correlationStrategy(message -> message.getHeaders().get("correlationId"))
            .releaseStrategy(group -> group.size() == 5) // Max batch size
            .groupTimeout(500) // Timeout for releasing the batch
            .expireGroupsUponCompletion(true)
            .sendPartialResultOnExpiry(true)
            .outputProcessor(group -> {
              try {
                int sum = StreamEx.of(group.getMessages())
                    .map(message -> (Integer) message.getPayload())
                    .toListAndThen(mathService::sum);

                List<Pair<Object, Integer>> payload = group.getMessages()
                    .stream()
                    .map(message -> Pair.of(message.getHeaders().getReplyChannel(), sum))
                    .collect(Collectors.toList());

                return MessageBuilder.withPayload(payload).build();

              } catch (Exception ex) {
                List<Pair<Object, Exception>> payload = StreamEx.of(group.getMessages())
                    .map(message -> Pair.of(message.getHeaders().getReplyChannel(), ex))
                    .toList();

                return MessageBuilder.withPayload(payload).build();
              }
            })
        )
        .split()
        .<Pair<?, ?>, Message<?>>transform(
            payload -> MessageBuilder.withPayload(payload.getRight())
                .setReplyChannel((MessageChannel) payload.getLeft())
                .setErrorChannel((MessageChannel) payload.getLeft())
                .build()
        )
        // .log() // LOG LINE 1
        .handle((payload, headers) -> payload)
        // .log() // LOG LINE 2
        .get();
  }
}

Below is my testing endpoint.

@Slf4j
@RestController
@RequestMapping("/test")
public class TestController {
  @Autowired
  MathGateway mathGateway;

  @PostMapping("/batch/sum/aggregate")
  public String sumAggregate(@RequestParam(required = false) Integer correlationId) {
    log.info("TestController.sumAggregate start");

    List<CompletableFuture<Void>> futures = new ArrayList<>();

    for (int i = 0; i < 5; i++) {
      int input              = i;
      int finalCorrelationId = correlationId == null ? i % 2 : correlationId;

      CompletableFuture<Void> future = mathGateway.sumAggregate(finalCorrelationId, input)
          .thenAccept(result -> {
            log.info("TestController.sumAggregate result, input: {}, output: {}", input, result);
          })
          .exceptionally(throwable -> {
            log.info("TestController.sumAggregate failed, input: {}, error: {}", input, throwable.getMessage());

            return null;
          });

      futures.add(future);
    }

    CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();

    log.info("TestController.sumAggregate finished");

    return "DONE";
  }
}

At the moment, the above code is working as I expected. If I send in a correlationId, I will get the sum of all 5 numbers as the output of all 5 CompletableFuture. Otherwise, if correlationId is missing, I will get 2 separate sums for even and odd numbers as the output of corresponding CompletableFuture at odd and even indices. When I switch the code to use MathService.sumExceptionally, I am getting proper exception out of my CompletableFuture.

Now here are my questions for the Temporal and Spring Integration masters.

  1. On the high-level, is there any problems with batching API calls from Temporal activities in this way? I plan to use Asynchronous Activity Completion to return result from the batching layer via CompleteableFuture.thenAccept.
  2. Is Spring Integration the right tool to build this batching layer? If not, why is that and is there a better alternative?
  3. The code is working from my point of view but maybe I cannot see what I don’t know. Do you see any problems with my code?
  4. Although the code is working, it looks quite complicated to me. I wonder if there’s a simpler approach/syntax in Spring Integration for what I’m doing in the code.
  5. When I enable LOG LINE 1 and LOG LINE 2, I see exactly the same thing printed on the console. Hence, I initially thought the handle call in the middle of the 2 log lines is useless. However, if I remove it, the code stops working. My TestController will get stuck because only 1 CompletableFuture can resolve. I tried to read documentations but I still cannot explain this behavior. Can you help me with this?

I’d be very grateful if you could help me with my questions above.