How to Enable Tool Output Streaming#

python-icon Download Python Script

Python script/notebook for this guide.

Tool Output Streaming how-to script

Prerequisites

  • Familiarity with WayFlow Agents and Tools

  • Basic understanding of async functions in Python

In this guide you will:

Tool output streaming for Server Tools#

WayFlow supports several LLM API providers. Select an LLM from the options below:

from wayflowcore.models import OCIGenAIModel, OCIClientConfigWithApiKey

llm = OCIGenAIModel(
    model_id="provider.model-id",
    compartment_id="compartment-id",
    client_config=OCIClientConfigWithApiKey(
        service_endpoint="https://url-to-service-endpoint.com",
    ),
)

Note

What is tool output streaming: Tool output streaming lets a tool produce intermediate outputs while it is still running, instead of waiting until the execution completes to return a single final result. When streaming is enabled, WayFlow emits chunk events as the tool makes progress (this is emitted as ToolExecutionStreamingChunkReceivedEvent). This enables UIs and listeners to display partial results in near real time. The tool’s final output is the last value produced (i.e., the completed tool result); earlier values are treated as streamed chunks emitted during execution.

You can enable tool output streaming by creating an async generator (i.e., an async callable yielding items with yield instead of return).

When running the async tool callable, yielded items are streamed via the event ToolExecutionStreamingChunkReceivedEvent.

The last yielded item is treated as the final tool result and is not streamed.

Here is an example using the @tool decorator:

@tool(description_mode="only_docstring")
async def my_streaming_tool(topic: str) -> AsyncGenerator[str, None]:
    """Stream intermediate outputs, then yield the final result."""
    all_sentences = [f"{topic} part {i}" for i in range(2)]
    for i in range(2):
        await anyio.sleep(0.2)  # simulate work
        yield all_sentences[i]
    yield ". ".join(all_sentences)

You can then define an EventListener to observe the streamed chunks:

class ToolStreamingListener(EventListener):
    def __call__(self, event: Event) -> None:
        if isinstance(event, ToolExecutionStreamingChunkReceivedEvent):
            print("[tool-chunk]", event.content)

Finally, register a listener before running your Agent/Flow:

from wayflowcore.events.eventlistener import register_event_listeners

with register_event_listeners([ToolStreamingListener()]):
    conv = assistant.start_conversation()
    conv.append_user_message("tell a story")
    status = conv.execute()

Tool output streaming for MCP Tools#

You can also enable tool output streaming when using MCP tools by wrapping your server-side async callable with the @mcp_streaming_tool decorator.

Note

Wrapping the callable allows to automatically handle the streaming by using the progress notification feature from MCP. This feature only works in async code, so you need to write an async callable to use the output streaming feature for MCP tools.

Here is an example using the official MCP SDK:

import anyio
from typing import AsyncGenerator
from mcp.server.fastmcp import FastMCP
from wayflowcore.mcp.mcphelpers import mcp_streaming_tool

server = FastMCP(
    name="Example MCP Server",
    instructions="A MCP Server.",
)

@server.tool(description="Stream intermediate outputs, then yield the final result.")
@mcp_streaming_tool
async def my_streaming_tool(topic: str) -> AsyncGenerator[str, None]:
    all_sentences = [f"{topic} part {i}" for i in range(2)]
    for i in range(2):
        await anyio.sleep(0.2)  # simulate work
        yield all_sentences[i]
    yield ". ".join(all_sentences)

server.run(transport="streamable-http")

When using other MCP libraries (e.g., https://gofastmcp.com/), you need to provide the context class when using the mcp_streaming_tool wrapper.

from fastmcp import FastMCP, Context

server = FastMCP(
   name="Example MCP Server",
   instructions="A MCP Server.",
)

async def my_tool() -> AsyncGenerator[str, None]:
   contents = [f"This is the sentence N°{i}" for i in range(5)]
   for chunk in contents:
         yield chunk  # streamed chunks
         await anyio.sleep(0.2)

   yield ". ".join(contents)  # final result

streaming_tool = mcp_streaming_tool(my_tool, context_cls=Context)
server.tool(description="...")(streaming_tool)

From the client-side, you can consume the MCP tool and observe the streamed chunks using an event listener as shown above with server tools.

exit()  # docs-skiprow # type: ignore
from wayflowcore.mcp import MCPTool, SSETransport, enable_mcp_without_auth
from wayflowcore.flow import Flow
from wayflowcore.steps import ToolExecutionStep

enable_mcp_without_auth()
mcp_client = SSETransport(url="http://localhost:8080/sse")
mcp_tool = MCPTool(
    name="my_streaming_tool",
    client_transport=mcp_client,
)

# Option A: Use the tool in an Agent
assistant = Agent(llm=llm, tools=[mcp_tool])

# Option B: Use the tool in a Flow
assistant = Flow.from_steps([
    ToolExecutionStep(name="mcp_tool_step", tool=mcp_tool)
])

# Then use the same ToolExecutionStreamingChunkReceived listener as above

See also

For more information read the Guide on using MCP Tools

Agent Spec Exporting/Loading#

You can export the assistant configuration to its Agent Spec configuration using the AgentSpecExporter.

from wayflowcore.agentspec import AgentSpecExporter

serialized_assistant = AgentSpecExporter().to_json(assistant)
from wayflowcore.agentspec import AgentSpecLoader

tool_registry = {t.name: t for t in assistant.tools}
loader = AgentSpecLoader(tool_registry=tool_registry)

Here is what the Agent Spec representation will look like ↓

Click here to see the assistant configuration.
{
    "component_type": "Agent",
    "id": "ce59e1be-aa7a-4a38-ba89-2c3cd546d51f",
    "name": "streaming-agent",
    "description": "Agent that streams tool outputs",
    "metadata": {
        "__metadata_info__": {}
    },
    "inputs": [],
    "outputs": [],
    "llm_config": {
        "component_type": "VllmConfig",
        "id": "424b4cf5-9eed-48bc-afa8-909524d3b6a0",
        "name": "llm_f95a3161__auto",
        "description": null,
        "metadata": {
            "__metadata_info__": {}
        },
        "default_generation_parameters": null,
        "url": "LLAMA_API_URL",
        "model_id": "LLAMA_MODEL_ID"
    },
    "system_prompt": "You can stream outputs from tools",
    "tools": [
        {
            "component_type": "ServerTool",
            "id": "ddd75c20-0053-46b2-80e9-139277b3f4f4",
            "name": "my_streaming_tool",
            "description": "Stream intermediate outputs, then yield the final result.",
            "metadata": {
                "__metadata_info__": {}
            },
            "inputs": [
                {
                    "type": "string",
                    "title": "topic"
                }
            ],
            "outputs": [
                {
                    "type": "string",
                    "title": "tool_output"
                }
            ]
        }
    ],
    "agentspec_version": "25.4.1"
}

You can then load the configuration back to an assistant using the AgentSpecLoader.

reloaded_assistant = loader.load_json(serialized_assistant)

Recap#

In this guide, you learned how to:

  • Implement streaming Server Tools using async generators

  • Listen to tool chunk events and correlate them with executions

  • Adjust the streaming cap (including unlimited)

Next steps#

Having learned how to stream tool outputs and consume chunk events, you may now proceed to:

Full code#

Click on the card at the top of this page to download the full code for this guide or copy the code below.

  1# Copyright © 2025, 2026 Oracle and/or its affiliates.
  2#
  3# This software is under the Apache License 2.0
  4# %%[markdown]
  5# Wayflow Code Example - How to Enable Tool Output Streaming
  6# ----------------------------------------------------------
  7
  8# How to use:
  9# Create a new Python virtual environment and install the latest WayFlow version.
 10# ```bash
 11# python -m venv venv-wayflowcore
 12# source venv-wayflowcore/bin/activate
 13# pip install --upgrade pip
 14# pip install "wayflowcore==26.1.1" 
 15# ```
 16
 17# You can now run the script
 18# 1. As a Python file:
 19# ```bash
 20# python howto_tooloutputstreaming.py
 21# ```
 22# 2. As a Notebook (in VSCode):
 23# When viewing the file,
 24#  - press the keys Ctrl + Enter to run the selected cell
 25#  - or Shift + Enter to run the selected cell and move to the cell below# (LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0) or Universal Permissive License
 26# (UPL) 1.0 (LICENSE-UPL or https://oss.oracle.com/licenses/upl), at your option.
 27
 28
 29
 30# %%[markdown]
 31## Define the llm
 32
 33# %%
 34from wayflowcore.models import VllmModel
 35
 36llm = VllmModel(
 37    model_id="LLAMA_MODEL_ID",
 38    host_port="LLAMA_API_URL",
 39)
 40
 41
 42# %%[markdown]
 43## Imports for this guide
 44
 45# %%
 46import anyio
 47from typing import AsyncGenerator
 48
 49from wayflowcore.agent import Agent
 50from wayflowcore.events.event import Event, ToolExecutionStreamingChunkReceivedEvent
 51from wayflowcore.events.eventlistener import EventListener, register_event_listeners
 52from wayflowcore.tools import tool
 53
 54# %%[markdown]
 55## Define streaming tool
 56
 57# %%
 58@tool(description_mode="only_docstring")
 59async def my_streaming_tool(topic: str) -> AsyncGenerator[str, None]:
 60    """Stream intermediate outputs, then yield the final result."""
 61    all_sentences = [f"{topic} part {i}" for i in range(2)]
 62    for i in range(2):
 63        await anyio.sleep(0.2)  # simulate work
 64        yield all_sentences[i]
 65    yield ". ".join(all_sentences)
 66
 67# %%[markdown]
 68## Define event listener
 69
 70# %%
 71class ToolStreamingListener(EventListener):
 72    def __call__(self, event: Event) -> None:
 73        if isinstance(event, ToolExecutionStreamingChunkReceivedEvent):
 74            print("[tool-chunk]", event.content)
 75
 76# %%[markdown]
 77## Build the agent
 78
 79# %%
 80assistant = Agent(
 81    llm=llm,
 82    name="streaming-agent",
 83    description="Agent that streams tool outputs",
 84    tools=[my_streaming_tool],
 85)
 86
 87# %%[markdown]
 88## Export config to Agent Spec
 89
 90# %%
 91from wayflowcore.agentspec import AgentSpecExporter
 92
 93serialized_assistant = AgentSpecExporter().to_json(assistant)
 94from wayflowcore.agentspec import AgentSpecLoader
 95
 96tool_registry = {t.name: t for t in assistant.tools}
 97loader = AgentSpecLoader(tool_registry=tool_registry)
 98
 99# %%[markdown]
100## Load Agent Spec config
101
102# %%
103reloaded_assistant = loader.load_json(serialized_assistant)
104
105# %%[markdown]
106## Run agent with stream listener
107
108# %%
109from wayflowcore.events.eventlistener import register_event_listeners
110
111with register_event_listeners([ToolStreamingListener()]):
112    conv = assistant.start_conversation()
113    conv.append_user_message("tell a story")
114    status = conv.execute()
115
116# %%[markdown]
117## Run mcp streaming tool
118
119# %%
120from wayflowcore.mcp import MCPTool, SSETransport, enable_mcp_without_auth
121from wayflowcore.flow import Flow
122from wayflowcore.steps import ToolExecutionStep
123
124enable_mcp_without_auth()
125mcp_client = SSETransport(url="http://localhost:8080/sse")
126mcp_tool = MCPTool(
127    name="my_streaming_tool",
128    client_transport=mcp_client,
129)
130
131# Option A: Use the tool in an Agent
132assistant = Agent(llm=llm, tools=[mcp_tool])
133
134# Option B: Use the tool in a Flow
135assistant = Flow.from_steps([
136    ToolExecutionStep(name="mcp_tool_step", tool=mcp_tool)
137])
138
139# Then use the same ToolExecutionStreamingChunkReceived listener as above