I am working on a company project to migrate from an internal maintained old version of temporal (MrT) to the latest open sourced version.
I got some error like “Called from non workflow or workflow callback thread”, which I think is related to the restriction of calling Workflow.newActivityStub outside Workflow context. Below is an example, my project is using Dependency Injection extensively for ActivityStubs.
I understand one possible solution is to migrate away from this and do something like:
new MyWorkflowImpl(Workflow.newActivityStub(MyActivity.class, activityOptions())))
But the problem is my project has 100+ Workflows and 30+ Activities. Also some workflowImpls have several layers indirect dependency on ActivityStub, such as MyWorkflowImpl → ClassA → ClassB → SomeActivityStubA, SomeActivityStubB.
To make changes to remove DI of ActivityStubs in the project would require a lot of changes (to the signature of many class constructors). I wonder are there any suggested ways (maybe more elegant) to handle this case?
I got some error like “Called from non workflow or workflow callback thread”, which I think is related to the restriction of calling Workflow.newActivityStub outside Workflow context.
Also some workflowImpls have several layers indirect dependency on ActivityStub, such as MyWorkflowImpl → ClassA → ClassB → SomeActivityStubA, SomeActivityStubB.
This should be fine if ClassA is an object that you instantiate in your workflow, what is the issue here?
You could create your own implementation of the stubbed interface using dynamic proxy that would call Workflow.newActivityStub only when invoked from a workflow.
I feel I don’t fully understand it though. Could you plz provide some sample code to help me understand?
Below is something I can think of based on the answer. It throws exception when not init from Workflow context, but I don’t see how does it solve the problem, i.e. Dagger may call createActivityStub very early (before the creation of Workflow context).
public class WorkflowActivityProxyHandler implements InvocationHandler {
private final Class<?> activityClass;
public WorkflowActivityProxyHandler(Class<?> activityClass) {
this.activityClass = activityClass;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (Workflow.isWorkflowThread()) {
Object activityStub = Workflow.newActivityStub(activityClass);
return method.invoke(activityStub, args);
} else {
throw new IllegalStateException("Called from non-workflow thread");
}
}
}
public class ActivityStubFactory {
@SuppressWarnings("unchecked")
public static <T> T createActivityStub(Class<T> activityClass) {
return (T) Proxy.newProxyInstance(
activityClass.getClassLoader(),
new Class<?>[]{activityClass},
new WorkflowActivityProxyHandler(activityClass)
);
}
}
@Module
public class ActivityModule {
@Provides
@Qualifiers.ActivityStub
static MyActivity myActivity() {
return ActivityStubFactory.createActivityStub(MyActivity.class);
}
}
@WorkflowInterface
public interface MyWorkflow {
@WorkflowMethod
void execute();
}
public class MyWorkflowImpl implements MyWorkflow {
private final MyActivity myActivity;
@Inject
public MyWorkflowImpl(@Qualifiers.ActivityStub MyActivity myActivity) {
this.myActivity = myActivity;
}
@Override
public void execute() {
myActivity.someActivityMethod();
}
}
I think your code should work. Instantiating your dynamic handler outside of the workflow context is OK. The actual calls on the implemented interface happen inside a workflow context.