chatflow.base

class agentopera.chatflow.base.ChatAgent(*args, **kwargs)[source]

Bases: ABC, TaskRunner

Protocol for a chat agent.

abstract property name: str

The name of the agent. This is used by team to uniquely identify the agent. It should be unique within the team.

abstract property description: str

The description of the agent. This is used by team to make decisions about which agents to use. The description should describe the agent’s capabilities and how to interact with it.

abstract property produced_message_types: Sequence[type[TextMessage | MultiModalMessage | StopMessage | ToolCallSummaryMessage | HandoffMessage | VercelMessage]]

The types of messages that the agent produces in the Response.chat_message field. They must be ChatMessage types.

abstract async on_messages(messages: Sequence[TextMessage | MultiModalMessage | StopMessage | ToolCallSummaryMessage | HandoffMessage | VercelMessage], cancellation_token: CancellationToken) Response[source]

Handles incoming messages and returns a response.

abstract on_messages_stream(messages: Sequence[TextMessage | MultiModalMessage | StopMessage | ToolCallSummaryMessage | HandoffMessage | VercelMessage], cancellation_token: CancellationToken) AsyncGenerator[ToolCallRequestEvent | ToolCallExecutionEvent | MemoryQueryEvent | UserInputRequestedEvent | ModelClientStreamingChunkEvent | ThoughtEvent | TextMessage | MultiModalMessage | StopMessage | ToolCallSummaryMessage | HandoffMessage | VercelMessage | Response, None][source]

Handles incoming messages and returns a stream of inner messages and and the final item is the response.

abstract async on_reset(cancellation_token: CancellationToken) None[source]

Resets the agent to its initialization state.

abstract async save_state() Mapping[str, Any][source]

Save agent state for later restoration

abstract async load_state(state: Mapping[str, Any]) None[source]

Restore agent from saved state

abstract async close() None[source]

Called when the engine is stopped or any stop method is called

class agentopera.chatflow.base.Response(*, chat_message: TextMessage | MultiModalMessage | StopMessage | ToolCallSummaryMessage | HandoffMessage | VercelMessage, inner_messages: Sequence[ToolCallRequestEvent | ToolCallExecutionEvent | MemoryQueryEvent | UserInputRequestedEvent | ModelClientStreamingChunkEvent | ThoughtEvent | TextMessage | MultiModalMessage | StopMessage | ToolCallSummaryMessage | HandoffMessage | VercelMessage] | None = None)[source]

Bases: object

A response from calling ChatAgent.on_messages().

chat_message: TextMessage | MultiModalMessage | StopMessage | ToolCallSummaryMessage | HandoffMessage | VercelMessage

A chat message produced by the agent as the response.

inner_messages: Sequence[ToolCallRequestEvent | ToolCallExecutionEvent | MemoryQueryEvent | UserInputRequestedEvent | ModelClientStreamingChunkEvent | ThoughtEvent | TextMessage | MultiModalMessage | StopMessage | ToolCallSummaryMessage | HandoffMessage | VercelMessage] | None = None

Inner messages produced by the agent, they can be AgentEvent or ChatMessage.

class agentopera.chatflow.base.Team(*args, **kwargs)[source]

Bases: ABC, TaskRunner

abstract async reset() None[source]

Reset the team and all its participants to its initial state.

abstract async save_state() Mapping[str, Any][source]

Save the current state of the team.

abstract async load_state(state: Mapping[str, Any]) None[source]

Load the state of the team.

exception agentopera.chatflow.base.TerminatedException[source]

Bases: BaseException

class agentopera.chatflow.base.TerminationCondition[source]

Bases: ABC

A stateful condition that determines when a conversation should be terminated.

A termination condition is a callable that takes a sequence of ChatMessage objects since the last time the condition was called, and returns a StopMessage if the conversation should be terminated, or None otherwise. Once a termination condition has been reached, it must be reset before it can be used again.

Termination conditions can be combined using the AND and OR operators.

Example

import asyncio
from agentopera.chatflow.conditions import MaxMessageTermination, TextMentionTermination


async def main() -> None:
    # Terminate the conversation after 10 turns or if the text "TERMINATE" is mentioned.
    cond1 = MaxMessageTermination(10) | TextMentionTermination("TERMINATE")

    # Terminate the conversation after 10 turns and if the text "TERMINATE" is mentioned.
    cond2 = MaxMessageTermination(10) & TextMentionTermination("TERMINATE")

    # ...

    # Reset the termination condition.
    await cond1.reset()
    await cond2.reset()


asyncio.run(main())
abstract property terminated: bool

Check if the termination condition has been reached

abstract async reset() None[source]

Reset the termination condition.

class agentopera.chatflow.base.AndTerminationCondition(*conditions: TerminationCondition)[source]

Bases: TerminationCondition

property terminated: bool

Check if the termination condition has been reached

async reset() None[source]

Reset the termination condition.

class agentopera.chatflow.base.OrTerminationCondition(*conditions: TerminationCondition)[source]

Bases: TerminationCondition

property terminated: bool

Check if the termination condition has been reached

async reset() None[source]

Reset the termination condition.

class agentopera.chatflow.base.TaskResult(messages: Sequence[ToolCallRequestEvent | ToolCallExecutionEvent | MemoryQueryEvent | UserInputRequestedEvent | ModelClientStreamingChunkEvent | ThoughtEvent | TextMessage | MultiModalMessage | StopMessage | ToolCallSummaryMessage | HandoffMessage | VercelMessage], stop_reason: str | None = None)[source]

Bases: object

Result of running a task.

messages: Sequence[ToolCallRequestEvent | ToolCallExecutionEvent | MemoryQueryEvent | UserInputRequestedEvent | ModelClientStreamingChunkEvent | ThoughtEvent | TextMessage | MultiModalMessage | StopMessage | ToolCallSummaryMessage | HandoffMessage | VercelMessage]

Messages produced by the task.

stop_reason: str | None = None

The reason the task stopped.

class agentopera.chatflow.base.TaskRunner(*args, **kwargs)[source]

Bases: Protocol

A task runner.

async run(*, task: str | TextMessage | MultiModalMessage | StopMessage | ToolCallSummaryMessage | HandoffMessage | VercelMessage | Sequence[TextMessage | MultiModalMessage | StopMessage | ToolCallSummaryMessage | HandoffMessage | VercelMessage] | None = None, cancellation_token: CancellationToken | None = None) TaskResult[source]

Run the task and return the result.

The task can be a string, a single message, or a sequence of messages.

The runner is stateful and a subsequent call to this method will continue from where the previous call left off. If the task is not specified, the runner will continue with the current task.

run_stream(*, task: str | TextMessage | MultiModalMessage | StopMessage | ToolCallSummaryMessage | HandoffMessage | VercelMessage | Sequence[TextMessage | MultiModalMessage | StopMessage | ToolCallSummaryMessage | HandoffMessage | VercelMessage] | None = None, cancellation_token: CancellationToken | None = None) AsyncGenerator[ToolCallRequestEvent | ToolCallExecutionEvent | MemoryQueryEvent | UserInputRequestedEvent | ModelClientStreamingChunkEvent | ThoughtEvent | TextMessage | MultiModalMessage | StopMessage | ToolCallSummaryMessage | HandoffMessage | VercelMessage | TaskResult, None][source]

Run the task and produces a stream of messages and the final result TaskResult as the last item in the stream.

The task can be a string, a single message, or a sequence of messages.

The runner is stateful and a subsequent call to this method will continue from where the previous call left off. If the task is not specified, the runner will continue with the current task.

class agentopera.chatflow.base.Handoff(*, target: str, description: str = '', name: str = '', message: str = '')[source]

Bases: BaseModel

Handoff configuration.

target: str

The name of the target agent to handoff to.

description: str

The description of the handoff such as the condition under which it should happen and the target agent’s ability. If not provided, it is generated from the target agent’s name.

name: str

The name of this handoff configuration. If not provided, it is generated from the target agent’s name.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

message: str

The message to the target agent. If not provided, it is generated from the target agent’s name.

classmethod set_defaults(values: Dict[str, Any]) Dict[str, Any][source]
property handoff_tool: BaseTool[BaseModel, BaseModel]

Create a handoff tool from this handoff configuration.