Signalling system - Human driven workflows

Might be totally off on this but the current Signal* system in Temporal works in a fire-and-forget way.
Also there might be an overlap/misunderstanding on when to use signals/async complete activities.

I’ll layout a generic scenario below;

  1. user submits email address in a form → OTP is generated + Start Workflow execution
  2. user is sent an SMS with OTP
    this can fail - if it fails internally temporal will retry :white_check_mark:
    if it passes but user doesn’t receive SMS because operator felt like not sending it - we want the user to be able to request it again :x:
  3. first signal – with help of Maxim I was able to figure this out. this signal can be fire and forget.
  4. user receives OTP and enters OTP
  5. second signal – when we receive this signal the next activity is to verify the OTP. there’s no point in retrying if this fails because it’s user error - so lets say we detect this; we can’t get a hold of the response from the activity and send back to the user.
    we want to get the result of the activity, if it fails respond back to the user and say wrong OTP and then allow them to request a new OTP (first signal) and then enter the OTP again (second signal).

this usecase also won’t work with async complete activity

3 Likes

What you need is synchronous update, similar to the query, but with the ability to mutate the workflow state.

We have plans in adding such a feature, but in the meantime, there are a couple of workarounds to achieve what you want.

Signal then Query

The simplest approach is to signal workflow and then query it periodically until query will indicate that the request is successful or failed. Temporal guarantees read after write consistency of signal and then query pair.
The main drawback of this approach is that it introduces additional latency as query polling interval cannot be very low.

Use Local Activity to Notify

A little bit more complex approach is to use a local activity to unblock the caller.
Here is the sequence of steps to implement this:

  • A process that initiates the request needs to expose an endpoint to complete the request.
  • The synchronous request first sends a signal to the workflow. The signal arguments include a unique requestId as well as host and port of the client endpoint. Then it creates a channel (or CompletableFuture in case of JavaSDK) and inserts it into a map using requestId as a key. Then the requesting thread blocks on this channel receive (or Future.get).
  • Upon receiving the signal the workflow performs all the necessary activities and state transitions. Then to reply the workflow invokes the deliver result local activity. This activity invokes the complete request endpoint using the host and port and passing requestId as one of the arguments.
  • As the host and port which are specific to the process are used the request handler is executed in the same process that sent the original signal. Then it gets from the map the channel (or CompletableFuture) that the original requested thread is blocked on and completes it with the result of the request received as another activity argument.
  • The original thread is unblocked by receiving the request result and can continue its execution.
5 Likes

just curious, (using local activity to update) ,is this how the out of box synchronous signals (with reply) be implemented in the future version of temporal?

just curious, (using local activity to update) ,is this how the out of box synchronous signals (with reply) be implemented in the future version of temporal?

It is not 100% designed yet. I believe that the most useful API is a synchronous long poll request to the Temporal service. The open question is about reconnection behavior if the caller process crashed. Some of the options are:

  1. Caller has to make another request
  2. Caller can reconnect by request id. The problem with this approach is deciding when to remove the response as it is not clear if it was consumed or not. And keeping it around for the lifetime of a workflow is expensive.
  3. Updates cannot block the workflow code. This way response is delivered through a long poll (not implemented yet) query. This has benefit over 2 as response lifecycle is managed by the workflow implementor. But it complicates the life of the workflow developer.
  4. Updates are delivered asynchronously through some activity.

We’ll post the design proposal once we get to this.

1 Like

I think it is somehow related. I am trying to do similar thing: mutate workflow state with signals, however, I do not need those updates to be synchronous, but with guaranteed sequence. And it looks like Temporal is not sequencing signals by time they received. I have small test and if there are small delays then between signals then test fails because signals come to WF out of sequence, with 50 milliseconds or more, all is well. So, does Temporal have any guarantees that events delivered to workflow in the order of their arrival?

//with delay 0 or 5 test fails, with 50 or more it passes
     long delay = 50;
       WorkflowExecution wfInfo =  WorkflowClient.start( newWf::run,  fsmDefinition);
        DynamicFSMWorkflow workflow = client.newWorkflowStub( DynamicFSMWorkflow.class, wfInfo.getWorkflowId());
        assertEquals( "progress", workflow.currentState().name);
        workflow.receivedEvent( new MismoEvent( MismoEventType.Pause));
        Thread.sleep(delay);
        assertEquals( "paused", workflow.currentState().name);
        workflow.receivedEvent( new MismoEvent( MismoEventType.UpdateStatus));
        Thread.sleep(delay);
        assertEquals( "progress", workflow.currentState().name);

        workflow.receivedEvent( new MismoEvent( MismoEventType.Done));
        Thread.sleep(delay);
        assertEquals( "done", workflow.currentState().name);
        Thread.sleep(delay);
        verify( wfActivitiesMock,times(2)).cancelTimer(any(),any());
        verify( wfActivitiesMock,times(1)).setTimer(any(),any(),any());

Signals are guaranteed to be delivered in the same order. Do you see them in the event history in the same order?

I use unit test for now, and see that they come out of order if “delay” is less than 5, maybe it is something specific to the test environment. Will try against server, then reply back.

same behavior with real server, here are the histories, first one is correct, that is when there is 500ms pause between signals


and this is incorrect, without pauses between signals

junit log shows that FSM state is incorrect

with delays in between signals the happy path works as expected

I don’t see difference in signal ordering between the both attached histories:

Correct sequence:

  {
    "eventId": "5",
    "eventTime": {
      "seconds": "1606518090",
      "nanos": 116367543
    },
    "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED",
    "version": "0",
    "taskId": "7340053",
    "workflowExecutionSignaledEventAttributes": {
      "signalName": "receivedEvent",
      "input": {
        "payloads": [
          {
            "metadata": {
              "encoding": "json/plain"
            },
            "data": "{"eventType":"Pause"}"
          }
        ]
      },
      "identity": "86270@Konstantins-iMac.local"
    },
    "attributes": "workflowExecutionSignaledEventAttributes"
  },
...
  {
    "eventId": "21",
    "eventTime": {
      "seconds": "1606518090",
      "nanos": 643777548
    },
    "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED",
    "version": "0",
    "taskId": "7340098",
    "workflowExecutionSignaledEventAttributes": {
      "signalName": "receivedEvent",
      "input": {
        "payloads": [
          {
            "metadata": {
              "encoding": "json/plain"
            },
            "data": "{"eventType":"UpdateStatus"}"
          }
        ]
      },
      "identity": "86270@Konstantins-iMac.local"
    },
    "attributes": "workflowExecutionSignaledEventAttributes"
  },
...
  {
    "eventId": "31",
    "eventTime": {
      "seconds": "1606518091",
      "nanos": 167453124
    },
    "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED",
    "version": "0",
    "taskId": "7340126",
    "workflowExecutionSignaledEventAttributes": {
      "signalName": "receivedEvent",
      "input": {
        "payloads": [
          {
            "metadata": {
              "encoding": "json/plain"
            },
            "data": "{"eventType":"Done"}"
          }
        ]
      },
      "identity": "86270@Konstantins-iMac.local"
    },
    "attributes": "workflowExecutionSignaledEventAttributes"
  },
...

So the sequence is

  1. Pause
  2. UpdateStatus
  3. Done

Broken Sequence

  {
    "eventId": "5",
    "eventTime": {
      "seconds": "1606518164",
      "nanos": 383600371
    },
    "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED",
    "version": "0",
    "taskId": "7340091",
    "workflowExecutionSignaledEventAttributes": {
      "signalName": "receivedEvent",
      "input": {
        "payloads": [
          {
            "metadata": {
              "encoding": "json/plain"
            },
            "data": "{"eventType":"Pause"}"
          }
        ]
      },
      "identity": "86312@Konstantins-iMac.local"
    },
    "attributes": "workflowExecutionSignaledEventAttributes"
  },
...
  {
    "eventId": "13",
    "eventTime": {
      "seconds": "1606518164",
      "nanos": 454248646
    },
    "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED",
    "version": "0",
    "taskId": "7340114",
    "workflowExecutionSignaledEventAttributes": {
      "signalName": "receivedEvent",
      "input": {
        "payloads": [
          {
            "metadata": {
              "encoding": "json/plain"
            },
            "data": "{"eventType":"UpdateStatus"}"
          }
        ]
      },
      "identity": "86312@Konstantins-iMac.local"
    },
    "attributes": "workflowExecutionSignaledEventAttributes"
  },
...
  {
    "eventId": "23",
    "eventTime": {
      "seconds": "1606518164",
      "nanos": 537351722
    },
    "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED",
    "version": "0",
    "taskId": "7340138",
    "workflowExecutionSignaledEventAttributes": {
      "signalName": "receivedEvent",
      "input": {
        "payloads": [
          {
            "metadata": {
              "encoding": "json/plain"
            },
            "data": "{"eventType":"Done"}"
          }
        ]
      },
      "identity": "86312@Konstantins-iMac.local"
    },
    "attributes": "workflowExecutionSignaledEventAttributes"
  },
...

So the sequence is the same as the previous one:

  1. Pause
  2. UpdateStatus
  3. Done

well, with real server unit test passes but I see in the log
11:56:43.640 [signal receivedEvent] ERROR c.x.w.d.DynamicFSMWorkflowImpl - Out of sequence:Pause at state done
check timer = progress duration:null
11:56:43.641 [signal receivedEvent] ERROR c.x.w.d.DynamicFSMWorkflowImpl - Out of sequence:UpdateStatus at state done
check timer = progress duration:null

which means that signals ‘Pause’ and ‘UpdateStatus’ were delivered to the instance of WF that is already in 'done state;
here is full code if you want to try it yourself https://www.dropbox.com/s/y02ii4rgib5yyrl/dynamic.fg.tgz?dl=0

1 Like

Oh, and if you comment out calls to real server and uncomment test env, then unit test fails when ‘delay = 0’

DynamicFSMWorkflowTest > testTimerNotification() FAILED
Wanted but not invoked:
wFActivities.setTimer(, , );
-> at com.xpanse.workflow.dynamic_fsm.DynamicFSMWorkflowTest.testTimerNotification(DynamicFSMWorkflowTest.java:113)

However, there were other interactions with this mock:
wFActivities.cancelTimer(
    "wf-1606594598567",
    "progress"
);
-> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

wFActivities.cancelTimer(
    "wf-1606594598567",
    "paused"
);
-> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at com.xpanse.workflow.dynamic_fsm.DynamicFSMWorkflowTest.testTimerNotification(DynamicFSMWorkflowTest.java:113)

1 test completed, 1 failed
Finished generating test XML results (0.011 secs) into: /Users/kgignatyev/dev/1companies/xpanse/poc/wf_with_temporal/dynamic-fsm-workflow/lib/build/test-results/test
Generating HTML test report…
Finished generating test html results (0.017 secs) into: /Users/kgignatyev/dev/1companies/xpanse/poc/wf_with_temporal/dynamic-fsm-workflow/lib/build/reports/tests/test
:lib:test (Thread[Execution worker for ‘:’,5,main]) completed. Took 1.8 secs.

I cannot compile the code you posted as it misses some dependencies:

Starting a Gradle Daemon, 1 incompatible Daemon could not be reused, use --status for details

> Configure project :lib
e: /Users/maxim/support/dynamic.fg/lib/build.gradle.kts:11:5: Unresolved reference: toolchain
e: /Users/maxim/support/dynamic.fg/lib/build.gradle.kts:12:9: Unresolved reference: languageVersion
e: /Users/maxim/support/dynamic.fg/lib/build.gradle.kts:12:29: Unresolved reference: JavaLanguageVersion

FAILURE: Build failed with an exception.

* Where:
Build file '/Users/maxim/support/dynamic.fg/lib/build.gradle.kts' line: 11

* What went wrong:
Script compilation errors:

  Line 11:     toolchain {
               ^ Unresolved reference: toolchain

  Line 12:         languageVersion.set(JavaLanguageVersion.of(11))
                   ^ Unresolved reference: languageVersion

  Line 12:         languageVersion.set(JavaLanguageVersion.of(11))
                                       ^ Unresolved reference: JavaLanguageVersion

3 errors

* Try:
Run with --stacktrace option to get the stack trace. Run with --info or --debug option to get more log output. Run with --scan to get full insights.

* Get more help at https://help.gradle.org

But looking at the workflow code I see that it processes state transitions from the signal method. My guess is that these state transitions evolve some blocking operations. When the signal method is blocked other signals will be delivered. See this issue for the proposed solution. Here is another one.

I agree that the current design is not ideal. I’m not sure if we can change it in a backward-compatible way.

here is project with wrapper - gradle 6.7, definitely compilable https://www.dropbox.com/s/290ss7aad8ufuca/dynamic-fsm-workflow.tgz?dl=0
executing by java -version
openjdk version “14.0.1” 2020-04-14
OpenJDK Runtime Environment (build 14.0.1+7)
OpenJDK 64-Bit Server VM (build 14.0.1+7, mixed mode, sharing)

and there is nothing blocking in the signal method, it is fast too.
My suspicion is that it is somehow related to the time resolution. Another IMO important thing is that test environment and real one behave quite differently

And thank you for the links, this one seems could be ‘the ticket’

1 Like

@maxim has the design been posted somewhere already?

I don’t think we have a sample for this design.

@maxim is the design/proposal for synchronous signals available somewhere?

@akash I believe you are looking for @SignalMethod threading configuration · Issue #214 · temporalio/sdk-java · GitHub

@maxim are there any samples w.r.t “Use Local Activity to Notify”?

As the host and port which are specific to the process are used the request handler is executed in the same process that sent the original signal.

Does this also take care of the semantic where I just deposit the payload to the workflow state in the @SignalMethod (and letting workflow execution take care of reacting to the signal)? Will it still be synchronous/in the same process?

Since it’s been 2 years almost since you posted, is there a better way that exists now over this? Other than the “synchronous update” feature that is still in development of course.

are there any samples w.r.t “Use Local Activity to Notify”?

Can you open an issue in java samples repo for this to be added?

Will it still be synchronous/in the same process?

from the description of the pattern, yes.

is there a better way that exists now over this

not yet, features is still in the works at this time

@maxim, @tihomir
Any plans to support synchronous updates in a workflow in near future?