Here’s the beginnings of my Spring configuration (below).
I take an @Autowired(required=false)
dependency on the TemporalConnection
class in my Spring AppRunner
class in order to initialize the temporal instance with the namespace, etc.
My controller takes a dependency on the WorkflowClient
, which is auto-wired with the bean declared in the configuration.
I’ve had the same question about how to better plug into the Spring framework for auto-discovery of workflows and activities. While I haven’t attempted it yet, my guess is the implementation will involve a mix of declaring @Bean
s for the activity and workflow stubs and using the Spring API for finding all beans that implement the relevant interfaces (or maybe that have a custom annotation like @TemporalActivity
or @TemporalWorkflowImplementation
).
So, while this doesn’t solve all of your wish list, it’s a start. If I do end up building something fancier, I’ll share it.
@Configuration
@Profile("temporal")
public class TemporalConfig {
public static final String ACTIVITY_QUEUE = "activities";
public static final String WORKFLOW_QUEUE = "workflows";
@Value("${temporal.serviceAddress}")
private String serviceAddress;
@Value("${temporal.namespace}")
private String namespace;
@Bean
public WorkerFactory workerFactory(WorkflowClient workflowClient) {
return WorkerFactory.newInstance(workflowClient);
}
@Bean
public WorkflowClient workflowClient(WorkflowServiceStubs workflowServiceStubs) {
return WorkflowClient.newInstance(
workflowServiceStubs,
WorkflowClientOptions.newBuilder().setNamespace(namespace).build()
);
}
@Bean
public WorkflowServiceStubs workflowServiceStubs() {
return WorkflowServiceStubs.newInstance(WorkflowServiceStubsOptions.newBuilder().setTarget(serviceAddress).build());
}
@Slf4j
@Component
@Profile("temporal")
public static class TemporalConnection {
@Value("${temporal.namespace}")
private String namespace;
@Value("${temporal.workflowRetentionDays}")
private int workflowRetentionDays;
private final WorkflowServiceStubs service;
private final WorkerFactory factory;
public TemporalConnection(
WorkflowServiceStubs workflowServiceStubs,
WorkerFactory factory
) {
this.factory = factory;
this.service = workflowServiceStubs;
}
public void init() {
initNamespace();
startWorkers();
log.info("Temporal connection initialized");
}
private void startWorkers() {
Worker workflowWorker = factory.newWorker(WORKFLOW_QUEUE);
workflowWorker.registerWorkflowImplementationTypes(HelloWorldImpl.class);
Worker activityWorker = factory.newWorker(ACTIVITY_QUEUE);
activityWorker.registerActivitiesImplementations(new HelloActivityImpl("Hello"));
factory.start();
}
@SneakyThrows
private void initNamespace() {
Duration retention = Duration.newBuilder().setSeconds(60 * 60 * 24 * 7L).build();
boolean connected = false;
int counter = 0;
while (!connected && counter < 600) { // ~20 minutes max
try {
log.info("Registering namespace \"{}\" with a retention period of {} days",
namespace,
workflowRetentionDays
);
RegisterNamespaceRequest request =
RegisterNamespaceRequest.newBuilder()
.setName(namespace)
.setWorkflowExecutionRetentionPeriod(retention)
.build();
RegisterNamespaceResponse response = service.blockingStub().registerNamespace(request);
connected = response.isInitialized();
if (!response.isInitialized()) {
log.warn(response.getInitializationErrorString());
}
} catch (StatusRuntimeException ex) {
if (ex.getStatus().getCode() == Status.ALREADY_EXISTS.getCode()) { // Need to compare codes for successful equality
log.info("Domain \"{}\" already exists", namespace);
UpdateNamespaceRequest request = UpdateNamespaceRequest.newBuilder()
.setName(namespace)
.setConfig(NamespaceConfig.newBuilder().setWorkflowExecutionRetentionTtl(retention).build())
.build();
UpdateNamespaceResponse response = service.blockingStub().updateNamespace(request);
connected = response.isInitialized();
} else {
log.error("Cannot connect to Temporal service. Waiting for 2 seconds...", ex);
}
} catch (Exception e) {
log.error("Cannot connect to Temporal service. Waiting for 2 seconds...", e);
}
Thread.sleep(2000);
counter++;
}
}
}
}
The controller:
@Profile("temporal")
@RestController
@RequestMapping("/message")
@Api("Message")
public class HelloController {
private final WorkflowClient workflowClient;
public HelloController(WorkflowClient workflowClient) {this.workflowClient = workflowClient;}
@ApiOperation(value = "Ask the server to say something to you", nickname = "hello")
@GetMapping()
public Map<String, String> message(@ApiParam @RequestParam String name) {
HelloWorld helloWorld = workflowClient.newWorkflowStub(
HelloWorld.class,
WorkflowOptions.newBuilder()
.setTaskQueue(TemporalConfig.WORKFLOW_QUEUE)
.setWorkflowRunTimeout(Duration.ofMinutes(1))
.setWorkflowExecutionTimeout(Duration.ofMinutes(1))
.build()
);
String message = helloWorld.tssWorkflow(name);
Map<String, String> status = new HashMap<>();
status.put("message", message);
return status;
}
}