Hello,
I’m developing AWS state language like DSL structure.
When I run a flow that executes an activity, I get following exception:
“Operation allowed only while eventLoop is running”
It’s getting that exception on executing dynamic activity:
ActivityStub activityStub = Workflow.newUntypedActivityStub(createActivityOptions(state));
Promise<String> promise = activityStub.executeAsync("execute", String.class, state.getResource(), effectiveInput.getRawJson());
String result = promise.get();
The stack trace took me to ReplayWorkflowContextImpl.class
@Override
public Functions.Proc1<Exception> scheduleActivityTask(
ExecuteActivityParameters parameters, Functions.Proc2<Optional<Payloads>, Failure> callback) {
ScheduleActivityTaskCommandAttributes.Builder attributes = parameters.getAttributes();
if (attributes.getActivityId().isEmpty()) {
attributes.setActivityId(workflowStateMachines.randomUUID().toString());
}
Functions.Proc cancellationHandler =
workflowStateMachines.scheduleActivityTask(parameters, callback);
return (exception) -> cancellationHandler.apply();
}
Workflow Starter:
public class Starter {
public static final WorkflowServiceStubs service = WorkflowServiceStubs.newInstance();
public static final WorkflowClient client = WorkflowClient.newInstance(service);
public static final WorkerFactory factory = WorkerFactory.newInstance(client);
public static void start(StateMachine machine, JsonNode input) {
WorkflowOptions workflowOptions = createOptions(machine).build();
WorkflowStub workflowStub =
client.newUntypedWorkflowStub("test", workflowOptions);
WorkflowExecution execution = workflowStub.start("test", machine.getVersion(), machine.toJson(), input);
System.out.println(execution.getWorkflowId());
}
private static WorkflowOptions.Builder createOptions(StateMachine machine) {
Integer timeoutSeconds = machine.getTimeoutSeconds();
WorkflowOptions.Builder optionBuilder = WorkflowOptions.newBuilder();
optionBuilder.setTaskQueue(Worker.DEFAULT_TASK_QUEUE_NAME);
if (timeoutSeconds != null && timeoutSeconds > 0) {
optionBuilder.setWorkflowExecutionTimeout(Duration.ofSeconds(timeoutSeconds));
}
return optionBuilder;
}
}
Worker
public class Worker {
private static final WorkflowServiceStubs service = WorkflowServiceStubs.newInstance();
private static final WorkflowClient client = WorkflowClient.newInstance(service);
private static final WorkerFactory factory = WorkerFactory.newInstance(client);
public static final String DEFAULT_TASK_QUEUE_NAME = "code2-flow";
public static void main(String[] args) {
io.temporal.worker.Worker worker = factory.newWorker(DEFAULT_TASK_QUEUE_NAME);
worker.registerWorkflowImplementationTypes(StateLanguageWorkflow.class);
ActivityCompletionClient completionClient = client.newActivityCompletionClient();
Map<String, Function<String, CompletableFuture<String>>> tasks = new HashMap<>();
TaskRegistry taskRegistry = key -> tasks.get(key);
tasks.put("be:http", new HttpTask());
tasks.put("be:delay", new DelayedEchoTask());
worker.registerActivitiesImplementations(new DslActivitiesImpl(completionClient, taskRegistry));
factory.start();
}
}
TaskExecutor
public class TaskStateExecutor extends AbstractStateExecutor<TaskState> {
@Override
public Triple<String, JsonDataHolder, JsonDataHolder> process(TaskState state, JsonDataHolder input, JsonDataHolder ctx) {
// TODO Error Handling
JsonDataHolder effectiveInput = generateEffectiveInput(input, ctx, state.getInputPath(), state.getParameters());
ActivityStub activityStub = Workflow.newUntypedActivityStub(createActivityOptions(state));
Promise<String> promise = activityStub.executeAsync("execute", String.class, state.getResource(), effectiveInput.getRawJson());
String result = promise.get();
JsonDataHolder effectiveResult = generateEffectiveResult(result, state.getResultSelector(), ctx);
JsonDataHolder output = generateEffectiveOutput(effectiveResult, input, state.getResultPath(), state.getOutputPath());
return StateExecutorUtil.nextState(state, output, ctx);
}
private ActivityOptions createActivityOptions(TaskState state) {
Integer timeoutSeconds = state.getTimeoutSeconds();
ActivityOptions.Builder optionsBuilder = ActivityOptions.newBuilder();
if (timeoutSeconds != null) {
optionsBuilder.setStartToCloseTimeout(Duration.ofSeconds(timeoutSeconds));
}
optionsBuilder.setTaskQueue(Worker.DEFAULT_TASK_QUEUE_NAME);
// TODO Only first retrier is used, look for all.
StateExecutorUtil.retryOptions(optionsBuilder, state.getRetriers());
return optionsBuilder.build();
}
}