Hi Temporal Team,
When I try to run an activity to notify a block of an activity within a workflow, I receive the following exception:
“Called from non workflow or workflow callback thread”
What should be the correct way to execute an activity within an interceptor? I appreciate the help.
Interceptor implementation
public class ActivityInboundCallsInterceptor extends ActivityInboundCallsInterceptorBase {
private final Logger log;
private final Integer maxRetryBeforeNotify;
private static final Set<String> processedKeys = Collections.synchronizedSet(new HashSet<>());
private ActivityExecutionContext activityExecutionContext;
Class<?>[] workflowImplementationForNotify;
public ActivityInboundCallsInterceptor(io.temporal.common.interceptors.ActivityInboundCallsInterceptor next, Integer maxRetryBeforeNotify, Class<?>... workflowImplementationForNotify) {
super(next);
this.log = Workflow.getLogger(ActivityInboundCallsInterceptor.class);
this.maxRetryBeforeNotify = maxRetryBeforeNotify;
this.workflowImplementationForNotify = workflowImplementationForNotify;
}
@Override
public void init(ActivityExecutionContext context) {
this.activityExecutionContext = context;
super.init(context);
}
@Override
public ActivityOutput execute(ActivityInput input) {
ActivityOutput output = null;
try {
output = super.execute(input);
processedKeys.remove(this.activityExecutionContext.getInfo().getWorkflowId());
} catch (Exception ex) {
if (this.activityExecutionContext.getInfo().getAttempt() == maxRetryBeforeNotify
&& !processedKeys.contains(this.activityExecutionContext.getInfo().getWorkflowId())) {
log.info("[{}][{}] Sending activity error notification. {}", getClass().getCanonicalName(), Workflow.getInfo().getRunId(), activityExecutionContext.getInfo().getWorkflowId());
processedKeys.add(this.activityExecutionContext.getInfo().getWorkflowId());
SendMessageToSlackActivityImpl.builder()
.slackRepository(Objects.requireNonNull(ServiceContext.getBean(SlackRepository.class)))
.build()
.execute(SendMessageToSlackActivity.Arguments.builder()
.key("LOGISTICS_GENERAL_ERROR")
.data(slackMessageData)
.build());
}
throw new RuntimeException(ex);
}
return output;
}
"message": "Called from non workflow or workflow callback thread",
"source": "JavaSDK",
"stackTrace": "io.temporal.internal.sync.DeterministicRunnerImpl.currentThreadInternal(DeterministicRunnerImpl.java:97)\nio.temporal.internal.sync.WorkflowInternal.getRootWorkflowContext(WorkflowInternal.java:405)\nio.temporal.internal.sync .WorkflowInternal.getWorkflowInfo(WorkflowInternal.java:478)\nio.temporal.workflow.Workflow.getInfo(Workflow.java:340)\ncom.becleverman.salesandsubscriptionsservice.temporal.interceptors.ActivityInboundCallsInterceptor.execute(ActivityInboundCallsInterceptor.java:44)\ nio.temporal.internal.activity.POJOActivityTaskHandler$POJOActivityImplementation.execute(...