Integration with Spring and self registration of workflows and activities

For anyone looking for a generic Spring way to locate WorkflowInterface and ActivityInterface implementation classes to register, I use this Spring bean. It works pretty well.

import com.google.protobuf.Duration;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.temporal.activity.ActivityInterface;
import io.temporal.api.namespace.v1.NamespaceConfig;
import io.temporal.api.workflowservice.v1.RegisterNamespaceRequest;
import io.temporal.api.workflowservice.v1.RegisterNamespaceResponse;
import io.temporal.api.workflowservice.v1.UpdateNamespaceRequest;
import io.temporal.api.workflowservice.v1.UpdateNamespaceResponse;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.worker.Worker;
import io.temporal.worker.WorkerFactory;
import io.temporal.workflow.Workflow;
import io.temporal.workflow.WorkflowInterface;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.ClassPathScanningCandidateComponentProvider;
import org.springframework.core.type.filter.AnnotationTypeFilter;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

@Component
public class TemporalConnection {

    @Autowired
    private WorkflowServiceStubs service;

    @Autowired
    private WorkerFactory factory;

    @Autowired
    private WorkflowConfig config;

    @Autowired
    private ApplicationContext context;

    @PostConstruct
    public void init() throws InterruptedException {
        initNamespace();
        startWorkers();
    }

    private void startWorkers() {
        String workflowQueue = config.getWorkflowQueue();

        Worker workflowWorker = factory.newWorker(workflowQueue);

        //Scan the classpath to all classes that are workflow implementations
        // We can't use the context.getBeansWithAnnotation method as
        // workflow implementation classes can't be spring bean instances
        // ... i.e. they can't have @Component annotations
        Class[] workflowImplementations = findWorkflowImplementations();

        for (Class<?> workflowClass : workflowImplementations) {
            logger.info("Registering workflow implementation: {}", workflowClass);
            workflowWorker.registerWorkflowImplementationTypes(workflowClass);
        }

        //Get any beans annotated with @ActivityInterface and register them
        Map<String, Object> activities = context.getBeansWithAnnotation(ActivityInterface.class);
        for (Object activity : activities.values()) {
            workflowWorker.registerActivitiesImplementations(activity);
        }

        factory.start();
    }

    public Class<?>[] findWorkflowImplementations() {
        final List<Class<?>> result = new LinkedList<Class<?>>();
        final ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(
                false, context.getEnvironment());
        provider.addIncludeFilter(new AnnotationTypeFilter(WorkflowInterface.class, false, true));
        for (String location : config.getContextConfigLocations()) {
            for (BeanDefinition beanDefinition : provider
                    .findCandidateComponents(location)) {
                try {
                    String beanClassName = beanDefinition.getBeanClassName();
                    logger.info("Found workflow implementation: {}", beanClassName);
                    result.add(Class.forName(beanClassName));
                } catch (ClassNotFoundException e) {
                    logger.warn(
                            "Could not resolve class object for bean definition", e);
                }
            }
        }

        return result.toArray(new Class<?>[result.size()]);
    }

    private void initNamespace() throws InterruptedException {
        Duration retention = Duration.newBuilder().setSeconds(60 * 60 * 24 * 7L).build();
        boolean connected = false;
        int counter = 0;
        while (!connected && counter < 600) { // ~20 minutes max
            try {
                RegisterNamespaceRequest request =
                        RegisterNamespaceRequest.newBuilder()
                                .setNamespace(config.getNamespace())
                                .setWorkflowExecutionRetentionPeriod(retention)
                                .build();
                RegisterNamespaceResponse response = service.blockingStub().registerNamespace(request);

                connected = response.isInitialized();

            } catch (StatusRuntimeException ex) {
                if (ex.getStatus().getCode() == Status.ALREADY_EXISTS.getCode()) { // Need to compare codes for successful equality
                    UpdateNamespaceRequest request = UpdateNamespaceRequest.newBuilder()
                            .setNamespace(config.getNamespace())
                            .setConfig(NamespaceConfig.newBuilder().setWorkflowExecutionRetentionTtl(retention).build())
                            .build();
                    UpdateNamespaceResponse response = service.blockingStub().updateNamespace(request);
                    connected = response.isInitialized();
                }
            } catch (Exception e) {
                throw e;
            }
            Thread.sleep(2000);
            counter++;
        }
    }
}
4 Likes