Execution Interrupts#

We often need to interrupt the normal execution of an assistant to perform specific operations including but not limited to:

  • Checking the token count

  • Verifying the runtime

  • Doing step by step debugging

For this reason, we provide execution interrupts, which let developers interact with the assistant’s execution at specific moments.

This page presents all APIs and classes related to execution interrupts in WayFlow.

Execution Interrupt interface#

class wayflowcore.executors.interrupts.executioninterrupt.ExecutionInterrupt(__metadata_info__=None, id=None)#

Execution Interrupts give developers a way to interact with the standard execution of an assistant, offering the chance to stop it when some events are triggered.

Parameters:
  • __metadata_info__ (Dict[str, Any] | None) –

  • id (str | None) –

on_event(event, state, conversation)#

Checks if the interrupt should be triggered at the current state, based on the given execution status. If the interrupt is triggered, this method returns the InterruptedExecutionStatus that should be returned to the execution caller.

Parameters:
  • event (Event) – The current event happening.

  • state (ConversationExecutionState) – The current ConversationExecutionState.

  • conversation (Conversation) –

Returns:

  • An instance of a InterruptedExecutionStatus subclass if the execution should be interrupted.

  • None if the execution can continue.

Return type:

Optional[InterruptedExecutionStatus]

class wayflowcore.executors.interrupts.executioninterrupt.InterruptedExecutionStatus(interrupter, reason, *, id=<factory>, __metadata_info__=<factory>)#

ExecutionStatus type thrown by the interrupts. It contains the ExecutionInterrupt that stopped the execution, and the reason why the execution was stopped.

Parameters:
interrupter: ExecutionInterrupt#
reason: str#

Basic Execution Interrupt classes#

Timeout Execution Interrupt#

class wayflowcore.executors.interrupts.timeoutexecutioninterrupt.SoftTimeoutExecutionInterrupt(timeout=None)#

Execution interrupt that stops the assistant’s execution after a given time limit. This is a soft limit, as it does not force the interruption of the execution at any time. For example: - It does not interrupt the execution of a step (except for FlowExecutionStep) - It does not interrupt the execution of a tool - It does not interrupt LLM models during generation

Parameters:

timeout (Optional[float]) – The timeout in seconds after which the assistant execution should be stopped. Default value set to 600 seconds (10 minutes).

Example

>>> from wayflowcore.agent import Agent
>>> from wayflowcore.executors.interrupts.timeoutexecutioninterrupt import SoftTimeoutExecutionInterrupt
>>> from wayflowcore.executors.executionstatus import ExecutionStatus
>>> from wayflowcore.executors.interrupts.executioninterrupt import InterruptedExecutionStatus
>>> from wayflowcore.models.llmmodelfactory import LlmModelFactory
>>> VLLM_CONFIG = {"model_type": "vllm", "host_port": LLAMA70B_API_ENDPOINT, "model_id": "/storage/models/Llama-3.1-70B-Instruct",}
>>> llm = LlmModelFactory.from_config(VLLM_CONFIG)
>>> assistant = Agent(llm=llm, custom_instruction="You are a helpful assistant")
>>> conversation = assistant.start_conversation()
>>> conversation.append_user_message("Tell me something interesting")
>>> timeout_interrupt = SoftTimeoutExecutionInterrupt(timeout=1)
>>> status = conversation.execute(execution_interrupts=[timeout_interrupt])
>>> isinstance(status, ExecutionStatus) or isinstance(status, InterruptedExecutionStatus)
True

Token Limit Execution Interrupt#

class wayflowcore.executors.interrupts.tokenlimitexecutioninterrupt.SoftTokenLimitExecutionInterrupt(tokens_per_model=None, all_models=None, total_tokens=None, __metadata_info__=None)#

Execution interrupt that stops the assistant’s execution after a given token limit for an LLM is reached. The interrupt counts the number of tokens generated by a given LLM object during the whole conversation. It stops the execution if:

  • Summing the tokens generated by the given list of LLMs, the global number of tokens is reached

  • One of the LLMs reached a given token limit

This is a soft limit, as it does not force the interruption of the execution at any time. For example: - It does not interrupt the execution of a step (except for FlowExecutionStep) - It does not interrupt the execution of a tool - It does not interrupt LLM models during generation

Parameters:
  • tokens_per_model (Optional[Dict[LlmModel, int]]) – Dictionary containing, for each model that has a limit, the maximum number of tokens it is allowed to generate.

  • all_models (Optional[List[LlmModel]]) – List of all the LLMs that should be counted for the global token limit. When a value for this parameter is provided, also total_tokens should be specified.

  • total_tokens (Optional[int]) – Maximum number of tokens that should be globally generated by the LLMs in all_models. When a value for this parameter is provided, also all_models should be specified.

  • __metadata_info__ (Optional[MetadataType]) –

Example

>>> from wayflowcore.agent import Agent
>>> from wayflowcore.executors.interrupts.tokenlimitexecutioninterrupt import SoftTokenLimitExecutionInterrupt
>>> from wayflowcore.executors.executionstatus import ExecutionStatus
>>> from wayflowcore.executors.interrupts.executioninterrupt import InterruptedExecutionStatus
>>> from wayflowcore.models.llmmodelfactory import LlmModelFactory
>>> VLLM_CONFIG = {"model_type": "vllm", "host_port": LLAMA70B_API_ENDPOINT, "model_id": "/storage/models/Llama-3.1-70B-Instruct",}
>>> llm = LlmModelFactory.from_config(VLLM_CONFIG)
>>> assistant = Agent(llm=llm, custom_instruction="You are a helpful assistant")
>>> conversation = assistant.start_conversation()
>>> conversation.append_user_message("Tell me something interesting")
>>> # With the following configuration we set a 100 tokens limit on the `llm` model
>>> token_limit_interrupt = SoftTokenLimitExecutionInterrupt(tokens_per_model={llm: 100})
>>> status = conversation.execute(execution_interrupts=[token_limit_interrupt])

The following configuration is equivalent to the above:

>>> conversation = assistant.start_conversation()
>>> conversation.append_user_message("Tell me something interesting")
>>> token_limit_interrupt = SoftTokenLimitExecutionInterrupt(all_models=[llm], total_tokens=100)
>>> status = conversation.execute(execution_interrupts=[token_limit_interrupt])
>>> isinstance(status, ExecutionStatus) or isinstance(status, InterruptedExecutionStatus)
True