chatflow.base
- class agentopera.chatflow.base.ChatAgent(*args, **kwargs)[source]
Bases:
ABC
,TaskRunner
Protocol for a chat agent.
- abstract property name: str
The name of the agent. This is used by team to uniquely identify the agent. It should be unique within the team.
- abstract property description: str
The description of the agent. This is used by team to make decisions about which agents to use. The description should describe the agent’s capabilities and how to interact with it.
- abstract property produced_message_types: Sequence[type[TextMessage | MultiModalMessage | StopMessage | ToolCallSummaryMessage | HandoffMessage | VercelMessage]]
The types of messages that the agent produces in the
Response.chat_message
field. They must beChatMessage
types.
- abstract async on_messages(messages: Sequence[TextMessage | MultiModalMessage | StopMessage | ToolCallSummaryMessage | HandoffMessage | VercelMessage], cancellation_token: CancellationToken) Response [source]
Handles incoming messages and returns a response.
- abstract on_messages_stream(messages: Sequence[TextMessage | MultiModalMessage | StopMessage | ToolCallSummaryMessage | HandoffMessage | VercelMessage], cancellation_token: CancellationToken) AsyncGenerator[ToolCallRequestEvent | ToolCallExecutionEvent | MemoryQueryEvent | UserInputRequestedEvent | ModelClientStreamingChunkEvent | ThoughtEvent | TextMessage | MultiModalMessage | StopMessage | ToolCallSummaryMessage | HandoffMessage | VercelMessage | Response, None] [source]
Handles incoming messages and returns a stream of inner messages and and the final item is the response.
- abstract async on_reset(cancellation_token: CancellationToken) None [source]
Resets the agent to its initialization state.
- class agentopera.chatflow.base.Response(*, chat_message: TextMessage | MultiModalMessage | StopMessage | ToolCallSummaryMessage | HandoffMessage | VercelMessage, inner_messages: Sequence[ToolCallRequestEvent | ToolCallExecutionEvent | MemoryQueryEvent | UserInputRequestedEvent | ModelClientStreamingChunkEvent | ThoughtEvent | TextMessage | MultiModalMessage | StopMessage | ToolCallSummaryMessage | HandoffMessage | VercelMessage] | None = None)[source]
Bases:
object
A response from calling
ChatAgent.on_messages()
.- chat_message: TextMessage | MultiModalMessage | StopMessage | ToolCallSummaryMessage | HandoffMessage | VercelMessage
A chat message produced by the agent as the response.
- inner_messages: Sequence[ToolCallRequestEvent | ToolCallExecutionEvent | MemoryQueryEvent | UserInputRequestedEvent | ModelClientStreamingChunkEvent | ThoughtEvent | TextMessage | MultiModalMessage | StopMessage | ToolCallSummaryMessage | HandoffMessage | VercelMessage] | None = None
Inner messages produced by the agent, they can be
AgentEvent
orChatMessage
.
- class agentopera.chatflow.base.Team(*args, **kwargs)[source]
Bases:
ABC
,TaskRunner
- class agentopera.chatflow.base.TerminationCondition[source]
Bases:
ABC
A stateful condition that determines when a conversation should be terminated.
A termination condition is a callable that takes a sequence of ChatMessage objects since the last time the condition was called, and returns a StopMessage if the conversation should be terminated, or None otherwise. Once a termination condition has been reached, it must be reset before it can be used again.
Termination conditions can be combined using the AND and OR operators.
Example
import asyncio from agentopera.chatflow.conditions import MaxMessageTermination, TextMentionTermination async def main() -> None: # Terminate the conversation after 10 turns or if the text "TERMINATE" is mentioned. cond1 = MaxMessageTermination(10) | TextMentionTermination("TERMINATE") # Terminate the conversation after 10 turns and if the text "TERMINATE" is mentioned. cond2 = MaxMessageTermination(10) & TextMentionTermination("TERMINATE") # ... # Reset the termination condition. await cond1.reset() await cond2.reset() asyncio.run(main())
- abstract property terminated: bool
Check if the termination condition has been reached
- class agentopera.chatflow.base.AndTerminationCondition(*conditions: TerminationCondition)[source]
Bases:
TerminationCondition
- property terminated: bool
Check if the termination condition has been reached
- class agentopera.chatflow.base.OrTerminationCondition(*conditions: TerminationCondition)[source]
Bases:
TerminationCondition
- property terminated: bool
Check if the termination condition has been reached
- class agentopera.chatflow.base.TaskResult(messages: Sequence[ToolCallRequestEvent | ToolCallExecutionEvent | MemoryQueryEvent | UserInputRequestedEvent | ModelClientStreamingChunkEvent | ThoughtEvent | TextMessage | MultiModalMessage | StopMessage | ToolCallSummaryMessage | HandoffMessage | VercelMessage], stop_reason: str | None = None)[source]
Bases:
object
Result of running a task.
- messages: Sequence[ToolCallRequestEvent | ToolCallExecutionEvent | MemoryQueryEvent | UserInputRequestedEvent | ModelClientStreamingChunkEvent | ThoughtEvent | TextMessage | MultiModalMessage | StopMessage | ToolCallSummaryMessage | HandoffMessage | VercelMessage]
Messages produced by the task.
- stop_reason: str | None = None
The reason the task stopped.
- class agentopera.chatflow.base.TaskRunner(*args, **kwargs)[source]
Bases:
Protocol
A task runner.
- async run(*, task: str | TextMessage | MultiModalMessage | StopMessage | ToolCallSummaryMessage | HandoffMessage | VercelMessage | Sequence[TextMessage | MultiModalMessage | StopMessage | ToolCallSummaryMessage | HandoffMessage | VercelMessage] | None = None, cancellation_token: CancellationToken | None = None) TaskResult [source]
Run the task and return the result.
The task can be a string, a single message, or a sequence of messages.
The runner is stateful and a subsequent call to this method will continue from where the previous call left off. If the task is not specified, the runner will continue with the current task.
- run_stream(*, task: str | TextMessage | MultiModalMessage | StopMessage | ToolCallSummaryMessage | HandoffMessage | VercelMessage | Sequence[TextMessage | MultiModalMessage | StopMessage | ToolCallSummaryMessage | HandoffMessage | VercelMessage] | None = None, cancellation_token: CancellationToken | None = None) AsyncGenerator[ToolCallRequestEvent | ToolCallExecutionEvent | MemoryQueryEvent | UserInputRequestedEvent | ModelClientStreamingChunkEvent | ThoughtEvent | TextMessage | MultiModalMessage | StopMessage | ToolCallSummaryMessage | HandoffMessage | VercelMessage | TaskResult, None] [source]
Run the task and produces a stream of messages and the final result
TaskResult
as the last item in the stream.The task can be a string, a single message, or a sequence of messages.
The runner is stateful and a subsequent call to this method will continue from where the previous call left off. If the task is not specified, the runner will continue with the current task.
- class agentopera.chatflow.base.Handoff(*, target: str, description: str = '', name: str = '', message: str = '')[source]
Bases:
BaseModel
Handoff configuration.
- target: str
The name of the target agent to handoff to.
- description: str
The description of the handoff such as the condition under which it should happen and the target agent’s ability. If not provided, it is generated from the target agent’s name.
- name: str
The name of this handoff configuration. If not provided, it is generated from the target agent’s name.
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- message: str
The message to the target agent. If not provided, it is generated from the target agent’s name.