Integration with Spring and self registration of workflows and activities

Hi. I have three main questions.

1. How would I “selfregister” workflows? For example, instead of doing

worker.registerWorkflowImplementationTypes(HelloWorldImpl.class);

in the main function for every single workflow, there would be an annotation that can handle the registration automatically. E.g.

@WorkflowInterface
public interface HelloWorld {
    @WorkflowMethod
    String sayHello(String semanticId, Worker worker);
}

// SelfRegister in substitution of worker.registerWorkflowImplementationTypes
@SelfRegisterWorkflow(taskqueue = "TaskQueue1",  /* other options */)
public class HelloWorldImpl implements HelloWorld {
    private final SayHiActivity activities;

    public HelloWorldImpl() {
        ActivityOptions options = ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(20)).build();
        this.activities = Workflow.newActivityStub(SayHiActivity.class, options);
    }

    @Override
    public String sayHello() {
      return activities.sayHi();
    }
}

I know this can be done manually with Reflections, but I don’t know if Temporal provides (maybe not now but in the future) a way to register workflows like this.

2. Is there a way to use an activity (like the one in the previous example) in a way that does not require to be registered? Or that can be “self registered” like the previous example.

My goal is not to have the main function of my app bloated with worker.registerWorkflowImplementationTypes() and worker.registerActivitiesImplementations().

3. There’s also the issue with Spring. Let’s say I want to execute a workflow on a POST request and return the result from the execution. I would do this in the Spring controller (or a service), but first I need access to the WorkflowClient (so I can call client.newWorkflowStub and then the workflow method; sayHello() in this case). What is the ideal way to proceed? Making WorkflowClient a singleton, for example? Since autowiring it is not possible (at least for know, since I’ve read that Temporal aims to integrate with Spring).

Thank you.

1 Like

We have plans to add Spring Boot integration. Until then you will have to write your own factories.

2 Likes

Hi @maxim, thank you. Do you have an ETA on this?

No ETA yet. But it is a very frequently asked feature. So it is pretty high in the priority list.

1 Like

Here’s the beginnings of my Spring configuration (below).

I take an @Autowired(required=false) dependency on the TemporalConnection class in my Spring AppRunner class in order to initialize the temporal instance with the namespace, etc.

My controller takes a dependency on the WorkflowClient, which is auto-wired with the bean declared in the configuration.

I’ve had the same question about how to better plug into the Spring framework for auto-discovery of workflows and activities. While I haven’t attempted it yet, my guess is the implementation will involve a mix of declaring @Beans for the activity and workflow stubs and using the Spring API for finding all beans that implement the relevant interfaces (or maybe that have a custom annotation like @TemporalActivity or @TemporalWorkflowImplementation).

So, while this doesn’t solve all of your wish list, it’s a start. If I do end up building something fancier, I’ll share it.

@Configuration
@Profile("temporal")
public class TemporalConfig {
    public static final String ACTIVITY_QUEUE = "activities";
    public static final String WORKFLOW_QUEUE = "workflows";

    @Value("${temporal.serviceAddress}")
    private String serviceAddress;

    @Value("${temporal.namespace}")
    private String namespace;

    @Bean
    public WorkerFactory workerFactory(WorkflowClient workflowClient) {
        return WorkerFactory.newInstance(workflowClient);
    }

    @Bean
    public WorkflowClient workflowClient(WorkflowServiceStubs workflowServiceStubs) {
        return WorkflowClient.newInstance(
                workflowServiceStubs,
                WorkflowClientOptions.newBuilder().setNamespace(namespace).build()
        );
    }

    @Bean
    public WorkflowServiceStubs workflowServiceStubs() {
        return WorkflowServiceStubs.newInstance(WorkflowServiceStubsOptions.newBuilder().setTarget(serviceAddress).build());
    }

    @Slf4j
    @Component
    @Profile("temporal")
    public static class TemporalConnection {
        @Value("${temporal.namespace}")
        private String namespace;

        @Value("${temporal.workflowRetentionDays}")
        private int workflowRetentionDays;

        private final WorkflowServiceStubs service;
        private final WorkerFactory factory;

        public TemporalConnection(
                WorkflowServiceStubs workflowServiceStubs,
                WorkerFactory factory
        ) {
            this.factory = factory;
            this.service = workflowServiceStubs;
        }

        public void init() {
            initNamespace();
            startWorkers();
            log.info("Temporal connection initialized");
        }

        private void startWorkers() {
            Worker workflowWorker = factory.newWorker(WORKFLOW_QUEUE);
            workflowWorker.registerWorkflowImplementationTypes(HelloWorldImpl.class);

            Worker activityWorker = factory.newWorker(ACTIVITY_QUEUE);
            activityWorker.registerActivitiesImplementations(new HelloActivityImpl("Hello"));

            factory.start();
        }

        @SneakyThrows
        private void initNamespace() {
            Duration retention = Duration.newBuilder().setSeconds(60 * 60 * 24 * 7L).build();
            boolean connected = false;
            int counter = 0;
            while (!connected && counter < 600) { // ~20 minutes max
                try {
                    log.info("Registering namespace \"{}\" with a retention period of {} days",
                            namespace,
                            workflowRetentionDays
                    );
                    RegisterNamespaceRequest request =
                            RegisterNamespaceRequest.newBuilder()
                                    .setName(namespace)
                                    .setWorkflowExecutionRetentionPeriod(retention)
                                    .build();
                    RegisterNamespaceResponse response = service.blockingStub().registerNamespace(request);

                    connected = response.isInitialized();
                    if (!response.isInitialized()) {
                        log.warn(response.getInitializationErrorString());
                    }

                } catch (StatusRuntimeException ex) {
                    if (ex.getStatus().getCode() == Status.ALREADY_EXISTS.getCode()) { // Need to compare codes for successful equality
                        log.info("Domain \"{}\" already exists", namespace);
                        UpdateNamespaceRequest request = UpdateNamespaceRequest.newBuilder()
                                .setName(namespace)
                                .setConfig(NamespaceConfig.newBuilder().setWorkflowExecutionRetentionTtl(retention).build())
                                .build();
                        UpdateNamespaceResponse response = service.blockingStub().updateNamespace(request);
                        connected = response.isInitialized();
                    } else {
                        log.error("Cannot connect to Temporal service.  Waiting for 2 seconds...", ex);
                    }
                } catch (Exception e) {
                    log.error("Cannot connect to Temporal service.  Waiting for 2 seconds...", e);
                }
                Thread.sleep(2000);
                counter++;
            }
        }
    }
}

The controller:

@Profile("temporal")
@RestController
@RequestMapping("/message")
@Api("Message")
public class HelloController {

    private final WorkflowClient workflowClient;

    public HelloController(WorkflowClient workflowClient) {this.workflowClient = workflowClient;}

    @ApiOperation(value = "Ask the server to say something to you", nickname = "hello")
    @GetMapping()
    public Map<String, String> message(@ApiParam @RequestParam String name) {

        HelloWorld helloWorld = workflowClient.newWorkflowStub(
                HelloWorld.class,
                WorkflowOptions.newBuilder()
                        .setTaskQueue(TemporalConfig.WORKFLOW_QUEUE)
                        .setWorkflowRunTimeout(Duration.ofMinutes(1))
                        .setWorkflowExecutionTimeout(Duration.ofMinutes(1))
                        .build()
        );

        String message = helloWorld.tssWorkflow(name);

        Map<String, String> status = new HashMap<>();
        status.put("message", message);
        return status;
    }
}
1 Like

@maxim

No ETA yet. But it is a very frequently asked feature. So it is pretty high in the priority list.

is it available to use? If not , is it advised to use temporal-java sdk with spring , considering your comments here .
Temporal with springboot - Temporal

We are analyzing various workflows for an urgent and high priority requirements , please suggest

I believe those comments target a native integration of the Temporal Java SDK and Spring, as in allowing workflow lifecycles to be managed by the Spring container, have out of box support for dependency injection inside workflows, etc.
You can still use Temporal with different frameworks such as SpringBoot and / or Quarkus(sample here), but until a native integration with these frameworks is awailable, you have to follow some restrictions such as not being able to use the framework lifecycle management for your workflows.

Hope this helps.

For anyone looking for a generic Spring way to locate WorkflowInterface and ActivityInterface implementation classes to register, I use this Spring bean. It works pretty well.

import com.google.protobuf.Duration;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.temporal.activity.ActivityInterface;
import io.temporal.api.namespace.v1.NamespaceConfig;
import io.temporal.api.workflowservice.v1.RegisterNamespaceRequest;
import io.temporal.api.workflowservice.v1.RegisterNamespaceResponse;
import io.temporal.api.workflowservice.v1.UpdateNamespaceRequest;
import io.temporal.api.workflowservice.v1.UpdateNamespaceResponse;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.worker.Worker;
import io.temporal.worker.WorkerFactory;
import io.temporal.workflow.Workflow;
import io.temporal.workflow.WorkflowInterface;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.ClassPathScanningCandidateComponentProvider;
import org.springframework.core.type.filter.AnnotationTypeFilter;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

@Component
public class TemporalConnection {

    @Autowired
    private WorkflowServiceStubs service;

    @Autowired
    private WorkerFactory factory;

    @Autowired
    private WorkflowConfig config;

    @Autowired
    private ApplicationContext context;

    @PostConstruct
    public void init() throws InterruptedException {
        initNamespace();
        startWorkers();
    }

    private void startWorkers() {
        String workflowQueue = config.getWorkflowQueue();

        Worker workflowWorker = factory.newWorker(workflowQueue);

        //Scan the classpath to all classes that are workflow implementations
        // We can't use the context.getBeansWithAnnotation method as
        // workflow implementation classes can't be spring bean instances
        // ... i.e. they can't have @Component annotations
        Class[] workflowImplementations = findWorkflowImplementations();

        for (Class<?> workflowClass : workflowImplementations) {
            logger.info("Registering workflow implementation: {}", workflowClass);
            workflowWorker.registerWorkflowImplementationTypes(workflowClass);
        }

        //Get any beans annotated with @ActivityInterface and register them
        Map<String, Object> activities = context.getBeansWithAnnotation(ActivityInterface.class);
        for (Object activity : activities.values()) {
            workflowWorker.registerActivitiesImplementations(activity);
        }

        factory.start();
    }

    public Class<?>[] findWorkflowImplementations() {
        final List<Class<?>> result = new LinkedList<Class<?>>();
        final ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(
                false, context.getEnvironment());
        provider.addIncludeFilter(new AnnotationTypeFilter(WorkflowInterface.class, false, true));
        for (String location : config.getContextConfigLocations()) {
            for (BeanDefinition beanDefinition : provider
                    .findCandidateComponents(location)) {
                try {
                    String beanClassName = beanDefinition.getBeanClassName();
                    logger.info("Found workflow implementation: {}", beanClassName);
                    result.add(Class.forName(beanClassName));
                } catch (ClassNotFoundException e) {
                    logger.warn(
                            "Could not resolve class object for bean definition", e);
                }
            }
        }

        return result.toArray(new Class<?>[result.size()]);
    }

    private void initNamespace() throws InterruptedException {
        Duration retention = Duration.newBuilder().setSeconds(60 * 60 * 24 * 7L).build();
        boolean connected = false;
        int counter = 0;
        while (!connected && counter < 600) { // ~20 minutes max
            try {
                RegisterNamespaceRequest request =
                        RegisterNamespaceRequest.newBuilder()
                                .setNamespace(config.getNamespace())
                                .setWorkflowExecutionRetentionPeriod(retention)
                                .build();
                RegisterNamespaceResponse response = service.blockingStub().registerNamespace(request);

                connected = response.isInitialized();

            } catch (StatusRuntimeException ex) {
                if (ex.getStatus().getCode() == Status.ALREADY_EXISTS.getCode()) { // Need to compare codes for successful equality
                    UpdateNamespaceRequest request = UpdateNamespaceRequest.newBuilder()
                            .setNamespace(config.getNamespace())
                            .setConfig(NamespaceConfig.newBuilder().setWorkflowExecutionRetentionTtl(retention).build())
                            .build();
                    UpdateNamespaceResponse response = service.blockingStub().updateNamespace(request);
                    connected = response.isInitialized();
                }
            } catch (Exception e) {
                throw e;
            }
            Thread.sleep(2000);
            counter++;
        }
    }
}
3 Likes

Hi @arnesenfamily
Is your activities class marked with @Component? I tried your startWorkers method, but got an error:

Caused by: java.lang.IllegalArgumentException: Class doesn't implement any non empty interface annotated with @ActivityInterface: xxxxxxxxActivityImpl$$EnhancerBySpringCGLIB$$40723738
	at io.temporal.common.metadata.POJOActivityImplMetadata.<init>(POJOActivityImplMetadata.java:109) ~[temporal-sdk-0.11.0+1-0-7.jar:0.11.0+1-0-7]
	at io.temporal.common.metadata.POJOActivityImplMetadata.newInstance(POJOActivityImplMetadata.java:53) ~[temporal-sdk-0.11.0+1-0-7.jar:0.11.0+1-0-7]
	at io.temporal.internal.sync.POJOActivityTaskHandler.registerActivityImplementation(POJOActivityTaskHandler.java:110) ~[temporal-sdk-0.11.0+1-0-7.jar:0.11.0+1-0-7]
	at io.temporal.internal.sync.POJOActivityTaskHandler.registerActivityImplementations(POJOActivityTaskHandler.java:179) ~[temporal-sdk-0.11.0+1-0-7.jar:0.11.0+1-0-7]
	at io.temporal.internal.sync.SyncActivityWorker.registerActivityImplementations(SyncActivityWorker.java:55) ~[temporal-sdk-0.11.0+1-0-7.jar:0.11.0+1-0-7]
	at io.temporal.worker.Worker.registerActivitiesImplementations(Worker.java:326) ~[temporal-sdk-0.11.0+1-0-7.jar:0.11.0+1-0-7]

Yes, activities are annotated with @Component and implement an interface that is annotated with @ActivityInterface