How to Do Map and Reduce Operations in Flows#

Map-Reduce is a programming model essential for efficiently processing large datasets across distributed systems. It is widely used in software engineering to enhance data processing speed and scalability.

Agent Spec supports the Map and Reduce operations in Flows, using the MapNode. This guide will show you how to use MapNode to perform an operation on all elements of a list

Flow diagram of a MapNode

Basic implementation#

Assuming you want to summarize a few articles. You have the option to generate the summary with the LlmNode class:

from pyagentspec.flows.edges import ControlFlowEdge, DataFlowEdge
from pyagentspec.flows.flow import Flow
from pyagentspec.property import StringProperty
from pyagentspec.flows.nodes import LlmNode, StartNode, EndNode

start_node = StartNode(name="start_node", inputs=[StringProperty(title="article")])
end_node = EndNode(name="end_node", outputs=[StringProperty(title="summary")])
summarize_node = LlmNode(
    name="summarize_node",
    llm_config=llm_config,
    prompt_template="""Summarize this article in 10 words:
 {{article}}""",
    outputs=[StringProperty(title="summary")],
)
summarize_flow = Flow(
    name="mapnode_subflow",
    start_node=start_node,
    nodes=[start_node, end_node, summarize_node],
    control_flow_connections=[
        ControlFlowEdge(name="cfe1", from_node=start_node, to_node=summarize_node),
        ControlFlowEdge(name="cfe2", from_node=summarize_node, to_node=end_node),
    ],
    data_flow_connections=[
        DataFlowEdge(
            name="dfe1",
            source_node=start_node,
            source_output="article",
            destination_node=summarize_node,
            destination_input="article",
        ),
        DataFlowEdge(
            name="dfe2",
            source_node=summarize_node,
            source_output="summary",
            destination_node=end_node,
            destination_input="summary",
        ),
    ],
)

This step takes a single article, and generates a summary. Since you have a list of articles, use the MapNode class to generate a summary for each article. Pay attention in using the right names for inputs and outputs of the MapNode: according to the Agent Spec language specification, you should prepend the iterated_ prefix to the inputs of the sub-flow that should be “mapped”, and the collected_ prefix to the outputs of the sub-flow that should be “reduced”.

In this case we select the append reduction method for the only subflow output we have (i.e., summary). This corresponds to the default behavior of the MapNode, so we don’t need to specify anything.

Please, refer to the Agent Spec specification for more information.

from pyagentspec.property import ListProperty, StringProperty
from pyagentspec.flows.nodes import MapNode

map_node = MapNode(
    name="map_node",
    subflow=summarize_flow,
    inputs=[ListProperty(title="iterated_article", item_type=StringProperty(title="article"))],
    outputs=[ListProperty(title="collected_summary", item_type=StringProperty(title="summary"))],
)

The code above will connect:

  • the article input of the sub-flow to the iterated_article input of the MapNode

  • the summary output of the sub-flow to the collected_summary output of the MapNode

Once this is done, you can create the flow for the MapNode:

start_node = StartNode(
    name="start_node",
    inputs=[ListProperty(title="articles", item_type=StringProperty(title="article"))],
)
end_node = EndNode(
    name="end_node",
    outputs=[ListProperty(title="summaries", item_type=StringProperty(title="summary"))],
)
flow = Flow(
    name="flow",
    start_node=start_node,
    nodes=[start_node, end_node, map_node],
    control_flow_connections=[
        ControlFlowEdge(name="cfe1", from_node=start_node, to_node=map_node),
        ControlFlowEdge(name="cfe2", from_node=map_node, to_node=end_node),
    ],
    data_flow_connections=[
        DataFlowEdge(
            name="dfe1",
            source_node=start_node,
            source_output="articles",
            destination_node=map_node,
            destination_input="iterated_article",
        ),
        DataFlowEdge(
            name="dfe2",
            source_node=map_node,
            source_output="collected_summary",
            destination_node=end_node,
            destination_input="summaries",
        ),
    ],
)

Agent Spec Serialization#

You can export the assistant configuration using the AgentSpecSerializer.

from pyagentspec.serialization import AgentSpecSerializer

serialized_assistant = AgentSpecSerializer().to_json(flow)

Here is what the Agent Spec representation will look like ↓

Click here to see the assistant configuration.
{
  "component_type": "Flow",
  "id": "853f4c79-ccec-4061-bb82-82b67b96d021",
  "name": "flow",
  "description": null,
  "metadata": {},
  "inputs": [
    {
      "title": "articles",
      "items": {
        "title": "article",
        "type": "string"
      },
      "type": "array"
    }
  ],
  "outputs": [
    {
      "title": "summaries",
      "items": {
        "title": "summary",
        "type": "string"
      },
      "type": "array"
    }
  ],
  "start_node": {
    "$component_ref": "69ea8387-3e3e-4202-95fb-5aa1fc95e0c2"
  },
  "nodes": [
    {
      "$component_ref": "69ea8387-3e3e-4202-95fb-5aa1fc95e0c2"
    },
    {
      "$component_ref": "1d7cfc39-ec21-4f91-88e8-7bb0c90a4abf"
    },
    {
      "$component_ref": "ba7eea20-fc0e-4212-a052-eb01e744ffc2"
    }
  ],
  "control_flow_connections": [
    {
      "component_type": "ControlFlowEdge",
      "id": "36ccaa13-8a8c-4160-b08e-a6915c25892d",
      "name": "cfe1",
      "description": null,
      "metadata": {},
      "from_node": {
        "$component_ref": "69ea8387-3e3e-4202-95fb-5aa1fc95e0c2"
      },
      "from_branch": null,
      "to_node": {
        "$component_ref": "ba7eea20-fc0e-4212-a052-eb01e744ffc2"
      }
    },
    {
      "component_type": "ControlFlowEdge",
      "id": "e5e45377-1484-497a-bbca-2150c71911fc",
      "name": "cfe2",
      "description": null,
      "metadata": {},
      "from_node": {
        "$component_ref": "ba7eea20-fc0e-4212-a052-eb01e744ffc2"
      },
      "from_branch": null,
      "to_node": {
        "$component_ref": "1d7cfc39-ec21-4f91-88e8-7bb0c90a4abf"
      }
    }
  ],
  "data_flow_connections": [
    {
      "component_type": "DataFlowEdge",
      "id": "f6eefbf1-6345-4555-916e-8e1748cba5be",
      "name": "dfe1",
      "description": null,
      "metadata": {},
      "source_node": {
        "$component_ref": "69ea8387-3e3e-4202-95fb-5aa1fc95e0c2"
      },
      "source_output": "articles",
      "destination_node": {
        "$component_ref": "ba7eea20-fc0e-4212-a052-eb01e744ffc2"
      },
      "destination_input": "iterated_article"
    },
    {
      "component_type": "DataFlowEdge",
      "id": "a6c13605-ea64-4090-8c18-eaef5f46e698",
      "name": "dfe2",
      "description": null,
      "metadata": {},
      "source_node": {
        "$component_ref": "ba7eea20-fc0e-4212-a052-eb01e744ffc2"
      },
      "source_output": "collected_summary",
      "destination_node": {
        "$component_ref": "1d7cfc39-ec21-4f91-88e8-7bb0c90a4abf"
      },
      "destination_input": "summaries"
    }
  ],
  "$referenced_components": {
    "ba7eea20-fc0e-4212-a052-eb01e744ffc2": {
      "component_type": "MapNode",
      "id": "ba7eea20-fc0e-4212-a052-eb01e744ffc2",
      "name": "map_node",
      "description": null,
      "metadata": {},
      "inputs": [
        {
          "title": "iterated_article",
          "items": {
            "title": "article",
            "type": "string"
          },
          "type": "array"
        }
      ],
      "outputs": [
        {
          "title": "collected_summary",
          "items": {
            "title": "summary",
            "type": "string"
          },
          "type": "array"
        }
      ],
      "branches": [
        "next"
      ],
      "subflow": {
        "component_type": "Flow",
        "id": "083f44be-1500-4066-9cb3-5ee700fbccc9",
        "name": "mapnode_subflow",
        "description": null,
        "metadata": {},
        "inputs": [
          {
            "title": "article",
            "type": "string"
          }
        ],
        "outputs": [
          {
            "title": "summary",
            "type": "string"
          }
        ],
        "start_node": {
          "$component_ref": "9cbd5883-4322-438d-bf44-b4c7746f85df"
        },
        "nodes": [
          {
            "$component_ref": "9cbd5883-4322-438d-bf44-b4c7746f85df"
          },
          {
            "$component_ref": "2e922c96-5986-48e2-93df-6b2cfdf14428"
          },
          {
            "$component_ref": "260c82f4-88f3-4cc0-9cff-25beba338a42"
          }
        ],
        "control_flow_connections": [
          {
            "component_type": "ControlFlowEdge",
            "id": "c6ffb270-5656-4ff5-a65b-27583fc53f26",
            "name": "cfe1",
            "description": null,
            "metadata": {},
            "from_node": {
              "$component_ref": "9cbd5883-4322-438d-bf44-b4c7746f85df"
            },
            "from_branch": null,
            "to_node": {
              "$component_ref": "260c82f4-88f3-4cc0-9cff-25beba338a42"
            }
          },
          {
            "component_type": "ControlFlowEdge",
            "id": "bc773dc3-9828-4700-b0dd-aa70cea70265",
            "name": "cfe2",
            "description": null,
            "metadata": {},
            "from_node": {
              "$component_ref": "260c82f4-88f3-4cc0-9cff-25beba338a42"
            },
            "from_branch": null,
            "to_node": {
              "$component_ref": "2e922c96-5986-48e2-93df-6b2cfdf14428"
            }
          }
        ],
        "data_flow_connections": [
          {
            "component_type": "DataFlowEdge",
            "id": "bb96701a-df21-4e46-a987-89a15d9fe3d6",
            "name": "dfe1",
            "description": null,
            "metadata": {},
            "source_node": {
              "$component_ref": "9cbd5883-4322-438d-bf44-b4c7746f85df"
            },
            "source_output": "article",
            "destination_node": {
              "$component_ref": "260c82f4-88f3-4cc0-9cff-25beba338a42"
            },
            "destination_input": "article"
          },
          {
            "component_type": "DataFlowEdge",
            "id": "ecc1b21c-69a6-40a9-a737-328a44bfa59f",
            "name": "dfe2",
            "description": null,
            "metadata": {},
            "source_node": {
              "$component_ref": "260c82f4-88f3-4cc0-9cff-25beba338a42"
            },
            "source_output": "summary",
            "destination_node": {
              "$component_ref": "2e922c96-5986-48e2-93df-6b2cfdf14428"
            },
            "destination_input": "summary"
          }
        ],
        "$referenced_components": {
          "260c82f4-88f3-4cc0-9cff-25beba338a42": {
            "component_type": "LlmNode",
            "id": "260c82f4-88f3-4cc0-9cff-25beba338a42",
            "name": "summarize_node",
            "description": null,
            "metadata": {},
            "inputs": [
              {
                "title": "article",
                "type": "string"
              }
            ],
            "outputs": [
              {
                "title": "summary",
                "type": "string"
              }
            ],
            "branches": [
              "next"
            ],
            "llm_config": {
              "component_type": "VllmConfig",
              "id": "b3fc1b3a-7d69-4242-bff8-2d3ac5d81a71",
              "name": "vllm-llama-4-maverick",
              "description": null,
              "metadata": {},
              "default_generation_parameters": null,
              "url": "http://url.to.my.vllm.server/llama4mav",
              "model_id": "llama-4-maverick"
            },
            "prompt_template": "Summarize this article in 10 words:\n {{article}}"
          },
          "9cbd5883-4322-438d-bf44-b4c7746f85df": {
            "component_type": "StartNode",
            "id": "9cbd5883-4322-438d-bf44-b4c7746f85df",
            "name": "start_node",
            "description": null,
            "metadata": {},
            "inputs": [
              {
                "title": "article",
                "type": "string"
              }
            ],
            "outputs": [
              {
                "title": "article",
                "type": "string"
              }
            ],
            "branches": [
              "next"
            ]
          },
          "2e922c96-5986-48e2-93df-6b2cfdf14428": {
            "component_type": "EndNode",
            "id": "2e922c96-5986-48e2-93df-6b2cfdf14428",
            "name": "end_node",
            "description": null,
            "metadata": {},
            "inputs": [
              {
                "title": "summary",
                "type": "string"
              }
            ],
            "outputs": [
              {
                "title": "summary",
                "type": "string"
              }
            ],
            "branches": [],
            "branch_name": "next"
          }
        }
      },
      "reducers": {
        "summary": "append"
      }
    },
    "69ea8387-3e3e-4202-95fb-5aa1fc95e0c2": {
      "component_type": "StartNode",
      "id": "69ea8387-3e3e-4202-95fb-5aa1fc95e0c2",
      "name": "start_node",
      "description": null,
      "metadata": {},
      "inputs": [
        {
          "title": "articles",
          "items": {
            "title": "article",
            "type": "string"
          },
          "type": "array"
        }
      ],
      "outputs": [
        {
          "title": "articles",
          "items": {
            "title": "article",
            "type": "string"
          },
          "type": "array"
        }
      ],
      "branches": [
        "next"
      ]
    },
    "1d7cfc39-ec21-4f91-88e8-7bb0c90a4abf": {
      "component_type": "EndNode",
      "id": "1d7cfc39-ec21-4f91-88e8-7bb0c90a4abf",
      "name": "end_node",
      "description": null,
      "metadata": {},
      "inputs": [
        {
          "title": "summaries",
          "items": {
            "title": "summary",
            "type": "string"
          },
          "type": "array"
        }
      ],
      "outputs": [
        {
          "title": "summaries",
          "items": {
            "title": "summary",
            "type": "string"
          },
          "type": "array"
        }
      ],
      "branches": [],
      "branch_name": "next"
    }
  },
  "agentspec_version": "25.4.1"
}

Recap#

In this guide, you learned how to implement a map-reduce operation using Agent Spec components.

Below is the complete code from this guide.
from pyagentspec.llms import VllmConfig

llm_config = VllmConfig(
    name="vllm-llama-4-maverick",
    model_id="llama-4-maverick",
    url="http://url.to.my.vllm.server/llama4mav",
)

from pyagentspec.flows.edges import ControlFlowEdge, DataFlowEdge
from pyagentspec.flows.flow import Flow
from pyagentspec.property import StringProperty
from pyagentspec.flows.nodes import LlmNode, StartNode, EndNode

start_node = StartNode(name="start_node", inputs=[StringProperty(title="article")])
end_node = EndNode(name="end_node", outputs=[StringProperty(title="summary")])
summarize_node = LlmNode(
    name="summarize_node",
    llm_config=llm_config,
    prompt_template="""Summarize this article in 10 words:
 {{article}}""",
    outputs=[StringProperty(title="summary")],
)
summarize_flow = Flow(
    name="mapnode_subflow",
    start_node=start_node,
    nodes=[start_node, end_node, summarize_node],
    control_flow_connections=[
        ControlFlowEdge(name="cfe1", from_node=start_node, to_node=summarize_node),
        ControlFlowEdge(name="cfe2", from_node=summarize_node, to_node=end_node),
    ],
    data_flow_connections=[
        DataFlowEdge(
            name="dfe1",
            source_node=start_node,
            source_output="article",
            destination_node=summarize_node,
            destination_input="article",
        ),
        DataFlowEdge(
            name="dfe2",
            source_node=summarize_node,
            source_output="summary",
            destination_node=end_node,
            destination_input="summary",
        ),
    ],
)

from pyagentspec.property import ListProperty, StringProperty
from pyagentspec.flows.nodes import MapNode

map_node = MapNode(
    name="map_node",
    subflow=summarize_flow,
    inputs=[ListProperty(title="iterated_article", item_type=StringProperty(title="article"))],
    outputs=[ListProperty(title="collected_summary", item_type=StringProperty(title="summary"))],
)

start_node = StartNode(
    name="start_node",
    inputs=[ListProperty(title="articles", item_type=StringProperty(title="article"))],
)
end_node = EndNode(
    name="end_node",
    outputs=[ListProperty(title="summaries", item_type=StringProperty(title="summary"))],
)
flow = Flow(
    name="flow",
    start_node=start_node,
    nodes=[start_node, end_node, map_node],
    control_flow_connections=[
        ControlFlowEdge(name="cfe1", from_node=start_node, to_node=map_node),
        ControlFlowEdge(name="cfe2", from_node=map_node, to_node=end_node),
    ],
    data_flow_connections=[
        DataFlowEdge(
            name="dfe1",
            source_node=start_node,
            source_output="articles",
            destination_node=map_node,
            destination_input="iterated_article",
        ),
        DataFlowEdge(
            name="dfe2",
            source_node=map_node,
            source_output="collected_summary",
            destination_node=end_node,
            destination_input="summaries",
        ),
    ],
)

from pyagentspec.serialization import AgentSpecSerializer

serialized_assistant = AgentSpecSerializer().to_json(flow)

Next steps#

Having learned how to perform map and reduce operations in Agent Spec, you may now proceed to How to Build an Orchestrator-Workers Agents Pattern.