For anyone looking for a generic Spring way to locate WorkflowInterface and ActivityInterface implementation classes to register, I use this Spring bean. It works pretty well.
import com.google.protobuf.Duration;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.temporal.activity.ActivityInterface;
import io.temporal.api.namespace.v1.NamespaceConfig;
import io.temporal.api.workflowservice.v1.RegisterNamespaceRequest;
import io.temporal.api.workflowservice.v1.RegisterNamespaceResponse;
import io.temporal.api.workflowservice.v1.UpdateNamespaceRequest;
import io.temporal.api.workflowservice.v1.UpdateNamespaceResponse;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.worker.Worker;
import io.temporal.worker.WorkerFactory;
import io.temporal.workflow.Workflow;
import io.temporal.workflow.WorkflowInterface;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.ClassPathScanningCandidateComponentProvider;
import org.springframework.core.type.filter.AnnotationTypeFilter;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@Component
public class TemporalConnection {
@Autowired
private WorkflowServiceStubs service;
@Autowired
private WorkerFactory factory;
@Autowired
private WorkflowConfig config;
@Autowired
private ApplicationContext context;
@PostConstruct
public void init() throws InterruptedException {
initNamespace();
startWorkers();
}
private void startWorkers() {
String workflowQueue = config.getWorkflowQueue();
Worker workflowWorker = factory.newWorker(workflowQueue);
//Scan the classpath to all classes that are workflow implementations
// We can't use the context.getBeansWithAnnotation method as
// workflow implementation classes can't be spring bean instances
// ... i.e. they can't have @Component annotations
Class[] workflowImplementations = findWorkflowImplementations();
for (Class<?> workflowClass : workflowImplementations) {
logger.info("Registering workflow implementation: {}", workflowClass);
workflowWorker.registerWorkflowImplementationTypes(workflowClass);
}
//Get any beans annotated with @ActivityInterface and register them
Map<String, Object> activities = context.getBeansWithAnnotation(ActivityInterface.class);
for (Object activity : activities.values()) {
workflowWorker.registerActivitiesImplementations(activity);
}
factory.start();
}
public Class<?>[] findWorkflowImplementations() {
final List<Class<?>> result = new LinkedList<Class<?>>();
final ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(
false, context.getEnvironment());
provider.addIncludeFilter(new AnnotationTypeFilter(WorkflowInterface.class, false, true));
for (String location : config.getContextConfigLocations()) {
for (BeanDefinition beanDefinition : provider
.findCandidateComponents(location)) {
try {
String beanClassName = beanDefinition.getBeanClassName();
logger.info("Found workflow implementation: {}", beanClassName);
result.add(Class.forName(beanClassName));
} catch (ClassNotFoundException e) {
logger.warn(
"Could not resolve class object for bean definition", e);
}
}
}
return result.toArray(new Class<?>[result.size()]);
}
private void initNamespace() throws InterruptedException {
Duration retention = Duration.newBuilder().setSeconds(60 * 60 * 24 * 7L).build();
boolean connected = false;
int counter = 0;
while (!connected && counter < 600) { // ~20 minutes max
try {
RegisterNamespaceRequest request =
RegisterNamespaceRequest.newBuilder()
.setNamespace(config.getNamespace())
.setWorkflowExecutionRetentionPeriod(retention)
.build();
RegisterNamespaceResponse response = service.blockingStub().registerNamespace(request);
connected = response.isInitialized();
} catch (StatusRuntimeException ex) {
if (ex.getStatus().getCode() == Status.ALREADY_EXISTS.getCode()) { // Need to compare codes for successful equality
UpdateNamespaceRequest request = UpdateNamespaceRequest.newBuilder()
.setNamespace(config.getNamespace())
.setConfig(NamespaceConfig.newBuilder().setWorkflowExecutionRetentionTtl(retention).build())
.build();
UpdateNamespaceResponse response = service.blockingStub().updateNamespace(request);
connected = response.isInitialized();
}
} catch (Exception e) {
throw e;
}
Thread.sleep(2000);
counter++;
}
}
}