import time
from typing import List, Sequence
from pydantic import BaseModel
from typing_extensions import Self
from ..base import TerminatedException, TerminationCondition
from ..messages import (
AgentEvent,
BaseChatMessage,
ChatMessage,
HandoffMessage,
MultiModalMessage,
StopMessage,
TextMessage,
ToolCallExecutionEvent,
)
[docs]
class StopMessageTermination(TerminationCondition):
"""Terminate the conversation if a StopMessage is received."""
def __init__(self) -> None:
self._terminated = False
@property
def terminated(self) -> bool:
return self._terminated
async def __call__(self, messages: Sequence[AgentEvent | ChatMessage]) -> StopMessage | None:
if self._terminated:
raise TerminatedException("Termination condition has already been reached")
for message in messages:
if isinstance(message, StopMessage):
self._terminated = True
return StopMessage(content="Stop message received", source="StopMessageTermination")
return None
[docs]
async def reset(self) -> None:
self._terminated = False
class MaxMessageTerminationConfig(BaseModel):
max_messages: int
include_agent_event: bool = False
[docs]
class MaxMessageTermination(TerminationCondition):
"""Terminate the conversation after a maximum number of messages have been exchanged.
Args:
max_messages: The maximum number of messages allowed in the conversation.
include_agent_event: If True, include :class:`~agentopera.chatflow.messages.AgentEvent` in the message count.
Otherwise, only include :class:`~agentopera.chatflow.messages.ChatMessage`. Defaults to False.
"""
def __init__(self, max_messages: int, include_agent_event: bool = False) -> None:
self._max_messages = max_messages
self._message_count = 0
self._include_agent_event = include_agent_event
@property
def terminated(self) -> bool:
return self._message_count >= self._max_messages
async def __call__(self, messages: Sequence[AgentEvent | ChatMessage]) -> StopMessage | None:
if self.terminated:
raise TerminatedException("Termination condition has already been reached")
self._message_count += len([m for m in messages if self._include_agent_event or isinstance(m, BaseChatMessage)])
if self._message_count >= self._max_messages:
return StopMessage(
content=f"Maximum number of messages {self._max_messages} reached, current message count: {self._message_count}",
source="MaxMessageTermination",
)
return None
[docs]
async def reset(self) -> None:
self._message_count = 0
class TextMentionTerminationConfig(BaseModel):
text: str
[docs]
class TextMentionTermination(TerminationCondition):
"""Terminate the conversation if a specific text is mentioned.
Args:
text: The text to look for in the messages.
sources: Check only messages of the specified agents for the text to look for.
"""
def __init__(self, text: str, sources: Sequence[str] | None = None) -> None:
self._termination_text = text
self._terminated = False
self._sources = sources
@property
def terminated(self) -> bool:
return self._terminated
async def __call__(self, messages: Sequence[AgentEvent | ChatMessage]) -> StopMessage | None:
if self._terminated:
raise TerminatedException("Termination condition has already been reached")
for message in messages:
if self._sources is not None and message.source not in self._sources:
continue
if isinstance(message.content, str) and self._termination_text in message.content:
self._terminated = True
return StopMessage(
content=f"Text '{self._termination_text}' mentioned", source="TextMentionTermination"
)
elif isinstance(message, MultiModalMessage):
for item in message.content:
if isinstance(item, str) and self._termination_text in item:
self._terminated = True
return StopMessage(
content=f"Text '{self._termination_text}' mentioned", source="TextMentionTermination"
)
return None
[docs]
async def reset(self) -> None:
self._terminated = False
class TokenUsageTerminationConfig(BaseModel):
max_total_token: int | None
max_prompt_token: int | None
max_completion_token: int | None
[docs]
class TokenUsageTermination(TerminationCondition):
"""Terminate the conversation if a token usage limit is reached.
Args:
max_total_token: The maximum total number of tokens allowed in the conversation.
max_prompt_token: The maximum number of prompt tokens allowed in the conversation.
max_completion_token: The maximum number of completion tokens allowed in the conversation.
Raises:
ValueError: If none of max_total_token, max_prompt_token, or max_completion_token is provided.
"""
def __init__(
self,
max_total_token: int | None = None,
max_prompt_token: int | None = None,
max_completion_token: int | None = None,
) -> None:
if max_total_token is None and max_prompt_token is None and max_completion_token is None:
raise ValueError(
"At least one of max_total_token, max_prompt_token, or max_completion_token must be provided"
)
self._max_total_token = max_total_token
self._max_prompt_token = max_prompt_token
self._max_completion_token = max_completion_token
self._total_token_count = 0
self._prompt_token_count = 0
self._completion_token_count = 0
@property
def terminated(self) -> bool:
return (
(self._max_total_token is not None and self._total_token_count >= self._max_total_token)
or (self._max_prompt_token is not None and self._prompt_token_count >= self._max_prompt_token)
or (self._max_completion_token is not None and self._completion_token_count >= self._max_completion_token)
)
async def __call__(self, messages: Sequence[AgentEvent | ChatMessage]) -> StopMessage | None:
if self.terminated:
raise TerminatedException("Termination condition has already been reached")
for message in messages:
if message.models_usage is not None:
self._prompt_token_count += message.models_usage.prompt_tokens
self._completion_token_count += message.models_usage.completion_tokens
self._total_token_count += message.models_usage.prompt_tokens + message.models_usage.completion_tokens
if self.terminated:
content = f"Token usage limit reached, total token count: {self._total_token_count}, prompt token count: {self._prompt_token_count}, completion token count: {self._completion_token_count}."
return StopMessage(content=content, source="TokenUsageTermination")
return None
[docs]
async def reset(self) -> None:
self._total_token_count = 0
self._prompt_token_count = 0
self._completion_token_count = 0
[docs]
class HandoffTermination(TerminationCondition):
"""Terminate the conversation if a :class:`~agentopera.chatflow.messages.HandoffMessage`
with the given target is received.
Args:
target (str): The target of the handoff message.
"""
def __init__(self, target: str) -> None:
self._terminated = False
self._target = target
@property
def terminated(self) -> bool:
return self._terminated
async def __call__(self, messages: Sequence[AgentEvent | ChatMessage]) -> StopMessage | None:
if self._terminated:
raise TerminatedException("Termination condition has already been reached")
for message in messages:
if isinstance(message, HandoffMessage) and message.target == self._target:
self._terminated = True
return StopMessage(
content=f"Handoff to {self._target} from {message.source} detected.", source="HandoffTermination"
)
return None
[docs]
async def reset(self) -> None:
self._terminated = False
class TimeoutTerminationConfig(BaseModel):
timeout_seconds: float
[docs]
class TimeoutTermination(TerminationCondition):
"""Terminate the conversation after a specified duration has passed.
Args:
timeout_seconds: The maximum duration in seconds before terminating the conversation.
"""
def __init__(self, timeout_seconds: float) -> None:
self._timeout_seconds = timeout_seconds
self._start_time = time.monotonic()
self._terminated = False
@property
def terminated(self) -> bool:
return self._terminated
async def __call__(self, messages: Sequence[AgentEvent | ChatMessage]) -> StopMessage | None:
if self._terminated:
raise TerminatedException("Termination condition has already been reached")
if (time.monotonic() - self._start_time) >= self._timeout_seconds:
self._terminated = True
return StopMessage(
content=f"Timeout of {self._timeout_seconds} seconds reached", source="TimeoutTermination"
)
return None
[docs]
async def reset(self) -> None:
self._start_time = time.monotonic()
self._terminated = False
class ExternalTerminationConfig(BaseModel):
pass
[docs]
class ExternalTermination(TerminationCondition):
"""A termination condition that is externally controlled
by calling the :meth:`set` method.
Example:
.. code-block:: python
from agentopera.chatflow.conditions import ExternalTermination
termination = ExternalTermination()
# Run the team in an asyncio task.
...
# Set the termination condition externally
termination.set()
"""
def __init__(self) -> None:
self._terminated = False
self._setted = False
@property
def terminated(self) -> bool:
return self._terminated
[docs]
def set(self) -> None:
"""Set the termination condition to terminated."""
self._setted = True
async def __call__(self, messages: Sequence[AgentEvent | ChatMessage]) -> StopMessage | None:
if self._terminated:
raise TerminatedException("Termination condition has already been reached")
if self._setted:
self._terminated = True
return StopMessage(content="External termination requested", source="ExternalTermination")
return None
[docs]
async def reset(self) -> None:
self._terminated = False
self._setted = False
class SourceMatchTerminationConfig(BaseModel):
sources: List[str]
[docs]
class SourceMatchTermination(TerminationCondition):
"""Terminate the conversation after a specific source responds.
Args:
sources (List[str]): List of source names to terminate the conversation.
Raises:
TerminatedException: If the termination condition has already been reached.
"""
def __init__(self, sources: List[str]) -> None:
self._sources = sources
self._terminated = False
@property
def terminated(self) -> bool:
return self._terminated
async def __call__(self, messages: Sequence[AgentEvent | ChatMessage]) -> StopMessage | None:
if self._terminated:
raise TerminatedException("Termination condition has already been reached")
if not messages:
return None
for message in messages:
if message.source in self._sources:
self._terminated = True
return StopMessage(content=f"'{message.source}' answered", source="SourceMatchTermination")
return None
[docs]
async def reset(self) -> None:
self._terminated = False
class TextMessageTerminationConfig(BaseModel):
"""Configuration for the TextMessageTermination termination condition."""
source: str | None = None
"""The source of the text message to terminate the conversation."""
[docs]
class TextMessageTermination(TerminationCondition):
"""Terminate the conversation if a :class:`~agentopera.chatflow.messages.TextMessage` is received.
This termination condition checks for TextMessage instances in the message sequence. When a TextMessage is found,
it terminates the conversation if either:
- No source was specified (terminates on any TextMessage)
- The message source matches the specified source
Args:
source (str | None, optional): The source name to match against incoming messages. If None, matches any source.
Defaults to None.
"""
def __init__(self, source: str | None = None) -> None:
self._terminated = False
self._source = source
@property
def terminated(self) -> bool:
return self._terminated
async def __call__(self, messages: Sequence[AgentEvent | ChatMessage]) -> StopMessage | None:
if self._terminated:
raise TerminatedException("Termination condition has already been reached")
for message in messages:
if isinstance(message, TextMessage) and (self._source is None or message.source == self._source):
self._terminated = True
return StopMessage(
content=f"Text message received from '{message.source}'", source="TextMessageTermination"
)
return None
[docs]
async def reset(self) -> None:
self._terminated = False
[docs]
class FunctionCallTermination(TerminationCondition):
"""Terminate the conversation if a :class:`~agentopera.core.models.FunctionExecutionResult`
with a specific name was received.
Args:
function_name (str): The name of the function to look for in the messages.
Raises:
TerminatedException: If the termination condition has already been reached.
"""
def __init__(self, function_name: str) -> None:
self._terminated = False
self._function_name = function_name
@property
def terminated(self) -> bool:
return self._terminated
async def __call__(self, messages: Sequence[AgentEvent | ChatMessage]) -> StopMessage | None:
if self._terminated:
raise TerminatedException("Termination condition has already been reached")
for message in messages:
if isinstance(message, ToolCallExecutionEvent):
for execution in message.content:
if execution.name == self._function_name:
self._terminated = True
return StopMessage(
content=f"Function '{self._function_name}' was executed.",
source="FunctionCallTermination",
)
return None
[docs]
async def reset(self) -> None:
self._terminated = False