Springboot microservices managed by Temporal.io + RabbitMQ

Sorry, maybe I expressed myself badly or the example I gave you is not in line with what I would like to do.
I would like to have the microservice that manages only the workflows and the various microservices that have the implementation of the activities to be called from the workflow microservice. The prj I sent you was just a test to figure out how to call an untype activity from a workflow, without register or in the workflow, if possible. Maybe not the correct example, but I needed it for the creation of a poc

The only problem with your sample is that it uses the same task queue name as the workflow one. As you want your activity to run in a separate worker use a different task queue name for invoking the activity. Otherwise your code looks fine.

This assumes that the activity implementation will be registered with a different worker that listens on that different task queue name.

I think misunderstanding is in what activity registration means.

There is no need to register an activity with a worker to invoke it. The registration is needed only to associate an activity type with an actual implementation to be able to execute activities of the given type.

Ok, great! thank you very much for the clarification, it was still not very clear to me. I thought the activities always had to be recorded too. Taking the example I attached to you, I changed the value of .setTaskQueue (“NEW_NAME_TASK_QUEUE”) on UntypedWorkflowImp. The error I get is the one that haunts me for a few days now :exploding_head:

io.temporal.failure.ActivityFailure: scheduledEventId=5, startedEventId=0, activityType='MyMethod', activityId='c04293ce-de95-3e1b-bfcd-84dcb8404923', identity='', retryState=RETRY_STATE_NON_RETRYABLE_FAILURE
	at java.lang.Thread.getStackTrace(Thread.java:1559) ~[na:1.8.0_275]
	at io.temporal.internal.sync.ActivityStubBase.execute(ActivityStubBase.java:48) ~[temporal-sdk-1.0.5.jar:na]
	at io.temporal.internal.sync.ActivityStubBase.execute(ActivityStubBase.java:33) ~[temporal-sdk-1.0.5.jar:na]
	at com.example.demo.workflow.UntypedWorkflowImpl.simpleMethodWorkflow(UntypedWorkflowImpl.java:24) ~[classes/:na]
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_275]
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_275]
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_275]
	at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_275]
	at io.temporal.internal.sync.POJOWorkflowImplementationFactory$POJOWorkflowImplementation$RootWorkflowInboundCallsInterceptor.execute(POJOWorkflowImplementationFactory.java:319) ~[temporal-sdk-1.0.5.jar:na]
	at io.temporal.internal.sync.POJOWorkflowImplementationFactory$POJOWorkflowImplementation.execute(POJOWorkflowImplementationFactory.java:277) [temporal-sdk-1.0.5.jar:na]
	at io.temporal.internal.sync.WorkflowExecuteRunnable.run(WorkflowExecuteRunnable.java:53) [temporal-sdk-1.0.5.jar:na]
	at io.temporal.internal.sync.SyncWorkflow.lambda$start$0(SyncWorkflow.java:126) [temporal-sdk-1.0.5.jar:na]
	at io.temporal.internal.sync.CancellationScopeImpl.run(CancellationScopeImpl.java:101) ~[temporal-sdk-1.0.5.jar:na]
	at io.temporal.internal.sync.WorkflowThreadImpl$RunnableWrapper.run(WorkflowThreadImpl.java:107) ~[temporal-sdk-1.0.5.jar:na]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_275]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_275]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_275]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_275]
	at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_275]
Caused by: io.temporal.failure.TimeoutFailure: message='activity timeout', timeoutType=TIMEOUT_TYPE_SCHEDULE_TO_START

2021-02-16 01:00:32.146 ERROR 5179 --- [nio-8080-exec-1] o.a.c.c.C.[.[.[/].[dispatcherServlet]    : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is io.temporal.client.WorkflowFailedException: workflowId='eea282d6-11a0-4319-b556-2c4559cec01e', runId='245aa5aa-03fe-4434-ac17-de166cd36e52', workflowType='UntypedWorkflow', retryState=RETRY_STATE_RETRY_POLICY_NOT_SET, workflowTaskCompletedEventId=9] with root cause

io.temporal.failure.TimeoutFailure: message='activity timeout', timeoutType=TIMEOUT_TYPE_SCHEDULE_TO_START

Starting from the example, could you tell me how to overcome this point? It is the last open point I have for the definition of the definitive architecture.
I couldn’t have done it without you

So this message means that you scheduled an activity to a task queue and it wasn’t picked up by a worker within ScheduleToStartTimeout.

could you tell me how to overcome this point?

Start another worker that listens on the NEW_NAME_TASK_QUEUE and has the activity implementation registered with it.

Watch this video. Starting from 9:50 it explains how task queues are used to dispatch tasks to activities. It talks about Cadence (which Temporal was forked from), but the idea is still the same. Cadence used name “task list” for task queues.

2 Likes

Thanks for the video, it gave me a very large detail about the activities.
I restructured the poc correctly, using 2 springboot apps where the first manages only the workflow on one queue, while the second manages only the activity on another queue. The communication between the 2 happens directly via untypedActivity and everything works great!
Thank you so much for all the support these days, Temporal is at the top :bomb: !

1 Like

For the communication between two services the untypedActivity that’s used, is the registerActivitiesImplementations is residing in your second service where your activity implementation is, since the param requires to reference the class? Any code snippet if possible to share would be great to illustrate the communication happening in two services. Thank you.

Worker untypedActivity = factory.newWorker(Constants.UNTYPED_ACTIVITY_TASK_QUEUE);
untypedActivity.registerActivitiesImplementations(new UntypedImpl());

For communication both services would need to talk to the same Temporal service, and know the namespace and task queue (endpoint) of the other. Each service would need a worker that has the workflows and activities registered that the service supports.

You don’t have to use untyped approach if you don’t want, typed approach is also possible.
For service A to invoke an workflow on service B, or to invoke an activity inside its workflow code of service B you only need the workflow/activity interface, not the actual impl which resides on service B.

See this demo that shows this, specifically the “game” service which only has the workflow interface and uses it to start a workflow execution in the java positioning service that has the impl. Note that this works with Temporal even across programming languages, for example the game service uses the exact same java workflow interface to invoke Go, TS, as well as PHP workflows running on different services. Same can be achieved with activities as well (only interface needed for typed approach).

Hope this helps.

2 Likes

Thank you for your explanation, that was helpful.

I’m following this tutorial and have multiple services containing activities. I invoke these using a central service containing workflow and untyped activityStub. I want an endpoint in my central service which can schedule a workflow having a sequence of activities. But this requires the central service to have the name of these activities from the user. Is there a better design than passing raw string of function names?

You can create a library with activity interfaces that is shared by the services.