How to Do Map and Reduce Operations in Flows#

Map-Reduce is a programming model essential for efficiently processing large datasets across distributed systems. It is widely used in software engineering to enhance data processing speed and scalability.

Agent Spec supports the Map and Reduce operations in Flows, using the MapNode. This guide will show you how to use MapNode to perform an operation on all elements of a list

Flow diagram of a MapNode

Basic implementation#

Assuming you want to summarize a few articles. You have the option to generate the summary with the LlmNode class:

from pyagentspec.flows.edges import ControlFlowEdge, DataFlowEdge
from pyagentspec.flows.flow import Flow
from pyagentspec.property import StringProperty
from pyagentspec.flows.nodes import LlmNode, StartNode, EndNode

start_node = StartNode(name="start_node", inputs=[StringProperty(title="article")])
end_node = EndNode(name="end_node", outputs=[StringProperty(title="summary")])
summarize_node = LlmNode(
    name="summarize_node",
    llm_config=llm_config,
    prompt_template="""Summarize this article in 10 words:
 {{article}}""",
    outputs=[StringProperty(title="summary")],
)
summarize_flow = Flow(
    name="mapnode_subflow",
    start_node=start_node,
    nodes=[start_node, end_node, summarize_node],
    control_flow_connections=[
        ControlFlowEdge(name="cfe1", from_node=start_node, to_node=summarize_node),
        ControlFlowEdge(name="cfe2", from_node=summarize_node, to_node=end_node),
    ],
    data_flow_connections=[
        DataFlowEdge(
            name="dfe1",
            source_node=start_node,
            source_output="article",
            destination_node=summarize_node,
            destination_input="article",
        ),
        DataFlowEdge(
            name="dfe2",
            source_node=summarize_node,
            source_output="summary",
            destination_node=end_node,
            destination_input="summary",
        ),
    ],
)

This step takes a single article, and generates a summary. Since you have a list of articles, use the MapNode class to generate a summary for each article. Pay attention in using the right names for inputs and outputs of the MapNode: according to the Agent Spec language specification, you should prepend the iterated_ prefix to the inputs of the sub-flow that should be “mapped”, and the collected_ prefix to the outputs of the sub-flow that should be “reduced”.

In this case we select the append reduction method for the only subflow output we have (i.e., summary). This corresponds to the default behavior of the MapNode, so we don’t need to specify anything.

Please, refer to the Agent Spec specification for more information.

from pyagentspec.property import ListProperty, StringProperty
from pyagentspec.flows.nodes import MapNode

map_node = MapNode(
    name="map_node",
    subflow=summarize_flow,
    inputs=[ListProperty(title="iterated_article", item_type=StringProperty(title="article"))],
    outputs=[ListProperty(title="collected_summary", item_type=StringProperty(title="summary"))],
)

The code above will connect:

  • the article input of the sub-flow to the iterated_article input of the MapNode

  • the summary output of the sub-flow to the collected_summary output of the MapNode

Once this is done, you can create the flow for the MapNode:

start_node = StartNode(
    name="start_node",
    inputs=[ListProperty(title="articles", item_type=StringProperty(title="article"))],
)
end_node = EndNode(
    name="end_node",
    outputs=[ListProperty(title="summaries", item_type=StringProperty(title="summary"))],
)
flow = Flow(
    name="flow",
    start_node=start_node,
    nodes=[start_node, end_node, map_node],
    control_flow_connections=[
        ControlFlowEdge(name="cfe1", from_node=start_node, to_node=map_node),
        ControlFlowEdge(name="cfe2", from_node=map_node, to_node=end_node),
    ],
    data_flow_connections=[
        DataFlowEdge(
            name="dfe1",
            source_node=start_node,
            source_output="articles",
            destination_node=map_node,
            destination_input="iterated_article",
        ),
        DataFlowEdge(
            name="dfe2",
            source_node=map_node,
            source_output="collected_summary",
            destination_node=end_node,
            destination_input="summaries",
        ),
    ],
)

Enabling parallelization#

Agent Spec offers a parallel version of MapNode called ParallelMapNode : the node’s behavior is equivalent, but the map operation is supposed to be performed in parallel. The flow we implemented for our map-reduce operation in this guide is well suited to enable parallelism, as it does not contain criticalities, such as access to mutable resources, or input requests. We can enable parallelism for the MapNode by simply changing the creation of the node to use the ParallelMapNode class:

from pyagentspec.property import ListProperty, StringProperty
from pyagentspec.flows.nodes import ParallelMapNode

parallel_map_node = ParallelMapNode(
    name="map_node",
    subflow=summarize_flow,
    inputs=[ListProperty(title="iterated_article", item_type=StringProperty(title="article"))],
    outputs=[ListProperty(title="collected_summary", item_type=StringProperty(title="summary"))],
)

Notes about parallelization#

Not all sub-flows can be executed in parallel. Agent Spec does not forbid specific configurations for ParallelMapNode subflows, but there are several precautions to take when parallelization is enabled, especially when the sub-flow is supposed to access mutable shared resources (e.g., the conversation), or interrupt the normal execution of the flow (e.g., client tools).

For more information about parallel execution support in Agent Spec, please check the language specification. For guidelines about secure implementation of concurrent execution, instead, check our security guidelines.

Agent Spec Serialization#

You can export the assistant configuration using the AgentSpecSerializer.

from pyagentspec.serialization import AgentSpecSerializer

serialized_assistant = AgentSpecSerializer().to_json(flow)

Here is what the Agent Spec representation will look like ↓

Click here to see the assistant configuration.
{
  "component_type": "Flow",
  "id": "853f4c79-ccec-4061-bb82-82b67b96d021",
  "name": "flow",
  "description": null,
  "metadata": {},
  "inputs": [
    {
      "title": "articles",
      "items": {
        "title": "article",
        "type": "string"
      },
      "type": "array"
    }
  ],
  "outputs": [
    {
      "title": "summaries",
      "items": {
        "title": "summary",
        "type": "string"
      },
      "type": "array"
    }
  ],
  "start_node": {
    "$component_ref": "69ea8387-3e3e-4202-95fb-5aa1fc95e0c2"
  },
  "nodes": [
    {
      "$component_ref": "69ea8387-3e3e-4202-95fb-5aa1fc95e0c2"
    },
    {
      "$component_ref": "1d7cfc39-ec21-4f91-88e8-7bb0c90a4abf"
    },
    {
      "$component_ref": "ba7eea20-fc0e-4212-a052-eb01e744ffc2"
    }
  ],
  "control_flow_connections": [
    {
      "component_type": "ControlFlowEdge",
      "id": "36ccaa13-8a8c-4160-b08e-a6915c25892d",
      "name": "cfe1",
      "description": null,
      "metadata": {},
      "from_node": {
        "$component_ref": "69ea8387-3e3e-4202-95fb-5aa1fc95e0c2"
      },
      "from_branch": null,
      "to_node": {
        "$component_ref": "ba7eea20-fc0e-4212-a052-eb01e744ffc2"
      }
    },
    {
      "component_type": "ControlFlowEdge",
      "id": "e5e45377-1484-497a-bbca-2150c71911fc",
      "name": "cfe2",
      "description": null,
      "metadata": {},
      "from_node": {
        "$component_ref": "ba7eea20-fc0e-4212-a052-eb01e744ffc2"
      },
      "from_branch": null,
      "to_node": {
        "$component_ref": "1d7cfc39-ec21-4f91-88e8-7bb0c90a4abf"
      }
    }
  ],
  "data_flow_connections": [
    {
      "component_type": "DataFlowEdge",
      "id": "f6eefbf1-6345-4555-916e-8e1748cba5be",
      "name": "dfe1",
      "description": null,
      "metadata": {},
      "source_node": {
        "$component_ref": "69ea8387-3e3e-4202-95fb-5aa1fc95e0c2"
      },
      "source_output": "articles",
      "destination_node": {
        "$component_ref": "ba7eea20-fc0e-4212-a052-eb01e744ffc2"
      },
      "destination_input": "iterated_article"
    },
    {
      "component_type": "DataFlowEdge",
      "id": "a6c13605-ea64-4090-8c18-eaef5f46e698",
      "name": "dfe2",
      "description": null,
      "metadata": {},
      "source_node": {
        "$component_ref": "ba7eea20-fc0e-4212-a052-eb01e744ffc2"
      },
      "source_output": "collected_summary",
      "destination_node": {
        "$component_ref": "1d7cfc39-ec21-4f91-88e8-7bb0c90a4abf"
      },
      "destination_input": "summaries"
    }
  ],
  "$referenced_components": {
    "ba7eea20-fc0e-4212-a052-eb01e744ffc2": {
      "component_type": "MapNode",
      "id": "ba7eea20-fc0e-4212-a052-eb01e744ffc2",
      "name": "map_node",
      "description": null,
      "metadata": {},
      "inputs": [
        {
          "title": "iterated_article",
          "items": {
            "title": "article",
            "type": "string"
          },
          "type": "array"
        }
      ],
      "outputs": [
        {
          "title": "collected_summary",
          "items": {
            "title": "summary",
            "type": "string"
          },
          "type": "array"
        }
      ],
      "branches": [
        "next"
      ],
      "subflow": {
        "component_type": "Flow",
        "id": "083f44be-1500-4066-9cb3-5ee700fbccc9",
        "name": "mapnode_subflow",
        "description": null,
        "metadata": {},
        "inputs": [
          {
            "title": "article",
            "type": "string"
          }
        ],
        "outputs": [
          {
            "title": "summary",
            "type": "string"
          }
        ],
        "start_node": {
          "$component_ref": "9cbd5883-4322-438d-bf44-b4c7746f85df"
        },
        "nodes": [
          {
            "$component_ref": "9cbd5883-4322-438d-bf44-b4c7746f85df"
          },
          {
            "$component_ref": "2e922c96-5986-48e2-93df-6b2cfdf14428"
          },
          {
            "$component_ref": "260c82f4-88f3-4cc0-9cff-25beba338a42"
          }
        ],
        "control_flow_connections": [
          {
            "component_type": "ControlFlowEdge",
            "id": "c6ffb270-5656-4ff5-a65b-27583fc53f26",
            "name": "cfe1",
            "description": null,
            "metadata": {},
            "from_node": {
              "$component_ref": "9cbd5883-4322-438d-bf44-b4c7746f85df"
            },
            "from_branch": null,
            "to_node": {
              "$component_ref": "260c82f4-88f3-4cc0-9cff-25beba338a42"
            }
          },
          {
            "component_type": "ControlFlowEdge",
            "id": "bc773dc3-9828-4700-b0dd-aa70cea70265",
            "name": "cfe2",
            "description": null,
            "metadata": {},
            "from_node": {
              "$component_ref": "260c82f4-88f3-4cc0-9cff-25beba338a42"
            },
            "from_branch": null,
            "to_node": {
              "$component_ref": "2e922c96-5986-48e2-93df-6b2cfdf14428"
            }
          }
        ],
        "data_flow_connections": [
          {
            "component_type": "DataFlowEdge",
            "id": "bb96701a-df21-4e46-a987-89a15d9fe3d6",
            "name": "dfe1",
            "description": null,
            "metadata": {},
            "source_node": {
              "$component_ref": "9cbd5883-4322-438d-bf44-b4c7746f85df"
            },
            "source_output": "article",
            "destination_node": {
              "$component_ref": "260c82f4-88f3-4cc0-9cff-25beba338a42"
            },
            "destination_input": "article"
          },
          {
            "component_type": "DataFlowEdge",
            "id": "ecc1b21c-69a6-40a9-a737-328a44bfa59f",
            "name": "dfe2",
            "description": null,
            "metadata": {},
            "source_node": {
              "$component_ref": "260c82f4-88f3-4cc0-9cff-25beba338a42"
            },
            "source_output": "summary",
            "destination_node": {
              "$component_ref": "2e922c96-5986-48e2-93df-6b2cfdf14428"
            },
            "destination_input": "summary"
          }
        ],
        "$referenced_components": {
          "260c82f4-88f3-4cc0-9cff-25beba338a42": {
            "component_type": "LlmNode",
            "id": "260c82f4-88f3-4cc0-9cff-25beba338a42",
            "name": "summarize_node",
            "description": null,
            "metadata": {},
            "inputs": [
              {
                "title": "article",
                "type": "string"
              }
            ],
            "outputs": [
              {
                "title": "summary",
                "type": "string"
              }
            ],
            "branches": [
              "next"
            ],
            "llm_config": {
              "component_type": "VllmConfig",
              "id": "b3fc1b3a-7d69-4242-bff8-2d3ac5d81a71",
              "name": "vllm-llama-4-maverick",
              "description": null,
              "metadata": {},
              "default_generation_parameters": null,
              "url": "http://url.to.my.vllm.server/llama4mav",
              "model_id": "llama-4-maverick"
            },
            "prompt_template": "Summarize this article in 10 words:\n {{article}}"
          },
          "9cbd5883-4322-438d-bf44-b4c7746f85df": {
            "component_type": "StartNode",
            "id": "9cbd5883-4322-438d-bf44-b4c7746f85df",
            "name": "start_node",
            "description": null,
            "metadata": {},
            "inputs": [
              {
                "title": "article",
                "type": "string"
              }
            ],
            "outputs": [
              {
                "title": "article",
                "type": "string"
              }
            ],
            "branches": [
              "next"
            ]
          },
          "2e922c96-5986-48e2-93df-6b2cfdf14428": {
            "component_type": "EndNode",
            "id": "2e922c96-5986-48e2-93df-6b2cfdf14428",
            "name": "end_node",
            "description": null,
            "metadata": {},
            "inputs": [
              {
                "title": "summary",
                "type": "string"
              }
            ],
            "outputs": [
              {
                "title": "summary",
                "type": "string"
              }
            ],
            "branches": [],
            "branch_name": "next"
          }
        }
      },
      "reducers": {
        "summary": "append"
      }
    },
    "69ea8387-3e3e-4202-95fb-5aa1fc95e0c2": {
      "component_type": "StartNode",
      "id": "69ea8387-3e3e-4202-95fb-5aa1fc95e0c2",
      "name": "start_node",
      "description": null,
      "metadata": {},
      "inputs": [
        {
          "title": "articles",
          "items": {
            "title": "article",
            "type": "string"
          },
          "type": "array"
        }
      ],
      "outputs": [
        {
          "title": "articles",
          "items": {
            "title": "article",
            "type": "string"
          },
          "type": "array"
        }
      ],
      "branches": [
        "next"
      ]
    },
    "1d7cfc39-ec21-4f91-88e8-7bb0c90a4abf": {
      "component_type": "EndNode",
      "id": "1d7cfc39-ec21-4f91-88e8-7bb0c90a4abf",
      "name": "end_node",
      "description": null,
      "metadata": {},
      "inputs": [
        {
          "title": "summaries",
          "items": {
            "title": "summary",
            "type": "string"
          },
          "type": "array"
        }
      ],
      "outputs": [
        {
          "title": "summaries",
          "items": {
            "title": "summary",
            "type": "string"
          },
          "type": "array"
        }
      ],
      "branches": [],
      "branch_name": "next"
    }
  },
  "agentspec_version": "25.4.1"
}

Recap#

In this guide, you learned how to implement a map-reduce operation using Agent Spec components.

Below is the complete code from this guide.
from pyagentspec.llms import VllmConfig

llm_config = VllmConfig(
    name="vllm-llama-4-maverick",
    model_id="llama-4-maverick",
    url="http://url.to.my.vllm.server/llama4mav",
)

from pyagentspec.flows.edges import ControlFlowEdge, DataFlowEdge
from pyagentspec.flows.flow import Flow
from pyagentspec.property import StringProperty
from pyagentspec.flows.nodes import LlmNode, StartNode, EndNode

start_node = StartNode(name="start_node", inputs=[StringProperty(title="article")])
end_node = EndNode(name="end_node", outputs=[StringProperty(title="summary")])
summarize_node = LlmNode(
    name="summarize_node",
    llm_config=llm_config,
    prompt_template="""Summarize this article in 10 words:
 {{article}}""",
    outputs=[StringProperty(title="summary")],
)
summarize_flow = Flow(
    name="mapnode_subflow",
    start_node=start_node,
    nodes=[start_node, end_node, summarize_node],
    control_flow_connections=[
        ControlFlowEdge(name="cfe1", from_node=start_node, to_node=summarize_node),
        ControlFlowEdge(name="cfe2", from_node=summarize_node, to_node=end_node),
    ],
    data_flow_connections=[
        DataFlowEdge(
            name="dfe1",
            source_node=start_node,
            source_output="article",
            destination_node=summarize_node,
            destination_input="article",
        ),
        DataFlowEdge(
            name="dfe2",
            source_node=summarize_node,
            source_output="summary",
            destination_node=end_node,
            destination_input="summary",
        ),
    ],
)

from pyagentspec.property import ListProperty, StringProperty
from pyagentspec.flows.nodes import MapNode

map_node = MapNode(
    name="map_node",
    subflow=summarize_flow,
    inputs=[ListProperty(title="iterated_article", item_type=StringProperty(title="article"))],
    outputs=[ListProperty(title="collected_summary", item_type=StringProperty(title="summary"))],
)

start_node = StartNode(
    name="start_node",
    inputs=[ListProperty(title="articles", item_type=StringProperty(title="article"))],
)
end_node = EndNode(
    name="end_node",
    outputs=[ListProperty(title="summaries", item_type=StringProperty(title="summary"))],
)
flow = Flow(
    name="flow",
    start_node=start_node,
    nodes=[start_node, end_node, map_node],
    control_flow_connections=[
        ControlFlowEdge(name="cfe1", from_node=start_node, to_node=map_node),
        ControlFlowEdge(name="cfe2", from_node=map_node, to_node=end_node),
    ],
    data_flow_connections=[
        DataFlowEdge(
            name="dfe1",
            source_node=start_node,
            source_output="articles",
            destination_node=map_node,
            destination_input="iterated_article",
        ),
        DataFlowEdge(
            name="dfe2",
            source_node=map_node,
            source_output="collected_summary",
            destination_node=end_node,
            destination_input="summaries",
        ),
    ],
)

from pyagentspec.serialization import AgentSpecSerializer

serialized_assistant = AgentSpecSerializer().to_json(flow)

Next steps#

Having learned how to perform map and reduce operations in Agent Spec, you may now proceed to How to Build an Orchestrator-Workers Agents Pattern.