chatflow.base
- class agentopera.chatflow.base.ChatAgent(*args, **kwargs)[source]
Bases:
ABC,TaskRunnerProtocol for a chat agent.
- abstract property name: str
The name of the agent. This is used by team to uniquely identify the agent. It should be unique within the team.
- abstract property description: str
The description of the agent. This is used by team to make decisions about which agents to use. The description should describe the agent’s capabilities and how to interact with it.
- abstract property produced_message_types: Sequence[type[TextMessage | MultiModalMessage | StopMessage | ToolCallSummaryMessage | HandoffMessage | VercelMessage]]
The types of messages that the agent produces in the
Response.chat_messagefield. They must beChatMessagetypes.
- abstract async on_messages(messages: Sequence[TextMessage | MultiModalMessage | StopMessage | ToolCallSummaryMessage | HandoffMessage | VercelMessage], cancellation_token: CancellationToken) Response[source]
Handles incoming messages and returns a response.
- abstract on_messages_stream(messages: Sequence[TextMessage | MultiModalMessage | StopMessage | ToolCallSummaryMessage | HandoffMessage | VercelMessage], cancellation_token: CancellationToken) AsyncGenerator[ToolCallRequestEvent | ToolCallExecutionEvent | MemoryQueryEvent | UserInputRequestedEvent | ModelClientStreamingChunkEvent | ThoughtEvent | TextMessage | MultiModalMessage | StopMessage | ToolCallSummaryMessage | HandoffMessage | VercelMessage | Response, None][source]
Handles incoming messages and returns a stream of inner messages and and the final item is the response.
- abstract async on_reset(cancellation_token: CancellationToken) None[source]
Resets the agent to its initialization state.
- class agentopera.chatflow.base.Response(*, chat_message: TextMessage | MultiModalMessage | StopMessage | ToolCallSummaryMessage | HandoffMessage | VercelMessage, inner_messages: Sequence[ToolCallRequestEvent | ToolCallExecutionEvent | MemoryQueryEvent | UserInputRequestedEvent | ModelClientStreamingChunkEvent | ThoughtEvent | TextMessage | MultiModalMessage | StopMessage | ToolCallSummaryMessage | HandoffMessage | VercelMessage] | None = None)[source]
Bases:
objectA response from calling
ChatAgent.on_messages().- chat_message: TextMessage | MultiModalMessage | StopMessage | ToolCallSummaryMessage | HandoffMessage | VercelMessage
A chat message produced by the agent as the response.
- inner_messages: Sequence[ToolCallRequestEvent | ToolCallExecutionEvent | MemoryQueryEvent | UserInputRequestedEvent | ModelClientStreamingChunkEvent | ThoughtEvent | TextMessage | MultiModalMessage | StopMessage | ToolCallSummaryMessage | HandoffMessage | VercelMessage] | None = None
Inner messages produced by the agent, they can be
AgentEventorChatMessage.
- class agentopera.chatflow.base.Team(*args, **kwargs)[source]
Bases:
ABC,TaskRunner
- class agentopera.chatflow.base.TerminationCondition[source]
Bases:
ABCA stateful condition that determines when a conversation should be terminated.
A termination condition is a callable that takes a sequence of ChatMessage objects since the last time the condition was called, and returns a StopMessage if the conversation should be terminated, or None otherwise. Once a termination condition has been reached, it must be reset before it can be used again.
Termination conditions can be combined using the AND and OR operators.
Example
import asyncio from agentopera.chatflow.conditions import MaxMessageTermination, TextMentionTermination async def main() -> None: # Terminate the conversation after 10 turns or if the text "TERMINATE" is mentioned. cond1 = MaxMessageTermination(10) | TextMentionTermination("TERMINATE") # Terminate the conversation after 10 turns and if the text "TERMINATE" is mentioned. cond2 = MaxMessageTermination(10) & TextMentionTermination("TERMINATE") # ... # Reset the termination condition. await cond1.reset() await cond2.reset() asyncio.run(main())
- abstract property terminated: bool
Check if the termination condition has been reached
- class agentopera.chatflow.base.AndTerminationCondition(*conditions: TerminationCondition)[source]
Bases:
TerminationCondition- property terminated: bool
Check if the termination condition has been reached
- class agentopera.chatflow.base.OrTerminationCondition(*conditions: TerminationCondition)[source]
Bases:
TerminationCondition- property terminated: bool
Check if the termination condition has been reached
- class agentopera.chatflow.base.TaskResult(messages: Sequence[ToolCallRequestEvent | ToolCallExecutionEvent | MemoryQueryEvent | UserInputRequestedEvent | ModelClientStreamingChunkEvent | ThoughtEvent | TextMessage | MultiModalMessage | StopMessage | ToolCallSummaryMessage | HandoffMessage | VercelMessage], stop_reason: str | None = None)[source]
Bases:
objectResult of running a task.
- messages: Sequence[ToolCallRequestEvent | ToolCallExecutionEvent | MemoryQueryEvent | UserInputRequestedEvent | ModelClientStreamingChunkEvent | ThoughtEvent | TextMessage | MultiModalMessage | StopMessage | ToolCallSummaryMessage | HandoffMessage | VercelMessage]
Messages produced by the task.
- stop_reason: str | None = None
The reason the task stopped.
- class agentopera.chatflow.base.TaskRunner(*args, **kwargs)[source]
Bases:
ProtocolA task runner.
- async run(*, task: str | TextMessage | MultiModalMessage | StopMessage | ToolCallSummaryMessage | HandoffMessage | VercelMessage | Sequence[TextMessage | MultiModalMessage | StopMessage | ToolCallSummaryMessage | HandoffMessage | VercelMessage] | None = None, cancellation_token: CancellationToken | None = None) TaskResult[source]
Run the task and return the result.
The task can be a string, a single message, or a sequence of messages.
The runner is stateful and a subsequent call to this method will continue from where the previous call left off. If the task is not specified, the runner will continue with the current task.
- run_stream(*, task: str | TextMessage | MultiModalMessage | StopMessage | ToolCallSummaryMessage | HandoffMessage | VercelMessage | Sequence[TextMessage | MultiModalMessage | StopMessage | ToolCallSummaryMessage | HandoffMessage | VercelMessage] | None = None, cancellation_token: CancellationToken | None = None) AsyncGenerator[ToolCallRequestEvent | ToolCallExecutionEvent | MemoryQueryEvent | UserInputRequestedEvent | ModelClientStreamingChunkEvent | ThoughtEvent | TextMessage | MultiModalMessage | StopMessage | ToolCallSummaryMessage | HandoffMessage | VercelMessage | TaskResult, None][source]
Run the task and produces a stream of messages and the final result
TaskResultas the last item in the stream.The task can be a string, a single message, or a sequence of messages.
The runner is stateful and a subsequent call to this method will continue from where the previous call left off. If the task is not specified, the runner will continue with the current task.
- class agentopera.chatflow.base.Handoff(*, target: str, description: str = '', name: str = '', message: str = '')[source]
Bases:
BaseModelHandoff configuration.
- target: str
The name of the target agent to handoff to.
- description: str
The description of the handoff such as the condition under which it should happen and the target agent’s ability. If not provided, it is generated from the target agent’s name.
- name: str
The name of this handoff configuration. If not provided, it is generated from the target agent’s name.
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- message: str
The message to the target agent. If not provided, it is generated from the target agent’s name.