Is it possible to query the task queue

To address some timing issues with one of our services, I need to implement some form of queuing system when starting workflows.

Ideally, what I want is something like:

  1. Get list of workflows currently running on the task queue
  2. Call a @QueryMethod on each that would return an id of a User
if (runningWorkflowUserId == this.id)
{
    Wait until runningWorkflow execution is done
    execute new workflow with this id
}
else {
    execute new workflow with this id
}

Now I’ve gotten as far as querying a workflow using @QueryMethod but the issue with this is I can only query a workflow after I have started this.

To execute workflows, I am using vertx verticles.

  1. Get list of workflows currently running on the task queue

This feature is not yet supported out of the box (see issue here). You could use ListOpenWorkflowExecutions
to list all open workflows for a namespace, and iterate through the response (WorkflowExecutionInfo) via getExecutionsList()
and check the task queue via WorkflowExecutionInfo->getTaskQueue().

WorkflowExecutionInfo also has getExecution() method that will give you the open workflows WorkflowExecution handle. You could get the workflow id and run id from this and use a workflow stub method for a known execution to query it.

Looking at the pseudo code you provided, with Temporal you cannot have two running workflow executions (in running state) with the same workflow id. If you set your workflow id to the user id subsequent client requests to start same execution with the same workflow id (in same namespace) would just fail unless it has completed just fyi if it helps.

Ah apologies for the vague “id”, essentially what I meant was that each incoming record would have a “unique” id (i.e. a user id) and I would use @QueryMethods to fetch that ID. It has nothing to do with temporal workflow ids.

Edit: This is what I have so far:

WorkflowServiceStubs service = WorkflowServiceStubs.newInstance();

ListOpenWorkflowExecutionsRequest openWorkflowsRequest =
  ListOpenWorkflowExecutionsRequest.newBuilder()
  .setNamespace("default")
  .setTypeFilter(WorkflowTypeFilter.newBuilder().setName("MyWorkflow"))
  .build();
ListOpenWorkflowExecutionsResponse openWorkflowsResponse =
  service.blockingStub().listOpenWorkflowExecutions(openWorkflowsRequest);

for (WorkflowExecutionInfo execution: openWorkflowsResponse.getExecutionsList()) {
  // here I'm supposed to get the workflow using its workflow id + run id, but I'm unsure how to actually get the "stub"
}
//start new workflow
MyWorkflow workflow = client.newWorkflowStub(MyWorkflow.class, options);

@saifxhatem

Maybe something like this would work for you (code below). Note that ListOpenWorkflowExecutions is paginated, and you need to go through all pages to make sure you have all the results.

public static void startNewWorkflowForEachId(String workflowType, String queryName, ByteString token) {
        ListOpenWorkflowExecutionsRequest req;
        if(token == null) {
            req = ListOpenWorkflowExecutionsRequest.newBuilder()
                            .setNamespace(client.getOptions().getNamespace())
                            .setTypeFilter(WorkflowTypeFilter.newBuilder()
                                    .setName(workflowType)
                                    .build())
                            .build();
        } else {
            req = ListOpenWorkflowExecutionsRequest.newBuilder()
                    .setNamespace(client.getOptions().getNamespace())
                    .setNextPageToken(token)
                    .setTypeFilter(WorkflowTypeFilter.newBuilder()
                            .setName(workflowType)
                            .build())
                    .build();
        }
        ListOpenWorkflowExecutionsResponse res =
                service.blockingStub().listOpenWorkflowExecutions(req);
        for(WorkflowExecutionInfo info : res.getExecutionsList()) {
            WorkflowExecution execution = WorkflowExecution.newBuilder()
                    .setWorkflowId(info.getExecution().getWorkflowId())
                    .setRunId(info.getExecution().getRunId())
                    .build();
            // You can also use typed workflow stub here if you wanted
            // For that do like
            // <T> T newWorkflowStub(Class<T> workflowInterface, String workflowId, Optional<String> runId);
            // SomeWorkflowInterface workflow = client.newWorkflowStub(SomeWorkflowInterface.class,
            //  execution.getWorkflowId(), Optional.of(execution.getRunId()));
            //  String id = workflow.queryMethod(....);

            // We use untyped stub since we pass the query name in this method...
            WorkflowStub stub = client.newUntypedWorkflowStub(execution, Optional.empty());
            String id = stub.query(queryName, String.class);
            // you can start your new workflow here and provide the id as input
            // ...
        }

        if (res.getNextPageToken() != null && res.getNextPageToken().size() > 0) {
            startNewWorkflowForEachId(workflowType, queryName, res.getNextPageToken());
        }
    }

When you call this method the first time you can pass null for the “token” input.

Thank you so much for your help.

This is what I ended up with later:

WorkflowServiceStubs service = WorkflowServiceStubs.newInstance();

ListOpenWorkflowExecutionsRequest openWorkflowsRequest =
  ListOpenWorkflowExecutionsRequest.newBuilder()
  .setNamespace("default")
  .setTypeFilter(WorkflowTypeFilter.newBuilder().setName("MyWorkflow"))
  .build();
ListOpenWorkflowExecutionsResponse openWorkflowsResponse =
  service.blockingStub().listOpenWorkflowExecutions(openWorkflowsRequest);

for (WorkflowExecutionInfo executionInfo: openWorkflowsResponse.getExecutionsList()) {
  String workflowId = executionInfo.getExecution().getWorkflowId();
  MyWorkflow runningWorkflow = client.newWorkflowStub(MyWorkflow.class, workflowId);
  int userId = runningWorkflow.getUserId();
  if (incomingUserId == userId) {
    //workflow already running for this user, do stuff
  }
}

I didn’t know about the pagination stuff and will try to account for it. Thanks again for your help!

@saifxhatem What you are doing is an anti-pattern. You are executing the List operation per workflow start, which will not scale as list operation is not as efficient as the workflow start. Also, the workflow index is eventually consistent. So your code has a race condition trying to start a workflow that is already running anyway.
Temporal out of the box ensures the uniqueness of workflows by their IDs. So I would recommend sending a signal (using SignalWithStart if needed) to perform some operations on the given workflow.