Integration with Spring and self registration of workflows and activities

Here’s the beginnings of my Spring configuration (below).

I take an @Autowired(required=false) dependency on the TemporalConnection class in my Spring AppRunner class in order to initialize the temporal instance with the namespace, etc.

My controller takes a dependency on the WorkflowClient, which is auto-wired with the bean declared in the configuration.

I’ve had the same question about how to better plug into the Spring framework for auto-discovery of workflows and activities. While I haven’t attempted it yet, my guess is the implementation will involve a mix of declaring @Beans for the activity and workflow stubs and using the Spring API for finding all beans that implement the relevant interfaces (or maybe that have a custom annotation like @TemporalActivity or @TemporalWorkflowImplementation).

So, while this doesn’t solve all of your wish list, it’s a start. If I do end up building something fancier, I’ll share it.

@Configuration
@Profile("temporal")
public class TemporalConfig {
    public static final String ACTIVITY_QUEUE = "activities";
    public static final String WORKFLOW_QUEUE = "workflows";

    @Value("${temporal.serviceAddress}")
    private String serviceAddress;

    @Value("${temporal.namespace}")
    private String namespace;

    @Bean
    public WorkerFactory workerFactory(WorkflowClient workflowClient) {
        return WorkerFactory.newInstance(workflowClient);
    }

    @Bean
    public WorkflowClient workflowClient(WorkflowServiceStubs workflowServiceStubs) {
        return WorkflowClient.newInstance(
                workflowServiceStubs,
                WorkflowClientOptions.newBuilder().setNamespace(namespace).build()
        );
    }

    @Bean
    public WorkflowServiceStubs workflowServiceStubs() {
        return WorkflowServiceStubs.newInstance(WorkflowServiceStubsOptions.newBuilder().setTarget(serviceAddress).build());
    }

    @Slf4j
    @Component
    @Profile("temporal")
    public static class TemporalConnection {
        @Value("${temporal.namespace}")
        private String namespace;

        @Value("${temporal.workflowRetentionDays}")
        private int workflowRetentionDays;

        private final WorkflowServiceStubs service;
        private final WorkerFactory factory;

        public TemporalConnection(
                WorkflowServiceStubs workflowServiceStubs,
                WorkerFactory factory
        ) {
            this.factory = factory;
            this.service = workflowServiceStubs;
        }

        public void init() {
            initNamespace();
            startWorkers();
            log.info("Temporal connection initialized");
        }

        private void startWorkers() {
            Worker workflowWorker = factory.newWorker(WORKFLOW_QUEUE);
            workflowWorker.registerWorkflowImplementationTypes(HelloWorldImpl.class);

            Worker activityWorker = factory.newWorker(ACTIVITY_QUEUE);
            activityWorker.registerActivitiesImplementations(new HelloActivityImpl("Hello"));

            factory.start();
        }

        @SneakyThrows
        private void initNamespace() {
            Duration retention = Duration.newBuilder().setSeconds(60 * 60 * 24 * 7L).build();
            boolean connected = false;
            int counter = 0;
            while (!connected && counter < 600) { // ~20 minutes max
                try {
                    log.info("Registering namespace \"{}\" with a retention period of {} days",
                            namespace,
                            workflowRetentionDays
                    );
                    RegisterNamespaceRequest request =
                            RegisterNamespaceRequest.newBuilder()
                                    .setName(namespace)
                                    .setWorkflowExecutionRetentionPeriod(retention)
                                    .build();
                    RegisterNamespaceResponse response = service.blockingStub().registerNamespace(request);

                    connected = response.isInitialized();
                    if (!response.isInitialized()) {
                        log.warn(response.getInitializationErrorString());
                    }

                } catch (StatusRuntimeException ex) {
                    if (ex.getStatus().getCode() == Status.ALREADY_EXISTS.getCode()) { // Need to compare codes for successful equality
                        log.info("Domain \"{}\" already exists", namespace);
                        UpdateNamespaceRequest request = UpdateNamespaceRequest.newBuilder()
                                .setName(namespace)
                                .setConfig(NamespaceConfig.newBuilder().setWorkflowExecutionRetentionTtl(retention).build())
                                .build();
                        UpdateNamespaceResponse response = service.blockingStub().updateNamespace(request);
                        connected = response.isInitialized();
                    } else {
                        log.error("Cannot connect to Temporal service.  Waiting for 2 seconds...", ex);
                    }
                } catch (Exception e) {
                    log.error("Cannot connect to Temporal service.  Waiting for 2 seconds...", e);
                }
                Thread.sleep(2000);
                counter++;
            }
        }
    }
}

The controller:

@Profile("temporal")
@RestController
@RequestMapping("/message")
@Api("Message")
public class HelloController {

    private final WorkflowClient workflowClient;

    public HelloController(WorkflowClient workflowClient) {this.workflowClient = workflowClient;}

    @ApiOperation(value = "Ask the server to say something to you", nickname = "hello")
    @GetMapping()
    public Map<String, String> message(@ApiParam @RequestParam String name) {

        HelloWorld helloWorld = workflowClient.newWorkflowStub(
                HelloWorld.class,
                WorkflowOptions.newBuilder()
                        .setTaskQueue(TemporalConfig.WORKFLOW_QUEUE)
                        .setWorkflowRunTimeout(Duration.ofMinutes(1))
                        .setWorkflowExecutionTimeout(Duration.ofMinutes(1))
                        .build()
        );

        String message = helloWorld.tssWorkflow(name);

        Map<String, String> status = new HashMap<>();
        status.put("message", message);
        return status;
    }
}
1 Like