Get Activity Failure or Success status from Activity Interceptors

Hello,

I need to send the activity status before and after the activity to another system.
output.results() just returns the activity response. how can i get the activity status (Sucess or Failure with Message) and publish to another system

I have used Activity Interceptors as below

@Slf4j
public class TestActivityInboundInterceptor extends ActivityInboundCallsInterceptorBase {
    private ActivityExecutionContext activityExecutionContext;
    private Someservice service;
    public TestActivityInboundInterceptor(ActivityInboundCallsInterceptor next,Someservice service) {
        super(next);
        this.service=service;
    }
    @Override
    public void init(ActivityExecutionContext context) {
        this.activityExecutionContext = context;
        super.init(context);
    }
    @Override
    public ActivityOutput execute(ActivityInput input) {
        log.info("Entering TestActivityInboundInterceptor");		
		service.publish();
        ActivityOutput output= super.execute(input);
	**log.info(output.results());**
	service.publish();
    }
}

ActivityOutput.getResult() returns a Promise that contains either a result or failure.

Thanks .

I see ActivityInboundCallsInterceptor.ActivityOutput returns activity response.

I see only WorkflowOutboundCallsInterceptor.ActivityOutput returns promise but activity failure i dont see it hitting this class .

Appreciate your help on this. i have this implemented as below. not sure if this the efficient way of getting status . .can you please provide some sample code from which interceptors i can pull the activitystatus

@Slf4j
public class TestActivityInboundInterceptor extends ActivityInboundCallsInterceptorBase {
    private ActivityExecutionContext activityExecutionContext;
    private Someservice service;
    public TestActivityInboundInterceptor(ActivityInboundCallsInterceptor next,Someservice service) {
        super(next);
        this.service=service;
    }
    @Override
    public void init(ActivityExecutionContext context) {
        this.activityExecutionContext = context;
        super.init(context);
    }
    @Override
    public ActivityOutput execute(ActivityInput input) {
        log.info("Entering TestActivityInboundInterceptor");		
		service.publish();
		try{
		      ActivityOutput output= super.execute(input);
			  **log.info(output.results());**
			  if(!ObjectUtils.isEmpty(output.getResult())){
			      service.publish();
			  }
		}
		catch(ApplicationFailure e){
			//set the status to failed
			service.publish;
		}

    }
}

The ActivityInboundCallsInterceptorBase.execute method should be asynchronous. You are violating this requirement by calling output.getResult, which can block a long time. Use Promise.handle (or similar) method to access the result async.

Calling super.execute(input)) in async way throws error


@Slf4j
public class TestActivityInboundInterceptor extends ActivityInboundCallsInterceptorBase {
    private ActivityExecutionContext activityExecutionContext;
    private Someservice service;
    public TestActivityInboundInterceptor(ActivityInboundCallsInterceptor next,Someservice service) {
        super(next);
        this.service=service;
    }
    @Override
    public void init(ActivityExecutionContext context) {
        this.activityExecutionContext = context;
        super.init(context);
    }
    @Override
    public ActivityOutput execute(ActivityInput input) {
        log.info("Entering TestActivityInboundInterceptor");		
		service.publish();
		Promise<ActivityOutput> outputPromise = Async.function(() -> super.execute(input));
    }
}

attempt=1java.lang.Error: Called from non workflow or workflow callback thread ~ at io.temporal.internal.sync.DeterministicRunnerImpl.currentThreadInternal(DeterministicRunnerImpl.java:130) ~ at io.temporal.internal.sync.CompletablePromiseImpl.(CompletablePromiseImpl.java:65) ~ at io.temporal.internal.sync.WorkflowInternal.newCompletablePromise(WorkflowInternal.java:113) ~ at io.temporal.workflow.Workflow.newPromise(Workflow.java:499) ~ at io.temporal.internal.sync.AsyncInternal.execute(AsyncInternal.java:299) ~ at io.temporal.internal.sync.AsyncInternal.function(AsyncInternal.java:56) ~ at io.temporal.workflow.Async.function(Async.java:38) ~ at net.jpmchase.test.interceptors.TestActivityInboundInterceptor .execute(TestActivityInboundInterceptor .java:48) ~ at io.temporal.internal.activity.ActivityTaskExecutors$BaseActivityTaskExecutor.execute(ActivityTaskExecutors.java:107) ~ at io.temporal.internal.activity.ActivityTaskHandlerImpl.handle(ActivityTaskHandlerImpl.java:124) ~ at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handleActivity(ActivityWorker.java:278) ~ at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handle(ActivityWorker.java:243) ~ at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handle(ActivityWorker.java:216) ~ at io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:105) ~ at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~ at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~ at java.base/java.lang.Thread.run(Thread.java:842) ~

This line doesn’t make sense to me. Is it trying to wrap a Promise in another Promise?

What are you trying to do here?

not sure what you meant by ActivityInboundCallsInterceptorBase.execute method should be asynchronous

It should not block on synchronous APIs like Promise.get. Use Promise.handle to execute logic on Promise completion.

What is sevice.publish? Is it an activity? Note that the workflow interceptor executes in the context of a workflow. So, it cannot make any direct IO, only invoke activities.

@maxim based on your comment “It should not block on synchronous APIs like Promise.get. Use Promise.handle to execute logic on Promise completion.”

TestActivityInboundInterceptor execute method returns only activity response. not clear how i can implement promise.handle here to fetch the results.

public TestActivityInboundInterceptor(ActivityInboundCallsInterceptor next, service service) {
super(next);
this.service = service;
}

@Override
public void init(ActivityExecutionContext context) {
this.activityExecutionContext = context;
super.init(context);
}

@Override
public ActivityOutput execute(ActivityInput input) {
//workflowName and activityName i get from activityExecutionContext
String serviceRequest={ ~~“workflowName”: “Workflow1”, ~~“activityName”: “Activity1”, ~~“status”: “STARTED”, };
service.publish(serviceRequest);
//only returns activity response
return super.execute(input);
}

Sorry, I thought you were asking about about workflow interceptor. Activity interceptor is fully synchronous. So no need to use promises.