Flows#

This page presents all APIs and classes related to Flows and nodes in PyAgentSpec.

Flows & Nodes#

class pyagentspec.flows.flow.Flow(*, id=<factory>, name, description=None, metadata=<factory>, min_agentspec_version=AgentSpecVersionEnum.v25_4_1, max_agentspec_version=AgentSpecVersionEnum.v25_4_1, inputs=None, outputs=None, start_node, nodes, control_flow_connections, data_flow_connections=None)#

Bases: AgenticComponent

A flow is a component to model sequences of operations to do in a precised order.

The operations and sequence is defined by the nodes and transitions associated to the flow. Steps can be deterministic, or for some use LLMs.

Parameters:
  • id (str) – A unique identifier for this Component

  • name (str) – Name of this Component

  • description (str | None) – Optional description of this Component

  • metadata (Dict[str, Any] | None) – Optional, additional metadata related to this Component

  • min_agentspec_version (AgentSpecVersionEnum) –

  • max_agentspec_version (AgentSpecVersionEnum) –

  • inputs (List[Property] | None) – List of inputs accepted by this component

  • outputs (List[Property] | None) – List of outputs exposed by this component

  • start_node (Node) – The starting node of this flow. It must be also part of the nodes list

  • nodes (List[Node]) – The list of nodes that compose this Flow

  • control_flow_connections (List[ControlFlowEdge]) – The list of edges that define the control flow of this Flow

  • data_flow_connections (List[DataFlowEdge] | None) – The list of edges that define the data flow of this Flow

Example

>>> from pyagentspec.property import Property
>>> from pyagentspec.flows.flow import Flow
>>> from pyagentspec.flows.edges import ControlFlowEdge, DataFlowEdge
>>> from pyagentspec.flows.nodes import LlmNode, StartNode, EndNode
>>> prompt_property = Property(
...     json_schema={"title": "prompt", "type": "string"}
... )
>>> llm_output_property = Property(
...     json_schema={"title": "llm_output", "type": "string"}
... )
>>> start_node = StartNode(name="start", inputs=[prompt_property])
>>> end_node = EndNode(name="end", outputs=[llm_output_property])
>>> llm_node = LlmNode(
...     name="simple llm node",
...     llm_config=llm_config,
...     prompt_template="{{prompt}}",
...     inputs=[prompt_property],
...     outputs=[llm_output_property],
... )
>>> flow = Flow(
...     name="Simple prompting flow",
...     start_node=start_node,
...     nodes=[start_node, llm_node, end_node],
...     control_flow_connections=[
...         ControlFlowEdge(name="start_to_llm", from_node=start_node, to_node=llm_node),
...         ControlFlowEdge(name="llm_to_end", from_node=llm_node, to_node=end_node),
...     ],
...     data_flow_connections=[
...         DataFlowEdge(
...             name="prompt_edge",
...             source_node=start_node,
...             source_output="prompt",
...             destination_node=llm_node,
...             destination_input="prompt",
...         ),
...         DataFlowEdge(
...             name="llm_output_edge",
...             source_node=llm_node,
...             source_output="llm_output",
...             destination_node=end_node,
...             destination_input="llm_output"
...         ),
...     ],
... )
class pyagentspec.flows.node.Node(*, id=<factory>, name, description=None, metadata=<factory>, min_agentspec_version=AgentSpecVersionEnum.v25_4_1, max_agentspec_version=AgentSpecVersionEnum.v25_4_1, inputs=None, outputs=None, branches=<factory>)#

Bases: ComponentWithIO

Base class for all nodes that can be put inside a flow.

Parameters:
  • id (str) – A unique identifier for this Component

  • name (str) – Name of this Component

  • description (str | None) – Optional description of this Component

  • metadata (Dict[str, Any] | None) – Optional, additional metadata related to this Component

  • min_agentspec_version (AgentSpecVersionEnum) –

  • max_agentspec_version (AgentSpecVersionEnum) –

  • inputs (List[Property] | None) – List of inputs accepted by this component

  • outputs (List[Property] | None) – List of outputs exposed by this component

  • branches (List[str]) – The list of outgoing branch names that the node will expose

DEFAULT_NEXT_BRANCH: ClassVar[str] = 'next'#

Name used for the default branch

branches: List[str]#

The list of outgoing branch names that the node will expose

class pyagentspec.flows.edges.dataflowedge.DataFlowEdge(*, id=<factory>, name, description=None, metadata=<factory>, min_agentspec_version=AgentSpecVersionEnum.v25_4_1, max_agentspec_version=AgentSpecVersionEnum.v25_4_1, source_node, source_output, destination_node, destination_input)#

Bases: Component

A data flow edge specifies how the output of a node propagates as input of another node.

An outputs can be propagated as input of several nodes.

Parameters:
  • id (str) – A unique identifier for this Component

  • name (str) – Name of this Component

  • description (str | None) – Optional description of this Component

  • metadata (Dict[str, Any] | None) – Optional, additional metadata related to this Component

  • min_agentspec_version (AgentSpecVersionEnum) –

  • max_agentspec_version (AgentSpecVersionEnum) –

  • source_node (Node) – The instance of the source Node

  • source_output (str) – The name of the property among the source node outputs that should be connected

  • destination_node (Node) – The instance of the destination Node

  • destination_input (str) – The name of the property among the destination node inputs that should be connected

destination_input: str#

The name of the property among the destination node inputs that should be connected

destination_node: Node#

The instance of the destination Node

source_node: Node#

The instance of the source Node

source_output: str#

The name of the property among the source node outputs that should be connected

class pyagentspec.flows.edges.controlflowedge.ControlFlowEdge(*, id=<factory>, name, description=None, metadata=<factory>, min_agentspec_version=AgentSpecVersionEnum.v25_4_1, max_agentspec_version=AgentSpecVersionEnum.v25_4_1, from_node, from_branch=None, to_node)#

Bases: Component

A control flow edge specifies a possible transition from a node to another in a flow.

A single node can have several potential next nodes, in which case several control flow edges should be present in the control flow connections of that flow.

Parameters:
  • id (str) – A unique identifier for this Component

  • name (str) – Name of this Component

  • description (str | None) – Optional description of this Component

  • metadata (Dict[str, Any] | None) – Optional, additional metadata related to this Component

  • min_agentspec_version (AgentSpecVersionEnum) –

  • max_agentspec_version (AgentSpecVersionEnum) –

  • from_node (Node) – The instance of the source Node

  • from_branch (str | None) – The name of the branch to connect. It must be among the list of branches offered by the from_node

  • to_node (Node) – The instance of the destination Node

from_branch: str | None#

The name of the branch to connect. It must be among the list of branches offered by the from_node

from_node: Node#

The instance of the source Node

to_node: Node#

The instance of the destination Node

Nodes#

class pyagentspec.flows.nodes.agentnode.AgentNode(*, id=<factory>, name, description=None, metadata=<factory>, min_agentspec_version=AgentSpecVersionEnum.v25_4_1, max_agentspec_version=AgentSpecVersionEnum.v25_4_1, inputs=None, outputs=None, branches=<factory>, agent)#

Bases: Node

The agent execution node is a node that will execute an agent as part of a flow.

If branches are configured, the agent will be prompted to select a branch before the agent node completes to transition to another node of the Flow.

  • Inputs

    Inferred from the definition of the agent to execute.

  • Outputs

    Inferred from the definition of the agent to execute.

  • Branches

    One, the default next.

Examples

>>> from pyagentspec.flows.flow import Flow
>>> from pyagentspec.flows.edges import ControlFlowEdge, DataFlowEdge
>>> from pyagentspec.agent import Agent
>>> from pyagentspec.property import Property
>>> from pyagentspec.flows.nodes import AgentNode, StartNode, EndNode
>>> from pyagentspec.tools import ServerTool
>>> query_property = Property(json_schema={"title": "query", "type": "string"})
>>> search_results_property = Property(
...     json_schema={"title": "search_results", "type": "array", "items": {"type": "string"}}
... )
>>> search_tool = ServerTool(
...     name="search_tool",
...     description=(
...        "This tool runs a web search with the given query "
...        "and returns the most relevant results"
...     ),
...     inputs=[query_property],
...     outputs=[search_results_property],
... )
>>> agent = Agent(
...     name="Search agent",
...     llm_config=llm_config,
...     system_prompt=(
...         "Your task is to gather the required information for the user: {{query}}"
...     ),
...     tools=[search_tool],
...     outputs=[search_results_property],
... )
>>> start_node = StartNode(name="start", inputs=[query_property])
>>> end_node = EndNode(name="end", outputs=[search_results_property])
>>> agent_node = AgentNode(
...     name="Search agent node",
...     agent=agent,
... )
>>> flow = Flow(
...     name="Search agent flow",
...     start_node=start_node,
...     nodes=[start_node, agent_node, end_node],
...     control_flow_connections=[
...         ControlFlowEdge(name="start_to_agent", from_node=start_node, to_node=agent_node),
...         ControlFlowEdge(name="agent_to_end", from_node=agent_node, to_node=end_node),
...     ],
...     data_flow_connections=[
...         DataFlowEdge(
...             name="query_edge",
...             source_node=start_node,
...             source_output="query",
...             destination_node=agent_node,
...             destination_input="query",
...         ),
...         DataFlowEdge(
...             name="search_results_edge",
...             source_node=agent_node,
...             source_output="search_results",
...             destination_node=end_node,
...             destination_input="search_results"
...         ),
...     ],
... )
Parameters:
  • id (str) – A unique identifier for this Component

  • name (str) – Name of this Component

  • description (str | None) – Optional description of this Component

  • metadata (Dict[str, Any] | None) – Optional, additional metadata related to this Component

  • min_agentspec_version (AgentSpecVersionEnum) –

  • max_agentspec_version (AgentSpecVersionEnum) –

  • inputs (List[Property] | None) – List of inputs accepted by this component

  • outputs (List[Property] | None) – List of outputs exposed by this component

  • branches (List[str]) – The list of outgoing branch names that the node will expose

  • agent (AgenticComponent) – The agentic component that will be called as part of the execution of this node

agent: AgenticComponent#

The agentic component that will be called as part of the execution of this node

class pyagentspec.flows.nodes.apinode.ApiNode(*, id=<factory>, name, description=None, metadata=<factory>, min_agentspec_version=AgentSpecVersionEnum.v25_4_1, max_agentspec_version=AgentSpecVersionEnum.v25_4_1, inputs=None, outputs=None, branches=<factory>, url, http_method, api_spec_uri=None, data=<factory>, query_params=<factory>, headers=<factory>)#

Bases: Node

Make an API call.

This node is intended to be a part of a Flow.

  • Inputs

    Inferred from the json spec retrieved from API Spec URI, if available and reachable. Otherwise, users have to manually specify them.

  • Outputs

    Inferred from the json spec retrieved from API Spec URI, if available and reachable. Otherwise, users have to manually specify them.

  • Branches

    One, the default next.

Examples

>>> from pyagentspec.flows.nodes import ApiNode
>>> from pyagentspec.property import Property
>>> weather_result_property = Property(
...     json_schema={
...         "title": "zurich_weather",
...         "type": "object",
...         "properties": {
...             "temperature": {
...                 "type": "number",
...                 "description": "Temperature in celsius degrees",
...             },
...             "weather": {"type": "string"}
...         },
...     }
... )
>>> call_current_weather_step = ApiNode(
...     name="Weather API call node",
...     url="https://example.com/weather",
...     http_method = "GET",
...     query_params={
...         "location": "zurich",
...     },
...     outputs=[weather_result_property]
... )
>>>
>>> item_id_property = Property(
...     json_schema={"title": "item_id", "type": "string"}
... )
>>> order_id_property = Property(
...     json_schema={"title": "order_id", "type": "string"}
... )
>>> store_id_property = Property(
...     json_schema={"title": "store_id", "type": "string"}
... )
>>> session_id_property = Property(
...     json_schema={"title": "session_id", "type": "string"}
... )
>>> create_order_step = ApiNode(
...     name="Orders api call node",
...     url="https://example.com/orders/{{ order_id }}",
...     http_method="POST",
...     # sending an object which will automatically be transformed into JSON
...     data={
...         # define a static body parameter
...         "topic_id": 12345,
...         # define a templated body parameter.
...         # The value for {{ item_id }} will be taken from the IO system at runtime
...         "item_id": "{{ item_id }}",
...     },
...     query_params={
...         # provide one templated query parameter called "store_id"
...         # which will take its value from the IO system from key "store_id"
...         "store_id": "{{ store_id }}",
...     },
...     headers={
...         # set header session_id. the value is coming from the IO system
...         "session_id": "{{ session_id }}",
...     },
...     inputs=[item_id_property, order_id_property, store_id_property, session_id_property],
... )
Parameters:
  • id (str) – A unique identifier for this Component

  • name (str) – Name of this Component

  • description (str | None) – Optional description of this Component

  • metadata (Dict[str, Any] | None) – Optional, additional metadata related to this Component

  • min_agentspec_version (AgentSpecVersionEnum) –

  • max_agentspec_version (AgentSpecVersionEnum) –

  • inputs (List[Property] | None) – List of inputs accepted by this component

  • outputs (List[Property] | None) – List of outputs exposed by this component

  • branches (List[str]) – The list of outgoing branch names that the node will expose

  • url (str) – The url of the API to which the call should be forwarded. Allows placeholders, which can define inputs

  • http_method (str) – The HTTP method to use for the API call (e.g., GET, POST, PUT, …). Allows placeholders, which can define inputs

  • api_spec_uri (str | None) – The uri of the specification of the API that is going to be called. Allows placeholders, which can define inputs

  • data (Dict[str, Any]) – The data to send as part of the body of this API call. Allows placeholders in dict values, which can define inputs

  • query_params (Dict[str, Any]) – Query parameters for the API call. Allows placeholders in dict values, which can define inputs

  • headers (Dict[str, Any]) – Additional headers for the API call. Allows placeholders in dict values, which can define inputs

DEFAULT_OUTPUT: ClassVar[str] = 'response'#

Input key for the name to transition to next.

Type:

str

api_spec_uri: str | None#

The uri of the specification of the API that is going to be called. Allows placeholders, which can define inputs

data: Dict[str, Any]#

The data to send as part of the body of this API call. Allows placeholders in dict values, which can define inputs

headers: Dict[str, Any]#

Additional headers for the API call. Allows placeholders in dict values, which can define inputs

http_method: str#

The HTTP method to use for the API call (e.g., GET, POST, PUT, …). Allows placeholders, which can define inputs

query_params: Dict[str, Any]#

Query parameters for the API call. Allows placeholders in dict values, which can define inputs

url: str#

The url of the API to which the call should be forwarded. Allows placeholders, which can define inputs

class pyagentspec.flows.nodes.branchingnode.BranchingNode(*, id=<factory>, name, description=None, metadata=<factory>, min_agentspec_version=AgentSpecVersionEnum.v25_4_1, max_agentspec_version=AgentSpecVersionEnum.v25_4_1, inputs=None, outputs=None, branches=<factory>, mapping)#

Bases: Node

Select the next node to transition to based on a mapping.

The input is used as key for the mapping. If the input does not correspond to any of the keys of the mapping the branch ‘default’ will be selected. This node is intended to be a part of a Flow.

  • Inputs

    The input value that should be used as key for the mapping.

  • Outputs

    None.

  • Branches

    One for each value in the mapping, plus a branch called default, which is the branch taken by the flow when mapping fails (i.e., the input does not match any key in the mapping).

Examples

>>> from pyagentspec.agent import Agent
>>> from pyagentspec.flows.edges import ControlFlowEdge, DataFlowEdge
>>> from pyagentspec.flows.flow import Flow
>>> from pyagentspec.flows.nodes import AgentNode, BranchingNode, StartNode, EndNode
>>> from pyagentspec.property import Property
>>> CORRECT_PASSWORD_BRANCH = "PASSWORD_OK"
>>> password_property = Property(
...     json_schema={"title": "password", "type": "string"}
... )
>>> agent = Agent(
...     name="User input agent",
...     llm_config=llm_config,
...     system_prompt=(
...         "Your task is to ask the password to the user. "
...         "Once you get it, submit it and end."
...     ),
...     outputs=[password_property],
... )
>>> start_node = StartNode(name="start")
>>> access_granted_end_node = EndNode(
...     name="access granted end", branch_name="ACCESS_GRANTED"
... )
>>> access_denied_end_node = EndNode(
...     name="access denied end", branch_name="ACCESS_DENIED"
... )
>>> branching_node = BranchingNode(
...     name="password check",
...     mapping={"123456": CORRECT_PASSWORD_BRANCH},
...     inputs=[password_property]
... )
>>> agent_node = AgentNode(
...     name="User input agent node",
...     agent=agent,
... )
>>> assistant = Flow(
...     name="Check access flow",
...     start_node=start_node,
...     nodes=[
...         start_node,
...         agent_node,
...         branching_node,
...         access_granted_end_node,
...         access_denied_end_node
...     ],
...     control_flow_connections=[
...         ControlFlowEdge(
...             name="start_to_agent",
...             from_node=start_node,
...             to_node=agent_node
...         ),
...         ControlFlowEdge(
...             name="agent_to_branching",
...             from_node=agent_node,
...             to_node=branching_node
...         ),
...         ControlFlowEdge(
...             name="branching_to_access_granted",
...             from_node=branching_node,
...             from_branch=CORRECT_PASSWORD_BRANCH,
...             to_node=access_granted_end_node,
...         ),
...         ControlFlowEdge(
...             name="branching_to_access_denied",
...             from_node=branching_node,
...             from_branch=BranchingNode.DEFAULT_BRANCH,
...             to_node=access_denied_end_node,
...         ),
...     ],
...     data_flow_connections=[
...         DataFlowEdge(
...             name="password_edge",
...             source_node=agent_node,
...             source_output="password",
...             destination_node=branching_node,
...             destination_input="password",
...         ),
...     ],
... )
Parameters:
  • id (str) – A unique identifier for this Component

  • name (str) – Name of this Component

  • description (str | None) – Optional description of this Component

  • metadata (Dict[str, Any] | None) – Optional, additional metadata related to this Component

  • min_agentspec_version (AgentSpecVersionEnum) –

  • max_agentspec_version (AgentSpecVersionEnum) –

  • inputs (List[Property] | None) – List of inputs accepted by this component

  • outputs (List[Property] | None) – List of outputs exposed by this component

  • branches (List[str]) – The list of outgoing branch names that the node will expose

  • mapping (Dict[str, str]) – The mapping between the value of the input and the name of the outgoing branch that will be taken when that input value is given

DEFAULT_BRANCH: ClassVar[str] = 'default'#

Name of the default branch used when mapping fails

DEFAULT_INPUT: ClassVar[str] = 'branching_mapping_key'#

Input key for the name to transition to next.

Type:

str

mapping: Dict[str, str]#

The mapping between the value of the input and the name of the outgoing branch that will be taken when that input value is given

class pyagentspec.flows.nodes.endnode.EndNode(*, id=<factory>, name, description=None, metadata=<factory>, min_agentspec_version=AgentSpecVersionEnum.v25_4_1, max_agentspec_version=AgentSpecVersionEnum.v25_4_1, inputs=None, outputs=None, branches=<factory>, branch_name='next')#

Bases: Node

End nodes denote the end of the execution of a flow.

There might be several end nodes in a flow, in which case the executor of the flow should be able to track which one was reached and pass that back to the caller.

  • Inputs

    The list of inputs of the step. If both input and output properties are specified they must be an exact match

  • Outputs

    The list of outputs that should be exposed by the flow. If both input and output properties are specified they must be an exact match

  • Branches

    None.

Examples

>>> from pyagentspec.agent import Agent
>>> from pyagentspec.flows.edges import ControlFlowEdge, DataFlowEdge
>>> from pyagentspec.flows.flow import Flow
>>> from pyagentspec.flows.nodes import AgentNode, BranchingNode, StartNode, EndNode
>>> from pyagentspec.property import Property
>>> languages_to_branch_name = {
...     "english": "ENGLISH",
...     "spanish": "SPANISH",
...     "italian": "ITALIAN",
... }
>>> language_property = Property(
...     json_schema={"title": "language", "type": "string"}
... )
>>> agent = Agent(
...     name="Language detector agent",
...     llm_config=llm_config,
...     system_prompt=(
...         "Your task is to understand the language spoken by the user."
...         "Please output only the language in lowercase and submit."
...     ),
...     outputs=[language_property],
... )
>>> start_node = StartNode(name="start")
>>> english_end_node = EndNode(
...     name="english end", branch_name=languages_to_branch_name["english"]
... )
>>> spanish_end_node = EndNode(
...     name="spanish end", branch_name=languages_to_branch_name["spanish"]
... )
>>> italian_end_node = EndNode(
...     name="italian end", branch_name=languages_to_branch_name["italian"]
... )
>>> unknown_end_node = EndNode(name="unknown language end", branch_name="unknown")
>>> branching_node = BranchingNode(
...     name="language check",
...     mapping=languages_to_branch_name,
...     inputs=[language_property]
... )
>>> agent_node = AgentNode(
...     name="User input agent node",
...     agent=agent,
... )
>>> assistant = Flow(
...     name="Check access flow",
...     start_node=start_node,
...     nodes=[
...         start_node,
...         agent_node,
...         branching_node,
...         english_end_node,
...         spanish_end_node,
...         italian_end_node,
...         unknown_end_node,
...     ],
...     control_flow_connections=[
...         ControlFlowEdge(
...             name="start_to_agent", from_node=start_node, to_node=agent_node
...         ),
...         ControlFlowEdge(
...             name="agent_to_branching", from_node=agent_node, to_node=branching_node
...         ),
...         ControlFlowEdge(
...             name="branching_to_english_end",
...             from_node=branching_node,
...             from_branch=languages_to_branch_name["english"],
...             to_node=english_end_node,
...         ),
...         ControlFlowEdge(
...             name="branching_to_spanish_end",
...             from_node=branching_node,
...             from_branch=languages_to_branch_name["spanish"],
...             to_node=spanish_end_node,
...         ),
...         ControlFlowEdge(
...             name="branching_to_italian_end",
...             from_node=branching_node,
...             from_branch=languages_to_branch_name["italian"],
...             to_node=italian_end_node,
...         ),
...         ControlFlowEdge(
...             name="branching_to_unknown_end",
...             from_node=branching_node,
...             from_branch=BranchingNode.DEFAULT_BRANCH,
...             to_node=unknown_end_node,
...         ),
...     ],
...     data_flow_connections=[
...         DataFlowEdge(
...             name="language_edge",
...             source_node=agent_node,
...             source_output="language",
...             destination_node=branching_node,
...             destination_input="language",
...         ),
...     ],
... )
Parameters:
  • id (str) – A unique identifier for this Component

  • name (str) – The name of the branch that corresponds to the branch that gets closed by this node, which will be exposed by the Flow

  • description (str | None) – Optional description of this Component

  • metadata (Dict[str, Any] | None) – Optional, additional metadata related to this Component

  • min_agentspec_version (AgentSpecVersionEnum) –

  • max_agentspec_version (AgentSpecVersionEnum) –

  • inputs (List[Property] | None) – List of inputs accepted by this component

  • outputs (List[Property] | None) – List of outputs exposed by this component

  • branches (List[str]) – The list of outgoing branch names that the node will expose

  • branch_name (str) – The name of the branch that corresponds to the branch that gets closed by this node, which will be exposed by the Flow

branch_name: str#

The name of the branch that corresponds to the branch that gets closed by this node, which will be exposed by the Flow

class pyagentspec.flows.nodes.flownode.FlowNode(*, id=<factory>, name, description=None, metadata=<factory>, min_agentspec_version=AgentSpecVersionEnum.v25_4_1, max_agentspec_version=AgentSpecVersionEnum.v25_4_1, inputs=None, outputs=None, branches=<factory>, subflow)#

Bases: Node

The flow node executes a subflow as part of a flow.

  • Inputs

    Inferred from the inner structure. It’s the sets of inputs required by the StartNode of the inner flow.

  • Outputs

    Inferred from the inner structure. It’s the union of the sets of outputs exposed by the EndNodes of the inner flow.

  • Branches

    Inferred from the inner flow, one per each different value of the attribute branch_name of the nodes of type EndNode in the inner flow.

Parameters:
  • id (str) – A unique identifier for this Component

  • name (str) – Name of this Component

  • description (str | None) – Optional description of this Component

  • metadata (Dict[str, Any] | None) – Optional, additional metadata related to this Component

  • min_agentspec_version (AgentSpecVersionEnum) –

  • max_agentspec_version (AgentSpecVersionEnum) –

  • inputs (List[Property] | None) – List of inputs accepted by this component

  • outputs (List[Property] | None) – List of outputs exposed by this component

  • branches (List[str]) – The list of outgoing branch names that the node will expose

  • subflow (Flow) – The flow that should be executed

Example

The FlowNode is particularly suitable when subflows can be reused inside a project. Let’s see an example with a flow that estimates numerical value using the “wisdowm of the crowd” effect:

>>> from pyagentspec.property import Property
>>> from pyagentspec.flows.flow import Flow
>>> from pyagentspec.flows.edges import ControlFlowEdge, DataFlowEdge
>>> from pyagentspec.flows.nodes import MapNode, LlmNode, ToolNode, StartNode, EndNode
>>> from pyagentspec.tools import ServerTool
>>> duplication_tool = ServerTool(
...     name="duplication_tool",
...     description="",
...     inputs=[
...         Property(
...             json_schema={"title": "element", "description": "", "type": "string"}
...         ),
...         Property(
...             json_schema={"title": "n", "description": "", "type": "integer"}
...         ),
...     ],
...     outputs=[
...         Property(
...             json_schema={
...                 "title": "flow_iterable_queries",
...                 "type": "array",
...                 "items": {"type": "string"}
...             },
...         )
...     ],
... )
>>> reduce_tool = ServerTool(
...     name="reduce_tool",
...     description="",
...     inputs=[
...         Property(
...             json_schema={"title": "elements", "type": "array", "items": {"type": "string"}}
...         ),
...     ],
...     outputs=[Property(json_schema={"title": "flow_processed_query", "type": "string"})],
... )
>>> # Defining a simple prompt
>>> REASONING_PROMPT_TEMPLATE = '''Provide your best numerical estimate for: {{user_input}}
... Your answer should be a single number.
... Do not include any units, reasoning, or extra text.'''
>>> # Defining the subflow for the map step
>>> user_input_property = Property(
...     json_schema={"title": "user_input", "type": "string"}
... )
>>> flow_processed_query_property = Property(
...     json_schema={"title": "flow_processed_query", "type": "string"}
... )
>>> start_node = StartNode(name="start", inputs=[user_input_property])
>>> end_node = EndNode(name="end", outputs=[flow_processed_query_property])
>>> llm_node = LlmNode(
...     name="reasoning llm node",
...     llm_config=llm_config,
...     prompt_template=REASONING_PROMPT_TEMPLATE,
...     inputs=[user_input_property],
...     outputs=[flow_processed_query_property],
... )
>>> inner_map_flow = Flow(
...     name="Map flow",
...     start_node=start_node,
...     nodes=[start_node, llm_node, end_node],
...     control_flow_connections=[
...         ControlFlowEdge(name="start_to_llm", from_node=start_node, to_node=llm_node),
...         ControlFlowEdge(name="llm_to_end", from_node=llm_node, to_node=end_node),
...     ],
...     data_flow_connections=[
...         DataFlowEdge(
...             name="query_edge",
...             source_node=start_node,
...             source_output="user_input",
...             destination_node=llm_node,
...             destination_input="user_input",
...         ),
...         DataFlowEdge(
...             name="search_results_edge",
...             source_node=llm_node,
...             source_output="flow_processed_query",
...             destination_node=end_node,
...             destination_input="flow_processed_query"
...         ),
...     ],
... )
>>> user_query_property = Property(
...     json_schema={"title": "user_query", "type": "string"}
... )
>>> n_repeat_property = Property(
...     json_schema={"title": "n_repeat", "type": "integer"}
... )
>>> flow_iterable_queries_property = Property(
...     json_schema={
...         "title": "iterated_user_input",
...         "type": "array",
...         "items": {"type": "string"},
...     }
... )
>>> flow_processed_queries_property = Property(
...     json_schema={
...         "title": "collected_flow_processed_query",
...         "type": "array",
...         "items": {"type": "string"},
...     }
... )
>>> start_node = StartNode(name="start", inputs=[user_query_property, n_repeat_property])
>>> end_node = EndNode(name="end", outputs=[flow_processed_query_property])
>>> duplication_node = ToolNode(
...     name="duplication_tool node",
...     tool=duplication_tool,
... )
>>> reduce_node = ToolNode(
...     name="reduce_tool node",
...     tool=reduce_tool,
... )
>>> map_node = MapNode(
...     name="map node",
...     subflow=inner_map_flow,
...     inputs=[flow_iterable_queries_property],
...     outputs=[flow_processed_queries_property],
... )
>>> mapreduce_flow = Flow(
...     name="Map-reduce flow",
...     start_node=start_node,
...     nodes=[start_node, duplication_node, map_node, reduce_node, end_node],
...     control_flow_connections=[
...         ControlFlowEdge(
...             name="start_to_duplication", from_node=start_node, to_node=duplication_node
...         ),
...         ControlFlowEdge(
...             name="duplication_to_map", from_node=duplication_node, to_node=map_node
...         ),
...         ControlFlowEdge(name="map_to_reduce", from_node=map_node, to_node=reduce_node),
...         ControlFlowEdge(name="reduce_to_end", from_node=reduce_node, to_node=end_node),
...     ],
...     data_flow_connections=[
...         DataFlowEdge(
...             name="query_edge",
...             source_node=start_node,
...             source_output="user_query",
...             destination_node=duplication_node,
...             destination_input="element",
...         ),
...         DataFlowEdge(
...             name="n_repeat_edge",
...             source_node=start_node,
...             source_output="n_repeat",
...             destination_node=duplication_node,
...             destination_input="n",
...         ),
...         DataFlowEdge(
...             name="flow_iterables_edge",
...             source_node=duplication_node,
...             source_output="flow_iterable_queries",
...             destination_node=map_node,
...             destination_input="iterated_user_input",
...         ),
...         DataFlowEdge(
...             name="flow_processed_queries_edge",
...             source_node=map_node,
...             source_output="collected_flow_processed_query",
...             destination_node=reduce_node,
...             destination_input="elements",
...         ),
...         DataFlowEdge(
...             name="flow_processed_query_edge",
...             source_node=reduce_node,
...             source_output="flow_processed_query",
...             destination_node=end_node,
...             destination_input="flow_processed_query",
...         ),
...     ],
... )

Once the subflow is created we can simply integrate it with the FlowNode:

>>> from pyagentspec.flows.nodes import FlowNode, AgentNode
>>> from pyagentspec.agent import Agent
>>> start_node = StartNode(name="start")
>>> end_node = EndNode(name="end", outputs=[flow_processed_query_property])
>>> flow_node = FlowNode(name="flow node", subflow=mapreduce_flow)
>>> agent = Agent(
...     name="User interaction agent",
...     llm_config=llm_config,
...     system_prompt=(
...         "Your task is to gather from the user the query and the number of times "
...         "it should be asked to an LLM. Once you have this information, submit and exit."
...     ),
...     outputs=[user_query_property, n_repeat_property],
... )
>>> agent_node = AgentNode(name="flow node", agent=agent)
>>> flow = Flow(
...     name="Map flow",
...     start_node=start_node,
...     nodes=[start_node, agent_node, flow_node, end_node],
...     control_flow_connections=[
...         ControlFlowEdge(name="start_to_agent", from_node=start_node, to_node=agent_node),
...         ControlFlowEdge(name="agent_to_flow", from_node=agent_node, to_node=flow_node),
...         ControlFlowEdge(name="flow_to_end", from_node=flow_node, to_node=end_node),
...     ],
...     data_flow_connections=[
...         DataFlowEdge(
...             name="query_edge",
...             source_node=agent_node,
...             source_output="user_query",
...             destination_node=flow_node,
...             destination_input="user_query",
...         ),
...         DataFlowEdge(
...             name="n_rep_edge",
...             source_node=agent_node,
...             source_output="n_repeat",
...             destination_node=flow_node,
...             destination_input="n_repeat"
...         ),
...         DataFlowEdge(
...             name="n_rep_edge",
...             source_node=flow_node,
...             source_output="flow_processed_query",
...             destination_node=end_node,
...             destination_input="flow_processed_query"
...         ),
...     ],
... )
subflow: Flow#

The flow that should be executed

class pyagentspec.flows.nodes.inputmessagenode.InputMessageNode(*, id=<factory>, name, description=None, metadata=<factory>, min_agentspec_version=AgentSpecVersionEnum.v25_4_1, max_agentspec_version=AgentSpecVersionEnum.v25_4_1, inputs=None, outputs=None, branches=<factory>, message=None)#

Bases: Node

This node interrupts the execution of the flow in order to wait for a user input, and restarts after receiving it. An agent message, if given, is appended to the conversation before waiting for input. User input is appended to the conversation as a user message, and it is returned as a string property from the node.

  • Inputs

    One per variable in the message

  • Outputs

    One string property that represents the content of the input user message.

  • Branches

    One, the default next.

Examples

>>> from pyagentspec.flows.edges import ControlFlowEdge, DataFlowEdge
>>> from pyagentspec.flows.flow import Flow
>>> from pyagentspec.flows.nodes import StartNode, EndNode, InputMessageNode, OutputMessageNode, LlmNode
>>> from pyagentspec.property import StringProperty
>>> start_node = StartNode(name="start")
>>> prompt_node = OutputMessageNode(name="ask_input", message="What is the paragraph you want to rephrase?")
>>> input_node = InputMessageNode(name="user_input", outputs=[StringProperty(title="user_input")])
>>> llm_node = LlmNode(
...     name="rephrase",
...     llm_config=llm_config,
...     prompt_template="Rephrase {{user_input}}",
...     outputs=[StringProperty(title="rephrased_user_input")],
... )
>>> output_node = OutputMessageNode(name="ask_input", message="{{rephrased_user_input}}")
>>> end_node = EndNode(name="end")
>>> flow = Flow(
...     name="rephrase_paragraph_flow",
...     start_node=start_node,
...     nodes=[start_node, prompt_node, input_node, llm_node, output_node, end_node],
...     control_flow_connections=[
...         ControlFlowEdge(name="ce1", from_node=start_node, to_node=prompt_node),
...         ControlFlowEdge(name="ce2", from_node=prompt_node, to_node=input_node),
...         ControlFlowEdge(name="ce3", from_node=input_node, to_node=llm_node),
...         ControlFlowEdge(name="ce4", from_node=llm_node, to_node=output_node),
...         ControlFlowEdge(name="ce5", from_node=output_node, to_node=end_node),
...     ],
...     data_flow_connections=[
...         DataFlowEdge(
...             name="de1",
...             source_node=input_node,
...             source_output="user_input",
...             destination_node=llm_node,
...             destination_input="user_input",
...         ),
...         DataFlowEdge(
...             name="de2",
...             source_node=llm_node,
...             source_output="rephrased_user_input",
...             destination_node=output_node,
...             destination_input="rephrased_user_input",
...         ),
...     ]
... )
Parameters:
  • id (str) – A unique identifier for this Component

  • name (str) – Name of this Component

  • description (str | None) – Optional description of this Component

  • metadata (Dict[str, Any] | None) – Optional, additional metadata related to this Component

  • min_agentspec_version (AgentSpecVersionEnum) –

  • max_agentspec_version (AgentSpecVersionEnum) –

  • inputs (List[Property] | None) – List of inputs accepted by this component

  • outputs (List[Property] | None) – List of outputs exposed by this component

  • branches (List[str]) – The list of outgoing branch names that the node will expose

  • message (str | None) – The agent message to append to the conversation before waiting for user input

DEFAULT_OUTPUT: ClassVar[str] = 'user_input'#
message: str | None#

The agent message to append to the conversation before waiting for user input

class pyagentspec.flows.nodes.llmnode.LlmNode(*, id=<factory>, name, description=None, metadata=<factory>, min_agentspec_version=AgentSpecVersionEnum.v25_4_1, max_agentspec_version=AgentSpecVersionEnum.v25_4_1, inputs=None, outputs=None, branches=<factory>, llm_config, prompt_template)#

Bases: Node

Execute a prompt template with a given LLM.

This node is intended to be a part of a Flow.

  • Inputs

    One per placeholder in the prompt template.

  • Outputs

    The output text generated by the LLM.

  • Branches

    One, the default next.

Parameters:
  • id (str) – A unique identifier for this Component

  • name (str) – Name of this Component

  • description (str | None) – Optional description of this Component

  • metadata (Dict[str, Any] | None) – Optional, additional metadata related to this Component

  • min_agentspec_version (AgentSpecVersionEnum) –

  • max_agentspec_version (AgentSpecVersionEnum) –

  • inputs (List[Property] | None) – List of inputs accepted by this component

  • outputs (List[Property] | None) – List of outputs exposed by this component

  • branches (List[str]) – The list of outgoing branch names that the node will expose

  • llm_config (LlmConfig) – The LLM to use for generation in this node.

  • prompt_template (str) – Defines the prompt sent to the model. Allows placeholders, which can define inputs.

Example

>>> from pyagentspec.property import Property
>>> from pyagentspec.flows.flow import Flow
>>> from pyagentspec.flows.edges import ControlFlowEdge, DataFlowEdge
>>> from pyagentspec.flows.nodes import LlmNode, StartNode, EndNode
>>> country_property = Property(
...     json_schema={"title": "country", "type": "string"}
... )
>>> capital_property = Property(
...     json_schema={"title": "capital", "type": "string"}
... )
>>> start_node = StartNode(name="start", inputs=[country_property])
>>> end_node = EndNode(name="end", outputs=[capital_property])
>>> llm_node = LlmNode(
...     name="simple llm node",
...     llm_config=llm_config,
...     prompt_template="What is the capital of {{ country }}?",
...     inputs=[country_property],
...     outputs=[capital_property],
... )
>>> flow = Flow(
...     name="Get the country's capital flow",
...     start_node=start_node,
...     nodes=[start_node, llm_node, end_node],
...     control_flow_connections=[
...         ControlFlowEdge(name="start_to_llm", from_node=start_node, to_node=llm_node),
...         ControlFlowEdge(name="llm_to_end", from_node=llm_node, to_node=end_node),
...     ],
...     data_flow_connections=[
...         DataFlowEdge(
...             name="country_edge",
...             source_node=start_node,
...             source_output="country",
...             destination_node=llm_node,
...             destination_input="country",
...         ),
...         DataFlowEdge(
...             name="capital_edge",
...             source_node=llm_node,
...             source_output="capital",
...             destination_node=end_node,
...             destination_input="capital"
...         ),
...     ],
... )
DEFAULT_OUTPUT: ClassVar[str] = 'generated_text'#

Raw text generated by the LLM.

Type:

str

llm_config: LlmConfig#

The LLM to use for generation in this node.

prompt_template: str#

Defines the prompt sent to the model. Allows placeholders, which can define inputs.

class pyagentspec.flows.nodes.mapnode.MapNode(*, id=<factory>, name, description=None, metadata=<factory>, min_agentspec_version=AgentSpecVersionEnum.v25_4_1, max_agentspec_version=AgentSpecVersionEnum.v25_4_1, inputs=None, outputs=None, branches=<factory>, subflow, reducers=None)#

Bases: Node

The map node executes a subflow on each element of a given input as part of a flow.

  • Inputs

    Inferred from the inner structure. It’s the sets of inputs required by the StartNode of the inner flow. The names of the inputs will be the ones of the inner flow, complemented with the iterated_ prefix. Their type is Union[inner_type, List[inner_type]], where inner_type is the type of the respective input in the inner flow.

  • Outputs

    Inferred from the inner structure. It’s the union of the sets of outputs exposed by the EndNodes of the inner flow, combined with the reducer method of each output. The names of the outputs will be the ones of the inner flow, complemented with the collected_ prefix. Their type depends on the reduce method specified for that output:

    • List of the respective output type in case of append

    • same type of the respective output type in case of sum, avg

  • Branches

    One, the default next.

Examples

In this example we will create a flow that returns an L2-normalized version of a given list of numbers.

>>> from pyagentspec.property import Property
>>> from pyagentspec.flows.edges import ControlFlowEdge, DataFlowEdge
>>> from pyagentspec.flows.flow import Flow
>>> from pyagentspec.flows.nodes import EndNode, StartNode, MapNode, ToolNode
>>> from pyagentspec.tools import ServerTool

First, we define a MapNode that returns the square of all the elements in a list. It will be used to compute the L2-norm.

>>> x_property = Property(json_schema={"title": "x", "type": "number"})
>>> x_square_property = Property(
...     json_schema={"title": "x_square", "type": "number"}
... )
>>> square_tool = ServerTool(
...     name="compute_square_tool",
...     description="Computes the square of a number",
...     inputs=[x_property],
...     outputs=[x_square_property],
... )
>>> list_of_x_property = Property(
...     json_schema={"title": "x_list", "type": "array", "items": {"type": "number"}}
... )
>>> start_node = StartNode(name="start", inputs=[x_property])
>>> end_node = EndNode(name="end", outputs=[x_square_property])
>>> square_tool_node = ToolNode(name="square tool node", tool=square_tool)
>>> square_number_flow = Flow(
...     name="Square number flow",
...     start_node=start_node,
...     nodes=[start_node, square_tool_node, end_node],
...     control_flow_connections=[
...         ControlFlowEdge(
...             name="start_to_tool", from_node=start_node, to_node=square_tool_node
...         ),
...         ControlFlowEdge(
...             name="tool_to_end", from_node=square_tool_node, to_node=end_node
...         ),
...     ],
...     data_flow_connections=[
...         DataFlowEdge(
...             name="x_edge",
...             source_node=start_node,
...             source_output="x",
...             destination_node=square_tool_node,
...             destination_input="x",
...         ),
...         DataFlowEdge(
...             name="x_square_edge",
...             source_node=square_tool_node,
...             source_output="x_square",
...             destination_node=end_node,
...             destination_input="x_square",
...         ),
...     ],
... )
>>> list_of_x_square_property = Property(
...     json_schema={"title": "x_square_list", "type": "array", "items": {"type": "number"}}
... )
>>> square_numbers_map_node = MapNode(
...     name="square number map node",
...     subflow=square_number_flow,
... )

Now we define the MapNode responsible for normalizing the given list of input numbers. The denominator is the same for all of the numbers, we are going to map only the numerators (i.e., the input numbers).

>>> numerator_property = Property(
...     json_schema={"title": "numerator", "type": "number"}
... )
>>> denominator_property = Property(
...     json_schema={"title": "denominator", "type": "number"}
... )
>>> result_property = Property(
...     json_schema={"title": "result", "type": "number"}
... )
>>> division_tool = ServerTool(
...     name="division_tool",
...     description="Computes the ratio between two numbers",
...     inputs=[numerator_property, denominator_property],
...     outputs=[result_property],
... )
>>> start_node = StartNode(name="start", inputs=[numerator_property, denominator_property])
>>> end_node = EndNode(name="end", outputs=[result_property])
>>> divide_node = ToolNode(name="divide node", tool=division_tool)
>>> normalize_flow = Flow(
...     name="Normalize flow",
...     start_node=start_node,
...     nodes=[start_node, divide_node, end_node],
...     control_flow_connections=[
...         ControlFlowEdge(name="start_to_tool", from_node=start_node, to_node=divide_node),
...         ControlFlowEdge(name="tool_to_end", from_node=divide_node, to_node=end_node),
...     ],
...     data_flow_connections=[
...         DataFlowEdge(
...             name="numerator_edge",
...             source_node=start_node,
...             source_output="numerator",
...             destination_node=divide_node,
...             destination_input="numerator",
...         ),
...         DataFlowEdge(
...             name="denominator_edge",
...             source_node=start_node,
...             source_output="denominator",
...             destination_node=divide_node,
...             destination_input="denominator",
...         ),
...         DataFlowEdge(
...             name="result_edge",
...             source_node=divide_node,
...             source_output="result",
...             destination_node=end_node,
...             destination_input="result",
...         ),
...     ],
... )

Finally, we define the overall flow:

  • The list of inputs is squared

  • The squared list is summed and root squared

  • The list of inputs is normalized based on the outcomes of the previous 2 steps

>>> squared_sum_property = Property(
...     json_schema={"title": "squared_sum", "type": "number"}
... )
>>> normalized_list_of_x_property = Property(
...     json_schema={
...         "title": "x_list_normalized",
...         "type": "array",
...         "items": {"type": "number"},
...     }
... )
>>> normalize_map_node = MapNode(
...     name="normalize map node",
...     subflow=normalize_flow,
... )
>>> squared_sum_tool = ServerTool(
...     name="squared_sum_tool",
...     description="Computes the squared sum of a list of numbers",
...     inputs=[list_of_x_property],
...     outputs=[squared_sum_property],
... )
>>> start_node = StartNode(name="start", inputs=[list_of_x_property])
>>> end_node = EndNode(name="end", outputs=[normalized_list_of_x_property])
>>> squared_sum_tool_node = ToolNode(name="squared sum tool node", tool=squared_sum_tool)
>>> flow = Flow(
...     name="L2 normalize flow",
...     start_node=start_node,
...     nodes=[
...         start_node,
...         square_numbers_map_node,
...         squared_sum_tool_node,
...         normalize_map_node,
...         end_node,
...     ],
...     control_flow_connections=[
...         ControlFlowEdge(
...             name="start_to_square_numbers",
...             from_node=start_node,
...             to_node=square_numbers_map_node
...         ),
...         ControlFlowEdge(
...             name="square_numbers_to_squared_sum_tool",
...             from_node=square_numbers_map_node,
...             to_node=squared_sum_tool_node
...         ),
...         ControlFlowEdge(
...             name="squared_sum_tool_to_normalize",
...             from_node=squared_sum_tool_node,
...             to_node=normalize_map_node
...         ),
...         ControlFlowEdge(
...             name="normalize_to_end",
...             from_node=normalize_map_node,
...             to_node=end_node
...         ),
...     ],
...     data_flow_connections=[
...         DataFlowEdge(
...             name="list_of_x_edge",
...             source_node=start_node,
...             source_output="x_list",
...             destination_node=square_numbers_map_node,
...             destination_input="iterated_x",
...         ),
...         DataFlowEdge(
...             name="x_square_list_edge",
...             source_node=square_numbers_map_node,
...             source_output="collected_x_square",
...             destination_node=squared_sum_tool_node,
...             destination_input="x_list",
...         ),
...         DataFlowEdge(
...             name="numerator_edge",
...             source_node=start_node,
...             source_output="x_list",
...             destination_node=normalize_map_node,
...             destination_input="iterated_numerator",
...         ),
...         DataFlowEdge(
...             name="denominator_edge",
...             source_node=squared_sum_tool_node,
...             source_output="squared_sum",
...             destination_node=normalize_map_node,
...             destination_input="iterated_denominator",
...         ),
...         DataFlowEdge(
...             name="x_list_normalized_edge",
...             source_node=normalize_map_node,
...             source_output="collected_result",
...             destination_node=end_node,
...             destination_input="x_list_normalized",
...         ),
...     ],
... )
Parameters:
  • id (str) – A unique identifier for this Component

  • name (str) – Name of this Component

  • description (str | None) – Optional description of this Component

  • metadata (Dict[str, Any] | None) – Optional, additional metadata related to this Component

  • min_agentspec_version (AgentSpecVersionEnum) –

  • max_agentspec_version (AgentSpecVersionEnum) –

  • inputs (List[Property] | None) – List of inputs accepted by this component

  • outputs (List[Property] | None) – List of outputs exposed by this component

  • branches (List[str]) – The list of outgoing branch names that the node will expose

  • subflow (Flow) – The flow that should be applied to all the input values

  • reducers (Dict[str, ReductionMethod] | None) – The way the outputs of the different executions (map) should be collected together (reduce). It’s a dictionary mapping the name of an output to the respective reduction method (e.g., append, sum, avg, …, allowed methods depend on the type of the output)

reducers: Dict[str, ReductionMethod] | None#

The way the outputs of the different executions (map) should be collected together (reduce). It’s a dictionary mapping the name of an output to the respective reduction method (e.g., append, sum, avg, …, allowed methods depend on the type of the output)

subflow: Flow#

The flow that should be applied to all the input values

class pyagentspec.flows.nodes.outputmessagenode.OutputMessageNode(*, id=<factory>, name, description=None, metadata=<factory>, min_agentspec_version=AgentSpecVersionEnum.v25_4_1, max_agentspec_version=AgentSpecVersionEnum.v25_4_1, inputs=None, outputs=None, branches=<factory>, message)#

Bases: Node

This node appends an agent message to the ongoing flow conversation.

  • Inputs

    One per variable in the message.

  • Outputs

    No outputs.

  • Branches

    One, the default next.

Examples

>>> from pyagentspec.flows.edges import ControlFlowEdge, DataFlowEdge
>>> from pyagentspec.flows.flow import Flow
>>> from pyagentspec.flows.nodes import StartNode, EndNode, InputMessageNode, OutputMessageNode, LlmNode
>>> from pyagentspec.property import StringProperty
>>> start_node = StartNode(name="start")
>>> prompt_node = OutputMessageNode(name="ask_input", message="What is the paragraph you want to rephrase?")
>>> input_node = InputMessageNode(name="user_input", outputs=[StringProperty(title="user_input")])
>>> llm_node = LlmNode(
...     name="rephrase",
...     llm_config=llm_config,
...     prompt_template="Rephrase {{user_input}}",
...     outputs=[StringProperty(title="rephrased_user_input")],
... )
>>> output_node = OutputMessageNode(name="ask_input", message="{{rephrased_user_input}}")
>>> end_node = EndNode(name="end")
>>> flow = Flow(
...     name="rephrase_paragraph_flow",
...     start_node=start_node,
...     nodes=[start_node, prompt_node, input_node, llm_node, output_node, end_node],
...     control_flow_connections=[
...         ControlFlowEdge(name="ce1", from_node=start_node, to_node=prompt_node),
...         ControlFlowEdge(name="ce2", from_node=prompt_node, to_node=input_node),
...         ControlFlowEdge(name="ce3", from_node=input_node, to_node=llm_node),
...         ControlFlowEdge(name="ce4", from_node=llm_node, to_node=output_node),
...         ControlFlowEdge(name="ce5", from_node=output_node, to_node=end_node),
...     ],
...     data_flow_connections=[
...         DataFlowEdge(
...             name="de1",
...             source_node=input_node,
...             source_output="user_input",
...             destination_node=llm_node,
...             destination_input="user_input",
...         ),
...         DataFlowEdge(
...             name="de2",
...             source_node=llm_node,
...             source_output="rephrased_user_input",
...             destination_node=output_node,
...             destination_input="rephrased_user_input",
...         ),
...     ]
... )
Parameters:
  • id (str) – A unique identifier for this Component

  • name (str) – Name of this Component

  • description (str | None) – Optional description of this Component

  • metadata (Dict[str, Any] | None) – Optional, additional metadata related to this Component

  • min_agentspec_version (AgentSpecVersionEnum) –

  • max_agentspec_version (AgentSpecVersionEnum) –

  • inputs (List[Property] | None) – List of inputs accepted by this component

  • outputs (List[Property] | None) – List of outputs exposed by this component

  • branches (List[str]) – The list of outgoing branch names that the node will expose

  • message (str) – Content of the agent message to append. Allows placeholders, which can define inputs.

message: str#

Content of the agent message to append. Allows placeholders, which can define inputs.

class pyagentspec.flows.nodes.startnode.StartNode(*, id=<factory>, name, description=None, metadata=<factory>, min_agentspec_version=AgentSpecVersionEnum.v25_4_1, max_agentspec_version=AgentSpecVersionEnum.v25_4_1, inputs=None, outputs=None, branches=<factory>)#

Bases: Node

Start nodes denote the start of the execution of a flow.

  • Inputs

    The list of inputs that should be the inputs of the flow. If both input and output properties are specified they must be an exact match

  • Outputs

    The list of outputs of the step. If both input and output properties are specified they must be an exact match

  • Branches

    One, the default next.

Examples

>>> from pyagentspec.property import Property
>>> from pyagentspec.flows.edges import ControlFlowEdge, DataFlowEdge
>>> from pyagentspec.flows.flow import Flow
>>> from pyagentspec.flows.nodes import EndNode, LlmNode, StartNode
>>> user_question_property = Property(
...     json_schema=dict(
...         title="user_question",
...         description="The user question.",
...         type="string",
...     )
... )
>>> answer_property = Property(json_schema=dict(title="answer", type="string"))
>>> start_node = StartNode(name="start", inputs=[user_question_property])
>>> end_node = EndNode(name="end", outputs=[answer_property])
>>> llm_node = LlmNode(
...     name="llm node",
...     prompt_template="Answer the user question: {{user_question}}",
...     llm_config=llm_config,
... )
>>> flow = Flow(
...     name="flow",
...     start_node=start_node,
...     nodes=[start_node, llm_node, end_node],
...     control_flow_connections=[
...         ControlFlowEdge(name="start_to_llm", from_node=start_node, to_node=llm_node),
...         ControlFlowEdge(name="llm_to_end", from_node=llm_node, to_node=end_node),
...     ],
...     data_flow_connections=[
...         DataFlowEdge(
...             name="query_edge",
...             source_node=start_node,
...             source_output="user_question",
...             destination_node=llm_node,
...             destination_input="user_question",
...         ),
...         DataFlowEdge(
...             name="answer_edge",
...             source_node=llm_node,
...             source_output="generated_text",
...             destination_node=end_node,
...             destination_input="answer"
...         ),
...     ],
... )
Parameters:
  • id (str) – A unique identifier for this Component

  • name (str) – Name of this Component

  • description (str | None) – Optional description of this Component

  • metadata (Dict[str, Any] | None) – Optional, additional metadata related to this Component

  • min_agentspec_version (AgentSpecVersionEnum) –

  • max_agentspec_version (AgentSpecVersionEnum) –

  • inputs (List[Property] | None) – List of inputs accepted by this component

  • outputs (List[Property] | None) – List of outputs exposed by this component

  • branches (List[str]) – The list of outgoing branch names that the node will expose

class pyagentspec.flows.nodes.toolnode.ToolNode(*, id=<factory>, name, description=None, metadata=<factory>, min_agentspec_version=AgentSpecVersionEnum.v25_4_1, max_agentspec_version=AgentSpecVersionEnum.v25_4_1, inputs=None, outputs=None, branches=<factory>, tool)#

Bases: Node

The tool execution node is a node that will execute a tool as part of a flow.

  • Inputs

    Inferred from the definition of the tool to execute.

  • Outputs

    Inferred from the definition of the tool to execute.

  • Branches

    One, the default next.

Examples

>>> from pyagentspec.flows.edges import ControlFlowEdge, DataFlowEdge
>>> from pyagentspec.flows.flow import Flow
>>> from pyagentspec.flows.nodes import ToolNode, StartNode, EndNode
>>> from pyagentspec.tools import ServerTool
>>> from pyagentspec.property import Property
>>>
>>> x_property = Property(json_schema={"title": "x", "type": "number"})
>>> x_square_root_property = Property(
...     json_schema={"title": "x_square_root", "type": "number"}
... )
>>> square_root_tool = ServerTool(
...     name="compute_square_root",
...     description="Computes the square root of a number",
...     inputs=[x_property],
...     outputs=[x_square_root_property],
... )
>>> start_node = StartNode(name="start", inputs=[x_property])
>>> end_node = EndNode(name="end", outputs=[x_square_root_property])
>>> tool_node = ToolNode(name="", tool=square_root_tool)
>>> flow = Flow(
...     name="Compute square root flow",
...     start_node=start_node,
...     nodes=[start_node, tool_node, end_node],
...     control_flow_connections=[
...         ControlFlowEdge(name="start_to_tool", from_node=start_node, to_node=tool_node),
...         ControlFlowEdge(name="tool_to_end", from_node=tool_node, to_node=end_node),
...     ],
...     data_flow_connections=[
...         DataFlowEdge(
...             name="x_edge",
...             source_node=start_node,
...             source_output="x",
...             destination_node=tool_node,
...             destination_input="x",
...         ),
...         DataFlowEdge(
...             name="x_square_root_edge",
...             source_node=tool_node,
...             source_output="x_square_root",
...             destination_node=end_node,
...             destination_input="x_square_root"
...         ),
...     ],
... )
Parameters:
  • id (str) – A unique identifier for this Component

  • name (str) – Name of this Component

  • description (str | None) – Optional description of this Component

  • metadata (Dict[str, Any] | None) – Optional, additional metadata related to this Component

  • min_agentspec_version (AgentSpecVersionEnum) –

  • max_agentspec_version (AgentSpecVersionEnum) –

  • inputs (List[Property] | None) – List of inputs accepted by this component

  • outputs (List[Property] | None) – List of outputs exposed by this component

  • branches (List[str]) – The list of outgoing branch names that the node will expose

  • tool (Tool) – The tool to be executed in this Node

tool: Tool#

The tool to be executed in this Node