Flows#
This page presents all APIs and classes related to Flows and nodes in PyAgentSpec.
Flows & Nodes#
- class pyagentspec.flows.flow.Flow(*, id=<factory>, name, description=None, metadata=<factory>, min_agentspec_version=AgentSpecVersionEnum.v25_4_1, max_agentspec_version=AgentSpecVersionEnum.v25_4_1, inputs=None, outputs=None, start_node, nodes, control_flow_connections, data_flow_connections=None)#
Bases:
AgenticComponent
A flow is a component to model sequences of operations to do in a precised order.
The operations and sequence is defined by the nodes and transitions associated to the flow. Steps can be deterministic, or for some use LLMs.
- Parameters:
id (str) – A unique identifier for this Component
name (str) – Name of this Component
description (str | None) – Optional description of this Component
metadata (Dict[str, Any] | None) – Optional, additional metadata related to this Component
min_agentspec_version (AgentSpecVersionEnum) –
max_agentspec_version (AgentSpecVersionEnum) –
inputs (List[Property] | None) – List of inputs accepted by this component
outputs (List[Property] | None) – List of outputs exposed by this component
start_node (Node) – The starting node of this flow. It must be also part of the nodes list
nodes (List[Node]) – The list of nodes that compose this Flow
control_flow_connections (List[ControlFlowEdge]) – The list of edges that define the control flow of this Flow
data_flow_connections (List[DataFlowEdge] | None) – The list of edges that define the data flow of this Flow
Example
>>> from pyagentspec.property import Property >>> from pyagentspec.flows.flow import Flow >>> from pyagentspec.flows.edges import ControlFlowEdge, DataFlowEdge >>> from pyagentspec.flows.nodes import LlmNode, StartNode, EndNode >>> prompt_property = Property( ... json_schema={"title": "prompt", "type": "string"} ... ) >>> llm_output_property = Property( ... json_schema={"title": "llm_output", "type": "string"} ... ) >>> start_node = StartNode(name="start", inputs=[prompt_property]) >>> end_node = EndNode(name="end", outputs=[llm_output_property]) >>> llm_node = LlmNode( ... name="simple llm node", ... llm_config=llm_config, ... prompt_template="{{prompt}}", ... inputs=[prompt_property], ... outputs=[llm_output_property], ... ) >>> flow = Flow( ... name="Simple prompting flow", ... start_node=start_node, ... nodes=[start_node, llm_node, end_node], ... control_flow_connections=[ ... ControlFlowEdge(name="start_to_llm", from_node=start_node, to_node=llm_node), ... ControlFlowEdge(name="llm_to_end", from_node=llm_node, to_node=end_node), ... ], ... data_flow_connections=[ ... DataFlowEdge( ... name="prompt_edge", ... source_node=start_node, ... source_output="prompt", ... destination_node=llm_node, ... destination_input="prompt", ... ), ... DataFlowEdge( ... name="llm_output_edge", ... source_node=llm_node, ... source_output="llm_output", ... destination_node=end_node, ... destination_input="llm_output" ... ), ... ], ... )
- class pyagentspec.flows.node.Node(*, id=<factory>, name, description=None, metadata=<factory>, min_agentspec_version=AgentSpecVersionEnum.v25_4_1, max_agentspec_version=AgentSpecVersionEnum.v25_4_1, inputs=None, outputs=None, branches=<factory>)#
Bases:
ComponentWithIO
Base class for all nodes that can be put inside a flow.
- Parameters:
id (str) – A unique identifier for this Component
name (str) – Name of this Component
description (str | None) – Optional description of this Component
metadata (Dict[str, Any] | None) – Optional, additional metadata related to this Component
min_agentspec_version (AgentSpecVersionEnum) –
max_agentspec_version (AgentSpecVersionEnum) –
inputs (List[Property] | None) – List of inputs accepted by this component
outputs (List[Property] | None) – List of outputs exposed by this component
branches (List[str]) – The list of outgoing branch names that the node will expose
- DEFAULT_NEXT_BRANCH: ClassVar[str] = 'next'#
Name used for the default branch
- branches: List[str]#
The list of outgoing branch names that the node will expose
- class pyagentspec.flows.edges.dataflowedge.DataFlowEdge(*, id=<factory>, name, description=None, metadata=<factory>, min_agentspec_version=AgentSpecVersionEnum.v25_4_1, max_agentspec_version=AgentSpecVersionEnum.v25_4_1, source_node, source_output, destination_node, destination_input)#
Bases:
Component
A data flow edge specifies how the output of a node propagates as input of another node.
An outputs can be propagated as input of several nodes.
- Parameters:
id (str) – A unique identifier for this Component
name (str) – Name of this Component
description (str | None) – Optional description of this Component
metadata (Dict[str, Any] | None) – Optional, additional metadata related to this Component
min_agentspec_version (AgentSpecVersionEnum) –
max_agentspec_version (AgentSpecVersionEnum) –
source_node (Node) – The instance of the source Node
source_output (str) – The name of the property among the source node outputs that should be connected
destination_node (Node) – The instance of the destination Node
destination_input (str) – The name of the property among the destination node inputs that should be connected
- destination_input: str#
The name of the property among the destination node inputs that should be connected
- source_output: str#
The name of the property among the source node outputs that should be connected
- class pyagentspec.flows.edges.controlflowedge.ControlFlowEdge(*, id=<factory>, name, description=None, metadata=<factory>, min_agentspec_version=AgentSpecVersionEnum.v25_4_1, max_agentspec_version=AgentSpecVersionEnum.v25_4_1, from_node, from_branch=None, to_node)#
Bases:
Component
A control flow edge specifies a possible transition from a node to another in a flow.
A single node can have several potential next nodes, in which case several control flow edges should be present in the control flow connections of that flow.
- Parameters:
id (str) – A unique identifier for this Component
name (str) – Name of this Component
description (str | None) – Optional description of this Component
metadata (Dict[str, Any] | None) – Optional, additional metadata related to this Component
min_agentspec_version (AgentSpecVersionEnum) –
max_agentspec_version (AgentSpecVersionEnum) –
from_node (Node) – The instance of the source Node
from_branch (str | None) – The name of the branch to connect. It must be among the list of branches offered by the from_node
to_node (Node) – The instance of the destination Node
- from_branch: str | None#
The name of the branch to connect. It must be among the list of branches offered by the from_node
Nodes#
- class pyagentspec.flows.nodes.agentnode.AgentNode(*, id=<factory>, name, description=None, metadata=<factory>, min_agentspec_version=AgentSpecVersionEnum.v25_4_1, max_agentspec_version=AgentSpecVersionEnum.v25_4_1, inputs=None, outputs=None, branches=<factory>, agent)#
Bases:
Node
The agent execution node is a node that will execute an agent as part of a flow.
If branches are configured, the agent will be prompted to select a branch before the agent node completes to transition to another node of the Flow.
- Inputs
Inferred from the definition of the agent to execute.
- Outputs
Inferred from the definition of the agent to execute.
- Branches
One, the default next.
Examples
>>> from pyagentspec.flows.flow import Flow >>> from pyagentspec.flows.edges import ControlFlowEdge, DataFlowEdge >>> from pyagentspec.agent import Agent >>> from pyagentspec.property import Property >>> from pyagentspec.flows.nodes import AgentNode, StartNode, EndNode >>> from pyagentspec.tools import ServerTool >>> query_property = Property(json_schema={"title": "query", "type": "string"}) >>> search_results_property = Property( ... json_schema={"title": "search_results", "type": "array", "items": {"type": "string"}} ... ) >>> search_tool = ServerTool( ... name="search_tool", ... description=( ... "This tool runs a web search with the given query " ... "and returns the most relevant results" ... ), ... inputs=[query_property], ... outputs=[search_results_property], ... ) >>> agent = Agent( ... name="Search agent", ... llm_config=llm_config, ... system_prompt=( ... "Your task is to gather the required information for the user: {{query}}" ... ), ... tools=[search_tool], ... outputs=[search_results_property], ... ) >>> start_node = StartNode(name="start", inputs=[query_property]) >>> end_node = EndNode(name="end", outputs=[search_results_property]) >>> agent_node = AgentNode( ... name="Search agent node", ... agent=agent, ... ) >>> flow = Flow( ... name="Search agent flow", ... start_node=start_node, ... nodes=[start_node, agent_node, end_node], ... control_flow_connections=[ ... ControlFlowEdge(name="start_to_agent", from_node=start_node, to_node=agent_node), ... ControlFlowEdge(name="agent_to_end", from_node=agent_node, to_node=end_node), ... ], ... data_flow_connections=[ ... DataFlowEdge( ... name="query_edge", ... source_node=start_node, ... source_output="query", ... destination_node=agent_node, ... destination_input="query", ... ), ... DataFlowEdge( ... name="search_results_edge", ... source_node=agent_node, ... source_output="search_results", ... destination_node=end_node, ... destination_input="search_results" ... ), ... ], ... )
- Parameters:
id (str) – A unique identifier for this Component
name (str) – Name of this Component
description (str | None) – Optional description of this Component
metadata (Dict[str, Any] | None) – Optional, additional metadata related to this Component
min_agentspec_version (AgentSpecVersionEnum) –
max_agentspec_version (AgentSpecVersionEnum) –
inputs (List[Property] | None) – List of inputs accepted by this component
outputs (List[Property] | None) – List of outputs exposed by this component
branches (List[str]) – The list of outgoing branch names that the node will expose
agent (AgenticComponent) – The agentic component that will be called as part of the execution of this node
- agent: AgenticComponent#
The agentic component that will be called as part of the execution of this node
- class pyagentspec.flows.nodes.apinode.ApiNode(*, id=<factory>, name, description=None, metadata=<factory>, min_agentspec_version=AgentSpecVersionEnum.v25_4_1, max_agentspec_version=AgentSpecVersionEnum.v25_4_1, inputs=None, outputs=None, branches=<factory>, url, http_method, api_spec_uri=None, data=<factory>, query_params=<factory>, headers=<factory>)#
Bases:
Node
Make an API call.
This node is intended to be a part of a Flow.
- Inputs
Inferred from the json spec retrieved from API Spec URI, if available and reachable. Otherwise, users have to manually specify them.
- Outputs
Inferred from the json spec retrieved from API Spec URI, if available and reachable. Otherwise, users have to manually specify them.
- Branches
One, the default next.
Examples
>>> from pyagentspec.flows.nodes import ApiNode >>> from pyagentspec.property import Property >>> weather_result_property = Property( ... json_schema={ ... "title": "zurich_weather", ... "type": "object", ... "properties": { ... "temperature": { ... "type": "number", ... "description": "Temperature in celsius degrees", ... }, ... "weather": {"type": "string"} ... }, ... } ... ) >>> call_current_weather_step = ApiNode( ... name="Weather API call node", ... url="https://example.com/weather", ... http_method = "GET", ... query_params={ ... "location": "zurich", ... }, ... outputs=[weather_result_property] ... ) >>> >>> item_id_property = Property( ... json_schema={"title": "item_id", "type": "string"} ... ) >>> order_id_property = Property( ... json_schema={"title": "order_id", "type": "string"} ... ) >>> store_id_property = Property( ... json_schema={"title": "store_id", "type": "string"} ... ) >>> session_id_property = Property( ... json_schema={"title": "session_id", "type": "string"} ... ) >>> create_order_step = ApiNode( ... name="Orders api call node", ... url="https://example.com/orders/{{ order_id }}", ... http_method="POST", ... # sending an object which will automatically be transformed into JSON ... data={ ... # define a static body parameter ... "topic_id": 12345, ... # define a templated body parameter. ... # The value for {{ item_id }} will be taken from the IO system at runtime ... "item_id": "{{ item_id }}", ... }, ... query_params={ ... # provide one templated query parameter called "store_id" ... # which will take its value from the IO system from key "store_id" ... "store_id": "{{ store_id }}", ... }, ... headers={ ... # set header session_id. the value is coming from the IO system ... "session_id": "{{ session_id }}", ... }, ... inputs=[item_id_property, order_id_property, store_id_property, session_id_property], ... )
- Parameters:
id (str) – A unique identifier for this Component
name (str) – Name of this Component
description (str | None) – Optional description of this Component
metadata (Dict[str, Any] | None) – Optional, additional metadata related to this Component
min_agentspec_version (AgentSpecVersionEnum) –
max_agentspec_version (AgentSpecVersionEnum) –
inputs (List[Property] | None) – List of inputs accepted by this component
outputs (List[Property] | None) – List of outputs exposed by this component
branches (List[str]) – The list of outgoing branch names that the node will expose
url (str) – The url of the API to which the call should be forwarded. Allows placeholders, which can define inputs
http_method (str) – The HTTP method to use for the API call (e.g., GET, POST, PUT, …). Allows placeholders, which can define inputs
api_spec_uri (str | None) – The uri of the specification of the API that is going to be called. Allows placeholders, which can define inputs
data (Dict[str, Any]) – The data to send as part of the body of this API call. Allows placeholders in dict values, which can define inputs
query_params (Dict[str, Any]) – Query parameters for the API call. Allows placeholders in dict values, which can define inputs
headers (Dict[str, Any]) – Additional headers for the API call. Allows placeholders in dict values, which can define inputs
- DEFAULT_OUTPUT: ClassVar[str] = 'response'#
Input key for the name to transition to next.
- Type:
str
- api_spec_uri: str | None#
The uri of the specification of the API that is going to be called. Allows placeholders, which can define inputs
- data: Dict[str, Any]#
The data to send as part of the body of this API call. Allows placeholders in dict values, which can define inputs
- headers: Dict[str, Any]#
Additional headers for the API call. Allows placeholders in dict values, which can define inputs
- http_method: str#
The HTTP method to use for the API call (e.g., GET, POST, PUT, …). Allows placeholders, which can define inputs
- query_params: Dict[str, Any]#
Query parameters for the API call. Allows placeholders in dict values, which can define inputs
- url: str#
The url of the API to which the call should be forwarded. Allows placeholders, which can define inputs
- class pyagentspec.flows.nodes.branchingnode.BranchingNode(*, id=<factory>, name, description=None, metadata=<factory>, min_agentspec_version=AgentSpecVersionEnum.v25_4_1, max_agentspec_version=AgentSpecVersionEnum.v25_4_1, inputs=None, outputs=None, branches=<factory>, mapping)#
Bases:
Node
Select the next node to transition to based on a mapping.
The input is used as key for the mapping. If the input does not correspond to any of the keys of the mapping the branch ‘default’ will be selected. This node is intended to be a part of a Flow.
- Inputs
The input value that should be used as key for the mapping.
- Outputs
None.
- Branches
One for each value in the mapping, plus a branch called
default
, which is the branch taken by the flow when mapping fails (i.e., the input does not match any key in the mapping).
Examples
>>> from pyagentspec.agent import Agent >>> from pyagentspec.flows.edges import ControlFlowEdge, DataFlowEdge >>> from pyagentspec.flows.flow import Flow >>> from pyagentspec.flows.nodes import AgentNode, BranchingNode, StartNode, EndNode >>> from pyagentspec.property import Property >>> CORRECT_PASSWORD_BRANCH = "PASSWORD_OK" >>> password_property = Property( ... json_schema={"title": "password", "type": "string"} ... ) >>> agent = Agent( ... name="User input agent", ... llm_config=llm_config, ... system_prompt=( ... "Your task is to ask the password to the user. " ... "Once you get it, submit it and end." ... ), ... outputs=[password_property], ... ) >>> start_node = StartNode(name="start") >>> access_granted_end_node = EndNode( ... name="access granted end", branch_name="ACCESS_GRANTED" ... ) >>> access_denied_end_node = EndNode( ... name="access denied end", branch_name="ACCESS_DENIED" ... ) >>> branching_node = BranchingNode( ... name="password check", ... mapping={"123456": CORRECT_PASSWORD_BRANCH}, ... inputs=[password_property] ... ) >>> agent_node = AgentNode( ... name="User input agent node", ... agent=agent, ... ) >>> assistant = Flow( ... name="Check access flow", ... start_node=start_node, ... nodes=[ ... start_node, ... agent_node, ... branching_node, ... access_granted_end_node, ... access_denied_end_node ... ], ... control_flow_connections=[ ... ControlFlowEdge( ... name="start_to_agent", ... from_node=start_node, ... to_node=agent_node ... ), ... ControlFlowEdge( ... name="agent_to_branching", ... from_node=agent_node, ... to_node=branching_node ... ), ... ControlFlowEdge( ... name="branching_to_access_granted", ... from_node=branching_node, ... from_branch=CORRECT_PASSWORD_BRANCH, ... to_node=access_granted_end_node, ... ), ... ControlFlowEdge( ... name="branching_to_access_denied", ... from_node=branching_node, ... from_branch=BranchingNode.DEFAULT_BRANCH, ... to_node=access_denied_end_node, ... ), ... ], ... data_flow_connections=[ ... DataFlowEdge( ... name="password_edge", ... source_node=agent_node, ... source_output="password", ... destination_node=branching_node, ... destination_input="password", ... ), ... ], ... )
- Parameters:
id (str) – A unique identifier for this Component
name (str) – Name of this Component
description (str | None) – Optional description of this Component
metadata (Dict[str, Any] | None) – Optional, additional metadata related to this Component
min_agentspec_version (AgentSpecVersionEnum) –
max_agentspec_version (AgentSpecVersionEnum) –
inputs (List[Property] | None) – List of inputs accepted by this component
outputs (List[Property] | None) – List of outputs exposed by this component
branches (List[str]) – The list of outgoing branch names that the node will expose
mapping (Dict[str, str]) – The mapping between the value of the input and the name of the outgoing branch that will be taken when that input value is given
- DEFAULT_BRANCH: ClassVar[str] = 'default'#
Name of the default branch used when mapping fails
- DEFAULT_INPUT: ClassVar[str] = 'branching_mapping_key'#
Input key for the name to transition to next.
- Type:
str
- mapping: Dict[str, str]#
The mapping between the value of the input and the name of the outgoing branch that will be taken when that input value is given
- class pyagentspec.flows.nodes.endnode.EndNode(*, id=<factory>, name, description=None, metadata=<factory>, min_agentspec_version=AgentSpecVersionEnum.v25_4_1, max_agentspec_version=AgentSpecVersionEnum.v25_4_1, inputs=None, outputs=None, branches=<factory>, branch_name='next')#
Bases:
Node
End nodes denote the end of the execution of a flow.
There might be several end nodes in a flow, in which case the executor of the flow should be able to track which one was reached and pass that back to the caller.
- Inputs
The list of inputs of the step. If both input and output properties are specified they must be an exact match
- Outputs
The list of outputs that should be exposed by the flow. If both input and output properties are specified they must be an exact match
- Branches
None.
Examples
>>> from pyagentspec.agent import Agent >>> from pyagentspec.flows.edges import ControlFlowEdge, DataFlowEdge >>> from pyagentspec.flows.flow import Flow >>> from pyagentspec.flows.nodes import AgentNode, BranchingNode, StartNode, EndNode >>> from pyagentspec.property import Property >>> languages_to_branch_name = { ... "english": "ENGLISH", ... "spanish": "SPANISH", ... "italian": "ITALIAN", ... } >>> language_property = Property( ... json_schema={"title": "language", "type": "string"} ... ) >>> agent = Agent( ... name="Language detector agent", ... llm_config=llm_config, ... system_prompt=( ... "Your task is to understand the language spoken by the user." ... "Please output only the language in lowercase and submit." ... ), ... outputs=[language_property], ... ) >>> start_node = StartNode(name="start") >>> english_end_node = EndNode( ... name="english end", branch_name=languages_to_branch_name["english"] ... ) >>> spanish_end_node = EndNode( ... name="spanish end", branch_name=languages_to_branch_name["spanish"] ... ) >>> italian_end_node = EndNode( ... name="italian end", branch_name=languages_to_branch_name["italian"] ... ) >>> unknown_end_node = EndNode(name="unknown language end", branch_name="unknown") >>> branching_node = BranchingNode( ... name="language check", ... mapping=languages_to_branch_name, ... inputs=[language_property] ... ) >>> agent_node = AgentNode( ... name="User input agent node", ... agent=agent, ... ) >>> assistant = Flow( ... name="Check access flow", ... start_node=start_node, ... nodes=[ ... start_node, ... agent_node, ... branching_node, ... english_end_node, ... spanish_end_node, ... italian_end_node, ... unknown_end_node, ... ], ... control_flow_connections=[ ... ControlFlowEdge( ... name="start_to_agent", from_node=start_node, to_node=agent_node ... ), ... ControlFlowEdge( ... name="agent_to_branching", from_node=agent_node, to_node=branching_node ... ), ... ControlFlowEdge( ... name="branching_to_english_end", ... from_node=branching_node, ... from_branch=languages_to_branch_name["english"], ... to_node=english_end_node, ... ), ... ControlFlowEdge( ... name="branching_to_spanish_end", ... from_node=branching_node, ... from_branch=languages_to_branch_name["spanish"], ... to_node=spanish_end_node, ... ), ... ControlFlowEdge( ... name="branching_to_italian_end", ... from_node=branching_node, ... from_branch=languages_to_branch_name["italian"], ... to_node=italian_end_node, ... ), ... ControlFlowEdge( ... name="branching_to_unknown_end", ... from_node=branching_node, ... from_branch=BranchingNode.DEFAULT_BRANCH, ... to_node=unknown_end_node, ... ), ... ], ... data_flow_connections=[ ... DataFlowEdge( ... name="language_edge", ... source_node=agent_node, ... source_output="language", ... destination_node=branching_node, ... destination_input="language", ... ), ... ], ... )
- Parameters:
id (str) – A unique identifier for this Component
name (str) – The name of the branch that corresponds to the branch that gets closed by this node, which will be exposed by the Flow
description (str | None) – Optional description of this Component
metadata (Dict[str, Any] | None) – Optional, additional metadata related to this Component
min_agentspec_version (AgentSpecVersionEnum) –
max_agentspec_version (AgentSpecVersionEnum) –
inputs (List[Property] | None) – List of inputs accepted by this component
outputs (List[Property] | None) – List of outputs exposed by this component
branches (List[str]) – The list of outgoing branch names that the node will expose
branch_name (str) – The name of the branch that corresponds to the branch that gets closed by this node, which will be exposed by the Flow
- branch_name: str#
The name of the branch that corresponds to the branch that gets closed by this node, which will be exposed by the Flow
- class pyagentspec.flows.nodes.flownode.FlowNode(*, id=<factory>, name, description=None, metadata=<factory>, min_agentspec_version=AgentSpecVersionEnum.v25_4_1, max_agentspec_version=AgentSpecVersionEnum.v25_4_1, inputs=None, outputs=None, branches=<factory>, subflow)#
Bases:
Node
The flow node executes a subflow as part of a flow.
- Inputs
Inferred from the inner structure. It’s the sets of inputs required by the StartNode of the inner flow.
- Outputs
Inferred from the inner structure. It’s the union of the sets of outputs exposed by the EndNodes of the inner flow.
- Branches
Inferred from the inner flow, one per each different value of the attribute
branch_name
of the nodes of type EndNode in the inner flow.
- Parameters:
id (str) – A unique identifier for this Component
name (str) – Name of this Component
description (str | None) – Optional description of this Component
metadata (Dict[str, Any] | None) – Optional, additional metadata related to this Component
min_agentspec_version (AgentSpecVersionEnum) –
max_agentspec_version (AgentSpecVersionEnum) –
inputs (List[Property] | None) – List of inputs accepted by this component
outputs (List[Property] | None) – List of outputs exposed by this component
branches (List[str]) – The list of outgoing branch names that the node will expose
subflow (Flow) – The flow that should be executed
Example
The
FlowNode
is particularly suitable when subflows can be reused inside a project. Let’s see an example with a flow that estimates numerical value using the “wisdowm of the crowd” effect:>>> from pyagentspec.property import Property >>> from pyagentspec.flows.flow import Flow >>> from pyagentspec.flows.edges import ControlFlowEdge, DataFlowEdge >>> from pyagentspec.flows.nodes import MapNode, LlmNode, ToolNode, StartNode, EndNode >>> from pyagentspec.tools import ServerTool >>> duplication_tool = ServerTool( ... name="duplication_tool", ... description="", ... inputs=[ ... Property( ... json_schema={"title": "element", "description": "", "type": "string"} ... ), ... Property( ... json_schema={"title": "n", "description": "", "type": "integer"} ... ), ... ], ... outputs=[ ... Property( ... json_schema={ ... "title": "flow_iterable_queries", ... "type": "array", ... "items": {"type": "string"} ... }, ... ) ... ], ... ) >>> reduce_tool = ServerTool( ... name="reduce_tool", ... description="", ... inputs=[ ... Property( ... json_schema={"title": "elements", "type": "array", "items": {"type": "string"}} ... ), ... ], ... outputs=[Property(json_schema={"title": "flow_processed_query", "type": "string"})], ... ) >>> # Defining a simple prompt >>> REASONING_PROMPT_TEMPLATE = '''Provide your best numerical estimate for: {{user_input}} ... Your answer should be a single number. ... Do not include any units, reasoning, or extra text.''' >>> # Defining the subflow for the map step >>> user_input_property = Property( ... json_schema={"title": "user_input", "type": "string"} ... ) >>> flow_processed_query_property = Property( ... json_schema={"title": "flow_processed_query", "type": "string"} ... ) >>> start_node = StartNode(name="start", inputs=[user_input_property]) >>> end_node = EndNode(name="end", outputs=[flow_processed_query_property]) >>> llm_node = LlmNode( ... name="reasoning llm node", ... llm_config=llm_config, ... prompt_template=REASONING_PROMPT_TEMPLATE, ... inputs=[user_input_property], ... outputs=[flow_processed_query_property], ... ) >>> inner_map_flow = Flow( ... name="Map flow", ... start_node=start_node, ... nodes=[start_node, llm_node, end_node], ... control_flow_connections=[ ... ControlFlowEdge(name="start_to_llm", from_node=start_node, to_node=llm_node), ... ControlFlowEdge(name="llm_to_end", from_node=llm_node, to_node=end_node), ... ], ... data_flow_connections=[ ... DataFlowEdge( ... name="query_edge", ... source_node=start_node, ... source_output="user_input", ... destination_node=llm_node, ... destination_input="user_input", ... ), ... DataFlowEdge( ... name="search_results_edge", ... source_node=llm_node, ... source_output="flow_processed_query", ... destination_node=end_node, ... destination_input="flow_processed_query" ... ), ... ], ... ) >>> user_query_property = Property( ... json_schema={"title": "user_query", "type": "string"} ... ) >>> n_repeat_property = Property( ... json_schema={"title": "n_repeat", "type": "integer"} ... ) >>> flow_iterable_queries_property = Property( ... json_schema={ ... "title": "iterated_user_input", ... "type": "array", ... "items": {"type": "string"}, ... } ... ) >>> flow_processed_queries_property = Property( ... json_schema={ ... "title": "collected_flow_processed_query", ... "type": "array", ... "items": {"type": "string"}, ... } ... ) >>> start_node = StartNode(name="start", inputs=[user_query_property, n_repeat_property]) >>> end_node = EndNode(name="end", outputs=[flow_processed_query_property]) >>> duplication_node = ToolNode( ... name="duplication_tool node", ... tool=duplication_tool, ... ) >>> reduce_node = ToolNode( ... name="reduce_tool node", ... tool=reduce_tool, ... ) >>> map_node = MapNode( ... name="map node", ... subflow=inner_map_flow, ... inputs=[flow_iterable_queries_property], ... outputs=[flow_processed_queries_property], ... ) >>> mapreduce_flow = Flow( ... name="Map-reduce flow", ... start_node=start_node, ... nodes=[start_node, duplication_node, map_node, reduce_node, end_node], ... control_flow_connections=[ ... ControlFlowEdge( ... name="start_to_duplication", from_node=start_node, to_node=duplication_node ... ), ... ControlFlowEdge( ... name="duplication_to_map", from_node=duplication_node, to_node=map_node ... ), ... ControlFlowEdge(name="map_to_reduce", from_node=map_node, to_node=reduce_node), ... ControlFlowEdge(name="reduce_to_end", from_node=reduce_node, to_node=end_node), ... ], ... data_flow_connections=[ ... DataFlowEdge( ... name="query_edge", ... source_node=start_node, ... source_output="user_query", ... destination_node=duplication_node, ... destination_input="element", ... ), ... DataFlowEdge( ... name="n_repeat_edge", ... source_node=start_node, ... source_output="n_repeat", ... destination_node=duplication_node, ... destination_input="n", ... ), ... DataFlowEdge( ... name="flow_iterables_edge", ... source_node=duplication_node, ... source_output="flow_iterable_queries", ... destination_node=map_node, ... destination_input="iterated_user_input", ... ), ... DataFlowEdge( ... name="flow_processed_queries_edge", ... source_node=map_node, ... source_output="collected_flow_processed_query", ... destination_node=reduce_node, ... destination_input="elements", ... ), ... DataFlowEdge( ... name="flow_processed_query_edge", ... source_node=reduce_node, ... source_output="flow_processed_query", ... destination_node=end_node, ... destination_input="flow_processed_query", ... ), ... ], ... )
Once the subflow is created we can simply integrate it with the
FlowNode
:>>> from pyagentspec.flows.nodes import FlowNode, AgentNode >>> from pyagentspec.agent import Agent >>> start_node = StartNode(name="start") >>> end_node = EndNode(name="end", outputs=[flow_processed_query_property]) >>> flow_node = FlowNode(name="flow node", subflow=mapreduce_flow) >>> agent = Agent( ... name="User interaction agent", ... llm_config=llm_config, ... system_prompt=( ... "Your task is to gather from the user the query and the number of times " ... "it should be asked to an LLM. Once you have this information, submit and exit." ... ), ... outputs=[user_query_property, n_repeat_property], ... ) >>> agent_node = AgentNode(name="flow node", agent=agent) >>> flow = Flow( ... name="Map flow", ... start_node=start_node, ... nodes=[start_node, agent_node, flow_node, end_node], ... control_flow_connections=[ ... ControlFlowEdge(name="start_to_agent", from_node=start_node, to_node=agent_node), ... ControlFlowEdge(name="agent_to_flow", from_node=agent_node, to_node=flow_node), ... ControlFlowEdge(name="flow_to_end", from_node=flow_node, to_node=end_node), ... ], ... data_flow_connections=[ ... DataFlowEdge( ... name="query_edge", ... source_node=agent_node, ... source_output="user_query", ... destination_node=flow_node, ... destination_input="user_query", ... ), ... DataFlowEdge( ... name="n_rep_edge", ... source_node=agent_node, ... source_output="n_repeat", ... destination_node=flow_node, ... destination_input="n_repeat" ... ), ... DataFlowEdge( ... name="n_rep_edge", ... source_node=flow_node, ... source_output="flow_processed_query", ... destination_node=end_node, ... destination_input="flow_processed_query" ... ), ... ], ... )
- class pyagentspec.flows.nodes.inputmessagenode.InputMessageNode(*, id=<factory>, name, description=None, metadata=<factory>, min_agentspec_version=AgentSpecVersionEnum.v25_4_1, max_agentspec_version=AgentSpecVersionEnum.v25_4_1, inputs=None, outputs=None, branches=<factory>, message=None)#
Bases:
Node
This node interrupts the execution of the flow in order to wait for a user input, and restarts after receiving it. An agent message, if given, is appended to the conversation before waiting for input. User input is appended to the conversation as a user message, and it is returned as a string property from the node.
- Inputs
One per variable in the message
- Outputs
One string property that represents the content of the input user message.
- Branches
One, the default next.
Examples
>>> from pyagentspec.flows.edges import ControlFlowEdge, DataFlowEdge >>> from pyagentspec.flows.flow import Flow >>> from pyagentspec.flows.nodes import StartNode, EndNode, InputMessageNode, OutputMessageNode, LlmNode >>> from pyagentspec.property import StringProperty >>> start_node = StartNode(name="start") >>> prompt_node = OutputMessageNode(name="ask_input", message="What is the paragraph you want to rephrase?") >>> input_node = InputMessageNode(name="user_input", outputs=[StringProperty(title="user_input")]) >>> llm_node = LlmNode( ... name="rephrase", ... llm_config=llm_config, ... prompt_template="Rephrase {{user_input}}", ... outputs=[StringProperty(title="rephrased_user_input")], ... ) >>> output_node = OutputMessageNode(name="ask_input", message="{{rephrased_user_input}}") >>> end_node = EndNode(name="end") >>> flow = Flow( ... name="rephrase_paragraph_flow", ... start_node=start_node, ... nodes=[start_node, prompt_node, input_node, llm_node, output_node, end_node], ... control_flow_connections=[ ... ControlFlowEdge(name="ce1", from_node=start_node, to_node=prompt_node), ... ControlFlowEdge(name="ce2", from_node=prompt_node, to_node=input_node), ... ControlFlowEdge(name="ce3", from_node=input_node, to_node=llm_node), ... ControlFlowEdge(name="ce4", from_node=llm_node, to_node=output_node), ... ControlFlowEdge(name="ce5", from_node=output_node, to_node=end_node), ... ], ... data_flow_connections=[ ... DataFlowEdge( ... name="de1", ... source_node=input_node, ... source_output="user_input", ... destination_node=llm_node, ... destination_input="user_input", ... ), ... DataFlowEdge( ... name="de2", ... source_node=llm_node, ... source_output="rephrased_user_input", ... destination_node=output_node, ... destination_input="rephrased_user_input", ... ), ... ] ... )
- Parameters:
id (str) – A unique identifier for this Component
name (str) – Name of this Component
description (str | None) – Optional description of this Component
metadata (Dict[str, Any] | None) – Optional, additional metadata related to this Component
min_agentspec_version (AgentSpecVersionEnum) –
max_agentspec_version (AgentSpecVersionEnum) –
inputs (List[Property] | None) – List of inputs accepted by this component
outputs (List[Property] | None) – List of outputs exposed by this component
branches (List[str]) – The list of outgoing branch names that the node will expose
message (str | None) – The agent message to append to the conversation before waiting for user input
- DEFAULT_OUTPUT: ClassVar[str] = 'user_input'#
- message: str | None#
The agent message to append to the conversation before waiting for user input
- class pyagentspec.flows.nodes.llmnode.LlmNode(*, id=<factory>, name, description=None, metadata=<factory>, min_agentspec_version=AgentSpecVersionEnum.v25_4_1, max_agentspec_version=AgentSpecVersionEnum.v25_4_1, inputs=None, outputs=None, branches=<factory>, llm_config, prompt_template)#
Bases:
Node
Execute a prompt template with a given LLM.
This node is intended to be a part of a Flow.
- Inputs
One per placeholder in the prompt template.
- Outputs
The output text generated by the LLM.
- Branches
One, the default next.
- Parameters:
id (str) – A unique identifier for this Component
name (str) – Name of this Component
description (str | None) – Optional description of this Component
metadata (Dict[str, Any] | None) – Optional, additional metadata related to this Component
min_agentspec_version (AgentSpecVersionEnum) –
max_agentspec_version (AgentSpecVersionEnum) –
inputs (List[Property] | None) – List of inputs accepted by this component
outputs (List[Property] | None) – List of outputs exposed by this component
branches (List[str]) – The list of outgoing branch names that the node will expose
llm_config (LlmConfig) – The LLM to use for generation in this node.
prompt_template (str) – Defines the prompt sent to the model. Allows placeholders, which can define inputs.
Example
>>> from pyagentspec.property import Property >>> from pyagentspec.flows.flow import Flow >>> from pyagentspec.flows.edges import ControlFlowEdge, DataFlowEdge >>> from pyagentspec.flows.nodes import LlmNode, StartNode, EndNode >>> country_property = Property( ... json_schema={"title": "country", "type": "string"} ... ) >>> capital_property = Property( ... json_schema={"title": "capital", "type": "string"} ... ) >>> start_node = StartNode(name="start", inputs=[country_property]) >>> end_node = EndNode(name="end", outputs=[capital_property]) >>> llm_node = LlmNode( ... name="simple llm node", ... llm_config=llm_config, ... prompt_template="What is the capital of {{ country }}?", ... inputs=[country_property], ... outputs=[capital_property], ... ) >>> flow = Flow( ... name="Get the country's capital flow", ... start_node=start_node, ... nodes=[start_node, llm_node, end_node], ... control_flow_connections=[ ... ControlFlowEdge(name="start_to_llm", from_node=start_node, to_node=llm_node), ... ControlFlowEdge(name="llm_to_end", from_node=llm_node, to_node=end_node), ... ], ... data_flow_connections=[ ... DataFlowEdge( ... name="country_edge", ... source_node=start_node, ... source_output="country", ... destination_node=llm_node, ... destination_input="country", ... ), ... DataFlowEdge( ... name="capital_edge", ... source_node=llm_node, ... source_output="capital", ... destination_node=end_node, ... destination_input="capital" ... ), ... ], ... )
- DEFAULT_OUTPUT: ClassVar[str] = 'generated_text'#
Raw text generated by the LLM.
- Type:
str
- prompt_template: str#
Defines the prompt sent to the model. Allows placeholders, which can define inputs.
- class pyagentspec.flows.nodes.mapnode.MapNode(*, id=<factory>, name, description=None, metadata=<factory>, min_agentspec_version=AgentSpecVersionEnum.v25_4_1, max_agentspec_version=AgentSpecVersionEnum.v25_4_1, inputs=None, outputs=None, branches=<factory>, subflow, reducers=None)#
Bases:
Node
The map node executes a subflow on each element of a given input as part of a flow.
- Inputs
Inferred from the inner structure. It’s the sets of inputs required by the StartNode of the inner flow. The names of the inputs will be the ones of the inner flow, complemented with the
iterated_
prefix. Their type isUnion[inner_type, List[inner_type]]
, whereinner_type
is the type of the respective input in the inner flow.
- Outputs
Inferred from the inner structure. It’s the union of the sets of outputs exposed by the EndNodes of the inner flow, combined with the reducer method of each output. The names of the outputs will be the ones of the inner flow, complemented with the
collected_
prefix. Their type depends on thereduce
method specified for that output:List
of the respective output type in case ofappend
same type of the respective output type in case of
sum
,avg
- Branches
One, the default next.
Examples
In this example we will create a flow that returns an L2-normalized version of a given list of numbers.
>>> from pyagentspec.property import Property >>> from pyagentspec.flows.edges import ControlFlowEdge, DataFlowEdge >>> from pyagentspec.flows.flow import Flow >>> from pyagentspec.flows.nodes import EndNode, StartNode, MapNode, ToolNode >>> from pyagentspec.tools import ServerTool
First, we define a MapNode that returns the square of all the elements in a list. It will be used to compute the L2-norm.
>>> x_property = Property(json_schema={"title": "x", "type": "number"}) >>> x_square_property = Property( ... json_schema={"title": "x_square", "type": "number"} ... ) >>> square_tool = ServerTool( ... name="compute_square_tool", ... description="Computes the square of a number", ... inputs=[x_property], ... outputs=[x_square_property], ... ) >>> list_of_x_property = Property( ... json_schema={"title": "x_list", "type": "array", "items": {"type": "number"}} ... ) >>> start_node = StartNode(name="start", inputs=[x_property]) >>> end_node = EndNode(name="end", outputs=[x_square_property]) >>> square_tool_node = ToolNode(name="square tool node", tool=square_tool) >>> square_number_flow = Flow( ... name="Square number flow", ... start_node=start_node, ... nodes=[start_node, square_tool_node, end_node], ... control_flow_connections=[ ... ControlFlowEdge( ... name="start_to_tool", from_node=start_node, to_node=square_tool_node ... ), ... ControlFlowEdge( ... name="tool_to_end", from_node=square_tool_node, to_node=end_node ... ), ... ], ... data_flow_connections=[ ... DataFlowEdge( ... name="x_edge", ... source_node=start_node, ... source_output="x", ... destination_node=square_tool_node, ... destination_input="x", ... ), ... DataFlowEdge( ... name="x_square_edge", ... source_node=square_tool_node, ... source_output="x_square", ... destination_node=end_node, ... destination_input="x_square", ... ), ... ], ... ) >>> list_of_x_square_property = Property( ... json_schema={"title": "x_square_list", "type": "array", "items": {"type": "number"}} ... ) >>> square_numbers_map_node = MapNode( ... name="square number map node", ... subflow=square_number_flow, ... )
Now we define the MapNode responsible for normalizing the given list of input numbers. The denominator is the same for all of the numbers, we are going to map only the numerators (i.e., the input numbers).
>>> numerator_property = Property( ... json_schema={"title": "numerator", "type": "number"} ... ) >>> denominator_property = Property( ... json_schema={"title": "denominator", "type": "number"} ... ) >>> result_property = Property( ... json_schema={"title": "result", "type": "number"} ... ) >>> division_tool = ServerTool( ... name="division_tool", ... description="Computes the ratio between two numbers", ... inputs=[numerator_property, denominator_property], ... outputs=[result_property], ... ) >>> start_node = StartNode(name="start", inputs=[numerator_property, denominator_property]) >>> end_node = EndNode(name="end", outputs=[result_property]) >>> divide_node = ToolNode(name="divide node", tool=division_tool) >>> normalize_flow = Flow( ... name="Normalize flow", ... start_node=start_node, ... nodes=[start_node, divide_node, end_node], ... control_flow_connections=[ ... ControlFlowEdge(name="start_to_tool", from_node=start_node, to_node=divide_node), ... ControlFlowEdge(name="tool_to_end", from_node=divide_node, to_node=end_node), ... ], ... data_flow_connections=[ ... DataFlowEdge( ... name="numerator_edge", ... source_node=start_node, ... source_output="numerator", ... destination_node=divide_node, ... destination_input="numerator", ... ), ... DataFlowEdge( ... name="denominator_edge", ... source_node=start_node, ... source_output="denominator", ... destination_node=divide_node, ... destination_input="denominator", ... ), ... DataFlowEdge( ... name="result_edge", ... source_node=divide_node, ... source_output="result", ... destination_node=end_node, ... destination_input="result", ... ), ... ], ... )
Finally, we define the overall flow:
The list of inputs is squared
The squared list is summed and root squared
The list of inputs is normalized based on the outcomes of the previous 2 steps
>>> squared_sum_property = Property( ... json_schema={"title": "squared_sum", "type": "number"} ... ) >>> normalized_list_of_x_property = Property( ... json_schema={ ... "title": "x_list_normalized", ... "type": "array", ... "items": {"type": "number"}, ... } ... ) >>> normalize_map_node = MapNode( ... name="normalize map node", ... subflow=normalize_flow, ... ) >>> squared_sum_tool = ServerTool( ... name="squared_sum_tool", ... description="Computes the squared sum of a list of numbers", ... inputs=[list_of_x_property], ... outputs=[squared_sum_property], ... ) >>> start_node = StartNode(name="start", inputs=[list_of_x_property]) >>> end_node = EndNode(name="end", outputs=[normalized_list_of_x_property]) >>> squared_sum_tool_node = ToolNode(name="squared sum tool node", tool=squared_sum_tool) >>> flow = Flow( ... name="L2 normalize flow", ... start_node=start_node, ... nodes=[ ... start_node, ... square_numbers_map_node, ... squared_sum_tool_node, ... normalize_map_node, ... end_node, ... ], ... control_flow_connections=[ ... ControlFlowEdge( ... name="start_to_square_numbers", ... from_node=start_node, ... to_node=square_numbers_map_node ... ), ... ControlFlowEdge( ... name="square_numbers_to_squared_sum_tool", ... from_node=square_numbers_map_node, ... to_node=squared_sum_tool_node ... ), ... ControlFlowEdge( ... name="squared_sum_tool_to_normalize", ... from_node=squared_sum_tool_node, ... to_node=normalize_map_node ... ), ... ControlFlowEdge( ... name="normalize_to_end", ... from_node=normalize_map_node, ... to_node=end_node ... ), ... ], ... data_flow_connections=[ ... DataFlowEdge( ... name="list_of_x_edge", ... source_node=start_node, ... source_output="x_list", ... destination_node=square_numbers_map_node, ... destination_input="iterated_x", ... ), ... DataFlowEdge( ... name="x_square_list_edge", ... source_node=square_numbers_map_node, ... source_output="collected_x_square", ... destination_node=squared_sum_tool_node, ... destination_input="x_list", ... ), ... DataFlowEdge( ... name="numerator_edge", ... source_node=start_node, ... source_output="x_list", ... destination_node=normalize_map_node, ... destination_input="iterated_numerator", ... ), ... DataFlowEdge( ... name="denominator_edge", ... source_node=squared_sum_tool_node, ... source_output="squared_sum", ... destination_node=normalize_map_node, ... destination_input="iterated_denominator", ... ), ... DataFlowEdge( ... name="x_list_normalized_edge", ... source_node=normalize_map_node, ... source_output="collected_result", ... destination_node=end_node, ... destination_input="x_list_normalized", ... ), ... ], ... )
- Parameters:
id (str) – A unique identifier for this Component
name (str) – Name of this Component
description (str | None) – Optional description of this Component
metadata (Dict[str, Any] | None) – Optional, additional metadata related to this Component
min_agentspec_version (AgentSpecVersionEnum) –
max_agentspec_version (AgentSpecVersionEnum) –
inputs (List[Property] | None) – List of inputs accepted by this component
outputs (List[Property] | None) – List of outputs exposed by this component
branches (List[str]) – The list of outgoing branch names that the node will expose
subflow (Flow) – The flow that should be applied to all the input values
reducers (Dict[str, ReductionMethod] | None) – The way the outputs of the different executions (map) should be collected together (reduce). It’s a dictionary mapping the name of an output to the respective reduction method (e.g., append, sum, avg, …, allowed methods depend on the type of the output)
- reducers: Dict[str, ReductionMethod] | None#
The way the outputs of the different executions (map) should be collected together (reduce). It’s a dictionary mapping the name of an output to the respective reduction method (e.g., append, sum, avg, …, allowed methods depend on the type of the output)
- class pyagentspec.flows.nodes.outputmessagenode.OutputMessageNode(*, id=<factory>, name, description=None, metadata=<factory>, min_agentspec_version=AgentSpecVersionEnum.v25_4_1, max_agentspec_version=AgentSpecVersionEnum.v25_4_1, inputs=None, outputs=None, branches=<factory>, message)#
Bases:
Node
This node appends an agent message to the ongoing flow conversation.
- Inputs
One per variable in the message.
- Outputs
No outputs.
- Branches
One, the default next.
Examples
>>> from pyagentspec.flows.edges import ControlFlowEdge, DataFlowEdge >>> from pyagentspec.flows.flow import Flow >>> from pyagentspec.flows.nodes import StartNode, EndNode, InputMessageNode, OutputMessageNode, LlmNode >>> from pyagentspec.property import StringProperty >>> start_node = StartNode(name="start") >>> prompt_node = OutputMessageNode(name="ask_input", message="What is the paragraph you want to rephrase?") >>> input_node = InputMessageNode(name="user_input", outputs=[StringProperty(title="user_input")]) >>> llm_node = LlmNode( ... name="rephrase", ... llm_config=llm_config, ... prompt_template="Rephrase {{user_input}}", ... outputs=[StringProperty(title="rephrased_user_input")], ... ) >>> output_node = OutputMessageNode(name="ask_input", message="{{rephrased_user_input}}") >>> end_node = EndNode(name="end") >>> flow = Flow( ... name="rephrase_paragraph_flow", ... start_node=start_node, ... nodes=[start_node, prompt_node, input_node, llm_node, output_node, end_node], ... control_flow_connections=[ ... ControlFlowEdge(name="ce1", from_node=start_node, to_node=prompt_node), ... ControlFlowEdge(name="ce2", from_node=prompt_node, to_node=input_node), ... ControlFlowEdge(name="ce3", from_node=input_node, to_node=llm_node), ... ControlFlowEdge(name="ce4", from_node=llm_node, to_node=output_node), ... ControlFlowEdge(name="ce5", from_node=output_node, to_node=end_node), ... ], ... data_flow_connections=[ ... DataFlowEdge( ... name="de1", ... source_node=input_node, ... source_output="user_input", ... destination_node=llm_node, ... destination_input="user_input", ... ), ... DataFlowEdge( ... name="de2", ... source_node=llm_node, ... source_output="rephrased_user_input", ... destination_node=output_node, ... destination_input="rephrased_user_input", ... ), ... ] ... )
- Parameters:
id (str) – A unique identifier for this Component
name (str) – Name of this Component
description (str | None) – Optional description of this Component
metadata (Dict[str, Any] | None) – Optional, additional metadata related to this Component
min_agentspec_version (AgentSpecVersionEnum) –
max_agentspec_version (AgentSpecVersionEnum) –
inputs (List[Property] | None) – List of inputs accepted by this component
outputs (List[Property] | None) – List of outputs exposed by this component
branches (List[str]) – The list of outgoing branch names that the node will expose
message (str) – Content of the agent message to append. Allows placeholders, which can define inputs.
- message: str#
Content of the agent message to append. Allows placeholders, which can define inputs.
- class pyagentspec.flows.nodes.startnode.StartNode(*, id=<factory>, name, description=None, metadata=<factory>, min_agentspec_version=AgentSpecVersionEnum.v25_4_1, max_agentspec_version=AgentSpecVersionEnum.v25_4_1, inputs=None, outputs=None, branches=<factory>)#
Bases:
Node
Start nodes denote the start of the execution of a flow.
- Inputs
The list of inputs that should be the inputs of the flow. If both input and output properties are specified they must be an exact match
- Outputs
The list of outputs of the step. If both input and output properties are specified they must be an exact match
- Branches
One, the default next.
Examples
>>> from pyagentspec.property import Property >>> from pyagentspec.flows.edges import ControlFlowEdge, DataFlowEdge >>> from pyagentspec.flows.flow import Flow >>> from pyagentspec.flows.nodes import EndNode, LlmNode, StartNode >>> user_question_property = Property( ... json_schema=dict( ... title="user_question", ... description="The user question.", ... type="string", ... ) ... ) >>> answer_property = Property(json_schema=dict(title="answer", type="string")) >>> start_node = StartNode(name="start", inputs=[user_question_property]) >>> end_node = EndNode(name="end", outputs=[answer_property]) >>> llm_node = LlmNode( ... name="llm node", ... prompt_template="Answer the user question: {{user_question}}", ... llm_config=llm_config, ... ) >>> flow = Flow( ... name="flow", ... start_node=start_node, ... nodes=[start_node, llm_node, end_node], ... control_flow_connections=[ ... ControlFlowEdge(name="start_to_llm", from_node=start_node, to_node=llm_node), ... ControlFlowEdge(name="llm_to_end", from_node=llm_node, to_node=end_node), ... ], ... data_flow_connections=[ ... DataFlowEdge( ... name="query_edge", ... source_node=start_node, ... source_output="user_question", ... destination_node=llm_node, ... destination_input="user_question", ... ), ... DataFlowEdge( ... name="answer_edge", ... source_node=llm_node, ... source_output="generated_text", ... destination_node=end_node, ... destination_input="answer" ... ), ... ], ... )
- Parameters:
id (str) – A unique identifier for this Component
name (str) – Name of this Component
description (str | None) – Optional description of this Component
metadata (Dict[str, Any] | None) – Optional, additional metadata related to this Component
min_agentspec_version (AgentSpecVersionEnum) –
max_agentspec_version (AgentSpecVersionEnum) –
inputs (List[Property] | None) – List of inputs accepted by this component
outputs (List[Property] | None) – List of outputs exposed by this component
branches (List[str]) – The list of outgoing branch names that the node will expose
- class pyagentspec.flows.nodes.toolnode.ToolNode(*, id=<factory>, name, description=None, metadata=<factory>, min_agentspec_version=AgentSpecVersionEnum.v25_4_1, max_agentspec_version=AgentSpecVersionEnum.v25_4_1, inputs=None, outputs=None, branches=<factory>, tool)#
Bases:
Node
The tool execution node is a node that will execute a tool as part of a flow.
- Inputs
Inferred from the definition of the tool to execute.
- Outputs
Inferred from the definition of the tool to execute.
- Branches
One, the default next.
Examples
>>> from pyagentspec.flows.edges import ControlFlowEdge, DataFlowEdge >>> from pyagentspec.flows.flow import Flow >>> from pyagentspec.flows.nodes import ToolNode, StartNode, EndNode >>> from pyagentspec.tools import ServerTool >>> from pyagentspec.property import Property >>> >>> x_property = Property(json_schema={"title": "x", "type": "number"}) >>> x_square_root_property = Property( ... json_schema={"title": "x_square_root", "type": "number"} ... ) >>> square_root_tool = ServerTool( ... name="compute_square_root", ... description="Computes the square root of a number", ... inputs=[x_property], ... outputs=[x_square_root_property], ... ) >>> start_node = StartNode(name="start", inputs=[x_property]) >>> end_node = EndNode(name="end", outputs=[x_square_root_property]) >>> tool_node = ToolNode(name="", tool=square_root_tool) >>> flow = Flow( ... name="Compute square root flow", ... start_node=start_node, ... nodes=[start_node, tool_node, end_node], ... control_flow_connections=[ ... ControlFlowEdge(name="start_to_tool", from_node=start_node, to_node=tool_node), ... ControlFlowEdge(name="tool_to_end", from_node=tool_node, to_node=end_node), ... ], ... data_flow_connections=[ ... DataFlowEdge( ... name="x_edge", ... source_node=start_node, ... source_output="x", ... destination_node=tool_node, ... destination_input="x", ... ), ... DataFlowEdge( ... name="x_square_root_edge", ... source_node=tool_node, ... source_output="x_square_root", ... destination_node=end_node, ... destination_input="x_square_root" ... ), ... ], ... )
- Parameters:
id (str) – A unique identifier for this Component
name (str) – Name of this Component
description (str | None) – Optional description of this Component
metadata (Dict[str, Any] | None) – Optional, additional metadata related to this Component
min_agentspec_version (AgentSpecVersionEnum) –
max_agentspec_version (AgentSpecVersionEnum) –
inputs (List[Property] | None) – List of inputs accepted by this component
outputs (List[Property] | None) – List of outputs exposed by this component
branches (List[str]) – The list of outgoing branch names that the node will expose
tool (Tool) – The tool to be executed in this Node