Source code for agentopera.engine.runtime.agent_engine

from __future__ import annotations

from collections.abc import Sequence
from typing import Any, Awaitable, Callable, Mapping, Protocol, Type, TypeVar, overload, runtime_checkable

from ..types.agent.agent import Agent
from ..types.agent import AgentId, AgentType, AgentMetadata
from ..types.agent.cancellation_token import CancellationToken
from ..serialization import MessageSerializer
from ..subscription.subscription import Subscription
from ..types.msg_channel import MessageChannel

# Undeliverable - error

T = TypeVar("T", bound=Agent)


[docs] @runtime_checkable class AgentEngine(Protocol):
[docs] async def send_message( self, message: Any, recipient: AgentId, *, sender: AgentId | None = None, cancellation_token: CancellationToken | None = None, message_id: str | None = None, ) -> Any: """Send a message to an agent and get a response. Args: message (Any): The message to send. recipient (AgentId): The agent to send the message to. sender (AgentId | None, optional): Agent which sent the message. Should **only** be None if this was sent from no agent, such as directly to the engine externally. Defaults to None. cancellation_token (CancellationToken | None, optional): Token used to cancel an in progress . Defaults to None. Raises: CantHandleException: If the recipient cannot handle the message. UndeliverableException: If the message cannot be delivered. Other: Any other exception raised by the recipient. Returns: Any: The response from the agent. """ ...
[docs] async def publish_message( self, message: Any, message_channel: MessageChannel, *, sender: AgentId | None = None, cancellation_token: CancellationToken | None = None, message_id: str | None = None, ) -> None: """Publish a message to all agents in the given namespace, or if no namespace is provided, the namespace of the sender. No responses are expected from publishing. Args: message (Any): The message to publish. message_channel (MessageChannel): The message channel to publish the message to. sender (AgentId | None, optional): The agent which sent the message. Defaults to None. cancellation_token (CancellationToken | None, optional): Token used to cancel an in progress. Defaults to None. message_id (str | None, optional): The message id. If None, a new message id will be generated. Defaults to None. This message id must be unique. and is recommended to be a UUID. Raises: UndeliverableException: If the message cannot be delivered. """ ...
[docs] async def register_agent_builder( self, type: str | AgentType, agent_builder: Callable[[], T | Awaitable[T]], *, expected_class: type[T] | None = None, ) -> AgentType: """Register an agent factory with the engine associated with a specific type. The type must be unique. This API does not add any subscriptions. .. note:: This is a low level API and usually the agent class's `register` method should be used instead, as this also handles subscriptions automatically. Example: .. code-block:: python from dataclasses import dataclass from agentopera.core import AgentEngine, MessageContext, RoutedAgent, event from agentopera.core.types.models import UserMessage @dataclass class MyMessage: content: str class MyAgent(RoutedAgent): def __init__(self) -> None: super().__init__("My core agent") @event async def handler(self, message: UserMessage, context: MessageContext) -> None: print("Event received: ", message.content) async def my_agent_factory(): return MyAgent() async def main() -> None: engine: AgentEngine = ... # type: ignore await engine.register_agent_builder("my_agent", lambda: MyAgent()) import asyncio asyncio.run(main()) Args: type (str): The type of agent this factory creates. It is not the same as agent class name. The `type` parameter is used to differentiate between different factory functions rather than agent classes. agent_factory (Callable[[], T]): The factory that creates the agent, where T is a concrete Agent type. Inside the factory, use `agentopera.core.AgentInstantiationContext` to access variables like the current engine and agent ID. expected_class (type[T] | None, optional): The expected class of the agent, used for engine validation of the factory. Defaults to None. If None, no validation is performed. """ ...
@overload async def get(self, id: AgentId, /, *, lazy: bool = ...) -> AgentId: ... @overload async def get(self, type: AgentType | str, /, key: str = ..., *, lazy: bool = ...) -> AgentId: ...
[docs] async def get(
self, id_or_type: AgentId | AgentType | str, /, key: str = "default", *, lazy: bool = True ) -> AgentId: ...
[docs] async def save_state(self) -> Mapping[str, Any]: """Save the state of the entire engine, including all hosted agents. The only way to restore the state is to pass it to :meth:`load_state`. The structure of the state is implementation defined and can be any JSON serializable object. Returns: Mapping[str, Any]: The saved state. """ ...
[docs] async def load_state(self, state: Mapping[str, Any]) -> None: """Load the state of the entire engine, including all hosted agents. The state should be the same as the one returned by :meth:`save_state`. Args: state (Mapping[str, Any]): The saved state. """ ...
[docs] async def agent_metadata(self, agent: AgentId) -> AgentMetadata: """Get the metadata for an agent. Args: agent (AgentId): The agent id. Returns: AgentMetadata: The agent metadata. """ ...
[docs] async def agent_save_state(self, agent: AgentId) -> Mapping[str, Any]: """Save the state of a single agent. The structure of the state is implementation defined and can be any JSON serializable object. Args: agent (AgentId): The agent id. Returns: Mapping[str, Any]: The saved state. """ ...
[docs] async def agent_load_state(self, agent: AgentId, state: Mapping[str, Any]) -> None: """Load the state of a single agent. Args: agent (AgentId): The agent id. state (Mapping[str, Any]): The saved state. """ ...
[docs] async def subscribe(self, subscription: Subscription) -> None: """Add a new subscription that the engine should fulfill when processing published messages Args: subscription (Subscription): The subscription to add """ ...
[docs] async def unsubscribe(self, id: str) -> None: """Remove a subscription from the engine Args: id (str): id of the subscription to remove Raises: LookupError: If the subscription does not exist """ ...
[docs] def register_msg_serializer(self, serializer: MessageSerializer[Any] | Sequence[MessageSerializer[Any]]) -> None: """Add a new message serialization serializer to the engine Note: This will deduplicate serializers based on the type_name and data_content_type properties Args: serializer (MessageSerializer[Any] | Sequence[MessageSerializer[Any]]): The serializer/s to add """ ...