How to Enable Tool Output Streaming#
Prerequisites
Familiarity with WayFlow Agents and Tools
Basic understanding of async functions in Python
In this guide you will:
Create a Server Tool that streams output chunks;
Consume tool chunk events with an Event Listener.
Tool output streaming for Server Tools#
WayFlow supports several LLM API providers. Select an LLM from the options below:
from wayflowcore.models import OCIGenAIModel, OCIClientConfigWithApiKey
llm = OCIGenAIModel(
model_id="provider.model-id",
compartment_id="compartment-id",
client_config=OCIClientConfigWithApiKey(
service_endpoint="https://url-to-service-endpoint.com",
),
)
from wayflowcore.models import VllmModel
llm = VllmModel(
model_id="model-id",
host_port="VLLM_HOST_PORT",
)
from wayflowcore.models import OllamaModel
llm = OllamaModel(
model_id="model-id",
)
Note
What is tool output streaming: Tool output streaming lets a tool produce intermediate outputs while it is still running, instead of waiting until the execution completes to return a single final result. When streaming is enabled, WayFlow emits chunk events as the tool makes progress (this is emitted as ToolExecutionStreamingChunkReceivedEvent). This enables UIs and listeners to display partial results in near real time. The tool’s final output is the last value produced (i.e., the completed tool result); earlier values are treated as streamed chunks emitted during execution.
You can enable tool output streaming by creating an async generator
(i.e., an async callable yielding items with yield instead of return).
When running the async tool callable, yielded items are streamed via the event ToolExecutionStreamingChunkReceivedEvent.
The last yielded item is treated as the final tool result and is not streamed.
Here is an example using the @tool decorator:
@tool(description_mode="only_docstring")
async def my_streaming_tool(topic: str) -> AsyncGenerator[str, None]:
"""Stream intermediate outputs, then yield the final result."""
all_sentences = [f"{topic} part {i}" for i in range(2)]
for i in range(2):
await anyio.sleep(0.2) # simulate work
yield all_sentences[i]
yield ". ".join(all_sentences)
You can then define an EventListener to observe the streamed chunks:
class ToolStreamingListener(EventListener):
def __call__(self, event: Event) -> None:
if isinstance(event, ToolExecutionStreamingChunkReceivedEvent):
print("[tool-chunk]", event.content)
Finally, register a listener before running your Agent/Flow:
from wayflowcore.events.eventlistener import register_event_listeners
with register_event_listeners([ToolStreamingListener()]):
conv = assistant.start_conversation()
conv.append_user_message("tell a story")
status = conv.execute()
Tool output streaming for MCP Tools#
You can also enable tool output streaming when using MCP tools by wrapping your server-side async callable with the @mcp_streaming_tool decorator.
Note
Wrapping the callable allows to automatically handle the streaming by using the progress notification feature from MCP. This feature only works in async code, so you need to write an async callable to use the output streaming feature for MCP tools.
Here is an example using the official MCP SDK:
import anyio
from typing import AsyncGenerator
from mcp.server.fastmcp import FastMCP
from wayflowcore.mcp.mcphelpers import mcp_streaming_tool
server = FastMCP(
name="Example MCP Server",
instructions="A MCP Server.",
)
@server.tool(description="Stream intermediate outputs, then yield the final result.")
@mcp_streaming_tool
async def my_streaming_tool(topic: str) -> AsyncGenerator[str, None]:
all_sentences = [f"{topic} part {i}" for i in range(2)]
for i in range(2):
await anyio.sleep(0.2) # simulate work
yield all_sentences[i]
yield ". ".join(all_sentences)
server.run(transport="streamable-http")
When using other MCP libraries (e.g., https://gofastmcp.com/), you need to
provide the context class when using the mcp_streaming_tool wrapper.
from fastmcp import FastMCP, Context
server = FastMCP(
name="Example MCP Server",
instructions="A MCP Server.",
)
async def my_tool() -> AsyncGenerator[str, None]:
contents = [f"This is the sentence N°{i}" for i in range(5)]
for chunk in contents:
yield chunk # streamed chunks
await anyio.sleep(0.2)
yield ". ".join(contents) # final result
streaming_tool = mcp_streaming_tool(my_tool, context_cls=Context)
server.tool(description="...")(streaming_tool)
From the client-side, you can consume the MCP tool and observe the streamed chunks using an event listener as shown above with server tools.
exit() # docs-skiprow # type: ignore
from wayflowcore.mcp import MCPTool, SSETransport, enable_mcp_without_auth
from wayflowcore.flow import Flow
from wayflowcore.steps import ToolExecutionStep
enable_mcp_without_auth()
mcp_client = SSETransport(url="http://localhost:8080/sse")
mcp_tool = MCPTool(
name="my_streaming_tool",
client_transport=mcp_client,
)
# Option A: Use the tool in an Agent
assistant = Agent(llm=llm, tools=[mcp_tool])
# Option B: Use the tool in a Flow
assistant = Flow.from_steps([
ToolExecutionStep(name="mcp_tool_step", tool=mcp_tool)
])
# Then use the same ToolExecutionStreamingChunkReceived listener as above
See also
For more information read the Guide on using MCP Tools
Agent Spec Exporting/Loading#
You can export the assistant configuration to its Agent Spec configuration using the AgentSpecExporter.
from wayflowcore.agentspec import AgentSpecExporter
serialized_assistant = AgentSpecExporter().to_json(assistant)
from wayflowcore.agentspec import AgentSpecLoader
tool_registry = {t.name: t for t in assistant.tools}
loader = AgentSpecLoader(tool_registry=tool_registry)
Here is what the Agent Spec representation will look like ↓
Click here to see the assistant configuration.
{
"component_type": "Agent",
"id": "ce59e1be-aa7a-4a38-ba89-2c3cd546d51f",
"name": "streaming-agent",
"description": "Agent that streams tool outputs",
"metadata": {
"__metadata_info__": {}
},
"inputs": [],
"outputs": [],
"llm_config": {
"component_type": "VllmConfig",
"id": "424b4cf5-9eed-48bc-afa8-909524d3b6a0",
"name": "llm_f95a3161__auto",
"description": null,
"metadata": {
"__metadata_info__": {}
},
"default_generation_parameters": null,
"url": "LLAMA_API_URL",
"model_id": "LLAMA_MODEL_ID"
},
"system_prompt": "You can stream outputs from tools",
"tools": [
{
"component_type": "ServerTool",
"id": "ddd75c20-0053-46b2-80e9-139277b3f4f4",
"name": "my_streaming_tool",
"description": "Stream intermediate outputs, then yield the final result.",
"metadata": {
"__metadata_info__": {}
},
"inputs": [
{
"type": "string",
"title": "topic"
}
],
"outputs": [
{
"type": "string",
"title": "tool_output"
}
]
}
],
"agentspec_version": "25.4.1"
}
component_type: Agent
id: ce59e1be-aa7a-4a38-ba89-2c3cd546d51f
name: streaming-agent
description: Agent that streams tool outputs
metadata:
__metadata_info__: {}
inputs: []
outputs: []
llm_config:
component_type: VllmConfig
id: 424b4cf5-9eed-48bc-afa8-909524d3b6a0
name: llm_f95a3161__auto
description: null
metadata:
__metadata_info__: {}
default_generation_parameters: null
url: LLAMA_API_URL
model_id: LLAMA_MODEL_ID
system_prompt: You can stream outputs from tools
tools:
- component_type: ServerTool
id: ddd75c20-0053-46b2-80e9-139277b3f4f4
name: my_streaming_tool
description: Stream intermediate outputs, then yield the final result.
metadata:
__metadata_info__: {}
inputs:
- type: string
title: topic
outputs:
- type: string
title: tool_output
agentspec_version: 25.4.1
You can then load the configuration back to an assistant using the AgentSpecLoader.
reloaded_assistant = loader.load_json(serialized_assistant)
Recap#
In this guide, you learned how to:
Implement streaming Server Tools using async generators
Listen to tool chunk events and correlate them with executions
Adjust the streaming cap (including unlimited)
Next steps#
Having learned how to stream tool outputs and consume chunk events, you may now proceed to:
Build Assistants with Tools to design richer tool-enabled agents and flows.
Use the Event System to implement custom listeners, tracing, and monitoring.
Full code#
Click on the card at the top of this page to download the full code for this guide or copy the code below.
1# Copyright © 2025, 2026 Oracle and/or its affiliates.
2#
3# This software is under the Apache License 2.0
4# %%[markdown]
5# Wayflow Code Example - How to Enable Tool Output Streaming
6# ----------------------------------------------------------
7
8# How to use:
9# Create a new Python virtual environment and install the latest WayFlow version.
10# ```bash
11# python -m venv venv-wayflowcore
12# source venv-wayflowcore/bin/activate
13# pip install --upgrade pip
14# pip install "wayflowcore==26.1.1"
15# ```
16
17# You can now run the script
18# 1. As a Python file:
19# ```bash
20# python howto_tooloutputstreaming.py
21# ```
22# 2. As a Notebook (in VSCode):
23# When viewing the file,
24# - press the keys Ctrl + Enter to run the selected cell
25# - or Shift + Enter to run the selected cell and move to the cell below# (LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0) or Universal Permissive License
26# (UPL) 1.0 (LICENSE-UPL or https://oss.oracle.com/licenses/upl), at your option.
27
28
29
30# %%[markdown]
31## Define the llm
32
33# %%
34from wayflowcore.models import VllmModel
35
36llm = VllmModel(
37 model_id="LLAMA_MODEL_ID",
38 host_port="LLAMA_API_URL",
39)
40
41
42# %%[markdown]
43## Imports for this guide
44
45# %%
46import anyio
47from typing import AsyncGenerator
48
49from wayflowcore.agent import Agent
50from wayflowcore.events.event import Event, ToolExecutionStreamingChunkReceivedEvent
51from wayflowcore.events.eventlistener import EventListener, register_event_listeners
52from wayflowcore.tools import tool
53
54# %%[markdown]
55## Define streaming tool
56
57# %%
58@tool(description_mode="only_docstring")
59async def my_streaming_tool(topic: str) -> AsyncGenerator[str, None]:
60 """Stream intermediate outputs, then yield the final result."""
61 all_sentences = [f"{topic} part {i}" for i in range(2)]
62 for i in range(2):
63 await anyio.sleep(0.2) # simulate work
64 yield all_sentences[i]
65 yield ". ".join(all_sentences)
66
67# %%[markdown]
68## Define event listener
69
70# %%
71class ToolStreamingListener(EventListener):
72 def __call__(self, event: Event) -> None:
73 if isinstance(event, ToolExecutionStreamingChunkReceivedEvent):
74 print("[tool-chunk]", event.content)
75
76# %%[markdown]
77## Build the agent
78
79# %%
80assistant = Agent(
81 llm=llm,
82 name="streaming-agent",
83 description="Agent that streams tool outputs",
84 tools=[my_streaming_tool],
85)
86
87# %%[markdown]
88## Export config to Agent Spec
89
90# %%
91from wayflowcore.agentspec import AgentSpecExporter
92
93serialized_assistant = AgentSpecExporter().to_json(assistant)
94from wayflowcore.agentspec import AgentSpecLoader
95
96tool_registry = {t.name: t for t in assistant.tools}
97loader = AgentSpecLoader(tool_registry=tool_registry)
98
99# %%[markdown]
100## Load Agent Spec config
101
102# %%
103reloaded_assistant = loader.load_json(serialized_assistant)
104
105# %%[markdown]
106## Run agent with stream listener
107
108# %%
109from wayflowcore.events.eventlistener import register_event_listeners
110
111with register_event_listeners([ToolStreamingListener()]):
112 conv = assistant.start_conversation()
113 conv.append_user_message("tell a story")
114 status = conv.execute()
115
116# %%[markdown]
117## Run mcp streaming tool
118
119# %%
120from wayflowcore.mcp import MCPTool, SSETransport, enable_mcp_without_auth
121from wayflowcore.flow import Flow
122from wayflowcore.steps import ToolExecutionStep
123
124enable_mcp_without_auth()
125mcp_client = SSETransport(url="http://localhost:8080/sse")
126mcp_tool = MCPTool(
127 name="my_streaming_tool",
128 client_transport=mcp_client,
129)
130
131# Option A: Use the tool in an Agent
132assistant = Agent(llm=llm, tools=[mcp_tool])
133
134# Option B: Use the tool in a Flow
135assistant = Flow.from_steps([
136 ToolExecutionStep(name="mcp_tool_step", tool=mcp_tool)
137])
138
139# Then use the same ToolExecutionStreamingChunkReceived listener as above