Sorry, maybe I expressed myself badly or the example I gave you is not in line with what I would like to do.
I would like to have the microservice that manages only the workflows and the various microservices that have the implementation of the activities to be called from the workflow microservice. The prj I sent you was just a test to figure out how to call an untype activity from a workflow, without register or in the workflow, if possible. Maybe not the correct example, but I needed it for the creation of a poc
The only problem with your sample is that it uses the same task queue name as the workflow one. As you want your activity to run in a separate worker use a different task queue name for invoking the activity. Otherwise your code looks fine.
This assumes that the activity implementation will be registered with a different worker that listens on that different task queue name.
I think misunderstanding is in what activity registration means.
There is no need to register an activity with a worker to invoke it. The registration is needed only to associate an activity type with an actual implementation to be able to execute activities of the given type.
Ok, great! thank you very much for the clarification, it was still not very clear to me. I thought the activities always had to be recorded too. Taking the example I attached to you, I changed the value of .setTaskQueue (âNEW_NAME_TASK_QUEUEâ) on UntypedWorkflowImp. The error I get is the one that haunts me for a few days now
io.temporal.failure.ActivityFailure: scheduledEventId=5, startedEventId=0, activityType='MyMethod', activityId='c04293ce-de95-3e1b-bfcd-84dcb8404923', identity='', retryState=RETRY_STATE_NON_RETRYABLE_FAILURE
at java.lang.Thread.getStackTrace(Thread.java:1559) ~[na:1.8.0_275]
at io.temporal.internal.sync.ActivityStubBase.execute(ActivityStubBase.java:48) ~[temporal-sdk-1.0.5.jar:na]
at io.temporal.internal.sync.ActivityStubBase.execute(ActivityStubBase.java:33) ~[temporal-sdk-1.0.5.jar:na]
at com.example.demo.workflow.UntypedWorkflowImpl.simpleMethodWorkflow(UntypedWorkflowImpl.java:24) ~[classes/:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_275]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_275]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_275]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_275]
at io.temporal.internal.sync.POJOWorkflowImplementationFactory$POJOWorkflowImplementation$RootWorkflowInboundCallsInterceptor.execute(POJOWorkflowImplementationFactory.java:319) ~[temporal-sdk-1.0.5.jar:na]
at io.temporal.internal.sync.POJOWorkflowImplementationFactory$POJOWorkflowImplementation.execute(POJOWorkflowImplementationFactory.java:277) [temporal-sdk-1.0.5.jar:na]
at io.temporal.internal.sync.WorkflowExecuteRunnable.run(WorkflowExecuteRunnable.java:53) [temporal-sdk-1.0.5.jar:na]
at io.temporal.internal.sync.SyncWorkflow.lambda$start$0(SyncWorkflow.java:126) [temporal-sdk-1.0.5.jar:na]
at io.temporal.internal.sync.CancellationScopeImpl.run(CancellationScopeImpl.java:101) ~[temporal-sdk-1.0.5.jar:na]
at io.temporal.internal.sync.WorkflowThreadImpl$RunnableWrapper.run(WorkflowThreadImpl.java:107) ~[temporal-sdk-1.0.5.jar:na]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_275]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_275]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_275]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_275]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_275]
Caused by: io.temporal.failure.TimeoutFailure: message='activity timeout', timeoutType=TIMEOUT_TYPE_SCHEDULE_TO_START
2021-02-16 01:00:32.146 ERROR 5179 --- [nio-8080-exec-1] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is io.temporal.client.WorkflowFailedException: workflowId='eea282d6-11a0-4319-b556-2c4559cec01e', runId='245aa5aa-03fe-4434-ac17-de166cd36e52', workflowType='UntypedWorkflow', retryState=RETRY_STATE_RETRY_POLICY_NOT_SET, workflowTaskCompletedEventId=9] with root cause
io.temporal.failure.TimeoutFailure: message='activity timeout', timeoutType=TIMEOUT_TYPE_SCHEDULE_TO_START
Starting from the example, could you tell me how to overcome this point? It is the last open point I have for the definition of the definitive architecture.
I couldnât have done it without you
So this message means that you scheduled an activity to a task queue and it wasnât picked up by a worker within ScheduleToStartTimeout
.
could you tell me how to overcome this point?
Start another worker that listens on the NEW_NAME_TASK_QUEUE
and has the activity implementation registered with it.
Watch this video. Starting from 9:50 it explains how task queues are used to dispatch tasks to activities. It talks about Cadence (which Temporal was forked from), but the idea is still the same. Cadence used name âtask listâ for task queues.
Thanks for the video, it gave me a very large detail about the activities.
I restructured the poc correctly, using 2 springboot apps where the first manages only the workflow on one queue, while the second manages only the activity on another queue. The communication between the 2 happens directly via untypedActivity and everything works great!
Thank you so much for all the support these days, Temporal is at the top !
For the communication between two services the untypedActivity thatâs used, is the registerActivitiesImplementations is residing in your second service where your activity implementation is, since the param requires to reference the class? Any code snippet if possible to share would be great to illustrate the communication happening in two services. Thank you.
Worker untypedActivity = factory.newWorker(Constants.UNTYPED_ACTIVITY_TASK_QUEUE);
untypedActivity.registerActivitiesImplementations(new UntypedImpl());
For communication both services would need to talk to the same Temporal service, and know the namespace and task queue (endpoint) of the other. Each service would need a worker that has the workflows and activities registered that the service supports.
You donât have to use untyped approach if you donât want, typed approach is also possible.
For service A to invoke an workflow on service B, or to invoke an activity inside its workflow code of service B you only need the workflow/activity interface, not the actual impl which resides on service B.
See this demo that shows this, specifically the âgameâ service which only has the workflow interface and uses it to start a workflow execution in the java positioning service that has the impl. Note that this works with Temporal even across programming languages, for example the game service uses the exact same java workflow interface to invoke Go, TS, as well as PHP workflows running on different services. Same can be achieved with activities as well (only interface needed for typed approach).
Hope this helps.
Thank you for your explanation, that was helpful.
Iâm following this tutorial and have multiple services containing activities. I invoke these using a central service containing workflow and untyped activityStub. I want an endpoint in my central service which can schedule a workflow having a sequence of activities. But this requires the central service to have the name of these activities from the user. Is there a better design than passing raw string of function names?
You can create a library with activity interfaces that is shared by the services.