How would you implement a pub/sub or observable pattern in temporal?

I would like my workflows to subscribe to some service and periodically be fed data by it. Is a signal used for that?
How would I implement this pattern in temporal?

2 examples:

  1. I want my workflow to send a slack message through an activity and then receive the user’s response. E.g send a slack block containing 2 buttons ‘yes’ ‘no’ to a slack channel and then receive the answer back in the workflow through a webhook service.
  2. I want my workflow to subscribe to some topic on another service through an activity and then receive data from that topic. Then another activity would unsubscribe from that topic when my workflow ends.

Yes. You set up your webhook handler outside of Temporal, such as by using an AWS lambda function, or whatever is convenient for you. Your webhook handler would then use the Temporal client library to send a signal to your workflow.

How do you receive data using that API? Does it also send you the data that you’ve subscribed to through a webhook?

It might send the data or I might send a “data id”.
Lets take IAM for example:

  • I want some workflows to “subscribe” to some user data and then receive data only for that user.
  • The IAM has support for webhooks.
  • Say I want to do something whenever a specific user that the workflow is working on changes password.
  • So IIUC I should fire an activity that subscribes to some topic for that user password change event.
  • Then the webhook should fire a signal to all subscribed workflows.
  • When the workflow ends I should unsubscribe from that topic.

I can start a new workflow, but sometimes I want to handle an event as part of an already running workflow which has some state and context.

Using the Temporal client, you can send a signal to a running workflow by its workflow id.

What if I don’t know (because I want to stay decoupled) the Id of the workflow(s) that should be signaled?

The assumption is that every message is related to some business entity. So, it is possible to determine that entity ID by looking at the message fields.

What is the scenario that doesn’t include enough data in the message to find which workflow to signal?

The scenario is that there is a command that is related to some business entity, but upon executing that command, there should be an event published to other services of which the business entity knows nothing about.
For example, in the domain of a digital wallet, where the workflow that represents the transfer of money should trigger another workflow that grants bonuses for each party involved, forcing the “Transfer Money” workflow to know about the “Grant Bonus” workflow introduces unnecessary coupling which could be avoided by publishing a “MoneyTransferred” event to which the “GrantBonus” workflow (among others) can subscribe to.
And, since the “GrantBonus” workflow is (and should be) naturally coupled to the “MoneyTransferred” event, there’s no need to introduce yet more coupling.

One possibility is that perhaps every “MoneyTransferred” event would result in a new “Grant Bonus” workflow being launched to handle that particular event. However, you say that “the” Grant Bonus workflow would subscribe to the “MoneyTransferred” event, which to my mind raises the question of how the Grant Bonus workflow would be identified… is there a single Grant Bonus workflow running which receives every “MoneyTransferred” event, or, if there are multiple Grant Bonus workflows running continuously, how would you identity which Grant Bonus workflow should receive a particular “MoneyTransferred” event?

(For a design where there was a single Grant Bonus workflow that would be signaled for every “MoneyTransferred” event, you’d run into the issue that while Temporal supports millions of concurrently running workflows, any single workflow instance can only support processing a limited number of workflow events per second).

I suspect that it would be simpler to have a new “Grant Bonus” workflow launched for each “MoneyTransferred” event. The workflow instance would naturally represent and implement the process of granting bonuses for that particular event, and the workflow instance would be finished when the bonuses had been granted.

Another question is, would it be enough to decouple the code, or do you really need to decouple the workflow?

For the former, imagine that in your code base you had a “Transfer Money” module and a “Grant Bonus” module. The “Transfer Money” module, in the code, would generate a “Money Transferred” event; the “Grant Bonus” module would, in the code, register that it wanted to receive “Money Transferred” events, and, on receiving such an event, start the “Grant Bonus” workflow. The whole thing would run in the “Money Transfer” workflow; you’d have a separation of concerns in the code while in execution the “Money Transfer” workflow would end up launching the “Grant Bonus” workflow.

Perhaps you might want to really decouple the workflow execution; then you might have the “Transfer Money” workflow launch a “Money Transferred” workflow, which in turn would launch registered workflows such as the “Grant Bonus” workflow.

And perhaps you might want to dynamically change how events are responded to; in which case the information of how to respond to an event (such as which subscriptions it had) would need to be stored somewhere… either in a workflow or an external database. The problem with storing dynamic subscription information in a workflow instance is again the issue that you’d want to avoid having a singleton workflow in the system that would become a bottleneck. So I expect you’d likely end up wanting to store dynamic subscription information an external database. The implementation then becomes a workflow generates an event by calling an activity; the activity reads the subscription information from the database and then launches such workflows as are required by using the Temporal client.

1 Like

I don’t think it is problematic for the MoneyTransfer workflow to know about the Bonus Granting workflow. Establishing a clear API between them makes troubleshooting production issues easier.
An implicit API based on an event that the Money Transfer workflow is publishing is potentially more problematic, as any change to this event might break unknown parts of your system. There is no also visibility into when and how this event was processed. And error handling is more complicated when async events and queues are used.

In some rare (in my opinion) cases when you really want to rely on even notification the pattern is that the queue listener of a particular service composes workflow id from the message content and starts/signals/signalsWithStart workflows based on that ID.