Approach for "streaming" activities

Hi all,

I’m evaluating Temporal as a platform for orchestrating user/LLM interactions. In these interactions, an LLM “streams” text output to the user live as it is being generated, and the user can take action (e.g. press a stop button or interject) at any time.

I see two main options to frame this logic in Temporal constructs.

Option 1: short activities, each returning one “batch” of LLM output

This is the option used by the AI agent example.

My core activity would trigger one batch of LLM output (at the order of a sentence or so). This activity would be looped many times within my workflow, and a user would interject by sending a signal to the workflow.

But I have the following concerns:

  • In many LLM libraries, “resuming” a conversation is a heavy-weight operation that might, for instance, resend a full conversation history. Since my small activity runs many times (potentially across different workers), repeated resumes would incur huge latency / cost overheads.

    IIUC, the typical mitigation would be to maintain ongoing connections to the LLM in worker memory, and use task routing to ensure the worker with the right ongoing connection executes relevant activities.

    But how idiomatic is this in practice? Live logic (data coming through the ongoing connection) would constantly be running inbetween activities, “unmanaged” by Temporal. I’d have to keep that logic in sync with activity retries. I’d need to ensure workers are registered for the right custom queues in the event of a crash etc.

  • My activities aren’t idempotent / pure: in truth, they accept “nothing” and produce “the next batch of text”. I suspect I can work around this with arbitrary UUIDs etc, but it also feels iffy.

  • I’d have to jump through hoops to stop my entire user input / LLM output from saturating the event log.

Option 2: long activities, each running an entire conversation

With this option, an activity runs an entire conversation, interjections included. Since communication between a live activity and its workflow is largely unsupported (IIUC), interjections would be ferried to the activity by some external message queue.

But I have the following concerns:

  • Since my activities run for ages, Temporal’s retry magic becomes much less helpful. I’d have to manually handle state storage and resumption so that big chunks of work aren’t lost on a crash.

  • I’d also have to implement this external message system and eschew signals/updates. My messages won’t be first-class citizens (e.g. on the event log) and I’d have a parallel control flow (e.g. a message might trigger a child workflow) not managed by Temporal.

My question

What is the right way to implement this kind of logic in Temporal? My initial impression is that Temporal’s abstractions just aren’t a comfortable fit for it.

Thank you for your time!

1 Like

Workflow Design

I would choose a middle ground which is the approach we have generally taken in our integrations because it provides the most benefit from using Temporal. Option 2 really only makes sense if the LLM portion of the overall workflow is a relatively small portion which you do want to retry wholesale. The costs of resumption in option one would likely be prohibitive without ongoing connections, which I address the complication of below.

The middle ground would be to isolate one step of an LLM conversation to a single activity. This usually looks like a single prompt, a single tool call, etc. These are generally quite repeatable, though the conversation history size can still be a concern. That does leave you with the need to answer the streaming problem however. Here we would recommend something akin to option two’s external message system being emitted from the activity, but with a key difference around:

I’d have a parallel control flow (e.g. a message might trigger a child workflow) not managed by Temporal

I have not found it to be usual for a partial LLM response to have a need for direct control flow like this. Rather, when the activities bound a single LLM prompt, the whole response would come back to the workflow and be able to take durable actions like starting a child workflow as normal. We’ve seen child workflows and activities as tool calls for example. Instead, the message system would be to deliver just the non-durable user facing progress that is generally what is needed from streaming.

We do have some plans about how to provide an easier way to implement this kind of messaging, potentially through Temporal’s system to remove the need for a separate messenger, but we’re still fairly early in design for that. There’s also a plan for more full fledged streaming from activities to workflows, but that has a number of considerations around handling actions taken as a result of an unfinished stream, so it is a longer term project.

Event Log Saturation

I’m assuming you are talking about workflow history here when you say “event log.” This is definitely a known issue in Temporal which has been exacerbated by the size of LLM payloads. We’re working on a solution to seamlessly provide external storage of large payloads which can be used across activities, but we’re not quite there yet. In the meantime, you could use a payload codec to offload the payload size to an external location if needed. Here’s a work in progress sample of that: Add claim check recipe. by robholland · Pull Request #9 · temporalio/ai-cookbook · GitHub, but yes, definitely hoops for now.

Ongoing Connections

In the LLM frameworks we’ve worked with so far, providing the full (or at least relevant portion of) the conversation history has been the way to go, but you are right that this could come at a cost. You seem to be right on track about how you might mitigate that and the trouble that arises. You could use something like a worker specific task queue but inherently someone, somewhere needs to understand what to do if the worker with the active state goes down. In this case that would probably look like handling a schedule to start failure of the activity in your workflow and performing some form of heavier resume with the full history.
This is definitely something that folks have done, especially in Go where sessions provide a somewhat leaky but simpler way to do this. For instance this is how I implemented a stateful MCP client session in the Python SDK. That one is slightly different from the sample because it actually generates the worker itself.