I need to send the activity status before and after the activity to another system.
output.results() just returns the activity response. how can i get the activity status (Sucess or Failure with Message) and publish to another system
I have used Activity Interceptors as below
@Slf4j
public class TestActivityInboundInterceptor extends ActivityInboundCallsInterceptorBase {
private ActivityExecutionContext activityExecutionContext;
private Someservice service;
public TestActivityInboundInterceptor(ActivityInboundCallsInterceptor next,Someservice service) {
super(next);
this.service=service;
}
@Override
public void init(ActivityExecutionContext context) {
this.activityExecutionContext = context;
super.init(context);
}
@Override
public ActivityOutput execute(ActivityInput input) {
log.info("Entering TestActivityInboundInterceptor");
service.publish();
ActivityOutput output= super.execute(input);
**log.info(output.results());**
service.publish();
}
}
I see ActivityInboundCallsInterceptor.ActivityOutput returns activity response.
I see only WorkflowOutboundCallsInterceptor.ActivityOutput returns promise but activity failure i dont see it hitting this class .
Appreciate your help on this. i have this implemented as below. not sure if this the efficient way of getting status . .can you please provide some sample code from which interceptors i can pull the activitystatus
@Slf4j
public class TestActivityInboundInterceptor extends ActivityInboundCallsInterceptorBase {
private ActivityExecutionContext activityExecutionContext;
private Someservice service;
public TestActivityInboundInterceptor(ActivityInboundCallsInterceptor next,Someservice service) {
super(next);
this.service=service;
}
@Override
public void init(ActivityExecutionContext context) {
this.activityExecutionContext = context;
super.init(context);
}
@Override
public ActivityOutput execute(ActivityInput input) {
log.info("Entering TestActivityInboundInterceptor");
service.publish();
try{
ActivityOutput output= super.execute(input);
**log.info(output.results());**
if(!ObjectUtils.isEmpty(output.getResult())){
service.publish();
}
}
catch(ApplicationFailure e){
//set the status to failed
service.publish;
}
}
}
The ActivityInboundCallsInterceptorBase.execute method should be asynchronous. You are violating this requirement by calling output.getResult, which can block a long time. Use Promise.handle (or similar) method to access the result async.
Calling super.execute(input)) in async way throws error
@Slf4j
public class TestActivityInboundInterceptor extends ActivityInboundCallsInterceptorBase {
private ActivityExecutionContext activityExecutionContext;
private Someservice service;
public TestActivityInboundInterceptor(ActivityInboundCallsInterceptor next,Someservice service) {
super(next);
this.service=service;
}
@Override
public void init(ActivityExecutionContext context) {
this.activityExecutionContext = context;
super.init(context);
}
@Override
public ActivityOutput execute(ActivityInput input) {
log.info("Entering TestActivityInboundInterceptor");
service.publish();
Promise<ActivityOutput> outputPromise = Async.function(() -> super.execute(input));
}
}
attempt=1java.lang.Error: Called from non workflow or workflow callback thread ~ at io.temporal.internal.sync.DeterministicRunnerImpl.currentThreadInternal(DeterministicRunnerImpl.java:130) ~ at io.temporal.internal.sync.CompletablePromiseImpl.(CompletablePromiseImpl.java:65) ~ at io.temporal.internal.sync.WorkflowInternal.newCompletablePromise(WorkflowInternal.java:113) ~ at io.temporal.workflow.Workflow.newPromise(Workflow.java:499) ~ at io.temporal.internal.sync.AsyncInternal.execute(AsyncInternal.java:299) ~ at io.temporal.internal.sync.AsyncInternal.function(AsyncInternal.java:56) ~ at io.temporal.workflow.Async.function(Async.java:38) ~ at net.jpmchase.test.interceptors.TestActivityInboundInterceptor .execute(TestActivityInboundInterceptor .java:48) ~ at io.temporal.internal.activity.ActivityTaskExecutors$BaseActivityTaskExecutor.execute(ActivityTaskExecutors.java:107) ~ at io.temporal.internal.activity.ActivityTaskHandlerImpl.handle(ActivityTaskHandlerImpl.java:124) ~ at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handleActivity(ActivityWorker.java:278) ~ at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handle(ActivityWorker.java:243) ~ at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handle(ActivityWorker.java:216) ~ at io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:105) ~ at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~ at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~ at java.base/java.lang.Thread.run(Thread.java:842) ~
It should not block on synchronous APIs like Promise.get. Use Promise.handle to execute logic on Promise completion.
What is sevice.publish? Is it an activity? Note that the workflow interceptor executes in the context of a workflow. So, it cannot make any direct IO, only invoke activities.