How to support both Polling and Callback in a workflow?

Hi,

For my use case, I have an external service that my workflow needs to talk to. The external service supports two ways to know when its done processing the data:

  1. An API which I can query for status. The status will tell me if its done processing
  2. When its done processing, the service has the ability to trigger a callback if a url is provided

Goal: I was wondering if it’s possible to create a workflow that could support both the polling and callback push events.

Polling
I would create an activity to support talking to the external service. Within the activity, I can implement the polling of the external service for status.

Callback
To support callback, I can trigger the activity to start the process and then have the workflow wait for a callback signal which will be an indicator that the process is complete.

 Workflow.await(() -> this.isCurrentActivityComplete)

Both
It’s not clear to me how you would support both. Workflow have the ability to support callbacks, but it wasn’t clear if you would be able to trigger something like that in the activity or does it have to be at the workflow layer. If it’s at the workflow layer, how would the code look like if you wanted to do an OR on the two cases and which ever occurs first will end the activity.

Thanks,
Derek

You can execute multiple parallel activities in a workflow. So in your case I would start a polling activity and wait for its completion or the signal. See HelloAsync sample.

I get the part about the polling, but not about how to wait for completion.

 public String getGreeting(String name) {
      // Async.invoke takes method reference and activity parameters and returns Promise.
      Promise<String> hello = Async.function(activities::composeGreeting, "Hello", name);
      Promise<String> bye = Async.function(activities::composeGreeting, "Bye", name);

      // Promise is similar to the Java Future. Promise#get blocks until result is ready.
      return hello.get() + "\n" + bye.get();
    }

The wait logic is at the workflow layer. How would this be implemented at the activity layer? Do you have an example of how callbacks can be used to end activities via workflow signal?

Thanks,
Derek

So you are looking for a way to cancel an activity, right?

An activity must heartbeat to be cancellable. The cancellation from the workflow code is done using CancellationScope. Here is a relevant HelloCancellationScope sample.

That is one way to do it, but it would leave the history of the activity as cancelled. I’m looking for a way to complete an activity form a callback while the activity is also polling for status to complete. This means when i look at the history it says that the activity completed successfully and the callback might have results that can be viewed as output of the activity

I’m not sure I understand the reason for such a requirement. Signal already contains the operation result, so no need to record it again into the history for any reason.

I think conceptually i’m getting it, but i’m trying to wrap my head around the implementation on how you would connect the callback with the cancellation trigger.

Workflow

public class SampleWorkflow {
    public CancellationScope  scope;
    public Result callbackResults;
     private final SampleActivities activities =
        Workflow.newActivityStub(
            SampleActivities.class,
            ActivityOptions.newBuilder()
                .setScheduleToCloseTimeout(Duration.ofSeconds(100))
                .setCancellationType(ActivityCancellationType.WAIT_CANCELLATION_COMPLETED)
                .build());

      public void start() {
          Promise promise;
          scope =
          Workflow.newCancellationScope(
              () -> {
                  promise = Async.function(activities::start);
              });
          // As code inside the scope is non blocking the run doesn't block.
          scope.run();
          // Wait for one of the activities to complete.
          Promise.anyOf([promise]);
          String result = promise.get();
          
        if(scope.isCanceled()) {
              return callbackResults;
        } else {
             return results;
        }
}

  @Signal
  public void callback(Results result) {
        callbackResults = result;
        scope.cancel();

  }
}

Activity

public class SampleActivity {
    public void start() {
      while(true) {
                Thread.sleep(JOB_POLL_RATE_IN_MS);
                Status status = client.getStatus(pipelineId);

                if(status.isTerminal()) {
                    break;
                }

                Activity.getExecutionContext().heartbeat(status);
            }
    }
}

Here is the example of what I’m tryin to do:

  • Activity that just polls
  • Workflow starts, creates a cancellationscope and wraps the execution in a Promise.
  • Workflow starts the activity with promise.get() which effectively will block
  • The activity is still actively polling and then the workflow gets a signal
  • The signal gets triggered, stores the callback result and then will cancel the activity.

In the implementation I’m relying on the promise to block. Not sure how you would use

Workflow.await(() -> ...);

to block as shown in some of the examples with handling signal calls.

Does this look correct?

is redundant as promise.get blocks until the promise is ready.

Otherwise your code looks OK.

Do you see any issue with running this in a cancel scope? I’m getting an exception when it tries to execute stub.execute. It throws an Invocation exception with the detail message Operation allowed only while eventLoop is running. If I replace the stub.execute with a printout statement it’ll work so makes me wonder why it’s happening.

Note: I’m running this in a unit test, so i’m not sure if that has any effect.

    final ActivityStub stub =
        Workflow.newUntypedActivityStub(
            ActivityOptions.newBuilder()
                .setRetryOptions(RetryOptions.newBuilder().setDoNotRetry().build())
                .setStartToCloseTimeout(Duration.ofMinutes(5))
                .setTaskQueue(activity.getName())
                .build());
    AtomicReference<Promise<ActivityPayload>> activityExecutePromiseAtomic = new AtomicReference<>();

    activityCancelableScope =
            Workflow.newCancellationScope(
                    () -> {
                      Promise<ActivityPayload> activityPromise = stub.executeAsync(methodNameDescriptor, ActivityPayload.class, new Object[]{pipelineId, args});
                      activityExecutePromiseAtomic.set(activityPromise);
                    });

    // As code inside the scope is non blocking the run doesn't block.
    activityCancelableScope.run();
1 Like

@maxim is going to need to answer this!

Could you post the whole exception stack trace? The code you posted works fine for me.

While I was trying to recreate the issue with the java sample, I think i discover what the issue is. I have a workflow that tries to generically create activities and the code I wrote tried to associate one CancelableScope for all the activities I created. This caused the Operation allowed only while eventLoops. In the dsl example that I submitted in the PR has execute methods within each of the Parallel, Statement, ActivityInvocation. In my code, I took all the methods out and flatten it into the workflow. This made it more difficult to see the problem.

I did come across a different issue based on the logic discussed in earlier conversations. It looks like when i tried to cancel the scope from the signal, I get a CanceledFailed exception.

Here is the code base to reproduce the problem: https://github.com/darewreck54/temporal-java-samples/tree/callback

Flow:

  • Unit Test will register the workflow and activity, and starts the workflow
  • The workflow will create the activity in a cancelable scope as an async execution. The activity method that gets triggered will run infinitely
  • The promise for the async execution gets started which will block
  • The unit test will then trigger the scope to cancel. This throws an exception CanceledFailed. When i looked at the details of the exception.
Optional[message: "Activity canceled"
cause {
  source: "JavaSDK"
  canceled_failure_info {
  }
}
activity_failure_info {
  scheduled_event_id: 5
  identity: "workflow"
  activity_type {
    name: "SampleActivities1GetInfo"
  }
  activity_id: "94a134c3-1500-35f9-95a4-20a00bee9814"
}
]

Note:

  • In my use case, I have a workflow that generically creates predefined activities. I want to be able to support signal which can cancel the specific activities. The way i’m achieving this is have a mapping based on the activity name to the cancellable scope. This is probably not the best way to do this. Is there a way to get a unique identifier for the executed activity instance. I notice that if you are in the activity you can call the activity context to get the activityId, but is it possible to get after you create the ActivityStub or be able to set your own activityID?
 ActivityStub stub =
        Workflow.newUntypedActivityStub(
            ActivityOptions.newBuilder()
                .setRetryOptions(RetryOptions.newBuilder().setMaximumAttempts(1).build())
                .setStartToCloseTimeout(Duration.ofMinutes(5))
                .setTaskQueue(this.name)
                .build());
  • I do notice that in the cancelable example you pointed out, that it configures a cancellationType but it’s not clear what the type would be in this case since the activity is actually polling and it’s possible that a signal could come in to complete the activity.
.setCancellationType(ActivityCancellationType.WAIT_CANCELLATION_COMPLETED)
  • I also noticed that if i didn’t use a heartbeat in the activity, I would get an ActivityShutdownException which would get propagated downstream becoming a NullPointerException with no stacktrace which I couldn’t determine what the null pointer was referring to.

I’m also open if there is a better way to achieve this.
Thanks

I believe cancellation works fine, but the workflow code that handles exceptions has bugs. Here is my attempt to fix it.

Make sure to use the latest version of the SDK. You are on a pre-release version.

Thanks Maxim, for some reason I thought the canceledFailure meant that it failed to cancel not that the cancel was successful.

I took your advice an address the current issues you pointed out. When I run it through a full workflow, from the history stack it looks like it’s processing everything, but the workflowExecutionComplete is never triggered. Is there something specific that I need to call to end the workflow. i thought it would just naturally end since that is what I saw initially before I started wrapping all the activity execution in cancellable scopes workflow logic which gets executed with this main File to start the workflow

Here is the execution json.

[
   {
      "eventId":"1",
      "eventTime":{
         "seconds":"1605870154",
         "nanos":5909000
      },
      "eventType":"EVENT_TYPE_WORKFLOW_EXECUTION_STARTED",
      "version":"0",
      "taskId":"2097244",
      "workflowExecutionStartedEventAttributes":{
         "workflowType":{
            "name":"SimpleDSLWorkflow"
         },
         "parentWorkflowNamespace":"",
         "parentInitiatedEventId":"0",
         "taskQueue":{
            "name":"dsl",
            "kind":"TASK_QUEUE_KIND_NORMAL"
         },
         "input":{
            "payloads":[
               {
                  "metadata":{
                     "encoding":"anNvbi9wbGFpbg=="
                  },
                  "data":"eyJ2YXJpYWJsZXMiOnsiYXJnMSI6InZhbHVlMSIsImFyZzIiOiJ2YWx1ZTIifSwicm9vdCI6eyJzZXF1ZW5jZSI6eyJlbGVtZW50cyI6W3sic2VxdWVuY2UiOm51bGwsImFjdGl2aXR5Ijp7Im5hbWUiOiJTYW1wbGVBY3Rpdml0aWVzMSIsIm1ldGhvZCI6ImdldEluZm8iLCJhcmd1bWVudHMiOlsiYXJnMSJdLCJyZXN1bHQiOiJyZXN1bHQxIiwic3VwcG9ydENhbGxiYWNrIjp0cnVlfSwicGFyYWxsZWwiOm51bGx9LHsic2VxdWVuY2UiOm51bGwsImFjdGl2aXR5Ijp7Im5hbWUiOiJTYW1wbGVBY3Rpdml0aWVzMiIsIm1ldGhvZCI6ImdldEluZm8iLCJhcmd1bWVudHMiOlsicmVzdWx0MSJdLCJyZXN1bHQiOiJyZXN1bHQyIiwic3VwcG9ydENhbGxiYWNrIjpmYWxzZX0sInBhcmFsbGVsIjpudWxsfV19LCJhY3Rpdml0eSI6bnVsbCwicGFyYWxsZWwiOm51bGx9fQ=="
               }
            ]
         },
         "workflowExecutionTimeout":{
            "seconds":"315360000",
            "nanos":0
         },
         "workflowRunTimeout":{
            "seconds":"315360000",
            "nanos":0
         },
         "workflowTaskTimeout":{
            "seconds":"10",
            "nanos":0
         },
         "continuedExecutionRunId":"",
         "initiator":"CONTINUE_AS_NEW_INITIATOR_UNSPECIFIED",
         "originalExecutionRunId":"82211630-9398-4f7e-9ec3-18bb567e58cd",
         "identity":"47260@C02C71F6MD6V",
         "firstExecutionRunId":"82211630-9398-4f7e-9ec3-18bb567e58cd",
         "retryPolicy":{
            "nonRetryableErrorTypes":[
               
            ],
            "initialInterval":{
               "seconds":"1",
               "nanos":0
            },
            "backoffCoefficient":2,
            "maximumInterval":{
               "seconds":"100",
               "nanos":0
            },
            "maximumAttempts":1
         },
         "attempt":1,
         "workflowExecutionExpirationTime":{
            "seconds":"1921230154",
            "nanos":5000000
         },
         "cronSchedule":"",
         "firstWorkflowTaskBackoff":{
            "seconds":"0",
            "nanos":0
         }
      },
      "attributes":"workflowExecutionStartedEventAttributes"
   },
   {
      "eventId":"2",
      "eventTime":{
         "seconds":"1605870154",
         "nanos":6019700
      },
      "eventType":"EVENT_TYPE_WORKFLOW_TASK_SCHEDULED",
      "version":"0",
      "taskId":"2097245",
      "workflowTaskScheduledEventAttributes":{
         "taskQueue":{
            "name":"dsl",
            "kind":"TASK_QUEUE_KIND_NORMAL"
         },
         "startToCloseTimeout":{
            "seconds":"10",
            "nanos":0
         },
         "attempt":1
      },
      "attributes":"workflowTaskScheduledEventAttributes"
   },
   {
      "eventId":"3",
      "eventTime":{
         "seconds":"1605870154",
         "nanos":25772600
      },
      "eventType":"EVENT_TYPE_WORKFLOW_TASK_STARTED",
      "version":"0",
      "taskId":"2097250",
      "workflowTaskStartedEventAttributes":{
         "scheduledEventId":"2",
         "identity":"47260@C02C71F6MD6V",
         "requestId":"cf5062d4-7e04-47a0-b0ea-521193cb282d"
      },
      "attributes":"workflowTaskStartedEventAttributes"
   },
   {
      "eventId":"4",
      "eventTime":{
         "seconds":"1605870154",
         "nanos":403710200
      },
      "eventType":"EVENT_TYPE_WORKFLOW_TASK_COMPLETED",
      "version":"0",
      "taskId":"2097253",
      "workflowTaskCompletedEventAttributes":{
         "scheduledEventId":"2",
         "startedEventId":"3",
         "identity":"47260@C02C71F6MD6V",
         "binaryChecksum":""
      },
      "attributes":"workflowTaskCompletedEventAttributes"
   },
   {
      "eventId":"5",
      "eventTime":{
         "seconds":"1605870154",
         "nanos":403746200
      },
      "eventType":"EVENT_TYPE_ACTIVITY_TASK_SCHEDULED",
      "version":"0",
      "taskId":"2097254",
      "activityTaskScheduledEventAttributes":{
         "activityId":"b01e3d4f-5af3-3868-b0e4-64a6da86bba2",
         "activityType":{
            "name":"SampleActivities1GetInfo"
         },
         "namespace":"",
         "taskQueue":{
            "name":"SampleActivities1",
            "kind":"TASK_QUEUE_KIND_NORMAL"
         },
         "input":{
            "payloads":[
               {
                  "metadata":{
                     "encoding":"anNvbi9wbGFpbg=="
                  },
                  "data":"WyJ2YWx1ZTEiXQ=="
               }
            ]
         },
         "scheduleToCloseTimeout":{
            "seconds":"315360000",
            "nanos":0
         },
         "scheduleToStartTimeout":{
            "seconds":"315360000",
            "nanos":0
         },
         "startToCloseTimeout":{
            "seconds":"300",
            "nanos":0
         },
         "heartbeatTimeout":{
            "seconds":"0",
            "nanos":0
         },
         "workflowTaskCompletedEventId":"4",
         "retryPolicy":{
            "nonRetryableErrorTypes":[
               
            ],
            "initialInterval":{
               "seconds":"1",
               "nanos":0
            },
            "backoffCoefficient":2,
            "maximumInterval":{
               "seconds":"100",
               "nanos":0
            },
            "maximumAttempts":1
         }
      },
      "attributes":"activityTaskScheduledEventAttributes"
   },
   {
      "eventId":"6",
      "eventTime":{
         "seconds":"1605870154",
         "nanos":44722500
      },
      "eventType":"EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED",
      "version":"0",
      "taskId":"2097255",
      "workflowExecutionSignaledEventAttributes":{
         "signalName":"callback",
         "input":{
            "payloads":[
               {
                  "metadata":{
                     "encoding":"anNvbi9wbGFpbg=="
                  },
                  "data":"IlNhbXBsZUFjdGl2aXRpZXMxIg=="
               },
               {
                  "metadata":{
                     "encoding":"anNvbi9wbGFpbg=="
                  },
                  "data":"Im5ldyB2YWx1ZSI="
               }
            ]
         },
         "identity":"47260@C02C71F6MD6V"
      },
      "attributes":"workflowExecutionSignaledEventAttributes"
   },
   {
      "eventId":"7",
      "eventTime":{
         "seconds":"1605870154",
         "nanos":403817700
      },
      "eventType":"EVENT_TYPE_WORKFLOW_TASK_SCHEDULED",
      "version":"0",
      "taskId":"2097259",
      "workflowTaskScheduledEventAttributes":{
         "taskQueue":{
            "name":"47260@C02C71F6MD6V:cb09d401-477c-4860-bb7d-95779f3e27a3",
            "kind":"TASK_QUEUE_KIND_STICKY"
         },
         "startToCloseTimeout":{
            "seconds":"10",
            "nanos":0
         },
         "attempt":1
      },
      "attributes":"workflowTaskScheduledEventAttributes"
   },
   {
      "eventId":"8",
      "eventTime":{
         "seconds":"1605870154",
         "nanos":413596600
      },
      "eventType":"EVENT_TYPE_WORKFLOW_TASK_STARTED",
      "version":"0",
      "taskId":"2097265",
      "workflowTaskStartedEventAttributes":{
         "scheduledEventId":"7",
         "identity":"cb09d401-477c-4860-bb7d-95779f3e27a3",
         "requestId":"87e54c1f-6090-4b0f-b8cc-6232d5bfa183"
      },
      "attributes":"workflowTaskStartedEventAttributes"
   },
   {
      "eventId":"9",
      "eventTime":{
         "seconds":"1605870154",
         "nanos":445071500
      },
      "eventType":"EVENT_TYPE_WORKFLOW_TASK_COMPLETED",
      "version":"0",
      "taskId":"2097268",
      "workflowTaskCompletedEventAttributes":{
         "scheduledEventId":"7",
         "startedEventId":"8",
         "identity":"47260@C02C71F6MD6V",
         "binaryChecksum":""
      },
      "attributes":"workflowTaskCompletedEventAttributes"
   },
   {
      "eventId":"10",
      "eventTime":{
         "seconds":"1605870154",
         "nanos":445093000
      },
      "eventType":"EVENT_TYPE_ACTIVITY_TASK_CANCEL_REQUESTED",
      "version":"0",
      "taskId":"2097269",
      "activityTaskCancelRequestedEventAttributes":{
         "scheduledEventId":"5",
         "workflowTaskCompletedEventId":"9"
      },
      "attributes":"activityTaskCancelRequestedEventAttributes"
   },
   {
      "eventId":"11",
      "eventTime":{
         "seconds":"1605870154",
         "nanos":445133200
      },
      "eventType":"EVENT_TYPE_ACTIVITY_TASK_SCHEDULED",
      "version":"0",
      "taskId":"2097270",
      "activityTaskScheduledEventAttributes":{
         "activityId":"1931e323-0b89-39fc-a99e-a543249bf6be",
         "activityType":{
            "name":"SampleActivities2GetInfo"
         },
         "namespace":"",
         "taskQueue":{
            "name":"SampleActivities2",
            "kind":"TASK_QUEUE_KIND_NORMAL"
         },
         "input":{
            "payloads":[
               {
                  "metadata":{
                     "encoding":"anNvbi9wbGFpbg=="
                  },
                  "data":"WyJuZXcgdmFsdWUiXQ=="
               }
            ]
         },
         "scheduleToCloseTimeout":{
            "seconds":"315360000",
            "nanos":0
         },
         "scheduleToStartTimeout":{
            "seconds":"315360000",
            "nanos":0
         },
         "startToCloseTimeout":{
            "seconds":"300",
            "nanos":0
         },
         "heartbeatTimeout":{
            "seconds":"0",
            "nanos":0
         },
         "workflowTaskCompletedEventId":"9",
         "retryPolicy":{
            "nonRetryableErrorTypes":[
               
            ],
            "initialInterval":{
               "seconds":"1",
               "nanos":0
            },
            "backoffCoefficient":2,
            "maximumInterval":{
               "seconds":"100",
               "nanos":0
            },
            "maximumAttempts":1
         }
      },
      "attributes":"activityTaskScheduledEventAttributes"
   },
   {
      "eventId":"12",
      "eventTime":{
         "seconds":"1605870154",
         "nanos":445110800
      },
      "eventType":"EVENT_TYPE_ACTIVITY_TASK_CANCELED",
      "version":"0",
      "taskId":"2097271",
      "activityTaskCanceledEventAttributes":{
         "details":{
            "payloads":[
               {
                  "metadata":{
                     "encoding":"anNvbi9wbGFpbg=="
                  },
                  "data":"IkFDVElWSVRZX0lEX05PVF9TVEFSVEVEIg=="
               }
            ]
         },
         "latestCancelRequestedEventId":"10",
         "scheduledEventId":"5",
         "startedEventId":"0",
         "identity":"47260@C02C71F6MD6V"
      },
      "attributes":"activityTaskCanceledEventAttributes"
   },
   {
      "eventId":"13",
      "eventTime":{
         "seconds":"1605870154",
         "nanos":445204200
      },
      "eventType":"EVENT_TYPE_WORKFLOW_TASK_SCHEDULED",
      "version":"0",
      "taskId":"2097276",
      "workflowTaskScheduledEventAttributes":{
         "taskQueue":{
            "name":"47260@C02C71F6MD6V:cb09d401-477c-4860-bb7d-95779f3e27a3",
            "kind":"TASK_QUEUE_KIND_STICKY"
         },
         "startToCloseTimeout":{
            "seconds":"10",
            "nanos":0
         },
         "attempt":1
      },
      "attributes":"workflowTaskScheduledEventAttributes"
   },
   {
      "eventId":"14",
      "eventTime":{
         "seconds":"1605870154",
         "nanos":456212400
      },
      "eventType":"EVENT_TYPE_WORKFLOW_TASK_STARTED",
      "version":"0",
      "taskId":"2097282",
      "workflowTaskStartedEventAttributes":{
         "scheduledEventId":"13",
         "identity":"cb09d401-477c-4860-bb7d-95779f3e27a3",
         "requestId":"dc7bd019-2e89-40d6-875b-c1bb9f8795c9"
      },
      "attributes":"workflowTaskStartedEventAttributes"
   },
   {
      "eventId":"15",
      "eventTime":{
         "seconds":"1605870154",
         "nanos":471067300
      },
      "eventType":"EVENT_TYPE_WORKFLOW_TASK_COMPLETED",
      "version":"0",
      "taskId":"2097285",
      "workflowTaskCompletedEventAttributes":{
         "scheduledEventId":"13",
         "startedEventId":"14",
         "identity":"47260@C02C71F6MD6V",
         "binaryChecksum":""
      },
      "attributes":"workflowTaskCompletedEventAttributes"
   }
]

However, when I run the unit test I do see a EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED which is confusing since its executing the same workflow and dsl configuraiton.

Stored Workflows:
ExecutionId{namespace='default', execution=workflow_id: "645f9505-7a4a-4712-b534-3cfc5d2f1a4b"
run_id: "11a6e990-5bbc-4ee8-a893-0b042dd64613"
}
event_id: 1
event_time {
  seconds: 1605870573
  nanos: 502000000
}
event_type: EVENT_TYPE_WORKFLOW_EXECUTION_STARTED
workflow_execution_started_event_attributes {
  workflow_type {
    name: "SimpleDSLWorkflow"
  }
  task_queue {
    name: "dsl"
  }
  input {
    payloads {
      metadata {
        key: "encoding"
        value: "json/plain"
      }
      data: "{\"variables\":{\"arg1\":\"value1\",\"arg2\":\"value2\"},\"root\":{\"sequence\":{\"elements\":[{\"sequence\":null,\"activity\":{\"name\":\"SampleActivities1\",\"method\":\"getInfo\",\"arguments\":[\"arg1\"],\"result\":\"result1\",\"supportCallback\":true},\"parallel\":null},{\"sequence\":null,\"activity\":{\"name\":\"SampleActivities2\",\"method\":\"getInfo\",\"arguments\":[\"result1\"],\"result\":\"result2\",\"supportCallback\":false},\"parallel\":null}]},\"activity\":null,\"parallel\":null}}"
    }
  }
  workflow_execution_timeout {
    seconds: 315360000
  }
  workflow_run_timeout {
    seconds: 315360000
  }
  workflow_task_timeout {
    seconds: 10
  }
  original_execution_run_id: "11a6e990-5bbc-4ee8-a893-0b042dd64613"
  identity: "47440@C02C71F6MD6V"
  attempt: 1
}
event_id: 2
event_time {
  seconds: 1605870573
  nanos: 502000000
}
event_type: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED
workflow_task_scheduled_event_attributes {
  task_queue {
    name: "dsl"
  }
  start_to_close_timeout {
    seconds: 10
  }
  attempt: 1
}
event_id: 3
event_time {
  seconds: 1605870573
  nanos: 510000000
}
event_type: EVENT_TYPE_WORKFLOW_TASK_STARTED
workflow_task_started_event_attributes {
  scheduled_event_id: 2
  identity: "47440@C02C71F6MD6V"
}
event_id: 4
event_time {
  seconds: 1605870573
  nanos: 647000000
}
event_type: EVENT_TYPE_WORKFLOW_TASK_COMPLETED
workflow_task_completed_event_attributes {
  scheduled_event_id: 2
  identity: "47440@C02C71F6MD6V"
}
event_id: 5
event_time {
  seconds: 1605870573
  nanos: 647000000
}
event_type: EVENT_TYPE_ACTIVITY_TASK_SCHEDULED
activity_task_scheduled_event_attributes {
  activity_id: "7acbc1c6-19d9-300d-9967-f761932e8b21"
  activity_type {
    name: "SampleActivities1GetInfo"
  }
  namespace: "default"
  task_queue {
    name: "SampleActivities1"
  }
  header {
  }
  input {
    payloads {
      metadata {
        key: "encoding"
        value: "json/plain"
      }
      data: "[\"value1\"]"
    }
  }
  schedule_to_close_timeout {
    seconds: 315360000
  }
  schedule_to_start_timeout {
    seconds: 315360000
  }
  start_to_close_timeout {
    seconds: 300
  }
  heartbeat_timeout {
  }
  workflow_task_completed_event_id: 3
  retry_policy {
    initial_interval {
      seconds: 1
    }
    backoff_coefficient: 2.0
    maximum_interval {
      seconds: 100
    }
    maximum_attempts: 1
  }
}
event_id: 6
event_time {
  seconds: 1605870603
  nanos: 516000000
}
event_type: EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED
workflow_execution_signaled_event_attributes {
  signal_name: "callback"
  input {
    payloads {
      metadata {
        key: "encoding"
        value: "json/plain"
      }
      data: "\"SampleActivities1\""
    }
    payloads {
      metadata {
        key: "encoding"
        value: "json/plain"
      }
      data: "\"hello\""
    }
  }
  identity: "47440@C02C71F6MD6V"
}
event_id: 7
event_time {
  seconds: 1605870603
  nanos: 516000000
}
event_type: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED
workflow_task_scheduled_event_attributes {
  task_queue {
    name: "dsl"
  }
  start_to_close_timeout {
    seconds: 10
  }
  attempt: 1
}
event_id: 8
event_time {
  seconds: 1605870603
  nanos: 517000000
}
event_type: EVENT_TYPE_WORKFLOW_TASK_STARTED
workflow_task_started_event_attributes {
  scheduled_event_id: 7
  identity: "47440@C02C71F6MD6V"
}
event_id: 9
event_time {
  seconds: 1605870603
  nanos: 548000000
}
event_type: EVENT_TYPE_WORKFLOW_TASK_COMPLETED
workflow_task_completed_event_attributes {
  scheduled_event_id: 7
  identity: "47440@C02C71F6MD6V"
}
event_id: 10
event_time {
  seconds: 1605870603
  nanos: 548000000
}
event_type: EVENT_TYPE_ACTIVITY_TASK_CANCEL_REQUESTED
activity_task_cancel_requested_event_attributes {
  scheduled_event_id: 5
  workflow_task_completed_event_id: 8
}
event_id: 11
event_time {
  seconds: 1605870603
  nanos: 548000000
}
event_type: EVENT_TYPE_ACTIVITY_TASK_SCHEDULED
activity_task_scheduled_event_attributes {
  activity_id: "8ac23950-9846-36c5-b8be-db49ce6c9660"
  activity_type {
    name: "SampleActivities2GetInfo"
  }
  namespace: "default"
  task_queue {
    name: "SampleActivities2"
  }
  header {
  }
  input {
    payloads {
      metadata {
        key: "encoding"
        value: "json/plain"
      }
      data: "[\"hello\"]"
    }
  }
  schedule_to_close_timeout {
    seconds: 315360000
  }
  schedule_to_start_timeout {
    seconds: 315360000
  }
  start_to_close_timeout {
    seconds: 300
  }
  heartbeat_timeout {
  }
  workflow_task_completed_event_id: 8
  retry_policy {
    initial_interval {
      seconds: 1
    }
    backoff_coefficient: 2.0
    maximum_interval {
      seconds: 100
    }
    maximum_attempts: 1
  }
}
event_id: 12
event_time {
  seconds: 1605870603
  nanos: 548000000
}
event_type: EVENT_TYPE_ACTIVITY_TASK_STARTED
activity_task_started_event_attributes {
  scheduled_event_id: 11
  identity: "47440@C02C71F6MD6V"
  attempt: 1
}
event_id: 13
event_time {
  seconds: 1605870603
  nanos: 552000000
}
event_type: EVENT_TYPE_ACTIVITY_TASK_COMPLETED
activity_task_completed_event_attributes {
  result {
    payloads {
      metadata {
        key: "encoding"
        value: "json/plain"
      }
      data: "\"Result_SampleActivities2GetInfo\""
    }
  }
  scheduled_event_id: 11
  started_event_id: 12
  identity: "47440@C02C71F6MD6V"
}
event_id: 14
event_time {
  seconds: 1605870603
  nanos: 552000000
}
event_type: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED
workflow_task_scheduled_event_attributes {
  task_queue {
    name: "dsl"
  }
  start_to_close_timeout {
    seconds: 10
  }
  attempt: 1
}
event_id: 15
event_time {
  seconds: 1605870603
  nanos: 553000000
}
event_type: EVENT_TYPE_WORKFLOW_TASK_STARTED
workflow_task_started_event_attributes {
  scheduled_event_id: 14
  identity: "20981835-d020-4bf3-b89a-2714285136ac"
}
event_id: 16
event_time {
  seconds: 1605870603
  nanos: 559000000
}
event_type: EVENT_TYPE_WORKFLOW_TASK_COMPLETED
workflow_task_completed_event_attributes {
  scheduled_event_id: 14
  identity: "47440@C02C71F6MD6V"
}
event_id: 17
event_time {
  seconds: 1605870603
  nanos: 559000000
}
event_type: EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED
workflow_execution_completed_event_attributes {
  result {
  }
  workflow_task_completed_event_id: 15
}


I created a branch that produces the error: Branch.

I also noticed that if I try to use the debugger in the unit test or while running against a temporal service, I can flakiness in terms of the behavior. I’m not sure if it’s a result of the different threads that get spin up, but I find that I need to just use print statements to know whats going on. An example of the type of error is like I would randomly get event loop issue with the stack trace

java.lang.IllegalStateException: Operation allowed only while eventLoop is running
	at io.temporal.internal.statemachines.WorkflowStateMachines.checkEventLoopExecuting(WorkflowStateMachines.java:904)
	at io.temporal.internal.statemachines.WorkflowStateMachines.randomUUID(WorkflowStateMachines.java:600)
	at io.temporal.internal.replay.ReplayWorkflowContextImpl.scheduleActivityTask(ReplayWorkflowContextImpl.java:214)
	at io.temporal.internal.sync.SyncWorkflowContext.executeActivityOnce(SyncWorkflowContext.java:183)
	at io.temporal.internal.sync.SyncWorkflowContext.executeActivity(SyncWorkflowContext.java:171)
	at io.temporal.internal.sync.ActivityStubImpl.executeAsync(ActivityStubImpl.java:47)
	at io.temporal.internal.sync.ActivityStubBase.executeAsync(ActivityStubBase.java:56)
	at io.temporal.samples.dsl.models.ActivityInvocation.lambda$execute$0(ActivityInvocation.java:65)
	at io.temporal.samples.dsl.models.ActivityInvocation$$Lambda$179/0000000000000000.run(Unknown Source)
	at io.temporal.internal.sync.CancellationScopeImpl.run(CancellationScopeImpl.java:104)
	at io.temporal.samples.dsl.models.ActivityInvocation.execute(ActivityInvocation.java:70)
	at io.temporal.samples.dsl.models.Statement.execute(Statement.java:36)
	at io.temporal.samples.dsl.models.Sequence.execute(Sequence.java:21)
	at io.temporal.samples.dsl.models.Statement.execute(Statement.java:32)
	at io.temporal.samples.dsl.SimpleDSLWorkflowImpl.execute(SimpleDSLWorkflowImpl.java:22)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:567)
	at io.temporal.internal.sync.POJOWorkflowImplementationFactory$POJOWorkflowImplementation$RootWorkflowInboundCallsInterceptor.execute(POJOWorkflowImplementationFactory.java:289)
	at io.temporal.internal.sync.POJOWorkflowImplementationFactory$POJOWorkflowImplementation.execute(POJOWorkflowImplementationFactory.java:253)
	at io.temporal.internal.sync.WorkflowExecuteRunnable.run(WorkflowExecuteRunnable.java:52)
	at io.temporal.internal.sync.SyncWorkflow.lambda$start$0(SyncWorkflow.java:121)
	at io.temporal.internal.sync.SyncWorkflow$$Lambda$177/0000000000000000.run(Unknown Source)
	at io.temporal.internal.sync.CancellationScopeImpl.run(CancellationScopeImpl.java:104)
	at io.temporal.internal.sync.WorkflowThreadImpl$RunnableWrapper.run(WorkflowThreadImpl.java:107)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:847)

Also any suggestions on the parts called out in the notes

  • In my use case, I have a workflow that generically creates predefined activities. I want to be able to support signal which can cancel the specific activities. The way i’m achieving this is have a mapping based on the activity name to the cancellable scope. This is probably not the best way to do this. Is there a way to get a unique identifier for the executed activity instance. I notice that if you are in the activity you can call the activity context to get the activityId, but is it possible to get after you create the ActivityStub or be able to set your own activityID?
 ActivityStub stub =
        Workflow.newUntypedActivityStub(
            ActivityOptions.newBuilder()
                .setRetryOptions(RetryOptions.newBuilder().setMaximumAttempts(1).build())
                .setStartToCloseTimeout(Duration.ofMinutes(5))
                .setTaskQueue(this.name)
                .build());
  • What does setting the cancellationType do for the activity?

Hey, I am thinking on the similar problem, but cant find a proper activity/workflow design. Problem is similar: reliably get result from external service by callback and polling. In case callback fail, polling will kick in after initial timeout to check the result of external service.
Unfortunately, code examples on the github are not available anymore, so I am looking for an advice.

I see the flow like this:

ServiceWorkflow
→ ServiceActivity - send data to external service with of workflow execution callback
→ PollingActivity (or workflow?) - scheduled to fire after 10 minutes
→ ResultActivity - async? activity, that waits ?signal? or ?command? from the callback or PollingActivity and then process the result.

I dont know if this will work or not, but:

  1. Is it better to use Activity or Workflow for polling to not block the worker?
  2. How to properly organise polling in activity without blocking the worker and throwing failure errors?
  3. Am I right to use async activity as a blocker?

Or should I do this differently?

You don’t need ResultActivity. The callback handler should signal workflow.

Thanks for the answer, and how I can tell workflow / activity to wait for the signal?

You don’t wait for signals in activities.

Inside a workflow, a standard approach is to call Workflow.await on a field updated from a signal handler.

Here is an example.

1 Like

Will it block the thread and wait? Or will wait for “command” from server to continue without eating concurrency?

It will not use worker resources if blocked for a long time.