How to distinguish between parallel Promises that succeeded and failed?

Hi, I am running the following code. my questions are:

  1. in the get() operation, how can I divide the results to completed and failed in order to continue with the completed only for the next activity, and in order to notify about the failed ones?
  2. if for the activity bulkEditActivities, the maxRetries is 3, for example, what happens if one item fails and 10 other completed? will the retries be applied to the failed one only? and will the workflow wait for its retries before proceeding to the next activity?

        if (bulkUpdate?.data != null) {
            val promises = bulkUpdate.itemsToInclude?.map { sessionId ->
                Async.function {
                    bulkEditActivities.saveSessionToDB(
                        SessionEditRequest(sessionId = sessionId, actionType = bulkUpdate.actionType, data = bulkUpdate.data)
                    )
                }
            }
            
            Promise.allOf(promises).get()

Use Promise.getFailure. It returns null if a promise was completed without error.

Hi, Thanks for the quick reply.

  1. I found the getFailure(). thanks!
  2. what does trigger a retry on activity? if I do try\catch on an activity execution (from outside, as in the example above) will a retry be triggered?
  3. what does trigger a retry on workflow? I assume that only a try\catch that throws an exception or any uncaught exception on the workflow method, right?
  1. Activity throwing an exception causes the retry according to its retry options. An exception is not retried if its type is found in RetryOptions.doNotRetry list. An exception created through ApplicationFailure.newNonRetryableFailure is also not retried. Note that activities have a default retry options. The try-catch in the workflow receives an exception only after activity retries are exhausted.

  2. Workflow method throwing a subclass of TemporalFailure leads to a workflow failure. If a workflow was started with RetryOptions then it is retried. By default, workflows don’t have retry options. We don’t recommend retrying workflows in the majority of cases. Instead, you want to make your workflows robust to intermittent failures by retrying activities as long as needed. Throwing any other exception is not going to fail a workflow. It is going to block its execution. The workflow will be stuck in the workflow task retry loop waiting for the fix. Note that the retry options don’t apply to this scenario. You can also specify which exceptions fail workflow by specifying their types through WorkflowImplementationOptions.FailWorkflowExceptionTypes.

Thanks much for your reply!!

about what you wrote:

  1. activity. I assume what you write about exception holds for failures too, correct? namely, I will get a failure in my example only after the retries are exhausted.

  2. workflow. my use case is that a parent “on-going” workflow lives forever and gets actions which are saved in WorkflowQueue (by signal). then it executes them (as child workflows) one by one (in the future maybe in parallel). so if I don’t throw TemporalFailure (without catching it) and I don’t catch and swallow any exception, the queue will be halted and my app will be blocked. I think it does not fit my use-case and I should skip the failure to the next action. WDYT?

  1. I’m not sure what is “exception hold”. The workflow code will get an exception only after the activity retries are exhausted or an exception is non retryable.

  2. Make sure that the child throws an ApplicationFailure if you want it to fail.

  1. I mean that I will get promise.failure after retries are exhausted.

  2. ApplicationFailure = TemporalFailure?

  1. Yes, the exception is delivered through a Promise if you call an activity asynchronously.

  2. ApplicationFailure extends TemporalFailure.

Thanks! very helpful

Hi Maxim, other questions that appeared during implementation:

  1. the documentation says: Use WorkflowQueue instead of BlockingQueue. my question is: if the workflow is executed with a global lock in 1 thread at a time, why not to use just regular Queue structure? why do we need WorkflowQueue at all?

  2. in continue to ques (1), what if I’d like to store some data within a running-forever workflow (some list of objects which represent the state of the WF and used to respond queries on the WF)? can I just use regular structures (list\queue\hashmap), storing them in the WF scope, transparently to any synchronous\threading considerations?

Thanks,
Shai

  1. WorkflowQueue supports blocking take and put operations. All regular queue operations are non blocking.

  2. Yes, you can use these structures in the workflow code. moneybatch sample does use a Set for example.

thanks for your reply.

  1. a queue example is below. aren’t the add and remove blocking? I mean “Adam” is added only after “Jack” is added, aren’t they? what am I missing?
val names: Queue<String> = LinkedList<String>()
  
names.add("Jack")
names.add("Adam")
names.add("Katherin")
  1. thx for the example

  2. is there a way to retrieve in a query all child executions of a given parent?
    my query to find the parent looks like the following. I’d like to find all running children under this parent.

     val wfType = EVENT_WORKFLOW_TYPE
     val wfId = eventId.toString()
     val wfStatus = WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING_VALUE
     val query = "WorkflowType = '$wfType' and WorkflowId = '$wfId' and ExecutionStatus = $wfStatus"
    
     val listWorkflowExecutionRequest = ListWorkflowExecutionsRequest.newBuilder()
             .setNamespace(client.options.namespace)
             .setQuery(query)
             .build()
    

Thanks,
Shai

  1. BlockingQueue (and its Temporal counterpart WorkflowQueue) block on take as long as the queue is empty and on put as long as the queue is full.
  2. I would store the child workflows in a Map inside the parent workflow. Then use a query method to return that information.

great!! thx

another question:
is the Workflow’s RetryPolicy applied to the signal and query methods or only to the WorkflowMethod?

Thanks,
Shai

Workflow retries are modeled as new workflow executions (same workflowid, different runid) and the same workflow input data that was recorded in the failed workflow.
In the new executions first event (WorkflowExecutionStarted) you can also get the previous executions failure via the continuedFailure property.

Not sure about the signals/query question, but maybe you are thinking of the workflow reset feature where you can define a certain event in workflow history to reset to? In that case yes all signals would be replayed up to the set reset point.
Queries are not recorded in workflow history.

thanks tihomir.

another important question to maxi\tihomir.
I would like to have one of my activities as transactional, but in order to execute the following internal method, it must be “suspend”:

transactionalOperator.executeAndAwait {
        val dbSession = getSession(eventId, sessionId, true).toSession()
        val sessionToUpdate: SessionRequest = patchSessionRequest.toSessionRequest(dbSession)
        val event: Event = eventService.getEvent(eventId)

the problem is that going up through the calls stack, the activity should be “suspend” as well, and the calling of the activity by the workflow by “Async.function” is invalid:

                    Async.function {
                        bulkEditActivitiesDB.updateSessionToDB(
                            bulkId = bulkId,
                            SessionUpdateRequest(

with the error: “Suspend function should be called only from a coroutine”.

the question is how to combine correctly suspend and transactional with a workflow calling Async.function?

Thanks,
Shai

I’m not aware of any " suspend " feature in Temporal SDK. What is it?

it’s not in Temporal, it’s in Kotlin.

https://kotlinlang.org/docs/composing-suspending-functions.html#concurrent-using-async

Thanks,
Shai

I don’t think you can use Kotlin coroutines in the workflow code at this point. I’d love to provide Kotlin SDK that uses them, but it is not a trivial project.