Execution Interrupts#
We often need to interrupt the normal execution of an assistant to perform specific operations including but not limited to:
Checking the token count
Verifying the runtime
Doing step by step debugging
For this reason, we provide execution interrupts, which let developers interact with the assistant’s execution at specific moments.
This page presents all APIs and classes related to execution interrupts in WayFlow.
Execution Interrupt interface#
- class wayflowcore.executors.interrupts.executioninterrupt.ExecutionInterrupt(__metadata_info__=None, id=None)#
Execution Interrupts give developers a way to interact with the standard execution of an assistant, offering the chance to stop it when some events are triggered.
- Parameters:
__metadata_info__ (Dict[str, Any] | None) –
id (str | None) –
- on_event(event, state, conversation)#
Checks if the interrupt should be triggered at the current state, based on the given execution status. If the interrupt is triggered, this method returns the
InterruptedExecutionStatus
that should be returned to the execution caller.- Parameters:
event (Event) – The current event happening.
state (ConversationExecutionState) – The current
ConversationExecutionState
.conversation (Conversation) –
- Returns:
An instance of a
InterruptedExecutionStatus
subclass if the execution should be interrupted.None
if the execution can continue.
- Return type:
Optional[InterruptedExecutionStatus]
- class wayflowcore.executors.interrupts.executioninterrupt.InterruptedExecutionStatus(interrupter, reason, *, id=<factory>, __metadata_info__=<factory>)#
ExecutionStatus type thrown by the interrupts. It contains the
ExecutionInterrupt
that stopped the execution, and the reason why the execution was stopped.- Parameters:
interrupter (wayflowcore.executors.interrupts.executioninterrupt.ExecutionInterrupt) – The
ExecutionInterrupt
that stopped the execution.reason (str) – Why the execution was stopped.
id (str) –
__metadata_info__ (Dict[str, Any]) –
- interrupter: ExecutionInterrupt#
- reason: str#
Basic Execution Interrupt classes#
Timeout Execution Interrupt#
- class wayflowcore.executors.interrupts.timeoutexecutioninterrupt.SoftTimeoutExecutionInterrupt(timeout=None)#
Execution interrupt that stops the assistant’s execution after a given time limit. This is a soft limit, as it does not force the interruption of the execution at any time. For example: - It does not interrupt the execution of a step (except for
FlowExecutionStep
) - It does not interrupt the execution of a tool - It does not interrupt LLM models during generation- Parameters:
timeout (Optional[float]) – The timeout in seconds after which the assistant execution should be stopped. Default value set to 600 seconds (10 minutes).
Example
>>> from wayflowcore.agent import Agent >>> from wayflowcore.executors.interrupts.timeoutexecutioninterrupt import SoftTimeoutExecutionInterrupt >>> from wayflowcore.executors.executionstatus import ExecutionStatus >>> from wayflowcore.executors.interrupts.executioninterrupt import InterruptedExecutionStatus >>> from wayflowcore.models.llmmodelfactory import LlmModelFactory >>> VLLM_CONFIG = {"model_type": "vllm", "host_port": LLAMA70B_API_ENDPOINT, "model_id": "/storage/models/Llama-3.1-70B-Instruct",} >>> llm = LlmModelFactory.from_config(VLLM_CONFIG) >>> assistant = Agent(llm=llm, custom_instruction="You are a helpful assistant") >>> conversation = assistant.start_conversation() >>> conversation.append_user_message("Tell me something interesting") >>> timeout_interrupt = SoftTimeoutExecutionInterrupt(timeout=1) >>> status = conversation.execute(execution_interrupts=[timeout_interrupt]) >>> isinstance(status, ExecutionStatus) or isinstance(status, InterruptedExecutionStatus) True
Token Limit Execution Interrupt#
- class wayflowcore.executors.interrupts.tokenlimitexecutioninterrupt.SoftTokenLimitExecutionInterrupt(tokens_per_model=None, all_models=None, total_tokens=None, __metadata_info__=None)#
Execution interrupt that stops the assistant’s execution after a given token limit for an LLM is reached. The interrupt counts the number of tokens generated by a given LLM object during the whole conversation. It stops the execution if:
Summing the tokens generated by the given list of LLMs, the global number of tokens is reached
One of the LLMs reached a given token limit
This is a soft limit, as it does not force the interruption of the execution at any time. For example: - It does not interrupt the execution of a step (except for
FlowExecutionStep
) - It does not interrupt the execution of a tool - It does not interrupt LLM models during generation- Parameters:
tokens_per_model (Optional[Dict[LlmModel, int]]) – Dictionary containing, for each model that has a limit, the maximum number of tokens it is allowed to generate.
all_models (Optional[List[LlmModel]]) – List of all the LLMs that should be counted for the global token limit. When a value for this parameter is provided, also total_tokens should be specified.
total_tokens (Optional[int]) – Maximum number of tokens that should be globally generated by the LLMs in
all_models
. When a value for this parameter is provided, alsoall_models
should be specified.__metadata_info__ (Optional[MetadataType]) –
Example
>>> from wayflowcore.agent import Agent >>> from wayflowcore.executors.interrupts.tokenlimitexecutioninterrupt import SoftTokenLimitExecutionInterrupt >>> from wayflowcore.executors.executionstatus import ExecutionStatus >>> from wayflowcore.executors.interrupts.executioninterrupt import InterruptedExecutionStatus >>> from wayflowcore.models.llmmodelfactory import LlmModelFactory >>> VLLM_CONFIG = {"model_type": "vllm", "host_port": LLAMA70B_API_ENDPOINT, "model_id": "/storage/models/Llama-3.1-70B-Instruct",} >>> llm = LlmModelFactory.from_config(VLLM_CONFIG) >>> assistant = Agent(llm=llm, custom_instruction="You are a helpful assistant") >>> conversation = assistant.start_conversation() >>> conversation.append_user_message("Tell me something interesting") >>> # With the following configuration we set a 100 tokens limit on the `llm` model >>> token_limit_interrupt = SoftTokenLimitExecutionInterrupt(tokens_per_model={llm: 100}) >>> status = conversation.execute(execution_interrupts=[token_limit_interrupt])
The following configuration is equivalent to the above:
>>> conversation = assistant.start_conversation() >>> conversation.append_user_message("Tell me something interesting") >>> token_limit_interrupt = SoftTokenLimitExecutionInterrupt(all_models=[llm], total_tokens=100) >>> status = conversation.execute(execution_interrupts=[token_limit_interrupt]) >>> isinstance(status, ExecutionStatus) or isinstance(status, InterruptedExecutionStatus) True