How to Do Map and Reduce Operations in Flows#
Map-Reduce is a programming model essential for efficiently processing large datasets across distributed systems. It is widely used in software engineering to enhance data processing speed and scalability.
Agent Spec supports the Map and Reduce operations in Flows, using the MapNode. This guide will show you how to use MapNode to perform an operation on all elements of a list
Basic implementation#
Assuming you want to summarize a few articles. You have the option to generate the summary with the LlmNode class:
from pyagentspec.flows.edges import ControlFlowEdge, DataFlowEdge
from pyagentspec.flows.flow import Flow
from pyagentspec.property import StringProperty
from pyagentspec.flows.nodes import LlmNode, StartNode, EndNode
start_node = StartNode(name="start_node", inputs=[StringProperty(title="article")])
end_node = EndNode(name="end_node", outputs=[StringProperty(title="summary")])
summarize_node = LlmNode(
name="summarize_node",
llm_config=llm_config,
prompt_template="""Summarize this article in 10 words:
{{article}}""",
outputs=[StringProperty(title="summary")],
)
summarize_flow = Flow(
name="mapnode_subflow",
start_node=start_node,
nodes=[start_node, end_node, summarize_node],
control_flow_connections=[
ControlFlowEdge(name="cfe1", from_node=start_node, to_node=summarize_node),
ControlFlowEdge(name="cfe2", from_node=summarize_node, to_node=end_node),
],
data_flow_connections=[
DataFlowEdge(
name="dfe1",
source_node=start_node,
source_output="article",
destination_node=summarize_node,
destination_input="article",
),
DataFlowEdge(
name="dfe2",
source_node=summarize_node,
source_output="summary",
destination_node=end_node,
destination_input="summary",
),
],
)
This step takes a single article, and generates a summary.
Since you have a list of articles, use the MapNode
class to generate a summary for each article.
Pay attention in using the right names for inputs and outputs of the MapNode
:
according to the Agent Spec language specification, you should prepend the iterated_
prefix
to the inputs of the sub-flow that should be “mapped”, and the collected_
prefix to the outputs
of the sub-flow that should be “reduced”.
In this case we select the append
reduction method for the only subflow output we have (i.e., summary
).
This corresponds to the default behavior of the MapNode
, so we don’t need to specify anything.
Please, refer to the Agent Spec specification for more information.
from pyagentspec.property import ListProperty, StringProperty
from pyagentspec.flows.nodes import MapNode
map_node = MapNode(
name="map_node",
subflow=summarize_flow,
inputs=[ListProperty(title="iterated_article", item_type=StringProperty(title="article"))],
outputs=[ListProperty(title="collected_summary", item_type=StringProperty(title="summary"))],
)
The code above will connect:
the
article
input of the sub-flow to theiterated_article
input of theMapNode
the
summary
output of the sub-flow to thecollected_summary
output of theMapNode
Once this is done, you can create the flow for the MapNode
:
start_node = StartNode(
name="start_node",
inputs=[ListProperty(title="articles", item_type=StringProperty(title="article"))],
)
end_node = EndNode(
name="end_node",
outputs=[ListProperty(title="summaries", item_type=StringProperty(title="summary"))],
)
flow = Flow(
name="flow",
start_node=start_node,
nodes=[start_node, end_node, map_node],
control_flow_connections=[
ControlFlowEdge(name="cfe1", from_node=start_node, to_node=map_node),
ControlFlowEdge(name="cfe2", from_node=map_node, to_node=end_node),
],
data_flow_connections=[
DataFlowEdge(
name="dfe1",
source_node=start_node,
source_output="articles",
destination_node=map_node,
destination_input="iterated_article",
),
DataFlowEdge(
name="dfe2",
source_node=map_node,
source_output="collected_summary",
destination_node=end_node,
destination_input="summaries",
),
],
)
Agent Spec Serialization#
You can export the assistant configuration using the AgentSpecSerializer.
from pyagentspec.serialization import AgentSpecSerializer
serialized_assistant = AgentSpecSerializer().to_json(flow)
Here is what the Agent Spec representation will look like ↓
Click here to see the assistant configuration.
{
"component_type": "Flow",
"id": "853f4c79-ccec-4061-bb82-82b67b96d021",
"name": "flow",
"description": null,
"metadata": {},
"inputs": [
{
"title": "articles",
"items": {
"title": "article",
"type": "string"
},
"type": "array"
}
],
"outputs": [
{
"title": "summaries",
"items": {
"title": "summary",
"type": "string"
},
"type": "array"
}
],
"start_node": {
"$component_ref": "69ea8387-3e3e-4202-95fb-5aa1fc95e0c2"
},
"nodes": [
{
"$component_ref": "69ea8387-3e3e-4202-95fb-5aa1fc95e0c2"
},
{
"$component_ref": "1d7cfc39-ec21-4f91-88e8-7bb0c90a4abf"
},
{
"$component_ref": "ba7eea20-fc0e-4212-a052-eb01e744ffc2"
}
],
"control_flow_connections": [
{
"component_type": "ControlFlowEdge",
"id": "36ccaa13-8a8c-4160-b08e-a6915c25892d",
"name": "cfe1",
"description": null,
"metadata": {},
"from_node": {
"$component_ref": "69ea8387-3e3e-4202-95fb-5aa1fc95e0c2"
},
"from_branch": null,
"to_node": {
"$component_ref": "ba7eea20-fc0e-4212-a052-eb01e744ffc2"
}
},
{
"component_type": "ControlFlowEdge",
"id": "e5e45377-1484-497a-bbca-2150c71911fc",
"name": "cfe2",
"description": null,
"metadata": {},
"from_node": {
"$component_ref": "ba7eea20-fc0e-4212-a052-eb01e744ffc2"
},
"from_branch": null,
"to_node": {
"$component_ref": "1d7cfc39-ec21-4f91-88e8-7bb0c90a4abf"
}
}
],
"data_flow_connections": [
{
"component_type": "DataFlowEdge",
"id": "f6eefbf1-6345-4555-916e-8e1748cba5be",
"name": "dfe1",
"description": null,
"metadata": {},
"source_node": {
"$component_ref": "69ea8387-3e3e-4202-95fb-5aa1fc95e0c2"
},
"source_output": "articles",
"destination_node": {
"$component_ref": "ba7eea20-fc0e-4212-a052-eb01e744ffc2"
},
"destination_input": "iterated_article"
},
{
"component_type": "DataFlowEdge",
"id": "a6c13605-ea64-4090-8c18-eaef5f46e698",
"name": "dfe2",
"description": null,
"metadata": {},
"source_node": {
"$component_ref": "ba7eea20-fc0e-4212-a052-eb01e744ffc2"
},
"source_output": "collected_summary",
"destination_node": {
"$component_ref": "1d7cfc39-ec21-4f91-88e8-7bb0c90a4abf"
},
"destination_input": "summaries"
}
],
"$referenced_components": {
"ba7eea20-fc0e-4212-a052-eb01e744ffc2": {
"component_type": "MapNode",
"id": "ba7eea20-fc0e-4212-a052-eb01e744ffc2",
"name": "map_node",
"description": null,
"metadata": {},
"inputs": [
{
"title": "iterated_article",
"items": {
"title": "article",
"type": "string"
},
"type": "array"
}
],
"outputs": [
{
"title": "collected_summary",
"items": {
"title": "summary",
"type": "string"
},
"type": "array"
}
],
"branches": [
"next"
],
"subflow": {
"component_type": "Flow",
"id": "083f44be-1500-4066-9cb3-5ee700fbccc9",
"name": "mapnode_subflow",
"description": null,
"metadata": {},
"inputs": [
{
"title": "article",
"type": "string"
}
],
"outputs": [
{
"title": "summary",
"type": "string"
}
],
"start_node": {
"$component_ref": "9cbd5883-4322-438d-bf44-b4c7746f85df"
},
"nodes": [
{
"$component_ref": "9cbd5883-4322-438d-bf44-b4c7746f85df"
},
{
"$component_ref": "2e922c96-5986-48e2-93df-6b2cfdf14428"
},
{
"$component_ref": "260c82f4-88f3-4cc0-9cff-25beba338a42"
}
],
"control_flow_connections": [
{
"component_type": "ControlFlowEdge",
"id": "c6ffb270-5656-4ff5-a65b-27583fc53f26",
"name": "cfe1",
"description": null,
"metadata": {},
"from_node": {
"$component_ref": "9cbd5883-4322-438d-bf44-b4c7746f85df"
},
"from_branch": null,
"to_node": {
"$component_ref": "260c82f4-88f3-4cc0-9cff-25beba338a42"
}
},
{
"component_type": "ControlFlowEdge",
"id": "bc773dc3-9828-4700-b0dd-aa70cea70265",
"name": "cfe2",
"description": null,
"metadata": {},
"from_node": {
"$component_ref": "260c82f4-88f3-4cc0-9cff-25beba338a42"
},
"from_branch": null,
"to_node": {
"$component_ref": "2e922c96-5986-48e2-93df-6b2cfdf14428"
}
}
],
"data_flow_connections": [
{
"component_type": "DataFlowEdge",
"id": "bb96701a-df21-4e46-a987-89a15d9fe3d6",
"name": "dfe1",
"description": null,
"metadata": {},
"source_node": {
"$component_ref": "9cbd5883-4322-438d-bf44-b4c7746f85df"
},
"source_output": "article",
"destination_node": {
"$component_ref": "260c82f4-88f3-4cc0-9cff-25beba338a42"
},
"destination_input": "article"
},
{
"component_type": "DataFlowEdge",
"id": "ecc1b21c-69a6-40a9-a737-328a44bfa59f",
"name": "dfe2",
"description": null,
"metadata": {},
"source_node": {
"$component_ref": "260c82f4-88f3-4cc0-9cff-25beba338a42"
},
"source_output": "summary",
"destination_node": {
"$component_ref": "2e922c96-5986-48e2-93df-6b2cfdf14428"
},
"destination_input": "summary"
}
],
"$referenced_components": {
"260c82f4-88f3-4cc0-9cff-25beba338a42": {
"component_type": "LlmNode",
"id": "260c82f4-88f3-4cc0-9cff-25beba338a42",
"name": "summarize_node",
"description": null,
"metadata": {},
"inputs": [
{
"title": "article",
"type": "string"
}
],
"outputs": [
{
"title": "summary",
"type": "string"
}
],
"branches": [
"next"
],
"llm_config": {
"component_type": "VllmConfig",
"id": "b3fc1b3a-7d69-4242-bff8-2d3ac5d81a71",
"name": "vllm-llama-4-maverick",
"description": null,
"metadata": {},
"default_generation_parameters": null,
"url": "http://url.to.my.vllm.server/llama4mav",
"model_id": "llama-4-maverick"
},
"prompt_template": "Summarize this article in 10 words:\n {{article}}"
},
"9cbd5883-4322-438d-bf44-b4c7746f85df": {
"component_type": "StartNode",
"id": "9cbd5883-4322-438d-bf44-b4c7746f85df",
"name": "start_node",
"description": null,
"metadata": {},
"inputs": [
{
"title": "article",
"type": "string"
}
],
"outputs": [
{
"title": "article",
"type": "string"
}
],
"branches": [
"next"
]
},
"2e922c96-5986-48e2-93df-6b2cfdf14428": {
"component_type": "EndNode",
"id": "2e922c96-5986-48e2-93df-6b2cfdf14428",
"name": "end_node",
"description": null,
"metadata": {},
"inputs": [
{
"title": "summary",
"type": "string"
}
],
"outputs": [
{
"title": "summary",
"type": "string"
}
],
"branches": [],
"branch_name": "next"
}
}
},
"reducers": {
"summary": "append"
}
},
"69ea8387-3e3e-4202-95fb-5aa1fc95e0c2": {
"component_type": "StartNode",
"id": "69ea8387-3e3e-4202-95fb-5aa1fc95e0c2",
"name": "start_node",
"description": null,
"metadata": {},
"inputs": [
{
"title": "articles",
"items": {
"title": "article",
"type": "string"
},
"type": "array"
}
],
"outputs": [
{
"title": "articles",
"items": {
"title": "article",
"type": "string"
},
"type": "array"
}
],
"branches": [
"next"
]
},
"1d7cfc39-ec21-4f91-88e8-7bb0c90a4abf": {
"component_type": "EndNode",
"id": "1d7cfc39-ec21-4f91-88e8-7bb0c90a4abf",
"name": "end_node",
"description": null,
"metadata": {},
"inputs": [
{
"title": "summaries",
"items": {
"title": "summary",
"type": "string"
},
"type": "array"
}
],
"outputs": [
{
"title": "summaries",
"items": {
"title": "summary",
"type": "string"
},
"type": "array"
}
],
"branches": [],
"branch_name": "next"
}
},
"agentspec_version": "25.4.1"
}
component_type: Flow
id: 9f172d92-f64b-4545-8297-4afa830088e6
name: flow
description: null
metadata: {}
inputs:
- title: articles
items:
title: article
type: string
type: array
outputs:
- title: summaries
items:
title: summary
type: string
type: array
start_node:
$component_ref: 2e7a764f-8236-459f-9cf5-7cde37f8ce94
nodes:
- $component_ref: 2e7a764f-8236-459f-9cf5-7cde37f8ce94
- $component_ref: 6c41a036-c2d2-4a1f-90f3-c6c0b0fda79e
- $component_ref: fa5a8b80-fd92-4b75-bd5f-aba7af849fe6
control_flow_connections:
- component_type: ControlFlowEdge
id: 9fe25f4f-5c0b-423c-b139-c659d24df775
name: cfe1
description: null
metadata: {}
from_node:
$component_ref: 2e7a764f-8236-459f-9cf5-7cde37f8ce94
from_branch: null
to_node:
$component_ref: fa5a8b80-fd92-4b75-bd5f-aba7af849fe6
- component_type: ControlFlowEdge
id: bbe03108-e414-4eca-9992-c5bb2770ffcf
name: cfe2
description: null
metadata: {}
from_node:
$component_ref: fa5a8b80-fd92-4b75-bd5f-aba7af849fe6
from_branch: null
to_node:
$component_ref: 6c41a036-c2d2-4a1f-90f3-c6c0b0fda79e
data_flow_connections:
- component_type: DataFlowEdge
id: 97018039-7d88-4de1-b09c-1b84a8b0f425
name: dfe1
description: null
metadata: {}
source_node:
$component_ref: 2e7a764f-8236-459f-9cf5-7cde37f8ce94
source_output: articles
destination_node:
$component_ref: fa5a8b80-fd92-4b75-bd5f-aba7af849fe6
destination_input: iterated_article
- component_type: DataFlowEdge
id: 002ab9b3-78b3-42cf-9b07-02063174783d
name: dfe2
description: null
metadata: {}
source_node:
$component_ref: fa5a8b80-fd92-4b75-bd5f-aba7af849fe6
source_output: collected_summary
destination_node:
$component_ref: 6c41a036-c2d2-4a1f-90f3-c6c0b0fda79e
destination_input: summaries
$referenced_components:
fa5a8b80-fd92-4b75-bd5f-aba7af849fe6:
component_type: MapNode
id: fa5a8b80-fd92-4b75-bd5f-aba7af849fe6
name: map_node
description: null
metadata: {}
inputs:
- title: iterated_article
items:
title: article
type: string
type: array
outputs:
- title: collected_summary
items:
title: summary
type: string
type: array
branches:
- next
subflow:
component_type: Flow
id: ab1087dd-8469-4173-b46c-03426bc7ae8f
name: mapnode_subflow
description: null
metadata: {}
inputs:
- title: article
type: string
outputs:
- title: summary
type: string
start_node:
$component_ref: 0a09736b-87e3-4b73-9c50-803fda03d5ce
nodes:
- $component_ref: 0a09736b-87e3-4b73-9c50-803fda03d5ce
- $component_ref: e0293cb9-1cc2-4434-b398-1c4ec4cf269c
- $component_ref: 0c76dfef-f68d-4e79-ac06-e2f605e017c3
control_flow_connections:
- component_type: ControlFlowEdge
id: eb62a074-4bdb-480e-8498-cc3db3b9b976
name: cfe1
description: null
metadata: {}
from_node:
$component_ref: 0a09736b-87e3-4b73-9c50-803fda03d5ce
from_branch: null
to_node:
$component_ref: 0c76dfef-f68d-4e79-ac06-e2f605e017c3
- component_type: ControlFlowEdge
id: fc730dd7-a564-47e9-8d42-4b9c36880edc
name: cfe2
description: null
metadata: {}
from_node:
$component_ref: 0c76dfef-f68d-4e79-ac06-e2f605e017c3
from_branch: null
to_node:
$component_ref: e0293cb9-1cc2-4434-b398-1c4ec4cf269c
data_flow_connections:
- component_type: DataFlowEdge
id: 87c2ec9d-f2df-4e85-ae41-ee03cea71ce0
name: dfe1
description: null
metadata: {}
source_node:
$component_ref: 0a09736b-87e3-4b73-9c50-803fda03d5ce
source_output: article
destination_node:
$component_ref: 0c76dfef-f68d-4e79-ac06-e2f605e017c3
destination_input: article
- component_type: DataFlowEdge
id: 7c53602c-d3e8-45ae-99fe-93e05ce48f93
name: dfe2
description: null
metadata: {}
source_node:
$component_ref: 0c76dfef-f68d-4e79-ac06-e2f605e017c3
source_output: summary
destination_node:
$component_ref: e0293cb9-1cc2-4434-b398-1c4ec4cf269c
destination_input: summary
$referenced_components:
0c76dfef-f68d-4e79-ac06-e2f605e017c3:
component_type: LlmNode
id: 0c76dfef-f68d-4e79-ac06-e2f605e017c3
name: summarize_node
description: null
metadata: {}
inputs:
- title: article
type: string
outputs:
- title: summary
type: string
branches:
- next
llm_config:
component_type: VllmConfig
id: 4175f758-cf13-4f15-87dc-fde10befe077
name: vllm-llama-4-maverick
description: null
metadata: {}
default_generation_parameters: null
url: http://url.to.my.vllm.server/llama4mav
model_id: llama-4-maverick
prompt_template: "Summarize this article in 10 words:\n {{article}}"
0a09736b-87e3-4b73-9c50-803fda03d5ce:
component_type: StartNode
id: 0a09736b-87e3-4b73-9c50-803fda03d5ce
name: start_node
description: null
metadata: {}
inputs:
- title: article
type: string
outputs:
- title: article
type: string
branches:
- next
e0293cb9-1cc2-4434-b398-1c4ec4cf269c:
component_type: EndNode
id: e0293cb9-1cc2-4434-b398-1c4ec4cf269c
name: end_node
description: null
metadata: {}
inputs:
- title: summary
type: string
outputs:
- title: summary
type: string
branches: []
branch_name: next
reducers:
summary: append
2e7a764f-8236-459f-9cf5-7cde37f8ce94:
component_type: StartNode
id: 2e7a764f-8236-459f-9cf5-7cde37f8ce94
name: start_node
description: null
metadata: {}
inputs:
- title: articles
items:
title: article
type: string
type: array
outputs:
- title: articles
items:
title: article
type: string
type: array
branches:
- next
6c41a036-c2d2-4a1f-90f3-c6c0b0fda79e:
component_type: EndNode
id: 6c41a036-c2d2-4a1f-90f3-c6c0b0fda79e
name: end_node
description: null
metadata: {}
inputs:
- title: summaries
items:
title: summary
type: string
type: array
outputs:
- title: summaries
items:
title: summary
type: string
type: array
branches: []
branch_name: next
agentspec_version: 25.4.1
Recap#
In this guide, you learned how to implement a map-reduce operation using Agent Spec components.
Below is the complete code from this guide.
from pyagentspec.llms import VllmConfig
llm_config = VllmConfig(
name="vllm-llama-4-maverick",
model_id="llama-4-maverick",
url="http://url.to.my.vllm.server/llama4mav",
)
from pyagentspec.flows.edges import ControlFlowEdge, DataFlowEdge
from pyagentspec.flows.flow import Flow
from pyagentspec.property import StringProperty
from pyagentspec.flows.nodes import LlmNode, StartNode, EndNode
start_node = StartNode(name="start_node", inputs=[StringProperty(title="article")])
end_node = EndNode(name="end_node", outputs=[StringProperty(title="summary")])
summarize_node = LlmNode(
name="summarize_node",
llm_config=llm_config,
prompt_template="""Summarize this article in 10 words:
{{article}}""",
outputs=[StringProperty(title="summary")],
)
summarize_flow = Flow(
name="mapnode_subflow",
start_node=start_node,
nodes=[start_node, end_node, summarize_node],
control_flow_connections=[
ControlFlowEdge(name="cfe1", from_node=start_node, to_node=summarize_node),
ControlFlowEdge(name="cfe2", from_node=summarize_node, to_node=end_node),
],
data_flow_connections=[
DataFlowEdge(
name="dfe1",
source_node=start_node,
source_output="article",
destination_node=summarize_node,
destination_input="article",
),
DataFlowEdge(
name="dfe2",
source_node=summarize_node,
source_output="summary",
destination_node=end_node,
destination_input="summary",
),
],
)
from pyagentspec.property import ListProperty, StringProperty
from pyagentspec.flows.nodes import MapNode
map_node = MapNode(
name="map_node",
subflow=summarize_flow,
inputs=[ListProperty(title="iterated_article", item_type=StringProperty(title="article"))],
outputs=[ListProperty(title="collected_summary", item_type=StringProperty(title="summary"))],
)
start_node = StartNode(
name="start_node",
inputs=[ListProperty(title="articles", item_type=StringProperty(title="article"))],
)
end_node = EndNode(
name="end_node",
outputs=[ListProperty(title="summaries", item_type=StringProperty(title="summary"))],
)
flow = Flow(
name="flow",
start_node=start_node,
nodes=[start_node, end_node, map_node],
control_flow_connections=[
ControlFlowEdge(name="cfe1", from_node=start_node, to_node=map_node),
ControlFlowEdge(name="cfe2", from_node=map_node, to_node=end_node),
],
data_flow_connections=[
DataFlowEdge(
name="dfe1",
source_node=start_node,
source_output="articles",
destination_node=map_node,
destination_input="iterated_article",
),
DataFlowEdge(
name="dfe2",
source_node=map_node,
source_output="collected_summary",
destination_node=end_node,
destination_input="summaries",
),
],
)
from pyagentspec.serialization import AgentSpecSerializer
serialized_assistant = AgentSpecSerializer().to_json(flow)
Next steps#
Having learned how to perform map
and reduce
operations in Agent Spec,
you may now proceed to How to Build an Orchestrator-Workers Agents Pattern.