Exception while executing dynamic activity: Operation allowed only while eventLoop is running

Hello,

I’m developing AWS state language like DSL structure.
When I run a flow that executes an activity, I get following exception:
“Operation allowed only while eventLoop is running”

It’s getting that exception on executing dynamic activity:

ActivityStub activityStub = Workflow.newUntypedActivityStub(createActivityOptions(state));
        Promise<String> promise = activityStub.executeAsync("execute", String.class, state.getResource(), effectiveInput.getRawJson());
        String result = promise.get();

The stack trace took me to ReplayWorkflowContextImpl.class

@Override
  public Functions.Proc1<Exception> scheduleActivityTask(
      ExecuteActivityParameters parameters, Functions.Proc2<Optional<Payloads>, Failure> callback) {
    ScheduleActivityTaskCommandAttributes.Builder attributes = parameters.getAttributes();
    if (attributes.getActivityId().isEmpty()) {
      attributes.setActivityId(workflowStateMachines.randomUUID().toString());
    }
    Functions.Proc cancellationHandler =
        workflowStateMachines.scheduleActivityTask(parameters, callback);
    return (exception) -> cancellationHandler.apply();
  }

Workflow Starter:

public class Starter {

    public static final WorkflowServiceStubs service = WorkflowServiceStubs.newInstance();
    public static final WorkflowClient client = WorkflowClient.newInstance(service);
    public static final WorkerFactory factory = WorkerFactory.newInstance(client);

    public static void start(StateMachine machine, JsonNode input) {

        WorkflowOptions workflowOptions = createOptions(machine).build();

        WorkflowStub workflowStub =
                client.newUntypedWorkflowStub("test", workflowOptions);

        WorkflowExecution execution = workflowStub.start("test", machine.getVersion(), machine.toJson(), input);
        System.out.println(execution.getWorkflowId());
    }

    private static WorkflowOptions.Builder createOptions(StateMachine machine) {
        Integer timeoutSeconds = machine.getTimeoutSeconds();
        WorkflowOptions.Builder optionBuilder = WorkflowOptions.newBuilder();
        optionBuilder.setTaskQueue(Worker.DEFAULT_TASK_QUEUE_NAME);
        if (timeoutSeconds != null && timeoutSeconds > 0) {
            optionBuilder.setWorkflowExecutionTimeout(Duration.ofSeconds(timeoutSeconds));
        }
        return optionBuilder;
    }
}

Worker

public class Worker {
    private static final WorkflowServiceStubs service = WorkflowServiceStubs.newInstance();
    private static final WorkflowClient client = WorkflowClient.newInstance(service);
    private static final WorkerFactory factory = WorkerFactory.newInstance(client);
    public static final String DEFAULT_TASK_QUEUE_NAME = "code2-flow";

    public static void main(String[] args) {
        io.temporal.worker.Worker worker = factory.newWorker(DEFAULT_TASK_QUEUE_NAME);
        worker.registerWorkflowImplementationTypes(StateLanguageWorkflow.class);

        ActivityCompletionClient completionClient = client.newActivityCompletionClient();

        Map<String, Function<String, CompletableFuture<String>>> tasks = new HashMap<>();
        TaskRegistry taskRegistry = key -> tasks.get(key);

        tasks.put("be:http", new HttpTask());
        tasks.put("be:delay", new DelayedEchoTask());

        worker.registerActivitiesImplementations(new DslActivitiesImpl(completionClient, taskRegistry));

        factory.start();
    }
}

TaskExecutor

public class TaskStateExecutor extends AbstractStateExecutor<TaskState> {
    @Override
    public Triple<String, JsonDataHolder, JsonDataHolder> process(TaskState state, JsonDataHolder input, JsonDataHolder ctx) {

        // TODO Error Handling

        JsonDataHolder effectiveInput = generateEffectiveInput(input, ctx, state.getInputPath(), state.getParameters());

        ActivityStub activityStub = Workflow.newUntypedActivityStub(createActivityOptions(state));
        Promise<String> promise = activityStub.executeAsync("execute", String.class, state.getResource(), effectiveInput.getRawJson());
        String result = promise.get();

        JsonDataHolder effectiveResult = generateEffectiveResult(result, state.getResultSelector(), ctx);
        JsonDataHolder output = generateEffectiveOutput(effectiveResult, input, state.getResultPath(), state.getOutputPath());

        return StateExecutorUtil.nextState(state, output, ctx);
    }

    private ActivityOptions createActivityOptions(TaskState state) {
        Integer timeoutSeconds = state.getTimeoutSeconds();

        ActivityOptions.Builder optionsBuilder = ActivityOptions.newBuilder();
        if (timeoutSeconds != null) {
            optionsBuilder.setStartToCloseTimeout(Duration.ofSeconds(timeoutSeconds));
        }

        optionsBuilder.setTaskQueue(Worker.DEFAULT_TASK_QUEUE_NAME);

        // TODO Only first retrier is used, look for all.
        StateExecutorUtil.retryOptions(optionsBuilder, state.getRetriers());

        return optionsBuilder.build();
    }
}

Which SDK version are you using? Can you reproduce using latest version?
Can you show the entire stacktrace and your workflow history?
Where is TaskStateExecutor->process being called?
Are you reusing activitystubs between workflows anywhere in your code?

I’ve fixed the issue by looking at the history. Thanks for your instructions.

When I look at the History, I got

ause
WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_ACTIVITY_ATTRIBUTES
failure
"BadScheduleActivityAttributes: A valid StartToClose or ScheduleToCloseTimeout is not set on command."
identity
78811@MacBook-Pro-4.home

Normally, I’m not setting a timeout, then I gave a fixed start to close timeout = 10 seconds to the activities.
Is it really a problem not giving a timeout?

Also, I’m not reusing activity stub (for each task, i am creating new one.)
Can I reuse it? But I will have different retry logic for each task.



I’m using 1.7.1.
When I try to use 1.8.x, intellij and maven do not work.

I’m calling TaskExecutor from the flow whenever I got a Task keyword from the DSL.
For each time, I’m creating a new activity stub.

When I look at the History, I got

ause
WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_ACTIVITY_ATTRIBUTES
failure
"BadScheduleActivityAttributes: A valid StartToClose or ScheduleToCloseTimeout is not set on command."
identity
78811@MacBook-Pro-4.home

Stacktrace:

run:74, WorkflowExecuteRunnable (io.temporal.internal.sync)
lambda$start$0:137, SyncWorkflow (io.temporal.internal.sync)
run:-1, SyncWorkflow$$Lambda$225/0x0000000800fdd420 (io.temporal.internal.sync)
run:101, CancellationScopeImpl (io.temporal.internal.sync)
run:111, WorkflowThreadImpl$RunnableWrapper (io.temporal.internal.sync)
call:539, Executors$RunnableAdapter (java.util.concurrent)
run$$$capture:264, FutureTask (java.util.concurrent)
run:-1, FutureTask (java.util.concurrent)
 - Async stack trace
<init>:151, FutureTask (java.util.concurrent)
newTaskFor:98, AbstractExecutorService (java.util.concurrent)
submit:122, AbstractExecutorService (java.util.concurrent)
start:250, WorkflowThreadImpl (io.temporal.internal.sync)
lambda$start$1:140, SyncWorkflow (io.temporal.internal.sync)
run:101, CancellationScopeImpl (io.temporal.internal.sync)
run:111, WorkflowThreadImpl$RunnableWrapper (io.temporal.internal.sync)
call:539, Executors$RunnableAdapter (java.util.concurrent)
run$$$capture:264, FutureTask (java.util.concurrent)
run:-1, FutureTask (java.util.concurrent)
 - Async stack trace
<init>:151, FutureTask (java.util.concurrent)
newTaskFor:98, AbstractExecutorService (java.util.concurrent)
submit:122, AbstractExecutorService (java.util.concurrent)
start:250, WorkflowThreadImpl (io.temporal.internal.sync)
runUntilAllBlocked:160, DeterministicRunnerImpl (io.temporal.internal.sync)
eventLoop:156, SyncWorkflow (io.temporal.internal.sync)
eventLoop:73, ReplayWorkflowExecutor (io.temporal.internal.replay)
eventLoop:321, ReplayWorkflowRunTaskHandler$StatesMachinesCallbackImpl (io.temporal.internal.replay)
eventLoop:463, WorkflowStateMachines (io.temporal.internal.statemachines)
access$500:60, WorkflowStateMachines (io.temporal.internal.statemachines)
workflowTaskStarted:915, WorkflowStateMachines$WorkflowTaskCommandsListener (io.temporal.internal.statemachines)
handleCompleted:126, WorkflowTaskStateMachine (io.temporal.internal.statemachines)
handleStarted:116, WorkflowTaskStateMachine (io.temporal.internal.statemachines)
apply:45, FixedTransitionAction (io.temporal.internal.statemachines)
executeTransition:147, StateMachine (io.temporal.internal.statemachines)
handleHistoryEvent:101, StateMachine (io.temporal.internal.statemachines)
handleEvent:67, EntityStateMachineBase (io.temporal.internal.statemachines)
handleSingleEvent:235, WorkflowStateMachines (io.temporal.internal.statemachines)
handleEventsBatch:199, WorkflowStateMachines (io.temporal.internal.statemachines)
handleEvent:175, WorkflowStateMachines (io.temporal.internal.statemachines)
handleWorkflowTaskImpl:176, ReplayWorkflowRunTaskHandler (io.temporal.internal.replay)
handleWorkflowTask:145, ReplayWorkflowRunTaskHandler (io.temporal.internal.replay)
handleWorkflowTaskWithQuery:122, ReplayWorkflowTaskHandler (io.temporal.internal.replay)
handleWorkflowTask:97, ReplayWorkflowTaskHandler (io.temporal.internal.replay)
handle:230, WorkflowWorker$TaskHandlerImpl (io.temporal.internal.worker)
handle:188, WorkflowWorker$TaskHandlerImpl (io.temporal.internal.worker)
lambda$process$0:83, PollTaskExecutor (io.temporal.internal.worker)
runWorker:1136, ThreadPoolExecutor (java.util.concurrent)
run:635, ThreadPoolExecutor$Worker (java.util.concurrent)
run:833, Thread (java.lang)

Yes, at least one timeout (preferably StartToClose) is required. The reason that for activities there is no meaningful default timeout.

lso, I’m not reusing activity stub (for each task, i am creating new one.)
Can I reuse it? But I will have different retry logic for each task.

It is OK to reuse an activity stub within a single workflow instance. It is a mistake to reuse stubs across workflows which can easily happen if IoC injection is used for them.

You still need to create stub per activity invocation if you want different retry options for each invocation.