Translating DSL example from go to Java

https://github.com/temporalio/samples-go/blob/master/dsl/starter/main.go#L43

For this line, how would you use the client to create the newWorkflow from the dsl created object. When i look at the workflowClient, it’s not clear how you pass in an argument?

This is where i’m at with the code.

    public static void main(String[] args) throws IOException {
        WorkflowServiceStubs service = WorkflowServiceStubs.newInstance();
        WorkflowClient client = WorkflowClient.newInstance(service);

        Path resourceDirectory = Paths.get("resources","dsl","workflow1.yaml");
        String absolutePath = resourceDirectory.toFile().getAbsolutePath();

        ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory());
        objectMapper.findAndRegisterModules();

        Workflow workflow = objectMapper.readValue(new File(absolutePath), Workflow.class);
    }

I wouldn’t call your DSL workflow Workflow as this name is part of the core SDK API. I don’t understand your question about passing arguments. In java your workflow interpreter is defined through an interface and you define which arguments the method annotated with @WorkflowMethod takes.

Sorry, i’m trying to port the go DSL sample to Java. It looks like the GO code translate the yaml file into a Workflow Object. This is the DSL workflow object (can rename it) And it uses the workflow client to create the SimpleDSLWorkflow which will take the workflow object and create the activities based on the DSL workflow Object.

I’m trying to figure how to translate that part into the java sdk from how it’s done in GO. With the DSL workflow object, how do you use that to info to pass to the SimpleDSLworkflow to process the DSL workflow object.

1 Like

Something like:

  DslWorkflow dslWorkflow = objectMapper.readValue(new File(absolutePath), DslWorkflow.class);
  DslInterpreter interpreter = client.newWorkflowStub(DslInterpreter.class, WorkflowOptions.newBuilder().setTaskQueue("taskqueuename").build());
  interpreter.execute(dslWorklfow);

Where DslInterpreter is defined as

@WorkflowInterface
 public interface DslInterpreter {
        void execute(DslWorkflow dslWorkflow);
 }

@maxim I created a pull request with the java sdk version. I tried to port it but experiencing issue with the activity not being picked up after its executed. I commented on the PR draft of where the issue is.

https://github.com/temporalio/java-samples/pull/46/files

Any advice appreciated,
Thanks,
Derek

1 Like

It looks like it should be the activity method name.

 ActivityStub stub =
        Workflow.newUntypedActivityStub(
            ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofMinutes(1)).build());
    stub.execute("activityMethod", ResultClass.class, args);

If there are activities with the same method name registered for the workflow, how would you this work…would the execute handle the prefix activity name properly?

In Java the SDK capitalizes the first letter of the activity name. So activityMethod is registered as ActivityMethod activity type.

If there are activities with the same method name registered for the workflow, how would you this work…would the execute handle the prefix activity name properly?

It is not allowed to register more than one activity implementation with the same type on the same worker. Otherwise, the worker wouldn’t know which method to invoke based on the activity type name. It is OK to use the same names for different workers listening to different task queues.

We recommend using different method names for different activities as it is the simplest approach. But if you want to use the same method names the options are:

  1. Use @ActivityInterface.namePrefix annotation property to prepend a prefix to the activity type.
  2. Use @ActivityMethod.name annotation property to explicitly specify the activity type the method implements.

It looks like the registration doesn’t recognize the nameprefix for the activity + the activity method. Is there something i’m missing? I figured it out take into account the name prefix… or does all the names need to be uniq as well even if it’s part of different activities?



When i look at the code base


looks like it’s not taking into account the activitie’s prefix name when it validates the condition.

If you remove @ActivityMethod annotations it is going to work fine. The reason is that the prefix is ignored when the ActivityMethod.name is specified.

As far as example I recommend having a single interface that contains multiple activity methods instead of many interfaces:

@ActivityInterface
public interface SampleActivities {
    String getInfo1();
    String getInfo2();
    ...
    String getInfo10();
}

The reason for the separation in activity is in the case you want separation of concerns. With the DSL, it’s nice to have the flexibility to make the separation.

It would be nice if by default, the SDK used the activityInterface name + the activity name with a “.” in the middle when it does the association. If users doesn’t want the granularity in name spacing, they can override it. This will also make it more intuitive when user use the attribute @ActivtyInterface(nameprefix) & @Activity Method(name) since it’s not obvious that one override the other. Is this something we can request?

Also whats the advantage of having an interfaces if you can’t register multiple activities that extend it for a single waveform vs. just having the class by itself.

How activities are defined has nothing to do with the DSL workflow as it uses their string names only. Think about an activity interface as a service that exposes multiple operations. The framework supports multiple interfaces, the same way multiple services are usually supported. At the same time creating an interface (or service) per single operation doesn’t serve any purpose besides adding more boilerplate code.

We had such ActivityClassName::methodName. default. I found that most developers hated the long activity names and used manual name overrides. You can always add “Whatever” as prefix if you prefer this style. BTW “.” using dot as separator is not possible as we report activity names as tag for the metric systems like Prometheus and they just drop metrics with the dot symbol (and a bunch of other special character) in their value.

Also what’s the advantage of having an interfaces if you can’t register multiple activities that extend it for a single waveform vs. just having the class by itself.

This is supported. The trick is to specify different activity type names for the inherited method. See HelloPolimorphicActivity sample which uses prefix for this.

That make sense. Maybe call it out that you can’t use both the ActivityInterface.name_prefix and the ActivityMethod.name and that the name will override the name_prefix in the documentation

In my case, since activities might be managed by different teams having the separation in files might be worth it at the cost of adding more boiler plate code.

In the case of activities owned by different teams, they will be using different task queues and the name collision is not going to be a problem.

I believe an example should try to demonstrate a single feature at a time. We plan to have an example that shows how to use multiple activity interfaces running in different worker processes. But the DSL sample is not about this. And having one method per interface is guaranteed to confuse new users.

Do you have an example of a workflow using a different task queue for activities with different task queues? I tried to look at the samples but couldn’t find one.

Particularly, how do you register the activities to different task queue then the one used by the workflow.

Example

 final Worker worker = factory.newWorker(TASK_QUEUE);
    worker.registerWorkflowImplementationTypes(SimpleDSLWorkflowImpl.class);
    worker.registerActivitiesImplementations(
        new SampleActivities.SampleActivitiesImpl1(),
        new SampleActivities.SampleActivitiesImpl2(),
        new SampleActivities.SampleActivitiesImpl3(),
        new SampleActivities.SampleActivitiesImpl4(),
        new SampleActivities.SampleActivitiesImpl5());
    factory.start();

When you retrieve the activity, doesn’t the task queue need to be the same as the workflow registration?

    ActivityStub stub =
        Workflow.newUntypedActivityStub(
            ActivityOptions.newBuilder()
                .setTaskQueue(<TASK QUEUE>)
                .setStartToCloseTimeout(Duration.ofMinutes(5))
                .setTaskQueue("dsl")
                .build());

Each worker instance can serve a single task queue. In the majority of the situations, each such worker is residing in its own service. Something like:

main1:

   Worker worker = factory.newWorker(TASK_QUEUE1);
    worker.registerActivitiesImplementations(
        new SampleActivities.SampleActivitiesImpl1());
    factory.start();

main2:

   Worker worker = factory.newWorker(TASK_QUEUE2);
    worker.registerActivitiesImplementations(
        new SampleActivities.SampleActivitiesImpl2());
    factory.start();

Then make sure that ActivityOptions.taskQueue is set to the correct queue name when calling activities from the workflow.

Sure, that make sense with a queue for each workflow.

My original question was for the case where you would have a workflow that have registered multiple activities and each activities would have their own task queues? How would that work?

Thanks,
Derek

I don’t think there is a concept of workflow registering activities. Workflow calls activities through correspondent activity stubs. Use ActivityOptions to specify the task queue per stub.

SampleActivities.SampleActivitiesImpl1 a1 = 
      Workflow.newActivityStub(SampleActivities.SampleActivitiesImpl1.class
            ActivityOptions.newBuilder()
                .setTaskQueue(<TASK QUEUE>)
                .setStartToCloseTimeout(Duration.ofMinutes(5))
                .build());

@maxim I had a question relating to the DSL translation.

Below is code that represents running statement in parallel when the code will create a promise so it can run all the task in parallel. It wraps it in a cancelation scope which will cancel the rest if it fails.

Can CancellationScopes be nested within other CancellationScopes? For example, in this case:

Async.function(this::execute, pipelineId, statement, bindings)

could be processing more nested parallel statements which would create new cancellation scopes. I guess in this case, if there was an issue the exception will be rethrow and the caller will fail as well resulting in the other cancelation scopes to eventually be called as well?

public void execute(UUID pipelineId, Parallel parallel, Map<String, ActivityPayload> bindings) {
    if(parallel == null || parallel.getBranches().length == 0) {
      return;
    }

    // In the parallel block, we want to execute all of them in parallel and wait for all of them.
    // if one activity fails then we want to cancel all the rest of them as well.
    List<Promise<Void>> results = new ArrayList<>(bindings.size());
    CancellationScope scope =
        Workflow.newCancellationScope(
            () -> {
              for (Statement statement : parallel.getBranches()) {
                results.add(Async.function(this::execute, pipelineId, statement, bindings));
              }
            });

    // As code inside the scope is non blocking the run doesn't block.
    scope.run();

    try {
      // If one activity fails then all the rest will fail
      Promise.allOf(results).get();
    } catch (RuntimeException ex) {
      // Cancel uncompleted activities
      scope.cancel();
      log.error("One of the Activities failed.  Canceling the rest.", ex);
      throw ex;
    }
  }

Thanks,
Derek

Yes, cancellation scopes are hierarchical. Each scope is automatically attached to its parent scope. If parent is cancelled all the child scopes are cancelled. The main workflow method is invoked in the context of a root scope which gets cancelled when the whole workflow is cancelled.

When you need to run some cleanup code in a cancelled scope create a detached one using Workflow.newDetachedCancellationScope.

ic thanks.

If possible can you take a look at Testing Framework for Java SDK?. The question relates to the exception being thrown within a Promise.all, but not seeing it being caught in the catch statement. Not sure if Promise.all just swallows the exception?