Implementing DSL workflows

Hi,
I am trying to explore temporal w.r.t DSL from json/yaml code. I went through the sample example - samples-java/src/main/java/io/temporal/samples/dsl at main · temporalio/samples-java · GitHub

In my case, i have multiple workflows with number of activities and subworkflows.

I have a few queries and need an understanding on the same :

  1. Do i need to write different interpretors for each DSL/workflow separately? Or can i have the same interpretor for different DSL definitions.
  2. Is there some way to define the activity input and output parameters in DSL ? Reason behind asking this is, as per the sample code - JSONNode is defined as the activity output always, which might differ as per different operations.
  3. Also, how can we handle the workflow output to be a custom java object as response based on workflow type or definition instead of a generic one?
  4. Can we not have multiple activities as part of the dynamic workflow implementation? If yes, then how do we create dynamic activity stub? It would be helpful if there is some sample that can be referred.
  5. Is there any sample code having multiple json definitions and an application logic to run all the workflows?

Let me know if there is a better way to solve all these dynamic workflow usecases.

Thanks.

  1. Do i need to write different interpretors for each DSL/workflow separately? Or can i have the same interpretor for different DSL definitions.

By interpreter you mean a workflow that executes the instructions from json/yaml dsl def? Then yes, you can. The Java SDK DynamicWorkflow interface is perfectly suited imo for that. It would allow you to use the same definition to execute multiple dsl-based workflows.

  1. Is there some way to define the activity input and output parameters in DSL ? Reason behind asking this is, as per the sample code - JSONNode is defined as the activity output always, which might differ as per different operations.

Depends on the DSL. BPMN for example has data inputs and data output mappings for each “Task”. You could read that mapping and figure out what types are needed for activity inputs/results.
Markup-based DSLs like the one used in example in many cases define their data as JSON. The Serverless Workflow DSL in this example does that (entire workflow data is json), and inputs to their “actions” (Temporal activities) is JSON as well, so that’s why its used in the sample.

You don’t have to use JsonNode tho, you can use concrete types as well. You could for example define in this example a “WorkflowData” class, it can look like:

public class WorkflowData {
  private ObjectNode value;

  public WorkflowData() {
    value = new ObjectMapper().createObjectNode();
  }

  public WorkflowData(String data) {
    try {
      if (data == null || data.trim().length() < 1) {
        value = new ObjectMapper().createObjectNode();
      } else {
        value = (ObjectNode) new ObjectMapper().readTree(data);
      }
    } catch (JsonProcessingException e) {
      throw new IllegalArgumentException("Invalid workflow data input: " + e.getMessage());
    }
  }
.....
}

and pass that around as inputs/results instead of JsonNode. There are other options as well.

  1. Also, how can we handle the workflow output to be a custom java object as response based on workflow type or definition instead of a generic one?

In your workflow code you can get the workflow type via:

String type = Workflow.getInfo().getWorkflowType();

based on that type and response for question 2. you can return any custom type you want.

  1. Can we not have multiple activities as part of the dynamic workflow implementation? If yes, then how do we create dynamic activity stub? It would be helpful if there is some sample that can be referred.

In the sample we use a single activity impl, but you can have as many as you want (register them with your worker). Temporal also provides DynamicActivity interface which is shown in this sample.

  1. Is there any sample code having multiple json definitions and an application logic to run all the workflows?

You can do it in the current dsl example, just register all your definitions here.
Are you planning to use the Serverless Workflow DSL as in the example or some other DSL?
If it’s the same one as used in sample, I can definitely help you with some code samples if you wish.

@tihomir Thanks for the explanation and quick response. Basically, I am trying to explore which type of DSL would be an appropriate one as per our business use case. Yes, definitely it would be helpful if you share a few more examples based on Serverless Workflow DSL. Also, if you have any samples of other DSLs as well, it would be great and will be helpful for our comparison.

Further questions :

  1. If my activity is a DynamicActivity which needs to override the execute(), then how can we route the logic to specific activity methods when implementing in a DSL? Because for each activity type, call is triggered to execute(). How to differentiate and execute a logic specific to an activity type. Same query i have for DynamicWorkflow implementation as well.

  2. How to make a call to subworkflow while writing the DSL interpretor code?

definitely it would be helpful if you share a few more examples based on Serverless Workflow DSL

you can see examples here

if you have any samples of other DSLs as well, it would be great and will be helpful for our comparison.

I don’t know which ones you are looking at currently in order to provide info. Step functions and Google cloud workflow have some examples on their websites. For things like bpmn, I am sure there are also many examples out there. Let us know what you are looking for specifically.

  1. If my activity is a DynamicActivity which needs to override the execute(), then how can we route the logic to specific activity methods when implementing in a DSL? Because for each activity type, call is triggered to execute(). How to differentiate and execute a logic specific to an activity type. Same query i have for DynamicWorkflow implementation as well.

If you use DynamicActivity, you can get the activity type via
Activity.getExecutionContext().getInfo().getActivityType();
Given that type, you can just call some method that knows how to handle that type. There is I think no need to “route” to a different activity as activities are just your code, so you can use programming language constructs to call a method that knows what how to invoke a service or whatever the instructions are from the dsl.
DynamicWorkflow is very well suited for dsl implementations. It allows you to define the workflow type inside your dsl itself (as an id for example) and all invocations for any of these types is going to be routed to the workflow impl that implements DynamicWorkflow interface. You can do then multiple things, you can do it as we have in our sample where you cache the parsed workflow dsl definitions up-front, or you can decided to pass the whole workflow definition as input to your workflow when you start it via client api and parse it inside the workflow itself and then execute according to dsl instructions.

  1. How to make a call to subworkflow while writing the DSL interpretor code?

Subworkflows would translate into Temporal child workflows imo.

@tihomir Thanks for the reply. I still do not understand how to invoke child workflows while translating DSL. Since invoking child workflow needs the WorkflowClient object to be initialized. I am facing issue if i try to initialize the WorkflowClient from the interpretor logic.

Since invoking child workflow needs the WorkflowClient

Don’t think this is correct as you can invoke a child workflow from your workflow code as well. This sample shows how you can do that.

There is a number of ways you can invoke a subflow/child workflow given the parameters of the serverless workflow dsl:

  1. Sync invoke and wait for results:

json sample:

"actions": [{
  "subFlowRef": "mySubflow"
}]

temporal:

ChildWorkflowStub cws = Workflow.newUntypedChildWorkflowStub("mySubflow", childWorkflowOptions);
cws.execute(MyReturnObj.class, workflowDataInput);
  1. async child invoke and wait for results:

json sample:

"actions":[
    {
     "subFlowRef": {
      "invoke": "async",
      "workflowId": "mySubflow"
     }
    }
   ],

temporal:

ChildWorkflowStub cws = Workflow.newUntypedChildWorkflowStub("mySubflow", childWorkflowOptions);
Promise<MyReturnObj> promise = cws.executeAsync(MyReturnObj.class, workflowDataInput);
// ...
MyReturnObj result = promise.get();
  1. async with parent close policy:

json sample:

"actions":[
    {
     "subFlowRef": {
      "invoke": "async",
      "onParentComplete": "continue",
      "workflowId": "mySubflow"
     }
    }
   ],

temporal code:

same as in 2. but you would need to set in your ChildWorkflowOptions:

.setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON)

Hope this helps.

Yes, this really helps. Will try this solution. Thank you @tihomir !!

Can we refer normal activities from DSL ?
suppose I have an activity in UserActivityImpl -

  User createUser(User request , XyzObject xyz ){...}

I need to use it from DSL . Is it possible ?
My requirement is to use DSL for defining the flows, so that different clients can provide their own workflow . Activities are already existing and may not be changed.

@Nitz Yes, you can invoke activities from DSL. Refer “ApproveApplication” operation with functionRef “ApproveApplication” specified in samples-java/customerapplication.json at main · temporalio/samples-java · GitHub
Same is read while DSL execution and activity for that functionRef is invoked from samples-java/DynamicDslWorkflow.java at main · temporalio/samples-java · GitHub

1 Like

Thanks @poojabhutada ,
my question is similar to yours where we don’t want to change the input and output [ jsonObject] or even the method body of activities.

As the same activities would b used by both non dsl workflows and dsl workflows.

Also , when I tried calling normal activity , it never returned and called the next activity and workflow didn’t complete

As the same activities would b used by both non dsl workflows and dsl workflows.

It’s typical for markup dsl’s to represent workflow data as JSON. Most if not all also make use of expression languages (rather than forcing a specific programming lang to be used) to manipulate data.

Inputs and results of your activities already have to be serializable, you could in case of dsl workflow create simple wrappers that serialize/deserialize the parameters defined as inputs to your activities and the activity result to/from JSON.

Also , when I tried calling normal activity , it never returned and called the next activity and workflow didn’t complete

Can you show what you are trying to do? The sample maybe can be improved to fit your use case.

sure @tihomir , here is the activities I am calling

@Override
    public String call1() {
        System.out.println("Call 1");
        return "call 1 successful ";
    }

    @Override
    public String call2() {
        System.out.println("Call 2");
        return "call 2 successful !" ;
    }

I have used the same json dsl that is provided here [ changed method names only as call1 , call 2 ]

@tihomir ,
our basic requirement is to have dsl based workflow implementation where the activities are written normally :

  • no jsonNode
  • Request and response of the activities are POJO.
  • same activities can be used by any workflow [not just dsl based]
  • we can pass the type of Request and response , if needed

Hi pooja ,
needs some help i have multiple workflows running in parallel already , and i need to perform an activity while we signal any workflow ,and i am totally new to temporal so been stuck in this use case for a day . is there any way u can help me out in this scenario

@srinivasan_S You can define signal method in workflow interface using @SignalMethod annotation and then signal the workflows by that signal name where you can pass the inputs as required and set the unblocking condition. Based on the unblocking condition, you can define await() in your main workflow method and invoke activity once that blocking condition is satisfied. Refer : samples-java/HelloSignal.java at main · temporalio/samples-java · GitHub

our basic requirement is to have dsl based workflow implementation where the activities are written normally…

I think you might be looking at something similar to bpmn where you have to define data types (process variables and their types) and then define strict mappings (data inputs / data outputs) for each of your activities (tasks/actions). This comes at a price as it would force you to use a specific programming language, and puts you in a lock regarding expressions inside the dsl to manipulate data.
Thats really reason why most DSLs use JSON as its core data structure representation, its neutral and you can use powerful expressions langs like jq for example.

In case of Serverless Workflow DSL thats used in the sample, I think it would be possible to achieve this, via either as mentioned a wrapper layer ontop of your activities, for marshalling/unmarshalling, or I think activity interceptors would work too. Will look into it and provide more info if I find a good impl for this.

Thanks @tihomir , we do have activities proxy that we can leverage for this marshaling and unmarshalling.
Is there any code I can refer or sample ?

Also is there any particular object/key that needs to be returned from the activity response ? or plain java is sufficient
like User[ id : String , pwd String]

@Nitz updated the dsl sample and made some small changes to allow activities to have custom type return and method parameters.
See specifically here.
There are more improvements that can be done, but hopefully this helps you get the idea.

1 Like

Thanks @tihomir.
it really helped me a lot.
One question , the Activity code has below line

new ActResult(Activity.getExecutionContext().getInfo().getActivityType(), "invoked");

is it always required to set result as “invoked” or it is just for example ?

Also , what changes are needed if we need to pass output of one activity as input to another activity.

is it always required to set result as “invoked” or it is just for example ?

Thats just for the sample so that the workflow results show activities that were invoked.

Also , what changes are needed if we need to pass output of one activity as input to another activity.

Serverless workflow spec defines that (and is similar to what you do with Temporal and code as well). Results of activity is merged with workflow data first (similar to how you would set activity results to a wf variable), then you can define what parts of wf data to pass to next activity. There are some more steps in the spec such as definition of action filters that filter that wf data before an action (activity) is invoked, but that’s atm not in the sample. Can add.

1 Like