Flows#

This page presents all APIs and classes related to Flows and nodes in PyAgentSpec.

Flows & Nodes#

class pyagentspec.flows.flow.Flow(*, id=<factory>, name, description=None, metadata=<factory>, min_agentspec_version=AgentSpecVersionEnum.v25_4_1, max_agentspec_version=AgentSpecVersionEnum.v26_2_0, inputs=None, outputs=None, start_node, nodes, control_flow_connections, data_flow_connections=None)#

Bases: AgenticComponent

A flow is a component to model sequences of operations to do in a precised order.

The operations and sequence is defined by the nodes and transitions associated to the flow. Steps can be deterministic, or for some use LLMs.

Parameters:
  • id (str) – A unique identifier for this Component

  • name (str) – Name of this Component

  • description (str | None) – Optional description of this Component

  • metadata (Dict[str, Any] | None) – Optional, additional metadata related to this Component

  • min_agentspec_version (AgentSpecVersionEnum) –

  • max_agentspec_version (AgentSpecVersionEnum) –

  • inputs (List[Property] | None) – List of inputs accepted by this component

  • outputs (List[Property] | None) – List of outputs exposed by this component

  • start_node (Node) – The starting node of this flow. It must be also part of the nodes list

  • nodes (List[Node]) – The list of nodes that compose this Flow

  • control_flow_connections (List[ControlFlowEdge]) – The list of edges that define the control flow of this Flow

  • data_flow_connections (List[DataFlowEdge] | None) – The list of edges that define the data flow of this Flow

Example

>>> from pyagentspec.property import Property
>>> from pyagentspec.flows.flow import Flow
>>> from pyagentspec.flows.edges import ControlFlowEdge, DataFlowEdge
>>> from pyagentspec.flows.nodes import LlmNode, StartNode, EndNode
>>> prompt_property = Property(
...     json_schema={"title": "prompt", "type": "string"}
... )
>>> llm_output_property = Property(
...     json_schema={"title": "llm_output", "type": "string"}
... )
>>> start_node = StartNode(name="start", inputs=[prompt_property])
>>> end_node = EndNode(name="end", outputs=[llm_output_property])
>>> llm_node = LlmNode(
...     name="simple llm node",
...     llm_config=llm_config,
...     prompt_template="{{prompt}}",
...     inputs=[prompt_property],
...     outputs=[llm_output_property],
... )
>>> flow = Flow(
...     name="Simple prompting flow",
...     start_node=start_node,
...     nodes=[start_node, llm_node, end_node],
...     control_flow_connections=[
...         ControlFlowEdge(name="start_to_llm", from_node=start_node, to_node=llm_node),
...         ControlFlowEdge(name="llm_to_end", from_node=llm_node, to_node=end_node),
...     ],
...     data_flow_connections=[
...         DataFlowEdge(
...             name="prompt_edge",
...             source_node=start_node,
...             source_output="prompt",
...             destination_node=llm_node,
...             destination_input="prompt",
...         ),
...         DataFlowEdge(
...             name="llm_output_edge",
...             source_node=llm_node,
...             source_output="llm_output",
...             destination_node=end_node,
...             destination_input="llm_output"
...         ),
...     ],
... )
class pyagentspec.flows.node.Node(*, id=<factory>, name, description=None, metadata=<factory>, min_agentspec_version=AgentSpecVersionEnum.v25_4_1, max_agentspec_version=AgentSpecVersionEnum.v26_2_0, inputs=None, outputs=None, branches=<factory>)#

Bases: ComponentWithIO

Base class for all nodes that can be put inside a flow.

Parameters:
  • id (str) – A unique identifier for this Component

  • name (str) – Name of this Component

  • description (str | None) – Optional description of this Component

  • metadata (Dict[str, Any] | None) – Optional, additional metadata related to this Component

  • min_agentspec_version (AgentSpecVersionEnum) –

  • max_agentspec_version (AgentSpecVersionEnum) –

  • inputs (List[Property] | None) – List of inputs accepted by this component

  • outputs (List[Property] | None) – List of outputs exposed by this component

  • branches (List[str]) – The list of outgoing branch names that the node will expose

DEFAULT_NEXT_BRANCH: ClassVar[str] = 'next'#

Name used for the default branch

branches: List[str]#

The list of outgoing branch names that the node will expose

class pyagentspec.flows.edges.dataflowedge.DataFlowEdge(*, id=<factory>, name, description=None, metadata=<factory>, min_agentspec_version=AgentSpecVersionEnum.v25_4_1, max_agentspec_version=AgentSpecVersionEnum.v26_2_0, source_node, source_output, destination_node, destination_input)#

Bases: Component

A data flow edge specifies how the output of a node propagates as input of another node.

An outputs can be propagated as input of several nodes.

Parameters:
  • id (str) – A unique identifier for this Component

  • name (str) – Name of this Component

  • description (str | None) – Optional description of this Component

  • metadata (Dict[str, Any] | None) – Optional, additional metadata related to this Component

  • min_agentspec_version (AgentSpecVersionEnum) –

  • max_agentspec_version (AgentSpecVersionEnum) –

  • source_node (Node) – The instance of the source Node

  • source_output (str) – The name of the property among the source node outputs that should be connected

  • destination_node (Node) – The instance of the destination Node

  • destination_input (str) – The name of the property among the destination node inputs that should be connected

destination_input: str#

The name of the property among the destination node inputs that should be connected

destination_node: Node#

The instance of the destination Node

source_node: Node#

The instance of the source Node

source_output: str#

The name of the property among the source node outputs that should be connected

class pyagentspec.flows.edges.controlflowedge.ControlFlowEdge(*, id=<factory>, name, description=None, metadata=<factory>, min_agentspec_version=AgentSpecVersionEnum.v25_4_1, max_agentspec_version=AgentSpecVersionEnum.v26_2_0, from_node, from_branch=None, to_node)#

Bases: Component

A control flow edge specifies a possible transition from a node to another in a flow.

A single node can have several potential next nodes, in which case several control flow edges should be present in the control flow connections of that flow.

Parameters:
  • id (str) – A unique identifier for this Component

  • name (str) – Name of this Component

  • description (str | None) – Optional description of this Component

  • metadata (Dict[str, Any] | None) – Optional, additional metadata related to this Component

  • min_agentspec_version (AgentSpecVersionEnum) –

  • max_agentspec_version (AgentSpecVersionEnum) –

  • from_node (Node) – The instance of the source Node

  • from_branch (str | None) – The name of the branch to connect. It must be among the list of branches offered by the from_node

  • to_node (Node) – The instance of the destination Node

from_branch: str | None#

The name of the branch to connect. It must be among the list of branches offered by the from_node

from_node: Node#

The instance of the source Node

to_node: Node#

The instance of the destination Node

Nodes#

class pyagentspec.flows.nodes.agentnode.AgentNode(*, id=<factory>, name, description=None, metadata=<factory>, min_agentspec_version=AgentSpecVersionEnum.v25_4_1, max_agentspec_version=AgentSpecVersionEnum.v26_2_0, inputs=None, outputs=None, branches=<factory>, agent)#

Bases: Node

The agent execution node is a node that will execute an agent as part of a flow.

If branches are configured, the agent will be prompted to select a branch before the agent node completes to transition to another node of the Flow.

  • Inputs

    Inferred from the definition of the agent to execute.

  • Outputs

    Inferred from the definition of the agent to execute.

  • Branches

    One, the default next.

Examples

>>> from pyagentspec.flows.flow import Flow
>>> from pyagentspec.flows.edges import ControlFlowEdge, DataFlowEdge
>>> from pyagentspec.agent import Agent
>>> from pyagentspec.property import Property
>>> from pyagentspec.flows.nodes import AgentNode, StartNode, EndNode
>>> from pyagentspec.tools import ServerTool
>>> query_property = Property(json_schema={"title": "query", "type": "string"})
>>> search_results_property = Property(
...     json_schema={"title": "search_results", "type": "array", "items": {"type": "string"}}
... )
>>> search_tool = ServerTool(
...     name="search_tool",
...     description=(
...        "This tool runs a web search with the given query "
...        "and returns the most relevant results"
...     ),
...     inputs=[query_property],
...     outputs=[search_results_property],
... )
>>> agent = Agent(
...     name="Search agent",
...     llm_config=llm_config,
...     system_prompt=(
...         "Your task is to gather the required information for the user: {{query}}"
...     ),
...     tools=[search_tool],
...     outputs=[search_results_property],
... )
>>> start_node = StartNode(name="start", inputs=[query_property])
>>> end_node = EndNode(name="end", outputs=[search_results_property])
>>> agent_node = AgentNode(
...     name="Search agent node",
...     agent=agent,
... )
>>> flow = Flow(
...     name="Search agent flow",
...     start_node=start_node,
...     nodes=[start_node, agent_node, end_node],
...     control_flow_connections=[
...         ControlFlowEdge(name="start_to_agent", from_node=start_node, to_node=agent_node),
...         ControlFlowEdge(name="agent_to_end", from_node=agent_node, to_node=end_node),
...     ],
...     data_flow_connections=[
...         DataFlowEdge(
...             name="query_edge",
...             source_node=start_node,
...             source_output="query",
...             destination_node=agent_node,
...             destination_input="query",
...         ),
...         DataFlowEdge(
...             name="search_results_edge",
...             source_node=agent_node,
...             source_output="search_results",
...             destination_node=end_node,
...             destination_input="search_results"
...         ),
...     ],
... )
Parameters:
  • id (str) – A unique identifier for this Component

  • name (str) – Name of this Component

  • description (str | None) – Optional description of this Component

  • metadata (Dict[str, Any] | None) – Optional, additional metadata related to this Component

  • min_agentspec_version (AgentSpecVersionEnum) –

  • max_agentspec_version (AgentSpecVersionEnum) –

  • inputs (List[Property] | None) – List of inputs accepted by this component

  • outputs (List[Property] | None) – List of outputs exposed by this component

  • branches (List[str]) – The list of outgoing branch names that the node will expose

  • agent (AgenticComponent) – The agentic component that will be called as part of the execution of this node

agent: AgenticComponent#

The agentic component that will be called as part of the execution of this node

class pyagentspec.flows.nodes.apinode.ApiNode(*, id=<factory>, name, description=None, metadata=<factory>, min_agentspec_version=AgentSpecVersionEnum.v25_4_1, max_agentspec_version=AgentSpecVersionEnum.v26_2_0, inputs=None, outputs=None, branches=<factory>, url, http_method, api_spec_uri=None, data=<factory>, query_params=<factory>, headers=<factory>, sensitive_headers=<factory>)#

Bases: Node

Make an API call.

This node is intended to be a part of a Flow.

  • Inputs

    Inferred from the json spec retrieved from API Spec URI, if available and reachable. Otherwise, users have to manually specify them.

  • Outputs

    Inferred from the json spec retrieved from API Spec URI, if available and reachable. Otherwise, users should manually specify them.

    If None is given, pyagentspec infers a generic property of any type named response.

  • Branches

    One, the default next.

Examples

>>> from pyagentspec.flows.nodes import ApiNode
>>> from pyagentspec.property import Property
>>> weather_result_property = Property(
...     json_schema={
...         "title": "zurich_weather",
...         "type": "object",
...         "properties": {
...             "temperature": {
...                 "type": "number",
...                 "description": "Temperature in celsius degrees",
...             },
...             "weather": {"type": "string"}
...         },
...     }
... )
>>> call_current_weather_step = ApiNode(
...     name="Weather API call node",
...     url="https://example.com/weather",
...     http_method = "GET",
...     query_params={
...         "location": "zurich",
...     },
...     outputs=[weather_result_property]
... )
>>>
>>> item_id_property = Property(
...     json_schema={"title": "item_id", "type": "string"}
... )
>>> order_id_property = Property(
...     json_schema={"title": "order_id", "type": "string"}
... )
>>> store_id_property = Property(
...     json_schema={"title": "store_id", "type": "string"}
... )
>>> session_id_property = Property(
...     json_schema={"title": "session_id", "type": "string"}
... )
>>> create_order_step = ApiNode(
...     name="Orders api call node",
...     url="https://example.com/orders/{{ order_id }}",
...     http_method="POST",
...     # sending an object which will automatically be transformed into JSON
...     data={
...         # define a static body parameter
...         "topic_id": 12345,
...         # define a templated body parameter.
...         # The value for {{ item_id }} will be taken from the IO system at runtime
...         "item_id": "{{ item_id }}",
...     },
...     query_params={
...         # provide one templated query parameter called "store_id"
...         # which will take its value from the IO system from key "store_id"
...         "store_id": "{{ store_id }}",
...     },
...     headers={
...         # set header session_id. the value is coming from the IO system
...         "session_id": "{{ session_id }}",
...     },
...     inputs=[item_id_property, order_id_property, store_id_property, session_id_property],
... )
Parameters:
  • id (str) – A unique identifier for this Component

  • name (str) – Name of this Component

  • description (str | None) – Optional description of this Component

  • metadata (Dict[str, Any] | None) – Optional, additional metadata related to this Component

  • min_agentspec_version (AgentSpecVersionEnum) –

  • max_agentspec_version (AgentSpecVersionEnum) –

  • inputs (List[Property] | None) – List of inputs accepted by this component

  • outputs (List[Property] | None) – List of outputs exposed by this component

  • branches (List[str]) – The list of outgoing branch names that the node will expose

  • url (str) – The url of the API to which the call should be forwarded. Allows placeholders, which can define inputs

  • http_method (str) – The HTTP method to use for the API call (e.g., GET, POST, PUT, …). Allows placeholders, which can define inputs

  • api_spec_uri (str | None) – The uri of the specification of the API that is going to be called. Allows placeholders, which can define inputs

  • data (str | bytes | Any) – The data to send as part of the body of this API call. Allows placeholders in dict values, which can define inputs. data as bytes and strings will be passed to the request body as-is, whereas JSONSerializable objects are converted to string with json.dumps before being added into the request’s body. Note: For AgentSpec version 25.4.1, this field is strictly typed as Dict[str, Any]. For versions 25.4.2 and above, it is typed as Any.

  • query_params (Dict[str, Any]) – Query parameters for the API call. Allows placeholders in dict values, which can define inputs

  • headers (Dict[str, Any]) – Additional headers for the API call. Allows placeholders in dict values, which can define inputs

  • sensitive_headers (Dict[str, Any]) – Additional headers for the API call. These headers are intended to be used for sensitive information such as authentication tokens and will be excluded form exported JSON configs.

DEFAULT_OUTPUT: ClassVar[str] = 'response'#

Default output name

api_spec_uri: str | None#

The uri of the specification of the API that is going to be called. Allows placeholders, which can define inputs

data: str | bytes | Any#

The data to send as part of the body of this API call. Allows placeholders in dict values, which can define inputs.

data as bytes and strings will be passed to the request body as-is, whereas JSONSerializable objects are converted to string with json.dumps before being added into the request’s body.

Note: For AgentSpec version 25.4.1, this field is strictly typed as Dict[str, Any]. For versions 25.4.2 and above, it is typed as Any.

headers: Dict[str, Any]#

Additional headers for the API call. Allows placeholders in dict values, which can define inputs

http_method: str#

The HTTP method to use for the API call (e.g., GET, POST, PUT, …). Allows placeholders, which can define inputs

query_params: Dict[str, Any]#

Query parameters for the API call. Allows placeholders in dict values, which can define inputs

sensitive_headers: Dict[str, Any]#

Additional headers for the API call. These headers are intended to be used for sensitive information such as authentication tokens and will be excluded form exported JSON configs.

url: str#

The url of the API to which the call should be forwarded. Allows placeholders, which can define inputs

class pyagentspec.flows.nodes.branchingnode.BranchingNode(*, id=<factory>, name, description=None, metadata=<factory>, min_agentspec_version=AgentSpecVersionEnum.v25_4_1, max_agentspec_version=AgentSpecVersionEnum.v26_2_0, inputs=None, outputs=None, branches=<factory>, mapping)#

Bases: Node

Select the next node to transition to based on a mapping.

The input is used as key for the mapping. If the input does not correspond to any of the keys of the mapping the branch ‘default’ will be selected. This node is intended to be a part of a Flow.

  • Inputs

    The input value that should be used as key for the mapping.

    If None is given, pyagentspec infers a string property named branching_mapping_key.

  • Outputs

    None.

  • Branches

    One for each value in the mapping, plus a branch called default, which is the branch taken by the flow when mapping fails (i.e., the input does not match any key in the mapping).

Examples

>>> from pyagentspec.agent import Agent
>>> from pyagentspec.flows.edges import ControlFlowEdge, DataFlowEdge
>>> from pyagentspec.flows.flow import Flow
>>> from pyagentspec.flows.nodes import AgentNode, BranchingNode, StartNode, EndNode
>>> from pyagentspec.property import Property
>>> CORRECT_PASSWORD_BRANCH = "PASSWORD_OK"
>>> password_property = Property(
...     json_schema={"title": "password", "type": "string"}
... )
>>> agent = Agent(
...     name="User input agent",
...     llm_config=llm_config,
...     system_prompt=(
...         "Your task is to ask the password to the user. "
...         "Once you get it, submit it and end."
...     ),
...     outputs=[password_property],
... )
>>> start_node = StartNode(name="start")
>>> access_granted_end_node = EndNode(
...     name="access granted end", branch_name="ACCESS_GRANTED"
... )
>>> access_denied_end_node = EndNode(
...     name="access denied end", branch_name="ACCESS_DENIED"
... )
>>> branching_node = BranchingNode(
...     name="password check",
...     mapping={"123456": CORRECT_PASSWORD_BRANCH},
...     inputs=[password_property]
... )
>>> agent_node = AgentNode(
...     name="User input agent node",
...     agent=agent,
... )
>>> assistant = Flow(
...     name="Check access flow",
...     start_node=start_node,
...     nodes=[
...         start_node,
...         agent_node,
...         branching_node,
...         access_granted_end_node,
...         access_denied_end_node
...     ],
...     control_flow_connections=[
...         ControlFlowEdge(
...             name="start_to_agent",
...             from_node=start_node,
...             to_node=agent_node
...         ),
...         ControlFlowEdge(
...             name="agent_to_branching",
...             from_node=agent_node,
...             to_node=branching_node
...         ),
...         ControlFlowEdge(
...             name="branching_to_access_granted",
...             from_node=branching_node,
...             from_branch=CORRECT_PASSWORD_BRANCH,
...             to_node=access_granted_end_node,
...         ),
...         ControlFlowEdge(
...             name="branching_to_access_denied",
...             from_node=branching_node,
...             from_branch=BranchingNode.DEFAULT_BRANCH,
...             to_node=access_denied_end_node,
...         ),
...     ],
...     data_flow_connections=[
...         DataFlowEdge(
...             name="password_edge",
...             source_node=agent_node,
...             source_output="password",
...             destination_node=branching_node,
...             destination_input="password",
...         ),
...     ],
... )
Parameters:
  • id (str) – A unique identifier for this Component

  • name (str) – Name of this Component

  • description (str | None) – Optional description of this Component

  • metadata (Dict[str, Any] | None) – Optional, additional metadata related to this Component

  • min_agentspec_version (AgentSpecVersionEnum) –

  • max_agentspec_version (AgentSpecVersionEnum) –

  • inputs (List[Property] | None) – List of inputs accepted by this component

  • outputs (List[Property] | None) – List of outputs exposed by this component

  • branches (List[str]) – The list of outgoing branch names that the node will expose

  • mapping (Dict[str, str]) – The mapping between the value of the input and the name of the outgoing branch that will be taken when that input value is given

DEFAULT_BRANCH: ClassVar[str] = 'default'#

Name of the default branch used when mapping fails

DEFAULT_INPUT: ClassVar[str] = 'branching_mapping_key'#

Input key for the name to transition to next.

mapping: Dict[str, str]#

The mapping between the value of the input and the name of the outgoing branch that will be taken when that input value is given

class pyagentspec.flows.nodes.endnode.EndNode(*, id=<factory>, name, description=None, metadata=<factory>, min_agentspec_version=AgentSpecVersionEnum.v25_4_1, max_agentspec_version=AgentSpecVersionEnum.v26_2_0, inputs=None, outputs=None, branches=<factory>, branch_name='next')#

Bases: Node

End nodes denote the end of the execution of a flow.

There might be several end nodes in a flow, in which case the executor of the flow should be able to track which one was reached and pass that back to the caller.

  • Inputs

    The list of inputs of the step. If both input and output properties are specified they must be an exact match

    If None is given, pyagentspec copies the outputs provided, if any. Otherwise, no input is exposed.

  • Outputs

    The list of outputs that should be exposed by the flow. If both input and output properties are specified they must be an exact match

    If None is given, pyagentspec copies the inputs provided, if any. Otherwise, no output is exposed.

  • Branches

    None.

Examples

>>> from pyagentspec.agent import Agent
>>> from pyagentspec.flows.edges import ControlFlowEdge, DataFlowEdge
>>> from pyagentspec.flows.flow import Flow
>>> from pyagentspec.flows.nodes import AgentNode, BranchingNode, StartNode, EndNode
>>> from pyagentspec.property import Property
>>> languages_to_branch_name = {
...     "english": "ENGLISH",
...     "spanish": "SPANISH",
...     "italian": "ITALIAN",
... }
>>> language_property = Property(
...     json_schema={"title": "language", "type": "string"}
... )
>>> agent = Agent(
...     name="Language detector agent",
...     llm_config=llm_config,
...     system_prompt=(
...         "Your task is to understand the language spoken by the user."
...         "Please output only the language in lowercase and submit."
...     ),
...     outputs=[language_property],
... )
>>> start_node = StartNode(name="start")
>>> english_end_node = EndNode(
...     name="english end", branch_name=languages_to_branch_name["english"]
... )
>>> spanish_end_node = EndNode(
...     name="spanish end", branch_name=languages_to_branch_name["spanish"]
... )
>>> italian_end_node = EndNode(
...     name="italian end", branch_name=languages_to_branch_name["italian"]
... )
>>> unknown_end_node = EndNode(name="unknown language end", branch_name="unknown")
>>> branching_node = BranchingNode(
...     name="language check",
...     mapping=languages_to_branch_name,
...     inputs=[language_property]
... )
>>> agent_node = AgentNode(
...     name="User input agent node",
...     agent=agent,
... )
>>> assistant = Flow(
...     name="Check access flow",
...     start_node=start_node,
...     nodes=[
...         start_node,
...         agent_node,
...         branching_node,
...         english_end_node,
...         spanish_end_node,
...         italian_end_node,
...         unknown_end_node,
...     ],
...     control_flow_connections=[
...         ControlFlowEdge(
...             name="start_to_agent", from_node=start_node, to_node=agent_node
...         ),
...         ControlFlowEdge(
...             name="agent_to_branching", from_node=agent_node, to_node=branching_node
...         ),
...         ControlFlowEdge(
...             name="branching_to_english_end",
...             from_node=branching_node,
...             from_branch=languages_to_branch_name["english"],
...             to_node=english_end_node,
...         ),
...         ControlFlowEdge(
...             name="branching_to_spanish_end",
...             from_node=branching_node,
...             from_branch=languages_to_branch_name["spanish"],
...             to_node=spanish_end_node,
...         ),
...         ControlFlowEdge(
...             name="branching_to_italian_end",
...             from_node=branching_node,
...             from_branch=languages_to_branch_name["italian"],
...             to_node=italian_end_node,
...         ),
...         ControlFlowEdge(
...             name="branching_to_unknown_end",
...             from_node=branching_node,
...             from_branch=BranchingNode.DEFAULT_BRANCH,
...             to_node=unknown_end_node,
...         ),
...     ],
...     data_flow_connections=[
...         DataFlowEdge(
...             name="language_edge",
...             source_node=agent_node,
...             source_output="language",
...             destination_node=branching_node,
...             destination_input="language",
...         ),
...     ],
... )
Parameters:
  • id (str) – A unique identifier for this Component

  • name (str) – The name of the branch that corresponds to the branch that gets closed by this node, which will be exposed by the Flow

  • description (str | None) – Optional description of this Component

  • metadata (Dict[str, Any] | None) – Optional, additional metadata related to this Component

  • min_agentspec_version (AgentSpecVersionEnum) –

  • max_agentspec_version (AgentSpecVersionEnum) –

  • inputs (List[Property] | None) – List of inputs accepted by this component

  • outputs (List[Property] | None) – List of outputs exposed by this component

  • branches (List[str]) – The list of outgoing branch names that the node will expose

  • branch_name (str) – The name of the branch that corresponds to the branch that gets closed by this node, which will be exposed by the Flow

branch_name: str#

The name of the branch that corresponds to the branch that gets closed by this node, which will be exposed by the Flow

class pyagentspec.flows.nodes.catchexceptionnode.CatchExceptionNode(*, id=<factory>, name, description=None, metadata=<factory>, min_agentspec_version=AgentSpecVersionEnum.v26_2_0, max_agentspec_version=AgentSpecVersionEnum.v26_2_0, inputs=None, outputs=None, branches=<factory>, subflow)#

Bases: Node

Node to execute a Flow and catch exceptions.

  • If no exception is caught, the node will transition to the branches of its subflow.

  • If an exception is caught, it will transition to an exception branch.

Inputs#

Same as the inputs from the subflow.

Outputs#

  • The outputs of the subflow. If an exception is raised, the default values of each output property are used.

  • An additional output named caught_exception_info with type string | null and default value null. Executors may populate it with a non-sensitive error description when an exception is caught.

Branches#

  • The branches of the subflow

  • One additional branch named caught_exception_branch

Security Considerations#

See security considerations regarding exception catching in the Security Considerations

param id:

A unique identifier for this Component

param name:

Name of this Component

param description:

Optional description of this Component

param metadata:

Optional, additional metadata related to this Component

param min_agentspec_version:

param max_agentspec_version:

param inputs:

List of inputs accepted by this component

param outputs:

List of outputs exposed by this component

param branches:

The list of outgoing branch names that the node will expose

param subflow:

Flow to execute and catch errors from.

CAUGHT_EXCEPTION_BRANCH: ClassVar[str] = 'caught_exception_branch'#

Name of the branch used when an exception is caught.

DEFAULT_EXCEPTION_INFO_VALUE: ClassVar[str] = 'caught_exception_info'#

Name of the output property containing exception details.

subflow: Flow#

Flow to execute and catch errors from.

Parameters:
  • id (str) –

  • name (str) –

  • description (str | None) –

  • metadata (Dict[str, Any] | None) –

  • min_agentspec_version (AgentSpecVersionEnum) –

  • max_agentspec_version (AgentSpecVersionEnum) –

  • inputs (List[Property] | None) –

  • outputs (List[Property] | None) –

  • branches (List[str]) –

  • subflow (Flow) –

class pyagentspec.flows.nodes.flownode.FlowNode(*, id=<factory>, name, description=None, metadata=<factory>, min_agentspec_version=AgentSpecVersionEnum.v25_4_1, max_agentspec_version=AgentSpecVersionEnum.v26_2_0, inputs=None, outputs=None, branches=<factory>, subflow)#

Bases: Node

The flow node executes a subflow as part of a flow.

  • Inputs

    Inferred from the inner structure. It’s the sets of inputs required by the StartNode of the inner flow.

  • Outputs

    Inferred from the inner structure. It’s the union of the sets of outputs exposed by the EndNodes of the inner flow.

  • Branches

    Inferred from the inner flow, one per each different value of the attribute branch_name of the nodes of type EndNode in the inner flow.

Parameters:
  • id (str) – A unique identifier for this Component

  • name (str) – Name of this Component

  • description (str | None) – Optional description of this Component

  • metadata (Dict[str, Any] | None) – Optional, additional metadata related to this Component

  • min_agentspec_version (AgentSpecVersionEnum) –

  • max_agentspec_version (AgentSpecVersionEnum) –

  • inputs (List[Property] | None) – List of inputs accepted by this component

  • outputs (List[Property] | None) – List of outputs exposed by this component

  • branches (List[str]) – The list of outgoing branch names that the node will expose

  • subflow (Flow) – The flow that should be executed

Example

The FlowNode is particularly suitable when subflows can be reused inside a project. Let’s see an example with a flow that estimates numerical value using the “wisdowm of the crowd” effect:

>>> from pyagentspec.property import Property
>>> from pyagentspec.flows.flow import Flow
>>> from pyagentspec.flows.edges import ControlFlowEdge, DataFlowEdge
>>> from pyagentspec.flows.nodes import MapNode, LlmNode, ToolNode, StartNode, EndNode
>>> from pyagentspec.tools import ServerTool
>>> duplication_tool = ServerTool(
...     name="duplication_tool",
...     description="",
...     inputs=[
...         Property(
...             json_schema={"title": "element", "description": "", "type": "string"}
...         ),
...         Property(
...             json_schema={"title": "n", "description": "", "type": "integer"}
...         ),
...     ],
...     outputs=[
...         Property(
...             json_schema={
...                 "title": "flow_iterable_queries",
...                 "type": "array",
...                 "items": {"type": "string"}
...             },
...         )
...     ],
... )
>>> reduce_tool = ServerTool(
...     name="reduce_tool",
...     description="",
...     inputs=[
...         Property(
...             json_schema={"title": "elements", "type": "array", "items": {"type": "string"}}
...         ),
...     ],
...     outputs=[Property(json_schema={"title": "flow_processed_query", "type": "string"})],
... )
>>> # Defining a simple prompt
>>> REASONING_PROMPT_TEMPLATE = '''Provide your best numerical estimate for: {{user_input}}
... Your answer should be a single number.
... Do not include any units, reasoning, or extra text.'''
>>> # Defining the subflow for the map step
>>> user_input_property = Property(
...     json_schema={"title": "user_input", "type": "string"}
... )
>>> flow_processed_query_property = Property(
...     json_schema={"title": "flow_processed_query", "type": "string"}
... )
>>> start_node = StartNode(name="start", inputs=[user_input_property])
>>> end_node = EndNode(name="end", outputs=[flow_processed_query_property])
>>> llm_node = LlmNode(
...     name="reasoning llm node",
...     llm_config=llm_config,
...     prompt_template=REASONING_PROMPT_TEMPLATE,
...     inputs=[user_input_property],
...     outputs=[flow_processed_query_property],
... )
>>> inner_map_flow = Flow(
...     name="Map flow",
...     start_node=start_node,
...     nodes=[start_node, llm_node, end_node],
...     control_flow_connections=[
...         ControlFlowEdge(name="start_to_llm", from_node=start_node, to_node=llm_node),
...         ControlFlowEdge(name="llm_to_end", from_node=llm_node, to_node=end_node),
...     ],
...     data_flow_connections=[
...         DataFlowEdge(
...             name="query_edge",
...             source_node=start_node,
...             source_output="user_input",
...             destination_node=llm_node,
...             destination_input="user_input",
...         ),
...         DataFlowEdge(
...             name="search_results_edge",
...             source_node=llm_node,
...             source_output="flow_processed_query",
...             destination_node=end_node,
...             destination_input="flow_processed_query"
...         ),
...     ],
... )
>>> user_query_property = Property(
...     json_schema={"title": "user_query", "type": "string"}
... )
>>> n_repeat_property = Property(
...     json_schema={"title": "n_repeat", "type": "integer"}
... )
>>> flow_iterable_queries_property = Property(
...     json_schema={
...         "title": "iterated_user_input",
...         "type": "array",
...         "items": {"type": "string"},
...     }
... )
>>> flow_processed_queries_property = Property(
...     json_schema={
...         "title": "collected_flow_processed_query",
...         "type": "array",
...         "items": {"type": "string"},
...     }
... )
>>> start_node = StartNode(name="start", inputs=[user_query_property, n_repeat_property])
>>> end_node = EndNode(name="end", outputs=[flow_processed_query_property])
>>> duplication_node = ToolNode(
...     name="duplication_tool node",
...     tool=duplication_tool,
... )
>>> reduce_node = ToolNode(
...     name="reduce_tool node",
...     tool=reduce_tool,
... )
>>> map_node = MapNode(
...     name="map node",
...     subflow=inner_map_flow,
...     inputs=[flow_iterable_queries_property],
...     outputs=[flow_processed_queries_property],
... )
>>> mapreduce_flow = Flow(
...     name="Map-reduce flow",
...     start_node=start_node,
...     nodes=[start_node, duplication_node, map_node, reduce_node, end_node],
...     control_flow_connections=[
...         ControlFlowEdge(
...             name="start_to_duplication", from_node=start_node, to_node=duplication_node
...         ),
...         ControlFlowEdge(
...             name="duplication_to_map", from_node=duplication_node, to_node=map_node
...         ),
...         ControlFlowEdge(name="map_to_reduce", from_node=map_node, to_node=reduce_node),
...         ControlFlowEdge(name="reduce_to_end", from_node=reduce_node, to_node=end_node),
...     ],
...     data_flow_connections=[
...         DataFlowEdge(
...             name="query_edge",
...             source_node=start_node,
...             source_output="user_query",
...             destination_node=duplication_node,
...             destination_input="element",
...         ),
...         DataFlowEdge(
...             name="n_repeat_edge",
...             source_node=start_node,
...             source_output="n_repeat",
...             destination_node=duplication_node,
...             destination_input="n",
...         ),
...         DataFlowEdge(
...             name="flow_iterables_edge",
...             source_node=duplication_node,
...             source_output="flow_iterable_queries",
...             destination_node=map_node,
...             destination_input="iterated_user_input",
...         ),
...         DataFlowEdge(
...             name="flow_processed_queries_edge",
...             source_node=map_node,
...             source_output="collected_flow_processed_query",
...             destination_node=reduce_node,
...             destination_input="elements",
...         ),
...         DataFlowEdge(
...             name="flow_processed_query_edge",
...             source_node=reduce_node,
...             source_output="flow_processed_query",
...             destination_node=end_node,
...             destination_input="flow_processed_query",
...         ),
...     ],
... )

Once the subflow is created we can simply integrate it with the FlowNode:

>>> from pyagentspec.flows.nodes import FlowNode, AgentNode
>>> from pyagentspec.agent import Agent
>>> start_node = StartNode(name="start")
>>> end_node = EndNode(name="end", outputs=[flow_processed_query_property])
>>> flow_node = FlowNode(name="flow node", subflow=mapreduce_flow)
>>> agent = Agent(
...     name="User interaction agent",
...     llm_config=llm_config,
...     system_prompt=(
...         "Your task is to gather from the user the query and the number of times "
...         "it should be asked to an LLM. Once you have this information, submit and exit."
...     ),
...     outputs=[user_query_property, n_repeat_property],
... )
>>> agent_node = AgentNode(name="flow node", agent=agent)
>>> flow = Flow(
...     name="Map flow",
...     start_node=start_node,
...     nodes=[start_node, agent_node, flow_node, end_node],
...     control_flow_connections=[
...         ControlFlowEdge(name="start_to_agent", from_node=start_node, to_node=agent_node),
...         ControlFlowEdge(name="agent_to_flow", from_node=agent_node, to_node=flow_node),
...         ControlFlowEdge(name="flow_to_end", from_node=flow_node, to_node=end_node),
...     ],
...     data_flow_connections=[
...         DataFlowEdge(
...             name="query_edge",
...             source_node=agent_node,
...             source_output="user_query",
...             destination_node=flow_node,
...             destination_input="user_query",
...         ),
...         DataFlowEdge(
...             name="n_rep_edge",
...             source_node=agent_node,
...             source_output="n_repeat",
...             destination_node=flow_node,
...             destination_input="n_repeat"
...         ),
...         DataFlowEdge(
...             name="n_rep_edge",
...             source_node=flow_node,
...             source_output="flow_processed_query",
...             destination_node=end_node,
...             destination_input="flow_processed_query"
...         ),
...     ],
... )
subflow: Flow#

The flow that should be executed

class pyagentspec.flows.nodes.parallelflownode.ParallelFlowNode(*, id=<factory>, name, description=None, metadata=<factory>, min_agentspec_version=AgentSpecVersionEnum.v25_4_2, max_agentspec_version=AgentSpecVersionEnum.v26_2_0, inputs=None, outputs=None, branches=<factory>, subflows=<factory>)#

Bases: Node

The parallel flow node executes multiple subflows in parallel.

  • Inputs

    Inferred from the inner structure. It’s the union of the sets of inputs of the inner flows. Inputs of different inner flows that have the same name are merged if they have the same type.

  • Outputs

    Inferred from the inner structure. It’s the union of the outputs of the inner flows. Outputs of different inner flows that have the same name are not allowed.

  • Branches

    One, the default next.

Examples

Parameters:
  • id (str) – A unique identifier for this Component

  • name (str) – Name of this Component

  • description (str | None) – Optional description of this Component

  • metadata (Dict[str, Any] | None) – Optional, additional metadata related to this Component

  • min_agentspec_version (AgentSpecVersionEnum) –

  • max_agentspec_version (AgentSpecVersionEnum) –

  • inputs (List[Property] | None) – List of inputs accepted by this component

  • outputs (List[Property] | None) – List of outputs exposed by this component

  • branches (List[str]) – The list of outgoing branch names that the node will expose

  • subflows (List[Flow]) – The flows that should be executed in parallel

subflows: List[Flow]#

The flows that should be executed in parallel

class pyagentspec.flows.nodes.inputmessagenode.InputMessageNode(*, id=<factory>, name, description=None, metadata=<factory>, min_agentspec_version=AgentSpecVersionEnum.v25_4_1, max_agentspec_version=AgentSpecVersionEnum.v26_2_0, inputs=None, outputs=None, branches=<factory>, message=None)#

Bases: Node

This node interrupts the execution of the flow in order to wait for a user input, and restarts after receiving it. An agent message, if given, is appended to the conversation before waiting for input. User input is appended to the conversation as a user message, and it is returned as a string property from the node.

  • Inputs

    One per variable in the message

  • Outputs

    One string property that represents the content of the input user message.

    If None is given, pyagentspec infers a string property named user_input.

  • Branches

    One, the default next.

Examples

>>> from pyagentspec.flows.edges import ControlFlowEdge, DataFlowEdge
>>> from pyagentspec.flows.flow import Flow
>>> from pyagentspec.flows.nodes import StartNode, EndNode, InputMessageNode, OutputMessageNode, LlmNode
>>> from pyagentspec.property import StringProperty
>>> start_node = StartNode(name="start")
>>> prompt_node = OutputMessageNode(name="ask_input", message="What is the paragraph you want to rephrase?")
>>> input_node = InputMessageNode(name="user_input", outputs=[StringProperty(title="user_input")])
>>> llm_node = LlmNode(
...     name="rephrase",
...     llm_config=llm_config,
...     prompt_template="Rephrase {{user_input}}",
...     outputs=[StringProperty(title="rephrased_user_input")],
... )
>>> output_node = OutputMessageNode(name="ask_input", message="{{rephrased_user_input}}")
>>> end_node = EndNode(name="end")
>>> flow = Flow(
...     name="rephrase_paragraph_flow",
...     start_node=start_node,
...     nodes=[start_node, prompt_node, input_node, llm_node, output_node, end_node],
...     control_flow_connections=[
...         ControlFlowEdge(name="ce1", from_node=start_node, to_node=prompt_node),
...         ControlFlowEdge(name="ce2", from_node=prompt_node, to_node=input_node),
...         ControlFlowEdge(name="ce3", from_node=input_node, to_node=llm_node),
...         ControlFlowEdge(name="ce4", from_node=llm_node, to_node=output_node),
...         ControlFlowEdge(name="ce5", from_node=output_node, to_node=end_node),
...     ],
...     data_flow_connections=[
...         DataFlowEdge(
...             name="de1",
...             source_node=input_node,
...             source_output="user_input",
...             destination_node=llm_node,
...             destination_input="user_input",
...         ),
...         DataFlowEdge(
...             name="de2",
...             source_node=llm_node,
...             source_output="rephrased_user_input",
...             destination_node=output_node,
...             destination_input="rephrased_user_input",
...         ),
...     ]
... )
Parameters:
  • id (str) – A unique identifier for this Component

  • name (str) – Name of this Component

  • description (str | None) – Optional description of this Component

  • metadata (Dict[str, Any] | None) – Optional, additional metadata related to this Component

  • min_agentspec_version (AgentSpecVersionEnum) –

  • max_agentspec_version (AgentSpecVersionEnum) –

  • inputs (List[Property] | None) – List of inputs accepted by this component

  • outputs (List[Property] | None) – List of outputs exposed by this component

  • branches (List[str]) – The list of outgoing branch names that the node will expose

  • message (str | None) – The agent message to append to the conversation before waiting for user input

DEFAULT_OUTPUT: ClassVar[str] = 'user_input'#
message: str | None#

The agent message to append to the conversation before waiting for user input

class pyagentspec.flows.nodes.llmnode.LlmNode(*, id=<factory>, name, description=None, metadata=<factory>, min_agentspec_version=AgentSpecVersionEnum.v25_4_1, max_agentspec_version=AgentSpecVersionEnum.v26_2_0, inputs=None, outputs=None, branches=<factory>, llm_config, prompt_template)#

Bases: Node

Execute a prompt template with a given LLM.

This node is intended to be a part of a Flow.

  • Inputs

    One per placeholder in the prompt template.

  • Outputs

    The output text generated by the LLM.

    If None is given, pyagentspec infers a string property named generated_text.

  • Branches

    One, the default next.

Parameters:
  • id (str) – A unique identifier for this Component

  • name (str) – Name of this Component

  • description (str | None) – Optional description of this Component

  • metadata (Dict[str, Any] | None) – Optional, additional metadata related to this Component

  • min_agentspec_version (AgentSpecVersionEnum) –

  • max_agentspec_version (AgentSpecVersionEnum) –

  • inputs (List[Property] | None) – List of inputs accepted by this component

  • outputs (List[Property] | None) – List of outputs exposed by this component

  • branches (List[str]) – The list of outgoing branch names that the node will expose

  • llm_config (LlmConfig) – The LLM to use for generation in this node.

  • prompt_template (str) – Defines the prompt sent to the model. Allows placeholders, which can define inputs.

Example

>>> from pyagentspec.property import Property
>>> from pyagentspec.flows.flow import Flow
>>> from pyagentspec.flows.edges import ControlFlowEdge, DataFlowEdge
>>> from pyagentspec.flows.nodes import LlmNode, StartNode, EndNode
>>> country_property = Property(
...     json_schema={"title": "country", "type": "string"}
... )
>>> capital_property = Property(
...     json_schema={"title": "capital", "type": "string"}
... )
>>> start_node = StartNode(name="start", inputs=[country_property])
>>> end_node = EndNode(name="end", outputs=[capital_property])
>>> llm_node = LlmNode(
...     name="simple llm node",
...     llm_config=llm_config,
...     prompt_template="What is the capital of {{ country }}?",
...     inputs=[country_property],
...     outputs=[capital_property],
... )
>>> flow = Flow(
...     name="Get the country's capital flow",
...     start_node=start_node,
...     nodes=[start_node, llm_node, end_node],
...     control_flow_connections=[
...         ControlFlowEdge(name="start_to_llm", from_node=start_node, to_node=llm_node),
...         ControlFlowEdge(name="llm_to_end", from_node=llm_node, to_node=end_node),
...     ],
...     data_flow_connections=[
...         DataFlowEdge(
...             name="country_edge",
...             source_node=start_node,
...             source_output="country",
...             destination_node=llm_node,
...             destination_input="country",
...         ),
...         DataFlowEdge(
...             name="capital_edge",
...             source_node=llm_node,
...             source_output="capital",
...             destination_node=end_node,
...             destination_input="capital"
...         ),
...     ],
... )
DEFAULT_OUTPUT: ClassVar[str] = 'generated_text'#

Raw text generated by the LLM.

llm_config: LlmConfig#

The LLM to use for generation in this node.

prompt_template: str#

Defines the prompt sent to the model. Allows placeholders, which can define inputs.

class pyagentspec.flows.nodes.mapnode.MapNode(*, id=<factory>, name, description=None, metadata=<factory>, min_agentspec_version=AgentSpecVersionEnum.v25_4_1, max_agentspec_version=AgentSpecVersionEnum.v26_2_0, inputs=None, outputs=None, branches=<factory>, subflow, reducers=None)#

Bases: Node

The map node executes a subflow on each element of a given input as part of a flow.

  • Inputs

    Inferred from the inner structure. It’s the sets of inputs required by the StartNode of the inner flow. The names of the inputs will be the ones of the inner flow, complemented with the iterated_ prefix. Their type is Union[inner_type, List[inner_type]], where inner_type is the type of the respective input in the inner flow.

    If None is given, pyagentspec infers input properties directly from the inner flow, specifying title and type according to the rules defined above.

  • Outputs

    Inferred from the inner structure. It’s the union of the sets of outputs exposed by the EndNodes of the inner flow, combined with the reducer method of each output. The names of the outputs will be the ones of the inner flow, complemented with the collected_ prefix. Their type depends on the reduce method specified for that output:

    • List of the respective output type in case of append

    • same type of the respective output type in case of sum, avg

    If None is given, pyagentspec infers outputs by exposing an output property for each entry in the reducers dictionary, specifying title and type according to the rules defined above.

  • Branches

    One, the default next.

Examples

In this example we will create a flow that returns an L2-normalized version of a given list of numbers.

>>> from pyagentspec.property import Property
>>> from pyagentspec.flows.edges import ControlFlowEdge, DataFlowEdge
>>> from pyagentspec.flows.flow import Flow
>>> from pyagentspec.flows.nodes import EndNode, StartNode, MapNode, ToolNode
>>> from pyagentspec.tools import ServerTool

First, we define a MapNode that returns the square of all the elements in a list. It will be used to compute the L2-norm.

>>> x_property = Property(json_schema={"title": "x", "type": "number"})
>>> x_square_property = Property(
...     json_schema={"title": "x_square", "type": "number"}
... )
>>> square_tool = ServerTool(
...     name="compute_square_tool",
...     description="Computes the square of a number",
...     inputs=[x_property],
...     outputs=[x_square_property],
... )
>>> list_of_x_property = Property(
...     json_schema={"title": "x_list", "type": "array", "items": {"type": "number"}}
... )
>>> start_node = StartNode(name="start", inputs=[x_property])
>>> end_node = EndNode(name="end", outputs=[x_square_property])
>>> square_tool_node = ToolNode(name="square tool node", tool=square_tool)
>>> square_number_flow = Flow(
...     name="Square number flow",
...     start_node=start_node,
...     nodes=[start_node, square_tool_node, end_node],
...     control_flow_connections=[
...         ControlFlowEdge(
...             name="start_to_tool", from_node=start_node, to_node=square_tool_node
...         ),
...         ControlFlowEdge(
...             name="tool_to_end", from_node=square_tool_node, to_node=end_node
...         ),
...     ],
...     data_flow_connections=[
...         DataFlowEdge(
...             name="x_edge",
...             source_node=start_node,
...             source_output="x",
...             destination_node=square_tool_node,
...             destination_input="x",
...         ),
...         DataFlowEdge(
...             name="x_square_edge",
...             source_node=square_tool_node,
...             source_output="x_square",
...             destination_node=end_node,
...             destination_input="x_square",
...         ),
...     ],
... )
>>> list_of_x_square_property = Property(
...     json_schema={"title": "x_square_list", "type": "array", "items": {"type": "number"}}
... )
>>> square_numbers_map_node = MapNode(
...     name="square number map node",
...     subflow=square_number_flow,
... )

Now we define the MapNode responsible for normalizing the given list of input numbers. The denominator is the same for all of the numbers, we are going to map only the numerators (i.e., the input numbers).

>>> numerator_property = Property(
...     json_schema={"title": "numerator", "type": "number"}
... )
>>> denominator_property = Property(
...     json_schema={"title": "denominator", "type": "number"}
... )
>>> result_property = Property(
...     json_schema={"title": "result", "type": "number"}
... )
>>> division_tool = ServerTool(
...     name="division_tool",
...     description="Computes the ratio between two numbers",
...     inputs=[numerator_property, denominator_property],
...     outputs=[result_property],
... )
>>> start_node = StartNode(name="start", inputs=[numerator_property, denominator_property])
>>> end_node = EndNode(name="end", outputs=[result_property])
>>> divide_node = ToolNode(name="divide node", tool=division_tool)
>>> normalize_flow = Flow(
...     name="Normalize flow",
...     start_node=start_node,
...     nodes=[start_node, divide_node, end_node],
...     control_flow_connections=[
...         ControlFlowEdge(name="start_to_tool", from_node=start_node, to_node=divide_node),
...         ControlFlowEdge(name="tool_to_end", from_node=divide_node, to_node=end_node),
...     ],
...     data_flow_connections=[
...         DataFlowEdge(
...             name="numerator_edge",
...             source_node=start_node,
...             source_output="numerator",
...             destination_node=divide_node,
...             destination_input="numerator",
...         ),
...         DataFlowEdge(
...             name="denominator_edge",
...             source_node=start_node,
...             source_output="denominator",
...             destination_node=divide_node,
...             destination_input="denominator",
...         ),
...         DataFlowEdge(
...             name="result_edge",
...             source_node=divide_node,
...             source_output="result",
...             destination_node=end_node,
...             destination_input="result",
...         ),
...     ],
... )

Finally, we define the overall flow:

  • The list of inputs is squared

  • The squared list is summed and root squared

  • The list of inputs is normalized based on the outcomes of the previous 2 steps

>>> squared_sum_property = Property(
...     json_schema={"title": "squared_sum", "type": "number"}
... )
>>> normalized_list_of_x_property = Property(
...     json_schema={
...         "title": "x_list_normalized",
...         "type": "array",
...         "items": {"type": "number"},
...     }
... )
>>> normalize_map_node = MapNode(
...     name="normalize map node",
...     subflow=normalize_flow,
... )
>>> squared_sum_tool = ServerTool(
...     name="squared_sum_tool",
...     description="Computes the squared sum of a list of numbers",
...     inputs=[list_of_x_property],
...     outputs=[squared_sum_property],
... )
>>> start_node = StartNode(name="start", inputs=[list_of_x_property])
>>> end_node = EndNode(name="end", outputs=[normalized_list_of_x_property])
>>> squared_sum_tool_node = ToolNode(name="squared sum tool node", tool=squared_sum_tool)
>>> flow = Flow(
...     name="L2 normalize flow",
...     start_node=start_node,
...     nodes=[
...         start_node,
...         square_numbers_map_node,
...         squared_sum_tool_node,
...         normalize_map_node,
...         end_node,
...     ],
...     control_flow_connections=[
...         ControlFlowEdge(
...             name="start_to_square_numbers",
...             from_node=start_node,
...             to_node=square_numbers_map_node
...         ),
...         ControlFlowEdge(
...             name="square_numbers_to_squared_sum_tool",
...             from_node=square_numbers_map_node,
...             to_node=squared_sum_tool_node
...         ),
...         ControlFlowEdge(
...             name="squared_sum_tool_to_normalize",
...             from_node=squared_sum_tool_node,
...             to_node=normalize_map_node
...         ),
...         ControlFlowEdge(
...             name="normalize_to_end",
...             from_node=normalize_map_node,
...             to_node=end_node
...         ),
...     ],
...     data_flow_connections=[
...         DataFlowEdge(
...             name="list_of_x_edge",
...             source_node=start_node,
...             source_output="x_list",
...             destination_node=square_numbers_map_node,
...             destination_input="iterated_x",
...         ),
...         DataFlowEdge(
...             name="x_square_list_edge",
...             source_node=square_numbers_map_node,
...             source_output="collected_x_square",
...             destination_node=squared_sum_tool_node,
...             destination_input="x_list",
...         ),
...         DataFlowEdge(
...             name="numerator_edge",
...             source_node=start_node,
...             source_output="x_list",
...             destination_node=normalize_map_node,
...             destination_input="iterated_numerator",
...         ),
...         DataFlowEdge(
...             name="denominator_edge",
...             source_node=squared_sum_tool_node,
...             source_output="squared_sum",
...             destination_node=normalize_map_node,
...             destination_input="iterated_denominator",
...         ),
...         DataFlowEdge(
...             name="x_list_normalized_edge",
...             source_node=normalize_map_node,
...             source_output="collected_result",
...             destination_node=end_node,
...             destination_input="x_list_normalized",
...         ),
...     ],
... )
Parameters:
  • id (str) – A unique identifier for this Component

  • name (str) – Name of this Component

  • description (str | None) – Optional description of this Component

  • metadata (Dict[str, Any] | None) – Optional, additional metadata related to this Component

  • min_agentspec_version (AgentSpecVersionEnum) –

  • max_agentspec_version (AgentSpecVersionEnum) –

  • inputs (List[Property] | None) – List of inputs accepted by this component

  • outputs (List[Property] | None) – List of outputs exposed by this component

  • branches (List[str]) – The list of outgoing branch names that the node will expose

  • subflow (Flow) – The flow that should be applied to all the input values

  • reducers (Dict[str, ReductionMethod] | None) – The way the outputs of the different executions (map) should be collected together (reduce). It’s a dictionary mapping the name of an output to the respective reduction method (e.g., append, sum, avg, …, allowed methods depend on the type of the output)

reducers: Dict[str, ReductionMethod] | None#

The way the outputs of the different executions (map) should be collected together (reduce). It’s a dictionary mapping the name of an output to the respective reduction method (e.g., append, sum, avg, …, allowed methods depend on the type of the output)

subflow: Flow#

The flow that should be applied to all the input values

class pyagentspec.flows.nodes.parallelmapnode.ParallelMapNode(*, id=<factory>, name, description=None, metadata=<factory>, min_agentspec_version=AgentSpecVersionEnum.v25_4_2, max_agentspec_version=AgentSpecVersionEnum.v26_2_0, inputs=None, outputs=None, branches=<factory>, subflow, reducers=None)#

Bases: Node

The parallel map node executes a subflow on each element of a given input in a parallel manner.

  • Inputs

    Inferred from the inner structure. It’s the sets of inputs required by the StartNode of the inner flow. The names of the inputs will be the ones of the inner flow, complemented with the iterated_ prefix. Their type is Union[inner_type, List[inner_type]], where inner_type is the type of the respective input in the inner flow.

    If None is given, pyagentspec infers input properties directly from the inner flow, specifying title and type according to the rules defined above.

  • Outputs

    Inferred from the inner structure. It’s the union of the sets of outputs exposed by the EndNodes of the inner flow, combined with the reducer method of each output. The names of the outputs will be the ones of the inner flow, complemented with the collected_ prefix. Their type depends on the reduce method specified for that output:

    • List of the respective output type in case of append

    • same type of the respective output type in case of sum, avg

    If None is given, pyagentspec infers outputs by exposing an output property for each entry in the reducers dictionary, specifying title and type according to the rules defined above.

  • Branches

    One, the default next.

Examples

In this example we create a flow that generates a summary of the given articles that talk about LLMs.

>>> from pyagentspec.property import BooleanProperty, StringProperty, ListProperty
>>> from pyagentspec.flows.edges import ControlFlowEdge, DataFlowEdge
>>> from pyagentspec.flows.flow import Flow
>>> from pyagentspec.flows.nodes import EndNode, StartNode, ParallelMapNode, LlmNode, BranchingNode
>>> from pyagentspec.flows.node import Node

First we define the flow that determines if an article talks about LLMs or not:

  • if it does, we return it, so that we will use it for our summary

  • if it does not, we don’t return its text

>>> def create_data_flow_edge(source_node: Node, destination_node: Node, property_name: str) -> DataFlowEdge:
...     return DataFlowEdge(
...         name=f"{source_node.name}_{destination_node.name}_{property_name}_edge",
...         source_node=source_node,
...         source_output=property_name,
...         destination_node=destination_node,
...         destination_input=property_name,
...     )
>>>
>>> article_property = StringProperty(title="article")
>>> is_llm_article_property = StringProperty(title="is_article")
>>> llm_node = LlmNode(
...     name="check_if_article_talks_about_llms_node",
...     prompt_template="Look at this article: {{article}}. Does it talk about LLMs? Answer `yes` or `no`.",
...     llm_config=llm_config,
...     inputs=[article_property],
...     outputs=[is_llm_article_property],
... )
>>>
>>> branching_node = BranchingNode(
...     name="decide_if_we_should_return_the_article",
...     mapping={"yes": "yes"},
...     inputs=[is_llm_article_property],
... )
>>>
>>> start_node = StartNode(name="start", inputs=[article_property])
>>> end_node_with_output = EndNode(name="end_with_output", outputs=[article_property])
>>> end_node_without_output = EndNode(name="end_without_output", outputs=[])
>>>
>>> check_if_article_is_about_llm_flow = Flow(
...     name="is_article_about_llm_flow",
...     start_node=start_node,
...     nodes=[start_node, llm_node, end_node_with_output, end_node_without_output, branching_node],
...     control_flow_connections=[
...         ControlFlowEdge(name="start_to_llm", from_node=start_node, to_node=llm_node),
...         ControlFlowEdge(name="llm_to_branching", from_node=llm_node, to_node=branching_node),
...         ControlFlowEdge(name="branching_to_end_with", from_node=branching_node, to_node=end_node_with_output),
...         ControlFlowEdge(name="branching_to_end_without", from_node=branching_node, to_node=end_node_without_output),
...     ],
...     data_flow_connections=[
...         create_data_flow_edge(start_node, llm_node, article_property.title),
...         create_data_flow_edge(llm_node, branching_node, is_llm_article_property.title),
...         create_data_flow_edge(start_node, end_node_with_output, article_property.title),
...     ],
...     outputs=[StringProperty(title=article_property.title, default="")],
... )

We put this flow into a ParallelMapNode, so that we can perform the check in parallel on multiple articles at the same time.

>>> list_of_articles_property = ListProperty(title="iterated_article", item_type=article_property)
>>> list_of_articles_about_llm_property = ListProperty(title="collected_article", item_type=article_property)
>>> parallel_article_check_node = ParallelMapNode(
...     name="parallel_check_if_articles_talk_about_llms_node",
...     subflow=check_if_article_is_about_llm_flow,
...     inputs=[list_of_articles_property],
...     outputs=[list_of_articles_about_llm_property],
... )

Finally, we create the flow that takes the list of articles as input, filters them through the ParallelMapNode we have just created, and we generate the summary with the remaining articles.

>>> summary_property = StringProperty(title="summary")
>>> summary_llm_node = LlmNode(
...     name="generate_summary_of_llm_articles_node",
...     prompt_template="Summarize the following articles that talk about LLMs: {{collected_article}}",
...     llm_config=llm_config,
...     inputs=[list_of_articles_about_llm_property],
...     outputs=[summary_property],
... )
>>> start_node = StartNode(name="start", inputs=[list_of_articles_property])
>>> end_node = EndNode(name="end", outputs=[summary_property])
>>> generate_summary_of_llm_articles_flow = Flow(
...     name="Generate summary of articles about LLMs flow",
...     start_node=start_node,
...     nodes=[start_node, parallel_article_check_node, summary_llm_node, end_node],
...     control_flow_connections=[
...         ControlFlowEdge(name="start_to_parallelmap", from_node=start_node, to_node=parallel_article_check_node),
...         ControlFlowEdge(name="parallelmap_to_llm", from_node=parallel_article_check_node, to_node=summary_llm_node),
...         ControlFlowEdge(name="llm_to_end", from_node=summary_llm_node, to_node=end_node),
...     ],
...     data_flow_connections=[
...         create_data_flow_edge(start_node, parallel_article_check_node, list_of_articles_property.title),
...         create_data_flow_edge(parallel_article_check_node, summary_llm_node, list_of_articles_about_llm_property.title),
...         create_data_flow_edge(summary_llm_node, end_node, summary_property.title),
...     ],
... )
Parameters:
  • id (str) – A unique identifier for this Component

  • name (str) – Name of this Component

  • description (str | None) – Optional description of this Component

  • metadata (Dict[str, Any] | None) – Optional, additional metadata related to this Component

  • min_agentspec_version (AgentSpecVersionEnum) –

  • max_agentspec_version (AgentSpecVersionEnum) –

  • inputs (List[Property] | None) – List of inputs accepted by this component

  • outputs (List[Property] | None) – List of outputs exposed by this component

  • branches (List[str]) – The list of outgoing branch names that the node will expose

  • subflow (Flow) – The flow that should be applied to all the input values

  • reducers (Dict[str, ReductionMethod] | None) – The way the outputs of the different executions (map) should be collected together (reduce). It’s a dictionary mapping the name of an output to the respective reduction method (e.g., append, sum, avg, …, allowed methods depend on the type of the output)

reducers: Dict[str, ReductionMethod] | None#

The way the outputs of the different executions (map) should be collected together (reduce). It’s a dictionary mapping the name of an output to the respective reduction method (e.g., append, sum, avg, …, allowed methods depend on the type of the output)

subflow: Flow#

The flow that should be applied to all the input values

class pyagentspec.flows.nodes.outputmessagenode.OutputMessageNode(*, id=<factory>, name, description=None, metadata=<factory>, min_agentspec_version=AgentSpecVersionEnum.v25_4_1, max_agentspec_version=AgentSpecVersionEnum.v26_2_0, inputs=None, outputs=None, branches=<factory>, message)#

Bases: Node

This node appends an agent message to the ongoing flow conversation.

  • Inputs

    One per variable in the message.

  • Outputs

    No outputs.

  • Branches

    One, the default next.

Examples

>>> from pyagentspec.flows.edges import ControlFlowEdge, DataFlowEdge
>>> from pyagentspec.flows.flow import Flow
>>> from pyagentspec.flows.nodes import StartNode, EndNode, InputMessageNode, OutputMessageNode, LlmNode
>>> from pyagentspec.property import StringProperty
>>> start_node = StartNode(name="start")
>>> prompt_node = OutputMessageNode(name="ask_input", message="What is the paragraph you want to rephrase?")
>>> input_node = InputMessageNode(name="user_input", outputs=[StringProperty(title="user_input")])
>>> llm_node = LlmNode(
...     name="rephrase",
...     llm_config=llm_config,
...     prompt_template="Rephrase {{user_input}}",
...     outputs=[StringProperty(title="rephrased_user_input")],
... )
>>> output_node = OutputMessageNode(name="ask_input", message="{{rephrased_user_input}}")
>>> end_node = EndNode(name="end")
>>> flow = Flow(
...     name="rephrase_paragraph_flow",
...     start_node=start_node,
...     nodes=[start_node, prompt_node, input_node, llm_node, output_node, end_node],
...     control_flow_connections=[
...         ControlFlowEdge(name="ce1", from_node=start_node, to_node=prompt_node),
...         ControlFlowEdge(name="ce2", from_node=prompt_node, to_node=input_node),
...         ControlFlowEdge(name="ce3", from_node=input_node, to_node=llm_node),
...         ControlFlowEdge(name="ce4", from_node=llm_node, to_node=output_node),
...         ControlFlowEdge(name="ce5", from_node=output_node, to_node=end_node),
...     ],
...     data_flow_connections=[
...         DataFlowEdge(
...             name="de1",
...             source_node=input_node,
...             source_output="user_input",
...             destination_node=llm_node,
...             destination_input="user_input",
...         ),
...         DataFlowEdge(
...             name="de2",
...             source_node=llm_node,
...             source_output="rephrased_user_input",
...             destination_node=output_node,
...             destination_input="rephrased_user_input",
...         ),
...     ]
... )
Parameters:
  • id (str) – A unique identifier for this Component

  • name (str) – Name of this Component

  • description (str | None) – Optional description of this Component

  • metadata (Dict[str, Any] | None) – Optional, additional metadata related to this Component

  • min_agentspec_version (AgentSpecVersionEnum) –

  • max_agentspec_version (AgentSpecVersionEnum) –

  • inputs (List[Property] | None) – List of inputs accepted by this component

  • outputs (List[Property] | None) – List of outputs exposed by this component

  • branches (List[str]) – The list of outgoing branch names that the node will expose

  • message (str) – Content of the agent message to append. Allows placeholders, which can define inputs.

message: str#

Content of the agent message to append. Allows placeholders, which can define inputs.

class pyagentspec.flows.nodes.startnode.StartNode(*, id=<factory>, name, description=None, metadata=<factory>, min_agentspec_version=AgentSpecVersionEnum.v25_4_1, max_agentspec_version=AgentSpecVersionEnum.v26_2_0, inputs=None, outputs=None, branches=<factory>)#

Bases: Node

Start nodes denote the start of the execution of a flow.

  • Inputs

    The list of inputs that should be the inputs of the flow. If both input and output properties are specified they must be an exact match

    If None is given, pyagentspec copies the outputs provided, if any. Otherwise, no input is exposed.

  • Outputs

    The list of outputs of the step. If both input and output properties are specified they must be an exact match

    If None is given, pyagentspec copies the inputs provided, if any. Otherwise, no output is exposed.

  • Branches

    One, the default next.

Examples

>>> from pyagentspec.property import Property
>>> from pyagentspec.flows.edges import ControlFlowEdge, DataFlowEdge
>>> from pyagentspec.flows.flow import Flow
>>> from pyagentspec.flows.nodes import EndNode, LlmNode, StartNode
>>> user_question_property = Property(
...     json_schema=dict(
...         title="user_question",
...         description="The user question.",
...         type="string",
...     )
... )
>>> answer_property = Property(json_schema=dict(title="answer", type="string"))
>>> start_node = StartNode(name="start", inputs=[user_question_property])
>>> end_node = EndNode(name="end", outputs=[answer_property])
>>> llm_node = LlmNode(
...     name="llm node",
...     prompt_template="Answer the user question: {{user_question}}",
...     llm_config=llm_config,
... )
>>> flow = Flow(
...     name="flow",
...     start_node=start_node,
...     nodes=[start_node, llm_node, end_node],
...     control_flow_connections=[
...         ControlFlowEdge(name="start_to_llm", from_node=start_node, to_node=llm_node),
...         ControlFlowEdge(name="llm_to_end", from_node=llm_node, to_node=end_node),
...     ],
...     data_flow_connections=[
...         DataFlowEdge(
...             name="query_edge",
...             source_node=start_node,
...             source_output="user_question",
...             destination_node=llm_node,
...             destination_input="user_question",
...         ),
...         DataFlowEdge(
...             name="answer_edge",
...             source_node=llm_node,
...             source_output="generated_text",
...             destination_node=end_node,
...             destination_input="answer"
...         ),
...     ],
... )
Parameters:
  • id (str) – A unique identifier for this Component

  • name (str) – Name of this Component

  • description (str | None) – Optional description of this Component

  • metadata (Dict[str, Any] | None) – Optional, additional metadata related to this Component

  • min_agentspec_version (AgentSpecVersionEnum) –

  • max_agentspec_version (AgentSpecVersionEnum) –

  • inputs (List[Property] | None) – List of inputs accepted by this component

  • outputs (List[Property] | None) – List of outputs exposed by this component

  • branches (List[str]) – The list of outgoing branch names that the node will expose

branches: List[str]#

The list of outgoing branch names that the node will expose

class pyagentspec.flows.nodes.toolnode.ToolNode(*, id=<factory>, name, description=None, metadata=<factory>, min_agentspec_version=AgentSpecVersionEnum.v25_4_1, max_agentspec_version=AgentSpecVersionEnum.v26_2_0, inputs=None, outputs=None, branches=<factory>, tool)#

Bases: Node

The tool execution node is a node that will execute a tool as part of a flow.

  • Inputs

    Inferred from the definition of the tool to execute.

  • Outputs

    Inferred from the definition of the tool to execute.

  • Branches

    One, the default next.

Examples

>>> from pyagentspec.flows.edges import ControlFlowEdge, DataFlowEdge
>>> from pyagentspec.flows.flow import Flow
>>> from pyagentspec.flows.nodes import ToolNode, StartNode, EndNode
>>> from pyagentspec.tools import ServerTool
>>> from pyagentspec.property import Property
>>>
>>> x_property = Property(json_schema={"title": "x", "type": "number"})
>>> x_square_root_property = Property(
...     json_schema={"title": "x_square_root", "type": "number"}
... )
>>> square_root_tool = ServerTool(
...     name="compute_square_root",
...     description="Computes the square root of a number",
...     inputs=[x_property],
...     outputs=[x_square_root_property],
... )
>>> start_node = StartNode(name="start", inputs=[x_property])
>>> end_node = EndNode(name="end", outputs=[x_square_root_property])
>>> tool_node = ToolNode(name="", tool=square_root_tool)
>>> flow = Flow(
...     name="Compute square root flow",
...     start_node=start_node,
...     nodes=[start_node, tool_node, end_node],
...     control_flow_connections=[
...         ControlFlowEdge(name="start_to_tool", from_node=start_node, to_node=tool_node),
...         ControlFlowEdge(name="tool_to_end", from_node=tool_node, to_node=end_node),
...     ],
...     data_flow_connections=[
...         DataFlowEdge(
...             name="x_edge",
...             source_node=start_node,
...             source_output="x",
...             destination_node=tool_node,
...             destination_input="x",
...         ),
...         DataFlowEdge(
...             name="x_square_root_edge",
...             source_node=tool_node,
...             source_output="x_square_root",
...             destination_node=end_node,
...             destination_input="x_square_root"
...         ),
...     ],
... )
Parameters:
  • id (str) – A unique identifier for this Component

  • name (str) – Name of this Component

  • description (str | None) – Optional description of this Component

  • metadata (Dict[str, Any] | None) – Optional, additional metadata related to this Component

  • min_agentspec_version (AgentSpecVersionEnum) –

  • max_agentspec_version (AgentSpecVersionEnum) –

  • inputs (List[Property] | None) – List of inputs accepted by this component

  • outputs (List[Property] | None) – List of outputs exposed by this component

  • branches (List[str]) – The list of outgoing branch names that the node will expose

  • tool (Tool) – The tool to be executed in this Node

tool: Tool#

The tool to be executed in this Node

Enumerators#

class pyagentspec.flows.nodes.mapnode.ReductionMethod(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)#

Bases: Enum

Enumerator for the types of reduction available in the MapNode.

APPEND = 'append'#

Append all the elements to a list

AVERAGE = 'average'#

Compute the average of all the elements

MAX = 'max'#

Get the element with the highest value

MIN = 'min'#

Get the element with the lowest value

SUM = 'sum'#

Sum all the elements

Flow Builder#

The Flow Builder provides a concise, chainable API to assemble Agent Spec Flows programmatically. It helps wire control and data edges, set entry/finish points, and serialize flows to JSON/YAML.

See some code examples in the Reference Sheet

class pyagentspec.flows.flowbuilder.FlowBuilder#

Bases: object

A builder for constructing Agent Spec Flows.

add_conditional(source_node, source_value, destination_map, default_destination, branching_node_name=None)#

Add a condition/branching to the Flow.

Parameters:
  • source_node (Node | str) – Node/name from which to start the branching from.

  • source_value (str | tuple[pyagentspec.flows.node.Node | str, str]) – Which value to use to perform the branching condition. If str, uses the source_node. If tuple[Node | str, str], uses the specified node and output name.

  • destination_map (dict[str, pyagentspec.flows.node.Node | str]) – Dictionary which specifies which node to transition to for given values.

  • default_destination (Node | str) – Node/name where to transition to if no matching value/transition is found in the destination_map.

  • branching_node_name (str | None) – Optional name for the branching node. Uses automatically generated auto-incrementing names if not providing.

Return type:

FlowBuilder

Example

>>> from pyagentspec.flows.flowbuilder import FlowBuilder
>>> from pyagentspec.flows.nodes import LlmNode
>>>
>>> flow = (
...     FlowBuilder()
...     .add_node(LlmNode(name="source_node", llm_config=llm_config, prompt_template="Generate 'fail' or 'success'"))
...     .add_node(LlmNode(name="fail_node", llm_config=llm_config, prompt_template="Print 'FAIL'"))
...     .add_node(LlmNode(name="success_node", llm_config=llm_config, prompt_template="Print 'SUCESS'"))
...     .add_conditional("source_node", LlmNode.DEFAULT_OUTPUT, {"success": "success_node", "fail": "fail_node"}, default_destination="fail_node")
...     .set_entry_point("source_node")
...     .set_finish_points(["fail_node", "success_node"])
...     .build()
... )
add_data_edge(source_node, dest_node, data_name, edge_name=None)#

Add a data flow edge to the Flow.

Parameters:
  • source_node (Node | str) – Node/name which constitute the start/source of the data flow edge.

  • dest_node (Node | str) – Node/name that constitutes the end/destination of the data flow edge.

  • data_name (str | tuple[str, str]) – Name of the data property to propagate between the two nodes, either str when the name is shared, or tuple (source_output, destination_input) when the names are different.

  • edge_name (str | None) – Name for the edge. Defaults to “data_flow_edge”

Return type:

FlowBuilder

add_edge(source_node, dest_node, from_branch=None, edge_name=None)#

Add a control flow edge to the Flow.

Parameters:
  • source_node (list[pyagentspec.flows.node.Node | str] | Node | str) – Single node/name (creates 1 edge) or list of nodes/names (creates N edges) which constitute the start of the control flow edge(s).

  • dest_node (Node | str) – Node/name that constitutes the end of the control flow edge(s).

  • from_branch (list[str | None] | str | None) – Optional source branch name(s) to use in the control flow edge(s). When a list, must be of the same length as the list of source_node.

  • edge_name (str | None) – Name for the edge. Defaults to f”control_edge_{source_node.name}_{dest_node.name}_{from_branch}”.

Return type:

FlowBuilder

add_node(node)#

Add a new node to the Flow.

Parameters:

node (Node) – Node to add to the Flow.

Return type:

FlowBuilder

add_sequence(nodes)#

Add a sequence of nodes to the Flow.

Parameters:

nodes (list[pyagentspec.flows.node.Node]) – List of nodes to add to the Flow.

Return type:

FlowBuilder

build(name='Flow')#

Build the Flow.

Will raise errors if encountering any while building the Flow.

Parameters:

name (str) –

Return type:

Flow

classmethod build_linear_flow(nodes: list[Node], name: str = DEFAULT_FLOW_NAME, serialize_as: Literal[None] = None, data_flow_edges: list[tuple[Node | str, Node | str, str] | tuple[Node | str, Node | str, str, str] | DataFlowEdge] | None = None, inputs: list[Property] | None = None, outputs: list[Property] | None = None) Flow#
classmethod build_linear_flow(nodes: list[Node], name: str = DEFAULT_FLOW_NAME, serialize_as: Literal['JSON', 'YAML'] = 'JSON', data_flow_edges: list[tuple[Node | str, Node | str, str] | tuple[Node | str, Node | str, str, str] | DataFlowEdge] | None = None, inputs: list[Property] | None = None, outputs: list[Property] | None = None) str

Build a linear flow from a list of nodes.

Parameters:
  • nodes – List of nodes to use to create the linear/sequential Flow.

  • serialize_as – Format for the returned object. If None, returns a pyagentspec Flow. Otherwise, returns its Agent Spec configuration as JSON/YAML.

  • data_flow_edges – Optional list of data flow edges. Either a DataFlowEdge object or a (src_node, dst_node, var_name) tuple when the variable name is shared, or (src_node, dst_node, src_in, dst_out) otherwise.

  • inputs – Optional list of inputs for the flow. If None, auto-detects as the list of inputs that are not generated at some point in the execution of the flow.

  • outputs – Optional list of outputs for the flow branches corresponding to each of the specified terminal nodes. Must be of the same length as node. If None, auto-detects as the intersection of all the outputs generated by any node in any execution branch of the flow.

build_spec(name='Flow', serialize_as='JSON')#

Build the Flow and return its Agent Spec JSON/YAML configuration.

Will raise errors if encountering any while building the Flow.

Parameters:
  • name (str) –

  • serialize_as (Literal['JSON', 'YAML']) –

Return type:

str

set_entry_point(node, inputs=None)#

Sets the first node to execute in the Flow.

Parameters:
  • node (Node | str) – Node/name that will first be run in the Flow.

  • inputs (list[pyagentspec.property.Property] | None) – Optional list of inputs for the flow. If None, auto-detects as the list of inputs that are not generated at some point in the execution of the flow.

Return type:

FlowBuilder

set_finish_points(node, outputs=None)#

Specifies the potential points of completion of the Flow.

Parameters:
  • node (list[pyagentspec.flows.node.Node | str] | Node | str) – Node/name or list of nodes/names which are terminal nodes in the Flow.

  • outputs (list[list[pyagentspec.property.Property] | None] | list[pyagentspec.property.Property] | None) – Optional list of outputs for the flow branches corresponding to each of the specified terminal nodes. Must be of the same length as node. If None, auto-detects as the intersection of all the outputs generated by any node in any execution branch of the flow.

Return type:

FlowBuilder