Temporal with springboot

Hello,

I am trying to run temporal with spring boot. I have annotated my activities in temporal workflow with @Component. However have used no annotation on workflow.

When I try to run activity from workflow, I run into this exception which says
IllegalStateException("Operation allowed only while eventLoop is running");

@WorkflowInterface
public interface SampleWorkflow {

    @WorkflowMethod
    void startWorkflow(final Request workflowRequest);
}
public void startWorkflow(final Request workflowRequest) {

        WorkflowOptions workflowOptions = WorkflowOptions
            .newBuilder()
            .setWorkflowId("SampleWorkflow")
            .setTaskQueue("SampleQueue")
            .build();

        InvestCryptoReconWorkflow workflow = workflowClient.newWorkflowStub(SampleWorkflow.class, workflowOptions);
        WorkflowClient.start(workflow::startWorkflow, workflowRequest);
    }

PersistenceActivity is persisting a new record in database.

@Override
    public void startWorkflow(final Request workflowRequest) {
        try {
            persistenceActivity = Workflow.newLocalActivityStub(PersistenceActivity.class, LocalActivityOptions
                .newBuilder()
                .setScheduleToCloseTimeout(Duration.ofSeconds(5))
                .build()
            );
            persistenceActivity.saveRecord("hello", "world");

        } catch (Exception e) {
            log.error("Unable to run workflow due to " + e.getMessage());
        }
    }

I am not sure if this is due to spring boot or due to workflow. What are the best practices when using temporal with springboot ?

Exception Stacktrace

java.lang.RuntimeException: LocalActivity: failure executing MARKER_COMMAND_CREATED->RECORD_MARKER, transition history is [CREATED->CHECK_EXECUTION_STATE, EXECUTING->SCHEDULE, REQUEST_PREPARED->MARK_AS_SENT, REQUEST_SENT->HANDLE_RESULT] 
io.temporal.internal.statemachines.StateMachine.executeTransition(StateMachine.java:140)
io.temporal.internal.statemachines.StateMachine.handleCommand(StateMachine.java:101)
io.temporal.internal.statemachines.EntityStateMachineBase.handleCommand(EntityStateMachineBase.java:51)
io.temporal.internal.statemachines.CancellableCommand.handleCommand(CancellableCommand.java:63)
io.temporal.internal.statemachines.WorkflowStateMachines.prepareImpl(WorkflowStateMachines.java:339)
io.temporal.internal.statemachines.WorkflowStateMachines.prepareCommands(WorkflowStateMachines.java:322)
io.temporal.internal.statemachines.WorkflowStateMachines.handleLocalActivityCompletion(WorkflowStateMachines.java:693)
io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.waitAndProcessLocalActivityCompletion(ReplayWorkflowRunTaskHandler.java:316)
io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.processLocalActivityRequests(ReplayWorkflowRunTaskHandler.java:292)
io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTask(ReplayWorkflowRunTaskHandler.java:151)
io.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTaskWithEmbeddedQuery(ReplayWorkflowTaskHandler.java:201)
io.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTask(ReplayWorkflowTaskHandler.java:114)
io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:319)
io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:279)
io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:73)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)

Caused By: io.temporal.internal.sync.PotentialDeadlockException: Potential deadlock detected: workflow thread "workflow-method" didn't yield control for over a second. Other workflow threads:


 
com.fasterxml.jackson.databind.ser.impl.ReadOnlyClassToSerializerMap.<init>(ReadOnlyClassToSerializerMap.java:32)
com.fasterxml.jackson.databind.ser.impl.ReadOnlyClassToSerializerMap.from(ReadOnlyClassToSerializerMap.java:55)
com.fasterxml.jackson.databind.ser.SerializerCache._makeReadOnlyLookupMap(SerializerCache.java:62)
com.fasterxml.jackson.databind.ser.SerializerCache.getReadOnlyLookupMap(SerializerCache.java:54)
com.fasterxml.jackson.databind.SerializerProvider.<init>(SerializerProvider.java:233)
com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.<init>(DefaultSerializerProvider.java:70)
com.fasterxml.jackson.databind.ser.DefaultSerializerProvider$Impl.<init>(DefaultSerializerProvider.java:614)
com.fasterxml.jackson.databind.ser.DefaultSerializerProvider$Impl.createInstance(DefaultSerializerProvider.java:628)
com.fasterxml.jackson.databind.ser.DefaultSerializerProvider$Impl.createInstance(DefaultSerializerProvider.java:606)
com.fasterxml.jackson.databind.ObjectMapper._serializerProvider(ObjectMapper.java:4389)
com.fasterxml.jackson.databind.ObjectMapper._writeValueAndClose(ObjectMapper.java:4407)
com.fasterxml.jackson.databind.ObjectMapper.writeValueAsBytes(ObjectMapper.java:3685)
io.temporal.common.converter.JacksonJsonPayloadConverter.toData(JacksonJsonPayloadConverter.java:58)
io.temporal.common.converter.DefaultDataConverter.toPayload(DefaultDataConverter.java:117)
io.temporal.common.converter.DefaultDataConverter.toPayloads(DefaultDataConverter.java:157)
io.temporal.internal.sync.SyncWorkflowContext.executeLocalActivityOnce(SyncWorkflowContext.java:267)
io.temporal.internal.sync.SyncWorkflowContext.lambda$executeLocalActivity$8761989b$1(SyncWorkflowContext.java:260)
io.temporal.internal.sync.SyncWorkflowContext$Lambda$1034/1041228117.apply(Unknown Source)
io.temporal.internal.sync.WorkflowRetryerInternal.retryAsync(WorkflowRetryerInternal.java:232)
io.temporal.internal.sync.SyncWorkflowContext.executeLocalActivity(SyncWorkflowContext.java:258)
io.temporal.internal.sync.LocalActivityStubImpl.executeAsync(LocalActivityStubImpl.java:50)
io.temporal.internal.sync.ActivityStubBase.execute(ActivityStubBase.java:38)
io.temporal.internal.sync.LocalActivityStubImpl.execute(LocalActivityStubImpl.java:29)
io.temporal.internal.sync.LocalActivityInvocationHandler.lambda$getActivityFunc$0(LocalActivityInvocationHandler.java:72)
io.temporal.internal.sync.LocalActivityInvocationHandler$Lambda$1033/1050736163.apply(Unknown Source)
io.temporal.internal.sync.ActivityInvocationHandlerBase.invoke(ActivityInvocationHandlerBase.java:70)
com.sun.proxy.$Proxy155.saveTradeRecon(Unknown Source)
com.sofi.invest.crypto.recon.workflows.InvestCryptoReconWorkflowImpl.startTradeReconWorkflow(InvestCryptoReconWorkflowImpl.java:52)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
io.temporal.internal.sync.POJOWorkflowImplementationFactory$POJOWorkflowImplementation$RootWorkflowInboundCallsInterceptor.execute(POJOWorkflowImplementationFactory.java:338)
io.temporal.internal.sync.POJOWorkflowImplementationFactory$POJOWorkflowImplementation.execute(POJOWorkflowImplementationFactory.java:296)
io.temporal.internal.sync.WorkflowExecuteRunnable.run(WorkflowExecuteRunnable.java:53)
io.temporal.internal.sync.SyncWorkflow.lambda$start$0(SyncWorkflow.java:133)
io.temporal.internal.sync.SyncWorkflow$Lambda$1032/1288652219.run(Unknown Source)
io.temporal.internal.sync.CancellationScopeImpl.run(CancellationScopeImpl.java:101)
io.temporal.internal.sync.WorkflowThreadImpl$RunnableWrapper.run(WorkflowThreadImpl.java:107)
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
java.util.concurrent.FutureTask.run(FutureTask.java:266)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)

Hello @nkatre, you should not tie your workflows and activities to any lifecycle management of the framework (spring boot/quarkus/…).

If you need to pass in any framework components (injection) to your activities, pass them via their constructor when you register the activity with the Temporal Worker.

Hope this helps.

@tihomir I have injected the dependencies in activities.

@nkatre that won’t work as the lifecycle of the activity execution is not controlled by the framework.
Take a look at this demo - temporal-patient-onboarding/WorkflowApplicationObserver.java at main · tsurdilo/temporal-patient-onboarding · GitHub
It’s quarkus, but same concepts can apply for spring boot at well.
As the workflow workers can be created by a bean managed by your framework (typically done on framework start) you can do your injections there, and pass them in as parameters to the Activity constructor when you register it with the worker.

@tihomir The activities are annotated as @Component and managed by spring

They should not be.
Workflows and Activities should not be annotated with any spring boot component annotations.

Why activities should not be annotated is unclear. Before starting workflow, we register activity implementations & for workflows we register the workflow definitions

Annotating with @Component tells Spring that your bean is available to be auto-discovered, meaning that Spring will discover and instantiate them. Within the default Spring scope @Component beans are also Singletons.
You can overwrite that default setting for specific beans with for example,
@Scope(ConfigurableBeanFactory. SCOPE_PROTOTYPE) and can define your custom scope.

Temporal Activity stubs are per Workflow object instance, and cannot be shared across Workflow instances.
They have a different scope than the one Spring provides for beans annotated with @Component.

In order for this to work we need to create a custom Spring scope (see Custom Scope in Spring | Baeldung for example) for both Workflow and Activities which fits also the Temporal model.

This should be doable and there are plans to add SpringBoot integration for Temporal, but there are no timelines for this.

At the moment best way is to not annotate your Workflows and Activities with any Spring scope annotations and like mentioned, pass the data to Activities constructor when you register it with the Worker.

Hope this helps.

@tihomir Can you share what the temporal model is to define the custom scope ?

@nkatre i was able to actually use the @Component annotation on Activities by autowiring it before registering it with a worker, see activities-component/DemoApplication.java at main · tsurdilo/activities-component · GitHub

This way it seems that the autowiring is first resolved inside the Activity impl - activities-component/DemoActivitiesImpl.java at main · tsurdilo/activities-component · GitHub

before its registered. Havent tested all possible cases but hope this helps. Is this the way you have set it up as well ?

You can clone that repo and run
mvn clean install spring-boot:run

We use SpringBoot. To handle SB-managed workflow and activity beans, I did the following:

Create annotation interfaces for workflow and activity implementations:

@Target({ ElementType.FIELD, ElementType.METHOD, ElementType.PARAMETER, ElementType.TYPE, ElementType.ANNOTATION_TYPE })
@Retention(RetentionPolicy.RUNTIME)
@Qualifier
@Component
public @interface ActivityImplementation {}


@Qualifier
@Component
@Target({ ElementType.FIELD, ElementType.METHOD, ElementType.PARAMETER, ElementType.TYPE, ElementType.ANNOTATION_TYPE })
@Retention(RetentionPolicy.RUNTIME)
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public @interface WorkflowImplementation {}

I annotate all my workflow and activity implementations with these interfaces, e.g.

@Component
@ActivityImplementation
public class MyActivityImpl { }
@Component
@WorkflowImplementation
public class MyWorkflowImpl { }
// Use an empty (no dependencies) constructor for the workflow
// The workflow code will create activity stubs at runtime
// The workflow instance has no state 
// (so no queries are possible, but we can re-use the same workflow bean for each temporal run)

Then I use a configuration class to set everything up:


@Configuration
@Profile("temporal")
public class TemporalConfig {
    public static final String ACTIVITY_QUEUE = "activities";
    public static final String WORKFLOW_QUEUE = "workflows";

    @Value("${temporal.serviceAddress}")
    private String serviceAddress;

    @Value("${temporal.namespace}")
    private String namespace;

    @Bean
    public WorkerFactory workerFactory(WorkflowClient workflowClient) {
        return WorkerFactory.newInstance(workflowClient);
    }

    @Bean
    public WorkflowClient workflowClient(WorkflowServiceStubs workflowServiceStubs) {
        var mapper = TemporalJsonConfig.objectMapper();

        return WorkflowClient.newInstance(
                workflowServiceStubs,
                WorkflowClientOptions.newBuilder()
                        .setNamespace(namespace)
                        // Be sure to keep this up-to-date with the default
                        // configuration in io.temporal.common.converter.DefaultDataConverter
                        .setDataConverter(new DefaultDataConverter(
                                new NullPayloadConverter(),
                                new ByteArrayPayloadConverter(),
                                new ProtobufJsonPayloadConverter(),
                                new JacksonJsonPayloadConverter(mapper)
                        ))
                        .build()
        );
    }

    @Bean
    public WorkflowServiceStubs workflowServiceStubs() {
        return WorkflowServiceStubs.newInstance(WorkflowServiceStubsOptions.newBuilder()
                .setTarget(serviceAddress)
                .build());
    }

    @Slf4j
    @Component
    @Profile("temporal")
    public static class TemporalConnection implements ApplicationContextAware {
        private ApplicationContext applicationContext;

        @Value("${temporal.namespace}")
        private String namespace;

        @Value("${temporal.workflowRetentionDays}")
        private int workflowRetentionDays;

        private final WorkflowServiceStubs service;
        private final WorkerFactory factory;
        private final Set<Class<?>> workflowImplementationTypes;
        private final List<Object> activityImplementations;

        public TemporalConnection(
                WorkflowServiceStubs workflowServiceStubs,
                WorkerFactory factory,
                // Spring will auto-wire these automatically
                @ActivityImplementation List<Object> activityImplementations
        ) {
            this.factory = factory;
            this.service = workflowServiceStubs;
            this.activityImplementations = activityImplementations;

            String packageName = this.getClass().getPackage().getName();

            // I scan for types, not beans, because I need to get a new bean each time
            AnnotatedTypeScanner annotatedTypeScanner = new AnnotatedTypeScanner(false, WorkflowImplementation.class);
            workflowImplementationTypes = annotatedTypeScanner.findTypes(packageName);
        }

        public void init() {
            initNamespace();
            startWorkers();
            log.info("Temporal connection initialized");
        }

        @SuppressWarnings({ "rawtypes", "unchecked" })
        private void startWorkers() {
            Worker workflowWorker = factory.newWorker(WORKFLOW_QUEUE);
            for (Class workImplType : workflowImplementationTypes) { 
                Class iface = getAnnotatedInterface(workImplType, WorkflowInterface.class);  
                log.info("Registering workflow implementation {}", workImplType.getSimpleName());
                workflowWorker.addWorkflowImplementationFactory(
                        iface,
                        () -> applicationContext.getBean(workImplType)
                );
            }

            Worker activityWorker = factory.newWorker(ACTIVITY_QUEUE);
            Object[] activities = activityImplementations.toArray();

            log.info("Registering activity implementations {}",
                    Arrays.stream(activities)
                            .map(Object::getClass)
                            .map(Class::getSimpleName)
                            .collect(Collectors.joining(", "))
            );
            activityWorker.registerActivitiesImplementations(activities);

            factory.start();
        }

        private Class<?> getAnnotatedInterface(Class<?> implType, Class<? extends Annotation> annotationType) {
            for (Class<?> iface : implType.getInterfaces()) {
                if (iface.getAnnotation(annotationType) != null) {
                    return iface;
                }
            }
            throw new IllegalStateException(
                    implType.getSimpleName() + " does not implement an interface annotated with " + annotationType.getSimpleName());
        }

        @SneakyThrows
        private void initNamespace() {
            Duration retention = Duration.newBuilder().setSeconds(60L * 60 * 24 * workflowRetentionDays).build();
            boolean connected = false;
            int counter = 0;
            while (!connected && counter < 20) { // ~20 minutes max
                try {
                    log.info("Registering namespace \"{}\" with a retention period of {} days",
                            namespace,
                            workflowRetentionDays
                    );
                    RegisterNamespaceRequest request =
                            RegisterNamespaceRequest.newBuilder()
                                    .setNamespace(namespace)
                                    .setWorkflowExecutionRetentionPeriod(retention)
                                    .build();
                    RegisterNamespaceResponse response = service.blockingStub().registerNamespace(request);

                    connected = response.isInitialized();
                    if (!response.isInitialized()) {
                        log.warn(response.getInitializationErrorString());
                    }

                } catch (StatusRuntimeException ex) {
                    if (ex.getStatus().getCode() == Status.ALREADY_EXISTS.getCode()) { // Need to compare codes for successful equality
                        log.info("Domain \"{}\" already exists", namespace);
                        UpdateNamespaceRequest request = UpdateNamespaceRequest.newBuilder()
                                .setNamespace(namespace)
                                .setConfig(NamespaceConfig.newBuilder().setWorkflowExecutionRetentionTtl(retention).build())
                                .build();
                        UpdateNamespaceResponse response = service.blockingStub().updateNamespace(request);
                        connected = response.isInitialized();
                    } else {
                        log.error("Cannot connect to Temporal service.  Waiting for 1 minute...", ex);
                        Thread.sleep(60000);
                    }
                } catch (Exception e) {
                    log.error("Cannot connect to Temporal service.  Waiting for 1 minute...", e);
                    Thread.sleep(60000);
                }
                counter++;
            }

            if (!connected) {
                throw new IllegalStateException("Unable to connect to temporal server");
            }
        }

        @Override
        public void setApplicationContext(@NotNull ApplicationContext applicationContext) {
            this.applicationContext = applicationContext;
        }
    }
}
1 Like

Thanks for the great example! How do you ensure that workflow instances by mistake don’t get shared mutable dependencies?

@maxim - good question.

I think the issue of mutable dependencies in a workflow is not unique to Spring Boot. While the injection framework introduces a possibility of the wrong dependency getting injected when multiple candidates are available, ultimately it is up to the developer to ensure that mutable dependencies don’t exist in the first place. Regardless of the context, mutable dependencies make a piece of software harder to comprehend and manage, so I avoid them in general.

In my case, my workflows have no dependencies, let alone mutable ones. :slight_smile:

But perhaps to your point: SB defaults to singleton instances for the beans. In the context of a temporal workflow, this is only ok if the workflow has NO INSTANCE STATE. In this case, you won’t have any workflow queries because there is nothing to query.

If you need to have workflow instance state, then it is critical that you ensure SB returns a new bean instance for each workflow run (e.g. use the Prototype bean scope). Similarly, you would need to ensure that if any workflow dependencies have instance state, that they are also have Prototype scope.

I’m not sure if there is a way to enforce this practice. I typically adhere very closely to a functional design pattern, so none of my components have instance state, and therefore using singletons everywhere is fine. This pattern fits very nicely with temporal, where a method invocation can be long-lived and fault-tolerant, allowing the state of the workflow to reside in the call stack.

If you are going to work with mutable state, you’ll have to be careful.

As your message outlined the number of caveats a user has to understand when instantiating workflows through Spring is pretty large. My experience is that most users don’t try to understand them and just use Spring with Temporal as it is a normal application. This leads to numerous hard-to-catch bugs. So we need to find a way to preclude invalid usage before making this a supported way to work with Temporal Java SDK.

@maxim - what are your criteria for “valid usage”? Besides ensuring workflows with state are freshly instantiated for each usage, are there other concerns to address?

The issue of having mutable dependencies is not a SB issue so much as a risky design pattern in general that matters even more in the context of temporal. Assuming temporal users are aware of the mutability constraints around workflows and their dependencies (and assuming we address the bean instantiation constraints discussed above), are there any other SB-related gotchas to address?

The whole idea of using Spring as I understand is to inject dependencies. What dependencies are you interested in injecting into a workflow object? Most people get confused about what can and cannot be injected. I’ve seen a lot of attempts to inject activity implementations directly into workflow code for example.

BTW. Workflows are absolutely OK to have mutable dependencies. It is not OK to share a mutable dependency across multiple workflow objects which is a Spring default.

While the dependency injection framework is a big part of Spring, as I don’t use any workflow dependencies, the primary value-add that I see is the automatic configuration of workflows: i.e. automatically setting up the namespace and registering the workflow and activity workers.

It is convenient to simply annotate a new workflow with @WorkflowImplementation, or an activity with @ActivityImplementation, and have it auto-register with temporal.

This could be done without Spring: one could scan the package for annotated types and register them. The code I shared above pretty much does that, and the parts that rely on Spring are not too difficult to replace. For convenience, this functionality could be wrapped in a Spring @Configuration class so that users of the Java SDK need only set an attribute in the properties and start annotating classes.

But once you start auto-instantiating workflows, you need to handle workflow dependencies. This is where the dependency injection comes in.

So, the “brilliant idea” of Spring is that you annotate-and-go and don’t have to think about registering your classes in the respective frameworks (API controllers, configuration, DB repositories, etc.). But to accomplish that end, you need dependency injection.

I see. We certainly will take your ideas into account while designing the Java SDK Spring Boot support.