Listening to event streams in a workflow


I am working on a solution for enterprise system integration, temporal seems to fit this use case pretty well. I have some requirements with consuming streams (e.g. Kafka) and listening to events within a workflow.

I have something similar to this mind from the experience of a developer (TypeScript example):

export const exampleStream = wf.defineStream('stream-example');

async function exampleWorkflow(){

  wf.subscribe(exampleStream,"match-condition", ()=>{
    //similar behavior like signals

  //do something

  await wf.waitFor(exampleStream, "match-condition")

  //do something



  • The workflow needs to subscribe to the stream (e.g. with an external service)
  • The workflow needs to automatically unsubscribe in case of a failure or completion

Example scenarios

  • Change capturing of databases (e.g. Postgres => Kafka => Temporal (any workflow))
  • Clickstream tracking (executing workflows on certain user clicks, including waiting for a specific activity)
  • IoT device event listening

How would you solve something like this with temporal?


There’s no built-in way to do this in Temporal.

The typically approach is to have a Kafka consumer that dispatches signals to workflows using signalWithStart (an API that atomically creates a workflow if it doesn’t exist and sends a signal to it).
The consumer does the filtering and associating with the proper “entity” workflow.

In your workflow code you’ll want to consume these signals in a loop and periodically call continue-as-new.

We typically recommend this pattern for this type of workflow:

// Make sure Request is serializable to JSON (ie. no function, no promises, etc)
type Request = { ... }

// Entity workflow pattern with serialization of request
// (ie. only one request is processed at a time)
export async function myWorkflow(requests: Request[] = []): Promise<void> {
  setHandler(mySignal, (input: Request) => {
    // Don't await here. Otherwise, the Workflow may complete before the promise completes.

  while (!workflowInfo().continueAsNewSuggested) {
		const timeSinceStart = - workflowInfo().runStartTime.getTime();
    const shouldProcessMore = await condition(() => requests.length > 0, ms('24h') - timeSinceStart);
    if (!shouldProcessMore) break;

    const request = requests.shift();

    // Process request as appropriate
    await handleSingleRequest(request);

  // This condition is optional, depending on your use case.
  if (requests.length === 0) {
  // Huge histories are bad for performance, so we switch to a new workflow execution whenever
  // history grows over 2000 events. When that happens, we forward any outstanding requests to the
  // next execution.
  await continueAsNew(requests);

function handleSingleRequest(request: Request): Promise<void> {
  // It's ok to await here

Note that signal inputs should be fairly small (a generally recommended practice), otherwise this technique of transferring them to the next execution will fail due to the aggregated input being too large.