I am working on a solution for enterprise system integration, temporal seems to fit this use case pretty well. I have some requirements with consuming streams (e.g. Kafka) and listening to events within a workflow.
I have something similar to this mind from the experience of a developer (TypeScript example):
The typically approach is to have a Kafka consumer that dispatches signals to workflows using signalWithStart (an API that atomically creates a workflow if it doesn’t exist and sends a signal to it).
The consumer does the filtering and associating with the proper “entity” workflow.
In your workflow code you’ll want to consume these signals in a loop and periodically call continue-as-new.
We typically recommend this pattern for this type of workflow:
// Make sure Request is serializable to JSON (ie. no function, no promises, etc)
type Request = { ... }
// Entity workflow pattern with serialization of request
// (ie. only one request is processed at a time)
export async function myWorkflow(requests: Request[] = []): Promise<void> {
setHandler(mySignal, (input: Request) => {
requests.push(input);
// Don't await here. Otherwise, the Workflow may complete before the promise completes.
});
while (!workflowInfo().continueAsNewSuggested) {
const timeSinceStart = Date.now() - workflowInfo().runStartTime.getTime();
const shouldProcessMore = await condition(() => requests.length > 0, ms('24h') - timeSinceStart);
if (!shouldProcessMore) break;
const request = requests.shift();
// Process request as appropriate
await handleSingleRequest(request);
}
// This condition is optional, depending on your use case.
if (requests.length === 0) {
return;
}
// Huge histories are bad for performance, so we switch to a new workflow execution whenever
// history grows over 2000 events. When that happens, we forward any outstanding requests to the
// next execution.
await continueAsNew(requests);
}
function handleSingleRequest(request: Request): Promise<void> {
// It's ok to await here
}
Note that signal inputs should be fairly small (a generally recommended practice), otherwise this technique of transferring them to the next execution will fail due to the aggregated input being too large.