Springboot microservices managed by Temporal.io + RabbitMQ

Hi all!

I would like to create a microservices architecture based on Springboot + RabbitMQ + Temporal. The idea is to create a Temporal microservice that, during the application startup phase, records the workers that will be used by the entire ecosystem. Subsequently, in the same microservice, all application workflows are created and launched.
These workflows will refer to activities that will have the task of writing to the queues and delegating the business logic to other microservices.
When the microservices finish their work, they write on another topic that will be read by the Temporal microservice and will trigger the next methods present in the workflow
The architectural scheme that i want create is like this:

  • Temporal Microservice:

    • Register workers
    • Manage workflows and activities
    • Write/read only messages and params to different topic managed by RabbitMQ
  • N Microservices: contains some business logic and write result on specific topic

An example (with 3 microservices) can be represented like this:

  1. Workflow Interface

     @WorkflowMethod
     void workflowExecution();
    
     @SignalMethod
     void firstEnded(Object someResult);
    
     @SignalMethod
     void secondEnded(Object someResult);
    
  2. Workflow Implementation

     ActivityOptions options = ActivityOptions.newBuilder()
                 .setScheduleToCloseTimeout(Duration.ofSeconds(30))
                 .build();
    
     private final MyActivity activity = Workflow.newActivityStub(MyActivity.class, options);
     private boolean firstEnded, secondEnded;
     private Object someResult;
    
     @Override
     public void workflowExecution() {
         while (true) {
              //write in a RabbitMQ  topic, a microservice listener execute the business logic and then write 
                 on another topic (called like firstTopic)
              Async.function(activity::callFirstMethod, someResult);
              Workflow.await(() -> firstEnded);
              //write in a RabbitMQ  topic, a microservice listener execute the business logic and then write 
                 on another topic (called like secondTopic)
              Async.function(activity::callSecondMethod, someResult);
              Workflow.await(() -> secondEnded);
              Async.function(activity::callThirdMethod, someResult);
          }
     }
    
     @Override
     public void firstEnded(Object someResult) {
         this.someResult = someResult;
         this.firstEnded= true;
     }
    
     @Override
     public void secondEnded(Object someResult) {
         this.someResult = someResult;
         this.secondEnded = true;
     }
    
  3. RabbitMQ Listener

    Define TopicExchange

     private MyWorkflow wf;
     @PostConstruct
     public attachOnWorkflow() {
          WorkflowServiceStubs service = WorkflowServiceStubs.newInstance();
          WorkflowClient client = WorkflowClient.newInstance(service);
          WorkflowOptions options = WorkflowOptions.newBuilder()
                     .setTaskQueue("MY_WORKFLOW_TASK_QUEUE")
                     .setWorkflowId("MyWorkflow")
                     .build();
          wf = client.newWorkflowStub(MyWorkflow.class, options);
     }
    
     public listenerOnFirstTopic(Object someResult) {
          wf.firstEnded(someResult);
     }
    
     public listenerOnSecondTopic(Object someResult) {
          wf.secondEnded(someResult);
     }
    

Is the structure correct? Any suggestions on how to improve it, if is correct?

Thanks in advance!

Ideally, the other services would implement activities directly instead of relying on RabbitMQ. It would simplify your workflows, give more visibility into activity executions and eliminate additional RabbitMQ dependency. If for legacy reasons you have to stick with RabbitMQ your proposal looks fine.

Some nits:

  1. You want to have a workflow instance per business entity. So the rabbitMQ listener should extract business entity ID from the message and use it as workflow ID to signal:
 private WorklfowClient client;
 @PostConstruct
 public attachOnWorkflow() {
      WorkflowServiceStubs service = WorkflowServiceStubs.newInstance();
      WorkflowClient client = WorkflowClient.newInstance(service);
 }

 public listenerOnFirstTopic(Object someResult) {
      WorkflowOptions options = WorkflowOptions.newBuilder()
                 .setTaskQueue("MY_WORKFLOW_TASK_QUEUE")
                 .setWorkflowId(getWorkflowIdFromResult(someResult))
                 .build();
      wf = client.newWorkflowStub(MyWorkflow.class, options);
      wf.firstEnded(someResult);
 }

 public listenerOnSecondTopic(Object someResult) {
      WorkflowOptions options = WorkflowOptions.newBuilder()
                 .setTaskQueue("MY_WORKFLOW_TASK_QUEUE")
                 .setWorkflowId(getWorkflowIdFromResult(someResult))
                 .build();
      wf = client.newWorkflowStub(MyWorkflow.class, options);
      wf.secondEnded(someResult);
 }
  1. It is not clear why your workflow has an infinite loop.
  2. As the activity sends a message to the rabbitMQ topic there is no reason to call it asynchronously. In your case, as you call it asynchronously and ignore the resulting Promise all exceptions thrown from the activity will be ignored.
  3. Consider using await that takes timeout as the first argument to deal with replies not coming in time.
2 Likes

Thanks @maxim for the reply.
Regarding what is recommended:

  1. it’s not a legacy reason to use RabbitMQ, it was mostly an idea. From what you told me, we could use Temporal for this purpose, not inserting an unnecessary dependency for writing on specific queue. To manage the getWorkflowIdFromResult do you mean something like this flow?

    MyActivitity → WorkflowExecution execution = Activity.getWorkflowExecution();
    then write on topic execution.getWorkflowId() or something different?

  2. correct, it will not be handled this way

  3. excellent observation, I’ll move on to sync management

  4. here too, thanks for the advice

EDIT:
I did a test as indicated in point 1., but when I call the SignalMethod I get the following error: Null workflowid. Was workflow started?
Instead if I initialize the workflow in the @PostConstruct phase and use that instance in the signal invocation, everything works correctly. Any indication about this?

If there are no legacy requirements I would strongly recommend not introducing RabbitMQ or any other additional queue into your architecture. Temporal already dispatches activities through internal task queues. Here are some benefits of not using an additional queue:

  • Simpler and more available system to install and maintain as it has fewer moving parts.
  • Timeouts. Temporal has separate timeouts for how long an activity task can stay in a task queue before picked up (ScheduleToStartTimeout) and how long it can execute after received by an activity implementation (StartToCloseTimeout). If an external queue is used then it is not possible to control these timeouts separately. So with native task queues, you can specify very long ScheduleToStartTimeout and a short StartToCloseTimeout which would allow you to not fail the task if a downstream service is down for a long time.
  • Retries. Activity invocations are automatically retried according to configured exponential retry options. Each activity type or even invocation can have its own retry options. You are not getting such auto retries when using an external queue. Auto retries do not write anything to the workflow history. So they can have practically unlimited duration. It is OK to retry for days or even months. RabbitMQ has built in retry mechanism, but it uses the same timeout for all activity types, doesn’t support exponential backoff, and is limited in duration.
  • Simpler workflow code. Your workflow code becomes much simpler as no signals are needed. Just activity invocations:
  activities.callFirst(...);
  activities.callSecond(...);
  activities.callThird(...);
  • Rate Limiting. Temporal task queue support rate-limiting. So if there is a need it is possible to limit rate of dispatch of tasks even in the presence of multiple downstream consumers. I believe RabbitMQ doesn’t support such a feature.
  • Heartbeating. You can have long-running operations implemented through RabbitMQ, but they would require a long timeout to detect their failures. Temporal supports heartbeating for long-running activities. This allows quick detection of process outages and other issues even for very long activities. It is also possible to include some application specific information into the heartbeat. If an activity fails and is retried the information from the last heartbeat can be used to continue activity work from the position recorded in the last heartbeat.
  • Cancellation. Temporal activities support cancellation (if heartbeating). It is not possible to cancel a task dispatched to RabbitMQ.

WorkflowID

Would you explain the use case. Are you going to have a single workflow instance only? Or you will have an instance per business entity?

Thanks so much for the detailed explanation Maxim.
As mentioned earlier, using RabbitMQ isn’t a legacy choice, but it’s the only thing that came to my mind for the architectural design to be implemented.
I have drawn a draft of a subsection of what will be the architectural part for the management of workflows

I need to decouple the components, in order to have:

  • Temporal Microservice (called WFManager) which has the task of managing only and exclusively the wokflows related to the business. We may need to create some workflow, both basic and cron.
  • First Business Logic Microservices (called FirstMS) which has the task of reading large files and saving their content, normalized, on a Mongo database.
  • Second Business Logic Microservices (called SecondMS) which has the task of using what was previously normalized and aggregating it, doing some operations, on a relational database.

Now I’ll explain what I intended to achieve on a practical level, assuming the creation of one of the workflows (the workflows will be very similar to each other):

  1. an operation is launched on the UI that triggers a workflow, present in WFManager
  2. the workflow within it will be as follows:
    activity.callElaboration (someParams);
    Workflow.await ((Duration.ofHours (6)) → elaborationEnded);
    activity.callAggregation (someParams);
    Workflow.await (() → secondEnded);
    Async.function(activity::generateReport, someParams));
  3. activity.callElaboration will have the task of writing to the RabbitMQ queue passing the parameters and references of the workflow
  4. FirstMS has inside the listener that will read on the previously defined queue and will use the parameters to read the files and normalize the data (operation that can last up to 6 hours)
  5. at the end of the operation, FirstMS will have the task of writing to another RabbitMQ queue passing the results and references of the workflow
  6. WFManager has inside the listener that will read on the previously defined queue and will trigger a SignalMethod to resume the workflow
  7. activity.callAggregation will have the task of writing to the RabbitMQ queue passing the parameters and the workflow references
  8. SecondMS has inside the listener that will read on the previously defined queue and will use the parameters to read the parameters sent and aggregate the data (operation that can last a maximum of a few hours)
  9. at the end of the operation, SecondMS will have the task of writing to another RabbitMQ queue passing the results and references of the workflow
  10. WFManager has inside the listener that will read on the previously defined queue and will trigger a SignalMethod to terminate the workflow, calling the activity.generateReport (someParams) method which will trigger a call via RestTemplate on an api present on another microservice

I hope I have made an idea of ​​what we want to achieve.
If I want to eliminate the procedures that write/read from the queues, process and generate a Signal, what could I do?
I miss the link to launch all the various ActivityMethods directly from Temporal, without an orchestration.
Thanks in advance

Here is an alternative much simpler architecture that uses Temporal to its full potential.


Your worklfow becomes:

firstService.callElaboration(someParams);
secondService.callAggregation(someParams);
secondService.generateReport(someParams);
1 Like

Great solution!
Fits perfectly with what I want to achieve.
I have only one last doubt.
How can I call an activity present on another from one microservice?
When I register my workflow implementations in the worker, I also need to define the activities associated with it.
Being decoupled, how could I make it happen?
In my POC, I realized this with only a Springboot Application and I’ve all the references inside my prj:

Worker

@Configuration
public class WorkerManager {

    public WorkerManager() {
      WorkflowServiceStubs service = WorkflowServiceStubs.newInstance();
      WorkflowClient client = WorkflowClient.newInstance(service);
      WorkerFactory factory = WorkerFactory.newInstance(client);

      Worker firstWorker = factory.newWorker("FIRST_TASK_QUEUE");
      firstWorker .registerWorkflowImplementationTypes(FirstWorkflowImpl.class);
      firstWorker .registerActivitiesImplementations(new FirstActivityImpl());

      Worker secondWorker = factory.newWorker("SECOND_TASK_QUEUE");
      secondWorker .registerWorkflowImplementationTypes(SecondWorkflowImpl.class);
      secondWorker .registerActivitiesImplementations(new SecondActivityImpl());

      factory.start();
  }
}

Workflow Impl

public class FirstWorkflowImplimplements FirstWorkflow {

ActivityOptions options = ActivityOptions.newBuilder()
        .setScheduleToCloseTimeout(Duration.ofSeconds(2))
        .setRetryOptions(RetryOptions.newBuilder().setMaximumAttempts(5).setInitialInterval(Duration.ofSeconds(5)).build())
        .build();

private final FirstActivity activity= Workflow.newActivityStub(FirstActivity.class, options);

@Override
public void executionWorkflow(Object someParams) {
    activity.firstActivity(someParams);
    activity.secondActivity(someParams);
}

firstWorker .registerActivitiesImplementations(new FirstActivityImpl());
private final FirstActivity activity= Workflow.newActivityStub(FirstActivity.class, options);

How can i implement this kind of highlighted initialization in the architecture definited above?
Could you give me an example?
Thank you so much for your help, it was very valuable!

Only activities and workflows that should be executed by a process should be registered. So your first service should have something like:

      Worker worker = factory.newWorker("FIRST_TASK_QUEUE");
      worker.registerActivitiesImplementations(new FirstActivityImpl());

Second service should have

      Worker worker = factory.newWorker("SECOND_TASK_QUEUE");
      worker.registerActivitiesImplementations(new SecondActivityImpl());

And the workflow service should have:

      Worker firstWorker = factory.newWorker("WORKFLOW_TASK_QUEUE");
      firstWorker.registerWorkflowImplementationTypes(FirstWorkflowImpl.class);

Then you have to use the appropriate task queues when creating a workflow stub:

      WorkflowOptions options = WorkflowOptions.newBuilder()
                 .setTaskQueue("WORKFLOW_TASK_QUEUE")
                 .setWorkflowId("MyWorkflow")
                 .build();
      wf = client.newWorkflowStub(MyWorkflow.class, options);

and activity stubs:

 ActivityOptions options1 = ActivityOptions.newBuilder()
             .setStartToCloseTimeout(Duration.ofSeconds(30))
             .setTaskQueue("FIRST_TASK_QUEUE")
             .build();

MyFirstServiceActivities activity = Workflow.newActivityStub(MyFirstServiceActivities.class, options1);

 ActivityOptions options2 = ActivityOptions.newBuilder()
             .setStartToCloseTimeout(Duration.ofSeconds(30))
             .setTaskQueue("SECOND_TASK_QUEUE")
             .build();

MySecondServiceActivities activity = Workflow.newActivityStub(MySecondServiceActivities.class, options2);

Also unless activities are CPU intensive you can collocate the workflow implementation with one of these services.

Perfect, as I imagined the implementation when you shared the Temporal-based architecture with me.
In the Activity stub section, I imagine it should be inserted within the worflow implementation in the WorkflowMicroservice.
But how do I initialize MyFirstServiceActivities activity = Workflow.newActivityStub (MyFirstServiceActivities.class, options1); if this class is present inside FirstMicroservice?
Should I use a sort of FeignClient or can I initialize only the interface on the WorkflowMicroservice and then define the implementation on the FirstMicroservices?
Also here, since there is decoupling, should I have a reference or can I overcome this problem?
Thank you very much Maxim

Awesome question!

I see two main options.

  1. Separate activity and workflow interfaces into separate libraries and make both workflow and activity implementation depend on them.
  2. Use untyped API to invoke activities and workflows.
ActivityStub untyped = Workflow.newUntypedActivityStub(options);
<returnType> result = untyped.execute("activityTypeName", <returnType>, <args>...);

Creating a library was the only idea that came to my mind, but that I quickly put aside so as not to create a strong coupling with microservices. Point 2, on the other hand, seems to me an excellent compromise to create the abstraction I was looking for! Seeing the snippet you wrote me, I believe it is possible to call the Activity directly. Can I also call a specific method within an Activity, using untyped? Or calling activityTypeName calls me directly the only method inside it?

Can I also call a specific method within an Activity, using untyped? Or calling activityTypeName calls me directly the only method inside it?

By default, each method of an activity interface is registered as a separate activity type with the method name. So you can call each specific method using untyped.

Sorry for the stress :slight_smile: but having found no documentation about using untype, I checked the java SDK test source (WorkflowTest.java).
Everything works, but only if I define in the worker the real implementation of the Activity.
If I don’t insert it, it timeout after the time defined in ScheduleToCloseTimeout.

WORKER

Worker untypedWorker = factory.newWorker(Constants.UNTYPED_COMPLEX_TASK_QUEUE);
untypedWorker.registerWorkflowImplementationTypes(UntypedWorkflowImpl.class);

WORKFLOW

public class UntypedWorkflowImpl implements UntypedWorkflow {

    ActivityOptions options = ActivityOptions.newBuilder().setScheduleToCloseTimeout(Duration.ofSeconds(5)).build();
    final ActivityStub untyped = Workflow.newUntypedActivityStub(options);

    @Override
    public void simpleMethodWorkflow(String param) {
        System.out.println("Running at: " + new Date());
        String result = untyped.execute("MyMethod", String.class, param);
        System.out.println("Calling First Activity and return: " + result);
    }

}

ACTIVITY

public class UntypedImpl implements Untyped {

    @Override
    public String myMethod(String param) {
        return "** Inside UNTYPED with param: " + param + "! **";
    }

}

If I launch the application like this, I get
io.temporal.failure.TimeoutFailure: message = ‘activity timeout’, timeoutType = TIMEOUT_TYPE_SCHEDULE_TO_START
If instead I insert
untypedWorker.registerActivitiesImplementations(new UntypedImpl()); on the WORKER everything works perfectly, but I return to a coupling level that I would like to avoid in the architecture phase.
Do I have to enter this or can I implicitly register the activity, or do I have to register the activity on the workflow?
I would like not to register the activity and make sure that everything can work, in order to decouple the Workflow microservice from the various Business microservices, which should have all the implementations of the activities inside them.

Make sure that you specify the task queue in the activity options:

 ActivityOptions options = ActivityOptions.newBuilder()
             .setStartToCloseTimeout(Duration.ofSeconds(30))
             .setTaskQueue("UNTYPED_ACTIVITY_TASK_QUEUE")
             .build();
    final ActivityStub untyped = Workflow.newUntypedActivityStub(options);

And then run your activity implementation in a worker that listens on that task queue name.

public class UntypedWorkflowImpl implements UntypedWorkflow {

    ActivityOptions options = ActivityOptions.newBuilder()
            .setScheduleToCloseTimeout(Duration.ofSeconds(30))
            .setTaskQueue(Constants.UNTYPED_COMPLEX_TASK_QUEUE)
            .build();
    final ActivityStub untyped = Workflow.newUntypedActivityStub(options);

    @Override
    public void simpleMethodWorkflow(String param) {
        System.out.println("Running at: " + new Date());
        String result = untyped.execute("MyMethod", String.class, param);
        System.out.println("Calling First Activity and return: " + result);
    }

}

I added the TaskQueue part, but I get the same error.
As if the listener was not triggered, it times out.
For completeness of information, I also attach the way in which I trigger the execution of the workflow, in case there was any error.

@GetMapping("untyped")
 public void untypedWorkflow() {
        WorkflowServiceStubs service = WorkflowServiceStubs.newInstance();
        WorkflowClient client = WorkflowClient.newInstance(service);
        WorkflowOptions options = WorkflowOptions.newBuilder()
                .setTaskQueue(Constants.UNTYPED_COMPLEX_TASK_QUEUE)
                .build();
        UntypedWorkflow workflow = client.newWorkflowStub(UntypedWorkflow.class, options);
        workflow.simpleMethodWorkflow("call workflow untyped");
 }

Is there something wrong that doesn’t start the Untyped ActivityStub?

How do you initialize workflow and activity worker? Make sure that different workers use different task queue names.

Workflow

WorkflowServiceStubs service = WorkflowServiceStubs.newInstance();
WorkflowClient client = WorkflowClient.newInstance(service);
WorkerFactory factory = WorkerFactory.newInstance(client);
Worker untypedWorker = factory.newWorker(Constants.UNTYPED_COMPLEX_TASK_QUEUE);
untypedWorker.registerWorkflowImplementationTypes(UntypedWorkflowImpl.class);
factory.start()

Activity

Worker untypedActivity = factory.newWorker(Constants.UNTYPED_ACTIVITY_TASK_QUEUE);
untypedActivity.registerActivitiesImplementations(new UntypedImpl());

So it works, but I don’t have access to the activity implementation (untypedActivity.registerActivitiesImplementations (new UntypedImpl ())) which is in the other microservice. If I remove this line of code, it timeout because the activity is not registered.Can I not implement the Activity implicitly and do it at runtime, without having the implementation? (because present on a microservice connected to the workflow microservice)
The implementation of Workflow and Activity is in the previous post. I updated the TaskQueue because it was the same.

You specify UNTYPED_ACTIVITY_TASK_QUEUE when starting the activity worker.

You specify UNTYPED_COMPLEX_TASK_QUEUE when creating the activity stub.

Make sure that the same name is used to host and invoke the activity. Otherwise you put the activity task in one queue and listen on another one.

I tried to create a prj with just this test case, to show you the problem. In case you could look at it I would be grateful.
In the temporal-server folder there is the compose for the server.
The application is a springboot application which, by calling the localhost:8080/worker/untyped api, calls the flow we are talking about.
In the WorkerManager class, line 22 there is the line of code that, if uncommented, makes the flow work but it is what I would not want to put for the decoupling I would like to create.

Thanks a lot as always

I’m confused.

In your original post, you said that you want to run your activities in a separate process. So I assumed that you have a separate process that starts a worker listening on its own task queue.

In the demo.tar.gz there is only one worker configured through WorkerManager. In this case, you have to register the activity with the worker of that process as at least one implementation of the activity is needed.

Would you explain what are you trying to achieve at this point?