Hello World for multiple microservices (saga pattern)

I’m trying to create “Hello Saga” application, that consists of two temporal clients.
This is basically a POC on how to replace saga, which we currently rely heavily on pub-sub request-response through RabbitMQ.

Just say I have two services (serviceOne & serviceTwo) where serviceOne is the orchestrator.
ServiceOne and serviceTwo deployed independently, on different machines. Each has it’s own codebase.
So it trigger temporal workflow & 1st step activity. In other words, serviceOne is the 1st saga participant.
ServiceTwo needs to trigger the 2nd activity after serviceOne. So serviceTwo is 2nd saga participant.

The idea seems to has two task queue in temporal:

  • CLIENT_ONE_TASK_QUEUE : in which service one (the orchestrator) trigger the saga and ‘publish’ (dunno if this correct temporal term) to this queue
  • CLIENT_TWO_TASK_QUEUE : in which serviceTwo ‘listens’ from this queue then run it’s own activity.

In saga pattern saga, serviceOne must publish to seomewhere, let’s just say CLIENT_TWO_TASK_QUEUE. This means serviceTwo needs to ‘listen’ there (CLIENT_TWO_TASK_QUEUE). I think this is achieved via ServiceTwo temporal worker. But I’m not sure if this correct approach, and if it is, how actually the serviceOne ‘publish’ to CLIENT_TWO_TASK_QUEUE.

So on serviceOne:

ClientOneSagaActivityImpl.java

public class ClientOneSagaActivityImpl implements ClientOneSagaActivity {

	@Override
	public String triggerClientOneActivity_seq1(String identifier) {
		String res = "Entering triggerClientOneActivity_seq1 for " + identifier;

		System.out.println(res);
		return res;
	}

}

ClientOneSagaWorkflowImpl.java

public class ClientOneSagaWorkflowImpl implements ClientOneSagaWorkflow {

	private ActivityOptions options = ActivityOptions.newBuilder().setScheduleToCloseTimeout(Duration.ofSeconds(2))
			.build();

	private final ClientOneSagaActivity clientOneSagaActivity = Workflow.newActivityStub(ClientOneSagaActivity.class,
			options);

	@Override
	public String triggerClientOneWorkflow(String identifier) {
		System.out.println("Entering triggerClientOneWorkflow for " + identifier);

		clientOneSagaActivity.triggerClientOneActivity_seq1(identifier);

		return "Workflow done for " + identifier;
	}

}

ClientOneSagaWorker.java

public class ClientOneSagaWorker {

	public static void main(String[] args) {
		WorkflowServiceStubs service = WorkflowServiceStubs
				.newInstance(WorkflowServiceStubsOptions.newBuilder().setTarget(Shared.TEMPORAL_SERVER).build());
		WorkflowClient client = WorkflowClient.newInstance(service);
		WorkerFactory factory = WorkerFactory.newInstance(client);
		Worker worker = factory.newWorker(Shared.CLIENT_ONE_TASK_QUEUE);
		worker.registerWorkflowImplementationTypes(ClientOneSagaWorkflowImpl.class);
		worker.registerActivitiesImplementations(new ClientOneSagaActivityImpl());
		factory.start();
	}

}

The code for service two is identical, only difference on queue name.

ClientTwoSagaActivityImpl.java

public class ClientTwoSagaActivityImpl implements ClientTwoSagaActivity {

	@Override
	public String triggerClientTwoActivity_seq2(String identifier) {
		String res = "Entering triggerClientTwoActivity_seq2 for " + identifier;

		System.out.println(res);
		return res;
	}

}

ClientTwoSagaWorkflowImpl.java

public class ClientTwoSagaWorkflowImpl implements ClientTwoSagaWorkflow {

	private ActivityOptions options = ActivityOptions.newBuilder().setScheduleToCloseTimeout(Duration.ofSeconds(2))
			.build();

	private final ClientTwoSagaActivity clientTwoSagaActivity = Workflow.newActivityStub(ClientTwoSagaActivity.class,
			options);

	@Override
	public String triggerClientTwoWorkflow(String identifier) {
		System.out.println("Entering triggerClientTwoWorkflow for " + identifier);

		clientTwoSagaActivity.triggerClientTwoActivity_seq2(identifier);

		return "Workflow done for " + identifier;
	}

}

ClientTwoSagaWorker.java

public class ClientTwoSagaWorker {

	public static void main(String[] args) {
		WorkflowServiceStubs service = WorkflowServiceStubs
				.newInstance(WorkflowServiceStubsOptions.newBuilder().setTarget(Shared.TEMPORAL_SERVER).build());
		WorkflowClient client = WorkflowClient.newInstance(service);
		WorkerFactory factory = WorkerFactory.newInstance(client);
		Worker worker = factory.newWorker(Shared.CLIENT_TWO_TASK_QUEUE);
		worker.registerWorkflowImplementationTypes(ClientTwoSagaWorkflowImpl.class);
		worker.registerActivitiesImplementations(new ClientTwoSagaActivityImpl());
		factory.start();
	}

}

Then on the main app of the service one, it calls the workflow.

Service One main app

@SpringBootApplication
public class Application implements CommandLineRunner {

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}

	@Override
	public void run(String... args) throws Exception {
		WorkflowServiceStubs service = WorkflowServiceStubs
				.newInstance(WorkflowServiceStubsOptions.newBuilder().setTarget(Shared.TEMPORAL_SERVER).build());
		WorkflowClient client = WorkflowClient.newInstance(service);
		WorkflowOptions options = WorkflowOptions.newBuilder().setTaskQueue(Shared.CLIENT_ONE_TASK_QUEUE).build();

		ClientOneSagaWorkflow workflow = client.newWorkflowStub(ClientOneSagaWorkflow.class, options);
		String res = workflow.triggerClientOneWorkflow(RandomStringUtils.randomNumeric(6));
		System.out.println(res);
	}

}

Also on Service Two main app

@SpringBootApplication
public class Application implements CommandLineRunner {

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}

	@Override
	public void run(String... args) throws Exception {
		WorkflowServiceStubs service = WorkflowServiceStubs
				.newInstance(WorkflowServiceStubsOptions.newBuilder().setTarget(Shared.TEMPORAL_SERVER).build());
		WorkflowClient client = WorkflowClient.newInstance(service);
		WorkflowOptions options = WorkflowOptions.newBuilder().setTaskQueue(Shared.CLIENT_TWO_TASK_QUEUE).build();

		ClientTwoSagaWorkflow workflow = client.newWorkflowStub(ClientTwoSagaWorkflow.class, options);
		String res = workflow.triggerClientTwoWorkflow(RandomStringUtils.randomNumeric(6));
		System.out.println(res);
	}

}

Now when I run the application and worker onboth serviceOne and serviceTwo, there were two different workflow (different type, different ID). I can’t find any example on how to assemble serviceone and servicetwo.

Any example here? Some post seems using this use case, but cant find sample code which actually uses two services.

Thanks in advance

This might be like example in temporal booking saga. But instead of having all activities in one service, we has our own service for booking car, and different service for booking hotel. So how to assemble those two without sharing code, or expose through synchronous API? It seems possible with temporal (from several other forum posts), just no visible sample.

You don’t really need the workflow in service2, just the activity and worker that registers it.
In your service1 workflow you can invoke the activity in service2, for example:

ActivityOptions options =
            ActivityOptions.newBuilder()
                    .setTaskQueue("ServiceTwoTaskQueueName")
                    // more options if needed here...
                    .build();
    ActivityStub stub = Workflow.newUntypedActivityStub(options);
    stub.execute("myServiceTwoActivityType", String.class, input1, input2)

where String.class is the expected result type of the activity, and input1, input2 can be inputs required by the activity.

Some other possible options could be to start service2 workflow from service1 activity, to use signals, invoke service2 workflow as child workflow in service1, etc

1 Like

Works well. The code (in case anyone else need it).

on serviceTwo : ClientTwoSagaActivity.java

@ActivityInterface
public interface ClientTwoSagaActivity {

	@ActivityMethod(name = "triggerClientTwoActivity")
	String triggerClientTwoActivity_seq2(String identifier);

}

on serviceOne ClientOneSagaWorkflowImpl.java

public class ClientOneSagaWorkflowImpl implements ClientOneSagaWorkflow {

	private ActivityOptions optionsClientOne = ActivityOptions.newBuilder().setTaskQueue(Shared.CLIENT_ONE_TASK_QUEUE)
			.setScheduleToCloseTimeout(Duration.ofMinutes(5)).build();
	private ActivityOptions optionsClientTwo = ActivityOptions.newBuilder().setTaskQueue(Shared.CLIENT_TWO_TASK_QUEUE)
			.setScheduleToCloseTimeout(Duration.ofMinutes(5)).build();

	private final ClientOneSagaActivity clientOneSagaActivity = Workflow.newActivityStub(ClientOneSagaActivity.class,
			optionsClientOne);
	private final ActivityStub clientTwoSagaActivity = Workflow.newUntypedActivityStub(optionsClientTwo);

	@Override
	public String triggerClientOneWorkflow(String identifier) {
		System.out.println("Entering triggerClientOneWorkflow for " + identifier);

		clientOneSagaActivity.triggerClientOneActivity_seq1(identifier);
		clientTwoSagaActivity.execute("triggerClientTwoActivity", String.class, identifier);

		return "Workflow done for " + identifier;
	}

}

There is no need to use ActivityStub in your case. Use the strongly typed interface if it is available to the workflow code:

	private final ClientTwoSagaActivity clientTwoSagaActivity = Workflow.newActivityStub(ClientTwoSagaActivity .class, optionsClientTwo);

	@Override
	public String triggerClientOneWorkflow(String identifier) {
		System.out.println("Entering triggerClientOneWorkflow for " + identifier);

		clientOneSagaActivity.triggerClientOneActivity_seq1(identifier);
		clientTwoSagaActivity.triggerClientTwoActivity(identifier);

		return "Workflow done for " + identifier;
	}