Implement a Traceable ReAct Agent Using Temporal and LangChain

@tihomir
While there are existing examples of workflows invoking AI agents, most of those workflows merely call the agent as a whole—tool invocation happens outside the workflow. As a result, it’s impossible to observe when the LLM chooses to call a tool, and any issues—like errors or delays in the tool—aren’t visible in the execution trace. In contrast, our approach embeds the tool calls inside the Temporal Workflow itself. This way, each tool invocation becomes part of the workflow history, fully traceable and observable—including tool failures or latency.

This integration not only surfaces the full reasoning chain of the agent but also leverages Temporal’s built-in observability for debugging and reliability.

How to build Temporal + Langchain ReAct AI Agent

This article will walk you through building an observable, extensible AI Agent system based on Temporal and Langchain. We’ll use a ReAct Agent for automatic reasoning and tool invocation, and every step can be traced in the Temporal workflow. Hope you enjoy the hands-on process—feel free to reach out and share your ideas!


1. Environment Setup

  1. Start Temporal Server (docker-compose is recommended, just a few minutes)
  2. Install dependencies (managed by poetry, just run poetry install)
  3. Set up your API KEY (if you want to use external LLMs like Gemini)

2. Design the Agent Workflow

The core logic is in workflows.py, where we use a Temporal workflow to orchestrate LLM reasoning and tool calls:

@workflow.defn
class AiAgentWorkflow:
    @workflow.run
    async def run(self, query: str) -> str:
        messages = [HumanMessage(query)]
        MAX_STEPS = 8
        for _ in range(MAX_STEPS):
            ai_msg = await workflow.execute_activity(
                "llm_chat",
                messages,
                schedule_to_close_timeout=timedelta(seconds=60),
            )
            messages.append(AIMessage(**ai_msg))
            if ai_msg["tool_calls"]:
                for tool_call in ai_msg["tool_calls"]:
                    tool_call_result = await workflow.execute_activity(
                        tool_call["name"],
                        tool_call["args"]["params"],
                        schedule_to_close_timeout=timedelta(seconds=10),
                    )
                    messages.append(
                        ToolMessage(
                            content=str(tool_call_result),
                            tool_call_id=tool_call["id"],
                        )
                    )
            else:
                return messages[-1].content
        
        last_ai_message = next(
            (msg.content for msg in reversed(messages) if isinstance(msg, AIMessage)),
            None,
        )

        return (
            f"Exceeded maximum steps ({MAX_STEPS}), last AI reply: {last_ai_message}"
            if last_ai_message
            else f"Exceeded maximum steps ({MAX_STEPS}), and no valid AI reply found."
        )

3. Implement Tools and LLM Calls

Define your LLM and tool logic in activities.py:

@activity.defn
async def llm_chat(messages: list) -> str:
    llm = init_chat_model("gemini-2.0-flash", model_provider="google_genai")
    llm_with_tools = llm.bind_tools([add, division])
    response = llm_with_tools.invoke(messages)
    return response

@activity.defn
async def add(params: list[int]) -> int:
    return sum(params)

class divisionParams(BaseModel):
    a: int = Field(..., description="Numerator")
    b: int = Field(..., description="Denominator, must not be zero", gt=0)

@activity.defn
async def division(params: divisionParams) -> int:
    return params.a // params.b

You can add more tools as you need—just implement an activity and register it with the worker.


4. Start the Worker

worker.py registers and runs all workflows and activities. You usually don’t need to change much here:

async def main():
    client = await Client.connect("localhost:7233")
    worker = Worker(
        client,
        task_queue="my-task-queue",
        workflows=[AiAgentWorkflow],
        activities=[llm_chat, add, division],
    )
    print("Worker started.")
    await worker.run()

5. Start a Workflow (Submit a Task)

Use starter.py to submit an Agent reasoning task and see the full process in action:

async def main():
    client = await Client.connect("localhost:7233")
    result = await client.execute_workflow(
        "AiAgentWorkflow",
        "what is 1 divided by 2 then add 1?",
        id=f"AI-Agent-workflow-{{uuid.uuid4()}}",
        task_queue="my-task-queue",
    )
    print("Workflow result:", result)

6. Observability & Debugging

With the Temporal UI (default http://localhost:8233), you can see every LLM reasoning and tool call’s input, output, duration, and status in real time. Debugging and tracing are super convenient.


7. Extensions & Advanced Usage

  • Add any tool you want—just implement a new activity and register it with the worker.
  • Integrate more LLMs or external APIs.
  • Use this project as a template to quickly build your own observable AI Agent system.

8. Reference & Community

Full code and more details: https://github.com/Frederic-Zhou/temporal_agent_workflow

If you have any questions, ideas, or just want to chat, feel free to reach out! You can also open an issue on GitHub or join the Temporal community.

3 Likes

Fantastic write-up, thank you so much for sharing!

1 Like

I’ve just updated my project to support human-in-the-loop workflows. :tada:

The design uses signals to send back the tool call ID along with the human decision.

When defining tools, I distinguish which ones require human approval by checking whether the tool name starts with an uppercase letter(Well, yes, I’m a Go programmer). This is obviously not the ideal approach—the more robust method would be to define an approval policy inside the workflow (e.g., a mapping of which tool calls require approval and what the possible decisions are).

However, for simplicity in this example, I’m using the uppercase convention to determine if human approval is required. (Since the LLM only returns the function name and arguments, it’s not straightforward to attach extra metadata to the function definition through decorators.)

Code has been updated in the repo: workflows_with_approvals.py.

from datetime import timedelta
from temporalio import workflow
from typing import Optional, Dict
from temporalio.exceptions import ApplicationError


from langchain_core.messages import HumanMessage, ToolMessage, AIMessage


@workflow.defn
class AiAgentWorkflow_WithApprovals:
    def __init__(self):
        # Store approval results: call_id -> "yes"/"no"
        self._approvals: Dict[str, Optional[str]] = {}

    # Approval results are written back by external systems or user interfaces via Signal

    @workflow.signal
    async def approve_tool(self, params: dict):  # "yes" / "no"
        self._approvals[params["call_id"]] = params["decision"]

    @workflow.run
    async def run(self, query: str, max_steps: int = 8) -> str:
        messages = [HumanMessage(query)]

        for _ in range(max_steps):
            ai_msg = await workflow.execute_activity(
                "llm_chat",
                messages,
                schedule_to_close_timeout=timedelta(seconds=60),
            )
            messages.append(AIMessage(**ai_msg))

            # End immediately if there are no tool calls
            if not ai_msg.get("tool_calls"):
                return messages[-1].content

            for tc in ai_msg["tool_calls"]:
                name = tc["name"]
                args = tc["args"].get("params", tc["args"])
                call_id = tc.get("id", f"{name}-{workflow.now()}")
                print(f"Workflow got tool call_id: {call_id}")

                # Name starts with uppercase -> requires approval

                if name and name[0].isupper():
                    try:
                        await workflow.wait_condition(
                            lambda: self._approvals.get(call_id) is not None,
                            timeout=timedelta(minutes=30),
                        )
                    except TimeoutError:
                        raise ApplicationError(
                            f"Approval timed out for tool '{name}' (call_id={call_id})",
                            type="ApprovalTimeout",
                            non_retryable=True,
                        )

                    if self._approvals.get(call_id) != "yes":
                        raise ApplicationError(
                            f"Approval denied for tool '{name}' (call_id={call_id})",
                            type="ApprovalDenied",
                            non_retryable=True,
                        )

                # Execute the tool
                result = await workflow.execute_activity(
                    name, args, schedule_to_close_timeout=timedelta(seconds=60)
                )
                messages.append(ToolMessage(content=str(result), tool_call_id=call_id))

        # Exceeded maximum steps without completion
        last_ai = next(
            (m.content for m in reversed(messages) if isinstance(m, AIMessage)), None
        )
        return last_ai or f"Exceeded max steps ({max_steps})."