How to Do Map and Reduce Operations in Flows#

python-icon Download Python Script

Python script/notebook for this guide.

MapStep how-to script

Prerequisites

This guide assumes familiarity with Flows.

Map-Reduce is a programming model essential for efficiently processing large datasets across distributed systems. It is widely used in software engineering to enhance data processing speed and scalability by parallelizing tasks.

WayFlow supports the Map and Reduce operations in Flows, using the MapStep. This guide will show you how to:

  • use MapStep perform an operation on all elements of a list

  • use MapStep to perform an operation on all key/value pairs of a dictionary

  • use MapStep to parallelize some operations

Flow diagram of a MapStep

To follow this guide, you need an LLM. WayFlow supports several LLM API providers. Select an LLM from the options below:

from wayflowcore.models import OCIGenAIModel

if __name__ == "__main__":

    llm = OCIGenAIModel(
        model_id="provider.model-id",
        service_endpoint="https://url-to-service-endpoint.com",
        compartment_id="compartment-id",
        auth_type="API_KEY",
    )

Basic implementation#

Assuming you want to summarize a few articles.

articles = [
    "Sea turtles are ancient reptiles that have been around for over 100 million years. They play crucial roles in marine ecosystems, such as maintaining healthy seagrass beds and coral reefs. Unfortunately, they are under threat due to poaching, habitat loss, and pollution. Conservation efforts worldwide aim to protect nesting sites and reduce bycatch in fishing gear.",
    "Dolphins are highly intelligent marine mammals known for their playfulness and curiosity. They live in social groups called pods, which can consist of hundreds of individuals depending on the species. Dolphins communicate using a variety of clicks, whistles, and other sounds. They face threats from habitat loss, marine pollution, and bycatch in fishing operations.",
    "Manatees, often referred to as 'sea cows', are gentle aquatic giants found in shallow coastal areas and rivers. These herbivorous mammals spend most of their time eating, resting, and traveling. They are particularly known for their slow movement and inability to survive in cold waters. Manatee populations are vulnerable to boat collisions, loss of warm-water habitats, and environmental pollutants.",
]

You have the option to generate the summary with the PromptExecutionStep class, as explained already in the separate guide:

from wayflowcore.controlconnection import ControlFlowEdge
from wayflowcore.dataconnection import DataFlowEdge
from wayflowcore.flow import Flow
from wayflowcore.property import StringProperty
from wayflowcore.steps import PromptExecutionStep, StartStep

start_step = StartStep(name="start_step", input_descriptors=[StringProperty("article")])
summarize_step = PromptExecutionStep(
    name="summarize_step",
    llm=llm,
    prompt_template="""Summarize this article in 10 words:
 {{article}}""",
    output_mapping={PromptExecutionStep.OUTPUT: "summary"},
)
summarize_flow = Flow(
    begin_step=start_step,
    control_flow_edges=[
        ControlFlowEdge(source_step=start_step, destination_step=summarize_step),
        ControlFlowEdge(source_step=summarize_step, destination_step=None),
    ],
    data_flow_edges=[
        DataFlowEdge(start_step, "article", summarize_step, "article"),
    ],
)

This step takes a single article, and generates a summary. Since you have a list of articles, use the MapStep class to generate a summary for each article.

from wayflowcore.property import ListProperty, StringProperty
from wayflowcore.steps import MapStep

map_step = MapStep(
    name="map_step",
    flow=summarize_flow,
    unpack_input={"article": "."},
    output_descriptors=[ListProperty(name="summary", item_type=StringProperty())],
    input_descriptors=[ListProperty(MapStep.ITERATED_INPUT, item_type=StringProperty())],
)

Note

In the unpack_input function, define how each sub-flow input is retrieved. Here, the sub-flow requires an article input. Set its value to ., because each iterated item is the article and . is the identity query in JQ.

The output_descriptors parameter specifies which outputs of the sub-flow will be collected and merged into a list.

Once this is done, create the flow for the MapStep and execute it:

from wayflowcore.property import ListProperty, StringProperty
from wayflowcore.steps import StartStep

start_step = StartStep(
    name="start_step",
    input_descriptors=[ListProperty("articles", item_type=StringProperty())]
)
flow = Flow(
    begin_step=start_step,
    control_flow_edges=[
        ControlFlowEdge(source_step=start_step, destination_step=map_step),
        ControlFlowEdge(source_step=map_step, destination_step=None),
    ],
    data_flow_edges=[
        DataFlowEdge(start_step, "articles", map_step, MapStep.ITERATED_INPUT),
    ],
)
conversation = flow.start_conversation(inputs={"articles": articles})
status = conversation.execute()
print(status.output_values)

As expected, your flow has generated summaries of three articles!

Processing in parallel#

By default, the MapStep runs all operations sequentially in order. This is done so that any flow (including flows that yield or ask the user) can be run.

In many cases (such as generating articles summary), the work is completely parallelizable because the operations are independent from each other. In this context, you can just set the parallel_execution parameter to True and the operations will be run in parallel using a thread-pool.

from wayflowcore.property import ListProperty, StringProperty
from wayflowcore.steps import StartStep

start_step = StartStep(input_descriptors=[ListProperty("articles", item_type=StringProperty())])
map_step = MapStep(
    flow=summarize_flow,
    unpack_input={"article": "."},
    output_descriptors=[ListProperty(name="summary", item_type=StringProperty())],
    input_descriptors=[ListProperty(MapStep.ITERATED_INPUT, item_type=StringProperty())],
    parallel_execution=True,
)
map_step_name = "map_step"
flow = Flow(
    begin_step_name="start_step",
    steps={
        "start_step": start_step,
        map_step_name: map_step,
    },
    control_flow_edges=[
        ControlFlowEdge(source_step=start_step, destination_step=map_step),
        ControlFlowEdge(source_step=map_step, destination_step=None),
    ],
    data_flow_edges=[
        DataFlowEdge(start_step, "articles", map_step, MapStep.ITERATED_INPUT),
    ],
)
conversation = flow.start_conversation(inputs={"articles": articles})
status = conversation.execute()
print(status.output_values)

Note

The Global Interpreter Lock (GIL) in Python is not a problem for parallel remote requests because I/O-bound operations, such as network requests, release the GIL during their execution, allowing other threads to run concurrently while the I/O operation is in progress.

Not all sub-flows can be executed in parallel. The table below summarizes the limitations of parallel execution for the MapStep:

Support

Type of flow

Examples

Remarks

FULLY SUPPORTED

Flows that do not yield and do not have any side-effect on the conversation (no variable read/write, posting to the conversation, and so on)

Embarrassingly parallel flows (simple independent operation), such as a PromptExecutionStep, ApiCallStep to post or get, and so on

N/A

SUPPORTED WITH SIDE EFFECTS

Flows that do not yield but have some side-effect on the conversation (variable read/write, posting to the conversation, and so on)

Flows with OutputMessageStep, VariableReadStep, VariableWriteStep, and so on

No guarantee in the order of operations (such as posting to the conversation), only the outputs are guaranteed in order.

NON SUPPORTED

Flows that yield. WayFlow does not support this, otherwise a user might be confused in what branch they are currently when prompted to answer.

Flows with InputMessageStep, AgentExecutionStep that can ask questions, and so on

It will raise an exception at instantiation time if a sub-flow can yield and step set to parallel

Common patterns and best practices#

Sometimes, you might have a dictionary, and you need to iterate on each of the key/value pairs. To achieve this, set iterated_input_type to DictProperty(<your_type>), and use the queries ._key (respectively ._value) to access the key (and respectively the value) from the key/value pair.

from wayflowcore.property import DictProperty, ListProperty, StringProperty
from wayflowcore.steps import StartStep

articles_as_dict = {str(idx): article for idx, article in enumerate(articles)}

map_step = MapStep(
    name="map_step",
    flow=summarize_flow,
    unpack_input={"article": "._value"},
    input_descriptors=[DictProperty(MapStep.ITERATED_INPUT, value_type=StringProperty())],
    output_descriptors=[ListProperty(name="summary", item_type=StringProperty())],
)
start_step = StartStep(
    name="start_step",
    input_descriptors=[DictProperty("articles", value_type=StringProperty())]
)
flow = Flow(
    begin_step=start_step,
    control_flow_edges=[
        ControlFlowEdge(source_step=start_step, destination_step=map_step),
        ControlFlowEdge(source_step=map_step, destination_step=None),
    ],
    data_flow_edges=[
        DataFlowEdge(start_step, "articles", map_step, MapStep.ITERATED_INPUT),
    ],
)

conversation = flow.start_conversation(inputs={"articles": articles_as_dict})
status = conversation.execute()
print(status.output_values)

Agent Spec Exporting/Loading#

You can export the assistant configuration to its Agent Spec configuration using the AgentSpecExporter.

from wayflowcore.agentspec import AgentSpecExporter

serialized_flow = AgentSpecExporter().to_json(flow)

Here is what the Agent Spec representation will look like ↓

Click here to see the assistant configuration.
{
  "component_type": "Flow",
  "id": "4ec5b97c-ec15-43b6-b778-0ae068572d2f",
  "name": "flow_e47f0d21",
  "description": "",
  "metadata": {
    "__metadata_info__": {}
  },
  "inputs": [
    {
      "type": "array",
      "items": {
        "type": "string"
      },
      "title": "articles"
    }
  ],
  "outputs": [
    {
      "type": "array",
      "items": {
        "type": "string"
      },
      "title": "summary"
    }
  ],
  "start_node": {
    "$component_ref": "4e291d9e-ec4e-45b7-ade8-1ed5bf37a405"
  },
  "nodes": [
    {
      "$component_ref": "4e291d9e-ec4e-45b7-ade8-1ed5bf37a405"
    },
    {
      "$component_ref": "c8ce9d86-b069-4338-8372-09f32462ed39"
    },
    {
      "$component_ref": "0e6801d2-803e-40bc-809d-9e3dc69e5720"
    }
  ],
  "control_flow_connections": [
    {
      "component_type": "ControlFlowEdge",
      "id": "8250a6ea-1250-45bd-9c97-3211c99511d6",
      "name": "start_step_to_map_step_control_flow_edge",
      "description": null,
      "metadata": {
        "__metadata_info__": {}
      },
      "from_node": {
        "$component_ref": "4e291d9e-ec4e-45b7-ade8-1ed5bf37a405"
      },
      "from_branch": null,
      "to_node": {
        "$component_ref": "c8ce9d86-b069-4338-8372-09f32462ed39"
      }
    },
    {
      "component_type": "ControlFlowEdge",
      "id": "09a6ef4e-bc03-4db8-a49c-d7f2a03414d0",
      "name": "map_step_to_None End node_control_flow_edge",
      "description": null,
      "metadata": {
        "__metadata_info__": {}
      },
      "from_node": {
        "$component_ref": "c8ce9d86-b069-4338-8372-09f32462ed39"
      },
      "from_branch": null,
      "to_node": {
        "$component_ref": "0e6801d2-803e-40bc-809d-9e3dc69e5720"
      }
    }
  ],
  "data_flow_connections": [
    {
      "component_type": "DataFlowEdge",
      "id": "a377edf1-bbac-49e0-bca0-2dd1638ba5ab",
      "name": "start_step_articles_to_map_step_iterated_input_data_flow_edge",
      "description": null,
      "metadata": {
        "__metadata_info__": {}
      },
      "source_node": {
        "$component_ref": "4e291d9e-ec4e-45b7-ade8-1ed5bf37a405"
      },
      "source_output": "articles",
      "destination_node": {
        "$component_ref": "c8ce9d86-b069-4338-8372-09f32462ed39"
      },
      "destination_input": "iterated_input"
    },
    {
      "component_type": "DataFlowEdge",
      "id": "6aadb135-1e4a-4f46-a2f9-3e4181f77818",
      "name": "map_step_summary_to_None End node_summary_data_flow_edge",
      "description": null,
      "metadata": {
        "__metadata_info__": {}
      },
      "source_node": {
        "$component_ref": "c8ce9d86-b069-4338-8372-09f32462ed39"
      },
      "source_output": "summary",
      "destination_node": {
        "$component_ref": "0e6801d2-803e-40bc-809d-9e3dc69e5720"
      },
      "destination_input": "summary"
    }
  ],
  "$referenced_components": {
    "c8ce9d86-b069-4338-8372-09f32462ed39": {
      "component_type": "ExtendedMapNode",
      "id": "c8ce9d86-b069-4338-8372-09f32462ed39",
      "name": "map_step",
      "description": "",
      "metadata": {
        "__metadata_info__": {}
      },
      "inputs": [
        {
          "type": "array",
          "items": {
            "type": "string"
          },
          "title": "iterated_input"
        }
      ],
      "outputs": [
        {
          "type": "array",
          "items": {
            "type": "string"
          },
          "title": "summary"
        }
      ],
      "branches": [
        "next"
      ],
      "input_mapping": {},
      "output_mapping": {},
      "flow": {
        "component_type": "Flow",
        "id": "8f8b92e5-fc99-4dac-9a56-ff423ced6195",
        "name": "flow_191e481f",
        "description": "",
        "metadata": {
          "__metadata_info__": {}
        },
        "inputs": [
          {
            "type": "string",
            "title": "article"
          }
        ],
        "outputs": [
          {
            "description": "the generated text",
            "type": "string",
            "title": "summary"
          }
        ],
        "start_node": {
          "$component_ref": "3e35c34a-75fc-4b8f-9454-5cd2ff95a985"
        },
        "nodes": [
          {
            "$component_ref": "3e35c34a-75fc-4b8f-9454-5cd2ff95a985"
          },
          {
            "$component_ref": "48b07b6d-fa2d-4975-88f5-b2113821a426"
          },
          {
            "$component_ref": "76083393-ffeb-4a1a-b2e1-a1b163c8ebbb"
          }
        ],
        "control_flow_connections": [
          {
            "component_type": "ControlFlowEdge",
            "id": "7e971869-fdda-41d2-8b72-746a5f4103c8",
            "name": "start_step_to_summarize_step_control_flow_edge",
            "description": null,
            "metadata": {
              "__metadata_info__": {}
            },
            "from_node": {
              "$component_ref": "3e35c34a-75fc-4b8f-9454-5cd2ff95a985"
            },
            "from_branch": null,
            "to_node": {
              "$component_ref": "48b07b6d-fa2d-4975-88f5-b2113821a426"
            }
          },
          {
            "component_type": "ControlFlowEdge",
            "id": "ac52880e-53fe-4418-a9a1-7e3248f76c4b",
            "name": "summarize_step_to_None End node_control_flow_edge",
            "description": null,
            "metadata": {
              "__metadata_info__": {}
            },
            "from_node": {
              "$component_ref": "48b07b6d-fa2d-4975-88f5-b2113821a426"
            },
            "from_branch": null,
            "to_node": {
              "$component_ref": "76083393-ffeb-4a1a-b2e1-a1b163c8ebbb"
            }
          }
        ],
        "data_flow_connections": [
          {
            "component_type": "DataFlowEdge",
            "id": "f21834ca-2e6a-469e-a385-cd1dd3bbe76e",
            "name": "start_step_article_to_summarize_step_article_data_flow_edge",
            "description": null,
            "metadata": {
              "__metadata_info__": {}
            },
            "source_node": {
              "$component_ref": "3e35c34a-75fc-4b8f-9454-5cd2ff95a985"
            },
            "source_output": "article",
            "destination_node": {
              "$component_ref": "48b07b6d-fa2d-4975-88f5-b2113821a426"
            },
            "destination_input": "article"
          },
          {
            "component_type": "DataFlowEdge",
            "id": "d8bf35b3-5595-4a1a-91fa-08b984cee4a9",
            "name": "summarize_step_summary_to_None End node_summary_data_flow_edge",
            "description": null,
            "metadata": {
              "__metadata_info__": {}
            },
            "source_node": {
              "$component_ref": "48b07b6d-fa2d-4975-88f5-b2113821a426"
            },
            "source_output": "summary",
            "destination_node": {
              "$component_ref": "76083393-ffeb-4a1a-b2e1-a1b163c8ebbb"
            },
            "destination_input": "summary"
          }
        ],
        "$referenced_components": {
          "48b07b6d-fa2d-4975-88f5-b2113821a426": {
            "component_type": "LlmNode",
            "id": "48b07b6d-fa2d-4975-88f5-b2113821a426",
            "name": "summarize_step",
            "description": "",
            "metadata": {
              "__metadata_info__": {}
            },
            "inputs": [
              {
                "description": "\"article\" input variable for the template",
                "type": "string",
                "title": "article"
              }
            ],
            "outputs": [
              {
                "description": "the generated text",
                "type": "string",
                "title": "summary"
              }
            ],
            "branches": [
              "next"
            ],
            "llm_config": {
              "component_type": "VllmConfig",
              "id": "0ef47939-f5ce-4267-a4f1-8c4f3492bad3",
              "name": "LLAMA_MODEL_ID",
              "description": null,
              "metadata": {
                "__metadata_info__": {}
              },
              "default_generation_parameters": null,
              "url": "LLAMA_API_URL",
              "model_id": "LLAMA_MODEL_ID"
            },
            "prompt_template": "Summarize this article in 10 words:\n {{article}}"
          },
          "3e35c34a-75fc-4b8f-9454-5cd2ff95a985": {
            "component_type": "StartNode",
            "id": "3e35c34a-75fc-4b8f-9454-5cd2ff95a985",
            "name": "start_step",
            "description": "",
            "metadata": {
              "__metadata_info__": {}
            },
            "inputs": [
              {
                "type": "string",
                "title": "article"
              }
            ],
            "outputs": [
              {
                "type": "string",
                "title": "article"
              }
            ],
            "branches": [
              "next"
            ]
          },
          "76083393-ffeb-4a1a-b2e1-a1b163c8ebbb": {
            "component_type": "EndNode",
            "id": "76083393-ffeb-4a1a-b2e1-a1b163c8ebbb",
            "name": "None End node",
            "description": null,
            "metadata": {
              "__metadata_info__": {}
            },
            "inputs": [
              {
                "description": "the generated text",
                "type": "string",
                "title": "summary"
              }
            ],
            "outputs": [
              {
                "description": "the generated text",
                "type": "string",
                "title": "summary"
              }
            ],
            "branches": [],
            "branch_name": "next"
          }
        }
      },
      "unpack_input": {
        "article": "."
      },
      "parallel_execution": true,
      "component_plugin_name": "NodesPlugin",
      "component_plugin_version": "25.4.0.dev0"
    },
    "4e291d9e-ec4e-45b7-ade8-1ed5bf37a405": {
      "component_type": "StartNode",
      "id": "4e291d9e-ec4e-45b7-ade8-1ed5bf37a405",
      "name": "start_step",
      "description": "",
      "metadata": {
        "__metadata_info__": {}
      },
      "inputs": [
        {
          "type": "array",
          "items": {
            "type": "string"
          },
          "title": "articles"
        }
      ],
      "outputs": [
        {
          "type": "array",
          "items": {
            "type": "string"
          },
          "title": "articles"
        }
      ],
      "branches": [
        "next"
      ]
    },
    "0e6801d2-803e-40bc-809d-9e3dc69e5720": {
      "component_type": "EndNode",
      "id": "0e6801d2-803e-40bc-809d-9e3dc69e5720",
      "name": "None End node",
      "description": null,
      "metadata": {
        "__metadata_info__": {}
      },
      "inputs": [
        {
          "type": "array",
          "items": {
            "type": "string"
          },
          "title": "summary"
        }
      ],
      "outputs": [
        {
          "type": "array",
          "items": {
            "type": "string"
          },
          "title": "summary"
        }
      ],
      "branches": [],
      "branch_name": "next"
    }
  },
  "agentspec_version": "25.4.1"
}

You can then load the configuration back to an assistant using the AgentSpecLoader.

from wayflowcore.agentspec import AgentSpecLoader

flow = AgentSpecLoader().load_json(serialized_flow)

Note

This guide uses the following extension/plugin Agent Spec components:

  • ExtendedMapNode

See the list of available Agent Spec extension/plugin components in the API Reference

Next steps#

Having learned how to perform map and reduce operations in WayFlow, you may now proceed to How to Use Agents in Flows.

Full code#

Click on the card at the top of this page to download the full code for this guide or copy the code below.

  1# Copyright © 2025 Oracle and/or its affiliates.
  2#
  3# This software is under the Universal Permissive License
  4# %%[markdown]
  5# WayFlow Code Example - How to Do Map and Reduce Operations in Flows
  6# -------------------------------------------------------------------
  7
  8# How to use:
  9# Create a new Python virtual environment and install the latest WayFlow version.
 10# ```bash
 11# python -m venv venv-wayflowcore
 12# source venv-wayflowcore/bin/activate
 13# pip install --upgrade pip
 14# pip install "wayflowcore==26.1" 
 15# ```
 16
 17# You can now run the script
 18# 1. As a Python file:
 19# ```bash
 20# python howto_mapstep.py
 21# ```
 22# 2. As a Notebook (in VSCode):
 23# When viewing the file,
 24#  - press the keys Ctrl + Enter to run the selected cell
 25#  - or Shift + Enter to run the selected cell and move to the cell below# (UPL) 1.0 (LICENSE-UPL or https://oss.oracle.com/licenses/upl) or Apache License
 26# 2.0 (LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0), at your option.
 27
 28
 29
 30# %%[markdown]
 31## Define the articles
 32
 33# %%
 34articles = [
 35    "Sea turtles are ancient reptiles that have been around for over 100 million years. They play crucial roles in marine ecosystems, such as maintaining healthy seagrass beds and coral reefs. Unfortunately, they are under threat due to poaching, habitat loss, and pollution. Conservation efforts worldwide aim to protect nesting sites and reduce bycatch in fishing gear.",
 36    "Dolphins are highly intelligent marine mammals known for their playfulness and curiosity. They live in social groups called pods, which can consist of hundreds of individuals depending on the species. Dolphins communicate using a variety of clicks, whistles, and other sounds. They face threats from habitat loss, marine pollution, and bycatch in fishing operations.",
 37    "Manatees, often referred to as 'sea cows', are gentle aquatic giants found in shallow coastal areas and rivers. These herbivorous mammals spend most of their time eating, resting, and traveling. They are particularly known for their slow movement and inability to survive in cold waters. Manatee populations are vulnerable to boat collisions, loss of warm-water habitats, and environmental pollutants.",
 38]
 39
 40# %%[markdown]
 41## Define the LLM
 42
 43# %%
 44from wayflowcore.models import VllmModel
 45
 46llm = VllmModel(
 47    model_id="LLAMA_MODEL_ID",
 48    host_port="LLAMA_API_URL",
 49)
 50
 51# %%[markdown]
 52## Create the Flow for the MapStep
 53
 54# %%
 55from wayflowcore.controlconnection import ControlFlowEdge
 56from wayflowcore.dataconnection import DataFlowEdge
 57from wayflowcore.flow import Flow
 58from wayflowcore.property import StringProperty
 59from wayflowcore.steps import PromptExecutionStep, StartStep
 60
 61start_step = StartStep(name="start_step", input_descriptors=[StringProperty("article")])
 62summarize_step = PromptExecutionStep(
 63    name="summarize_step",
 64    llm=llm,
 65    prompt_template="""Summarize this article in 10 words:
 66 {{article}}""",
 67    output_mapping={PromptExecutionStep.OUTPUT: "summary"},
 68)
 69summarize_flow = Flow(
 70    begin_step=start_step,
 71    control_flow_edges=[
 72        ControlFlowEdge(source_step=start_step, destination_step=summarize_step),
 73        ControlFlowEdge(source_step=summarize_step, destination_step=None),
 74    ],
 75    data_flow_edges=[
 76        DataFlowEdge(start_step, "article", summarize_step, "article"),
 77    ],
 78)
 79
 80# %%[markdown]
 81## Create the MapStep
 82
 83# %%
 84from wayflowcore.property import ListProperty, StringProperty
 85from wayflowcore.steps import MapStep
 86
 87map_step = MapStep(
 88    name="map_step",
 89    flow=summarize_flow,
 90    unpack_input={"article": "."},
 91    output_descriptors=[ListProperty(name="summary", item_type=StringProperty())],
 92    input_descriptors=[ListProperty(MapStep.ITERATED_INPUT, item_type=StringProperty())],
 93)
 94
 95# %%[markdown]
 96## Create and execute the final Flow
 97
 98# %%
 99from wayflowcore.property import ListProperty, StringProperty
100from wayflowcore.steps import StartStep
101
102start_step = StartStep(
103    name="start_step",
104    input_descriptors=[ListProperty("articles", item_type=StringProperty())]
105)
106flow = Flow(
107    begin_step=start_step,
108    control_flow_edges=[
109        ControlFlowEdge(source_step=start_step, destination_step=map_step),
110        ControlFlowEdge(source_step=map_step, destination_step=None),
111    ],
112    data_flow_edges=[
113        DataFlowEdge(start_step, "articles", map_step, MapStep.ITERATED_INPUT),
114    ],
115)
116conversation = flow.start_conversation(inputs={"articles": articles})
117status = conversation.execute()
118print(status.output_values)
119
120# %%[markdown]
121## Iterate over a dictionary
122
123# %%
124from wayflowcore.property import DictProperty, ListProperty, StringProperty
125from wayflowcore.steps import StartStep
126
127articles_as_dict = {str(idx): article for idx, article in enumerate(articles)}
128
129map_step = MapStep(
130    name="map_step",
131    flow=summarize_flow,
132    unpack_input={"article": "._value"},
133    input_descriptors=[DictProperty(MapStep.ITERATED_INPUT, value_type=StringProperty())],
134    output_descriptors=[ListProperty(name="summary", item_type=StringProperty())],
135)
136start_step = StartStep(
137    name="start_step",
138    input_descriptors=[DictProperty("articles", value_type=StringProperty())]
139)
140flow = Flow(
141    begin_step=start_step,
142    control_flow_edges=[
143        ControlFlowEdge(source_step=start_step, destination_step=map_step),
144        ControlFlowEdge(source_step=map_step, destination_step=None),
145    ],
146    data_flow_edges=[
147        DataFlowEdge(start_step, "articles", map_step, MapStep.ITERATED_INPUT),
148    ],
149)
150
151conversation = flow.start_conversation(inputs={"articles": articles_as_dict})
152status = conversation.execute()
153print(status.output_values)
154
155# %%[markdown]
156## Parallel execution of map reduce operation
157
158# %%
159from wayflowcore.property import ListProperty, StringProperty
160from wayflowcore.steps import StartStep
161
162start_step = StartStep(input_descriptors=[ListProperty("articles", item_type=StringProperty())])
163map_step = MapStep(
164    flow=summarize_flow,
165    unpack_input={"article": "."},
166    output_descriptors=[ListProperty(name="summary", item_type=StringProperty())],
167    input_descriptors=[ListProperty(MapStep.ITERATED_INPUT, item_type=StringProperty())],
168    parallel_execution=True,
169)
170map_step_name = "map_step"
171flow = Flow(
172    begin_step_name="start_step",
173    steps={
174        "start_step": start_step,
175        map_step_name: map_step,
176    },
177    control_flow_edges=[
178        ControlFlowEdge(source_step=start_step, destination_step=map_step),
179        ControlFlowEdge(source_step=map_step, destination_step=None),
180    ],
181    data_flow_edges=[
182        DataFlowEdge(start_step, "articles", map_step, MapStep.ITERATED_INPUT),
183    ],
184)
185conversation = flow.start_conversation(inputs={"articles": articles})
186status = conversation.execute()
187print(status.output_values)
188
189# %%[markdown]
190## Export config to Agent Spec
191
192# %%
193from wayflowcore.agentspec import AgentSpecExporter
194
195serialized_flow = AgentSpecExporter().to_json(flow)
196
197# %%[markdown]
198## Load Agent Spec config
199
200# %%
201from wayflowcore.agentspec import AgentSpecLoader
202
203flow = AgentSpecLoader().load_json(serialized_flow)