engine.agent

class agentopera.engine.agent.Agent(*args, **kwargs)[source]

Bases: Protocol

property metadata: AgentMetadata

Metadata of the agent.

property id: AgentId

ID of the agent.

async on_message(message: Any, ctx: MessageContext) Any[source]

Message handler for the agent. This should only be called by the engine, not by other agents.

Parameters:
  • message (Any) – Received message. Type is one of the types in subscriptions.

  • ctx (MessageContext) – Context of the message.

Returns:

Response to the message. Can be None.

Return type:

Any

Raises:
  • asyncio.CancelledError – If the message was cancelled.

  • CantHandleException – If the agent cannot handle the message.

async save_state() Mapping[str, Any][source]

Save the state of the agent. The result must be JSON serializable.

async load_state(state: Mapping[str, Any]) None[source]

Load in the state of the agent obtained from save_state.

Parameters:

state (Mapping[str, Any]) – State of the agent. Must be JSON serializable.

async close() None[source]

Called when the engine is closed

class agentopera.engine.agent.BaseAgent(description: str)[source]

Bases: ABC, Agent

property metadata: AgentMetadata

Metadata of the agent.

property type: str
property id: AgentId

ID of the agent.

property engine: AgentEngine
final async on_message(message: Any, ctx: MessageContext) Any[source]

Message handler for the agent. This should only be called by the engine, not by other agents.

Parameters:
  • message (Any) – Received message. Type is one of the types in subscriptions.

  • ctx (MessageContext) – Context of the message.

Returns:

Response to the message. Can be None.

Return type:

Any

Raises:
  • asyncio.CancelledError – If the message was cancelled.

  • CantHandleException – If the agent cannot handle the message.

abstract async on_message_impl(message: Any, ctx: MessageContext) Any[source]
async send_message(message: Any, recipient: AgentId, *, cancellation_token: CancellationToken | None = None, message_id: str | None = None) Any[source]

See agentopera.core.AgentEngine.send_message() for more information.

async publish_message(message: Any, message_channel: MessageChannel, *, cancellation_token: CancellationToken | None = None, message_id: str | None = None) None[source]
async save_state() Mapping[str, Any][source]

Save the state of the agent. The result must be JSON serializable.

async load_state(state: Mapping[str, Any]) None[source]

Load in the state of the agent obtained from save_state.

Parameters:

state (Mapping[str, Any]) – State of the agent. Must be JSON serializable.

async close() None[source]

Called when the engine is closed

async classmethod register(engine: AgentEngine, type: str, agent_builder: Callable[[], Self | Awaitable[Self]], *, skip_class_subscriptions: bool = False, skip_direct_message_subscription: bool = False) AgentType[source]

Register a virtual subclass of an ABC.

Returns the subclass, to allow usage as a class decorator.

class agentopera.engine.agent.ClosureAgent(description: str, closure: Callable[[ClosureContext, T, MessageContext], Awaitable[Any]], *, unknown_type_policy: Literal['error', 'warn', 'ignore'] = 'warn')[source]

Bases: BaseAgent, ClosureContext

property metadata: AgentMetadata

Metadata of the agent.

property id: AgentId

ID of the agent.

property engine: AgentEngine
async on_message_impl(message: Any, ctx: MessageContext) Any[source]
async save_state() Mapping[str, Any][source]

Closure agents do not have state. So this method always returns an empty dictionary.

async load_state(state: Mapping[str, Any]) None[source]

Closure agents do not have state. So this method does nothing.

async classmethod register_closure(engine: AgentEngine, type: str, closure: Callable[[ClosureContext, T, MessageContext], Awaitable[Any]], *, unknown_type_policy: Literal['error', 'warn', 'ignore'] = 'warn', skip_direct_message_subscription: bool = False, description: str = '', subscriptions: Callable[[], list[Subscription] | Awaitable[list[Subscription]]] | None = None) AgentType[source]

The closure agent allows you to define an agent using a closure, or function without needing to define a class. It allows values to be extracted out of the engine.

The closure can define the type of message which is expected, or Any can be used to accept any type of message.

Example:

import asyncio
from agentopera.engine import LocalAgentEngine, MessageContext, ClosureAgent, ClosureContext
from dataclasses import dataclass

from agentopera.engine.protocol.subscription.default_subscription import DefaultSubscription
from agentopera.engine.protocol.subscription.default_topic import DefaultTopicId


@dataclass
class MyMessage:
    content: str


async def main():
    queue = asyncio.Queue[MyMessage]()

    async def output_result(_ctx: ClosureContext, message: MyMessage, ctx: MessageContext) -> None:
        await queue.put(message)

    engine = LocalAgentEngine()
    await ClosureAgent.register_closure(
        engine, "output_result", output_result, subscriptions=lambda: [DefaultSubscription()]
    )

    engine.start()
    await engine.publish_message(MyMessage("Hello, world!"), DefaultTopicId())
    await engine.stop_when_idle()

    result = await queue.get()
    print(result)


asyncio.run(main())
Parameters:
  • engine (AgentEngine) – Engine to register the agent to

  • type (str) – Agent type of registered agent

  • closure (Callable[[ClosureContext, T, MessageContext], Awaitable[Any]]) – Closure to handle messages

  • unknown_type_policy (Literal["error", "warn", "ignore"], optional) – What to do if a type is encountered that does not match the closure type. Defaults to “warn”.

  • skip_direct_message_subscription (bool, optional) – Do not add direct message subscription for this agent. Defaults to False.

  • description (str, optional) – Description of what agent does. Defaults to “”.

  • subscriptions (Callable[[], list[Subscription] | Awaitable[list[Subscription]]] | None, optional) – List of subscriptions for this closure agent. Defaults to None.

Returns:

Type of the agent that was registered

Return type:

AgentType

class agentopera.engine.agent.ClosureContext(*args, **kwargs)[source]

Bases: Protocol

property id: AgentId
async send_message(message: Any, recipient: AgentId, *, cancellation_token: CancellationToken | None = None, message_id: str | None = None) Any[source]
async publish_message(message: Any, message_channel: MessageChannel, *, cancellation_token: CancellationToken | None = None) None[source]
class agentopera.engine.agent.RoutedAgent(description: str)[source]

Bases: BaseAgent

A base class for agents that route messages to handlers based on the type of the message and optional matching functions.

To create a routed agent, subclass this class and add message handlers as methods decorated with either event() or rpc() decorator.

Example:

from dataclasses import dataclass
from agentopera.core import MessageContext
from agentopera.core import RoutedAgent, event, rpc


@dataclass
class Message:
    pass


@dataclass
class MessageWithContent:
    content: str


@dataclass
class Response:
    pass


class MyAgent(RoutedAgent):
    def __init__(self):
        super().__init__("MyAgent")

    @event
    async def handle_event_message(self, message: Message, ctx: MessageContext) -> None:
        assert ctx.message_channel is not None
        await self.publish_message(MessageWithContent("event handled"), ctx.message_channel)

    @rpc(match=lambda message, ctx: message.content == "special")  # type: ignore
    async def handle_special_rpc_message(self, message: MessageWithContent, ctx: MessageContext) -> Response:
        return Response()
property cancel_token: CancellationToken
trigger_transient_cancel(delay: float = 1.5) None[source]

Triggers the cancel, then resets the token after delay seconds.

async on_message_impl(message: Any, ctx: MessageContext) Any | None[source]

Handle a message by routing it to the appropriate message handler. Do not override this method in subclasses. Instead, add message handlers as methods decorated with either the event() or rpc() decorator.

async on_unhandled_message(message: Any, ctx: MessageContext) None[source]

Called when a message is received that does not have a matching message handler. The default implementation logs an info message.

agentopera.engine.agent.event(func: None | Callable[[AgentT, ReceivesT, MessageContext], Coroutine[Any, Any, None]] = None, *, strict: bool = True, match: None | Callable[[ReceivesT, MessageContext], bool] = None) Callable[[Callable[[AgentT, ReceivesT, MessageContext], Coroutine[Any, Any, None]]], MessageHandler[AgentT, ReceivesT, None]] | MessageHandler[AgentT, ReceivesT, None][source]

Decorator for event message handlers.

Add this decorator to methods in a RoutedAgent class that are intended to handle event messages. These methods must have a specific signature that needs to be followed for it to be valid:

  • The method must be an async method.

  • The method must be decorated with the @message_handler decorator.

  • The method must have exactly 3 arguments:
    1. self

    2. message: The event message to be handled, this must be type-hinted with the message type that it is intended to handle.

    3. ctx: A agentopera.core.MessageContext object.

  • The method must return None.

Handlers can handle more than one message type by accepting a Union of the message types.

Parameters:
  • func – The function to be decorated.

  • strict – If True, the handler will raise an exception if the message type is not in the target types. If False, it will log a warning instead.

  • match – A function that takes the message and the context as arguments and returns a boolean. This is used for secondary routing after the message type. For handlers addressing the same message type, the match function is applied in alphabetical order of the handlers and the first matching handler will be called while the rest are skipped. If None, the first handler in alphabetical order matching the same message type will be called.

agentopera.engine.agent.message_handler(func: None | Callable[[AgentT, ReceivesT, MessageContext], Coroutine[Any, Any, ProducesT]] = None, *, strict: bool = True, match: None | Callable[[ReceivesT, MessageContext], bool] = None) Callable[[Callable[[AgentT, ReceivesT, MessageContext], Coroutine[Any, Any, ProducesT]]], MessageHandler[AgentT, ReceivesT, ProducesT]] | MessageHandler[AgentT, ReceivesT, ProducesT][source]

Decorator for generic message handlers.

Add this decorator to methods in a RoutedAgent class that are intended to handle both event and RPC messages. These methods must have a specific signature that needs to be followed for it to be valid:

  • The method must be an async method.

  • The method must be decorated with the @message_handler decorator.

  • The method must have exactly 3 arguments:
    1. self

    2. message: The message to be handled, this must be type-hinted with the message type that it is intended to handle.

    3. ctx: A agentopera.core.MessageContext object.

  • The method must be type hinted with what message types it can return as a response, or it can return None if it does not return anything.

Handlers can handle more than one message type by accepting a Union of the message types. It can also return more than one message type by returning a Union of the message types.

Parameters:
  • func – The function to be decorated.

  • strict – If True, the handler will raise an exception if the message type or return type is not in the target types. If False, it will log a warning instead.

  • match – A function that takes the message and the context as arguments and returns a boolean. This is used for secondary routing after the message type. For handlers addressing the same message type, the match function is applied in alphabetical order of the handlers and the first matching handler will be called while the rest are skipped. If None, the first handler in alphabetical order matching the same message type will be called.

agentopera.engine.agent.rpc(func: None | Callable[[AgentT, ReceivesT, MessageContext], Coroutine[Any, Any, ProducesT]] = None, *, strict: bool = True, match: None | Callable[[ReceivesT, MessageContext], bool] = None) Callable[[Callable[[AgentT, ReceivesT, MessageContext], Coroutine[Any, Any, ProducesT]]], MessageHandler[AgentT, ReceivesT, ProducesT]] | MessageHandler[AgentT, ReceivesT, ProducesT][source]

Decorator for RPC message handlers.

Add this decorator to methods in a RoutedAgent class that are intended to handle RPC messages. These methods must have a specific signature that needs to be followed for it to be valid:

  • The method must be an async method.

  • The method must be decorated with the @message_handler decorator.

  • The method must have exactly 3 arguments:
    1. self

    2. message: The message to be handled, this must be type-hinted with the message type that it is intended to handle.

    3. ctx: A agentopera.core.MessageContext object.

  • The method must be type hinted with what message types it can return as a response, or it can return None if it does not return anything.

Handlers can handle more than one message type by accepting a Union of the message types. It can also return more than one message type by returning a Union of the message types.

Parameters:
  • func – The function to be decorated.

  • strict – If True, the handler will raise an exception if the message type or return type is not in the target types. If False, it will log a warning instead.

  • match – A function that takes the message and the context as arguments and returns a boolean. This is used for secondary routing after the message type. For handlers addressing the same message type, the match function is applied in alphabetical order of the handlers and the first matching handler will be called while the rest are skipped. If None, the first handler in alphabetical order matching the same message type will be called.

class agentopera.engine.agent.ToolAgent(description: str, tools: List[Tool])[source]

Bases: RoutedAgent

A tool agent accepts direct messages of the type FunctionCall, executes the requested tool with the provided arguments, and returns the result as FunctionExecutionResult messages.

Parameters:
  • description (str) – The description of the agent.

  • tools (List[Tool]) – The list of tools that the agent can execute.

property tools: List[Tool]
async handle_function_call(message: FunctionCall, ctx: MessageContext) FunctionExecutionResult[source]

Handles a FunctionCall message by executing the requested tool with the provided arguments.

Parameters:
Returns:

The result of the function execution.

Return type:

FunctionExecutionResult

Raises:
  • ToolNotFoundException – If the tool is not found.

  • InvalidToolArgumentsException – If the tool arguments are invalid.

  • ToolExecutionException – If the tool execution fails.