engine.runtime

class agentopera.engine.runtime.LocalAgentEngine(*, tracer_provider: TracerProvider | None = None, ignore_unhandled_exceptions: bool = True)[source]

Bases: AgentEngine

A single-threaded agent engine that processes all messages using a single asyncio queue. Messages are delivered in the order they are received, and the engine processes each message in a separate asyncio task concurrently.

Note

This engine is suitable for development and standalone applications. It is not suitable for high-throughput or high-concurrency scenarios.

Parameters:
  • tracer_provider (TracerProvider, optional) – The tracer provider to use for tracing. Defaults to None.

  • ignore_unhandled_exceptions (bool, optional) – Whether to ignore unhandled exceptions in that occur in agent event handlers. Any background exceptions will be raised on the next call to process_next or from an awaited stop, stop_when_idle or stop_when. Note, this does not apply to RPC handlers. Defaults to True.

Examples

A simple example of creating a engine, registering an agent, sending a message and stopping the engine:

import asyncio
from dataclasses import dataclass

from agentopera.core import AgentId, MessageContext, RoutedAgent, LocalAgentEngine, message_handler


@dataclass
class MyMessage:
    content: str


class MyAgent(RoutedAgent):
    @message_handler
    async def handle_my_message(self, message: MyMessage, ctx: MessageContext) -> None:
        print(f"Received message: {message.content}")


async def main() -> None:
    # Create a engine and register the agent
    engine = LocalAgentEngine()
    await MyAgent.register(engine, "my_agent", lambda: MyAgent("My agent"))

    # Start the engine, send a message and stop the engine
    engine.start()
    await engine.send_message(MyMessage("Hello, world!"), recipient=AgentId("my_agent", "default"))
    await engine.stop()


asyncio.run(main())

An example of creating a engine, registering an agent, publishing a message and stopping the engine:

import asyncio
from dataclasses import dataclass

from agentopera.core import (
    DefaultTopicId,
    MessageContext,
    RoutedAgent,
    LocalAgentEngine,
    default_subscription,
    message_handler,
)


@dataclass
class MyMessage:
    content: str


# The agent is subscribed to the default topic.
@default_subscription
class MyAgent(RoutedAgent):
    @message_handler
    async def handle_my_message(self, message: MyMessage, ctx: MessageContext) -> None:
        print(f"Received message: {message.content}")


async def main() -> None:
    # Create a engine and register the agent
    engine = LocalAgentEngine()
    await MyAgent.register(engine, "my_agent", lambda: MyAgent("My agent"))

    # Start the engine.
    engine.start()
    # Publish a message to the default topic that the agent is subscribed to.
    await engine.publish_message(MyMessage("Hello, world!"), DefaultTopicId())
    # Wait for the message to be processed and then stop the engine.
    await engine.stop_when_idle()


asyncio.run(main())
property unprocessed_messages_count: int
async send_message(message: Any, recipient: AgentId, *, sender: AgentId | None = None, cancellation_token: CancellationToken | None = None, message_id: str | None = None) Any[source]

Send a message to an agent and get a response.

Parameters:
  • message (Any) – The message to send.

  • recipient (AgentId) – The agent to send the message to.

  • sender (AgentId | None, optional) – Agent which sent the message. Should only be None if this was sent from no agent, such as directly to the engine externally. Defaults to None.

  • cancellation_token (CancellationToken | None, optional) – Token used to cancel an in progress . Defaults to None.

Raises:
  • CantHandleException – If the recipient cannot handle the message.

  • UndeliverableException – If the message cannot be delivered.

  • Other – Any other exception raised by the recipient.

Returns:

The response from the agent.

Return type:

Any

async publish_message(message: Any, message_channel: MessageChannel, *, sender: AgentId | None = None, cancellation_token: CancellationToken | None = None, message_id: str | None = None) None[source]

Publish a message to all agents in the given namespace, or if no namespace is provided, the namespace of the sender.

No responses are expected from publishing.

Parameters:
  • message (Any) – The message to publish.

  • message_channel (MessageChannel) – The message channel to publish the message to.

  • sender (AgentId | None, optional) – The agent which sent the message. Defaults to None.

  • cancellation_token (CancellationToken | None, optional) – Token used to cancel an in progress. Defaults to None.

  • message_id (str | None, optional) – The message id. If None, a new message id will be generated. Defaults to None. This message id must be unique. and is recommended to be a UUID.

Raises:

UndeliverableException – If the message cannot be delivered.

async save_state() Mapping[str, Any][source]

Save the state of the entire engine, including all hosted agents. The only way to restore the state is to pass it to load_state().

The structure of the state is implementation defined and can be any JSON serializable object.

Returns:

The saved state.

Return type:

Mapping[str, Any]

async load_state(state: Mapping[str, Any]) None[source]

Load the state of the entire engine, including all hosted agents. The state should be the same as the one returned by save_state().

Parameters:

state (Mapping[str, Any]) – The saved state.

async process_next() None[source]

Process the next message in the queue.

If there is an unhandled exception in the background task, it will be raised here. process_next cannot be called again after an unhandled exception is raised.

start() None[source]

Start the engine message processing loop. This runs in a background task.

Example:

import asyncio
from agentopera.core import LocalAgentEngine


async def main() -> None:
    engine = LocalAgentEngine()
    engine.start()

    # ... do other things ...

    await engine.stop()


asyncio.run(main())
async close() None[source]

Calls stop() if applicable and the Agent.close() method on all instantiated agents

async stop() None[source]

Immediately stop the engine message processing loop. The currently processing message will be completed, but all others following it will be discarded.

async stop_when_idle() None[source]

Stop the engine message processing loop when there is no outstanding message being processed or queued. This is the most common way to stop the engine.

async stop_when(condition: Callable[[], bool]) None[source]

Stop the engine message processing loop when the condition is met.

Caution

This method is not recommended to be used, and is here for legacy reasons. It will spawn a busy loop to continually check the condition. It is much more efficient to call stop_when_idle or stop instead. If you need to stop the engine based on a condition, consider using a background task and asyncio.Event to signal when the condition is met and the background task should call stop.

async agent_metadata(agent: AgentId) AgentMetadata[source]

Get the metadata for an agent.

Parameters:

agent (AgentId) – The agent id.

Returns:

The agent metadata.

Return type:

AgentMetadata

async agent_save_state(agent: AgentId) Mapping[str, Any][source]

Save the state of a single agent.

The structure of the state is implementation defined and can be any JSON serializable object.

Parameters:

agent (AgentId) – The agent id.

Returns:

The saved state.

Return type:

Mapping[str, Any]

async agent_load_state(agent: AgentId, state: Mapping[str, Any]) None[source]

Load the state of a single agent.

Parameters:
  • agent (AgentId) – The agent id.

  • state (Mapping[str, Any]) – The saved state.

async register_agent_builder(type: str | AgentType, agent_builder: Callable[[], T | Awaitable[T]], *, expected_class: type[T] | None = None) AgentType[source]

Register an agent factory with the engine associated with a specific type. The type must be unique. This API does not add any subscriptions.

Note

This is a low level API and usually the agent class’s register method should be used instead, as this also handles subscriptions automatically.

Example:

from dataclasses import dataclass

from agentopera.core import AgentEngine, MessageContext, RoutedAgent, event
from agentopera.core.types.models import UserMessage


@dataclass
class MyMessage:
    content: str


class MyAgent(RoutedAgent):
    def __init__(self) -> None:
        super().__init__("My core agent")

    @event
    async def handler(self, message: UserMessage, context: MessageContext) -> None:
        print("Event received: ", message.content)


async def my_agent_factory():
    return MyAgent()


async def main() -> None:
    engine: AgentEngine = ...  # type: ignore
    await engine.register_agent_builder("my_agent", lambda: MyAgent())


import asyncio

asyncio.run(main())
Parameters:
  • type (str) – The type of agent this factory creates. It is not the same as agent class name. The type parameter is used to differentiate between different factory functions rather than agent classes.

  • agent_factory (Callable[[], T]) – The factory that creates the agent, where T is a concrete Agent type. Inside the factory, use agentopera.core.AgentInstantiationContext to access variables like the current engine and agent ID.

  • expected_class (type[T] | None, optional) – The expected class of the agent, used for engine validation of the factory. Defaults to None. If None, no validation is performed.

async subscribe(subscription: Subscription) None[source]

Add a new subscription that the engine should fulfill when processing published messages

Parameters:

subscription (Subscription) – The subscription to add

async unsubscribe(id: str) None[source]

Remove a subscription from the engine

Parameters:

id (str) – id of the subscription to remove

Raises:

LookupError – If the subscription does not exist

async get(id_or_type: AgentId | AgentType | str, /, key: str = 'default', *, lazy: bool = True) AgentId[source]
register_msg_serializer(serializer: MessageSerializer[Any] | Sequence[MessageSerializer[Any]]) None[source]

Add a new message serialization serializer to the engine

Note: This will deduplicate serializers based on the type_name and data_content_type properties

Parameters:

serializer (MessageSerializer[Any] | Sequence[MessageSerializer[Any]]) – The serializer/s to add

class agentopera.engine.runtime.DistAgentEngine(host_address: str, tracer_provider: TracerProvider | None = None, extra_grpc_config: Sequence[Tuple[str, Any]] | None = None, payload_serialization_format: str = 'application/json')[source]

Bases: AgentEngine

An agent engine for running remote or cross-language agents.

Agent messaging uses protobufs from agent_worker.proto and CloudEvent from cloudevent.proto.

Cross-language agents will additionally require all agents use shared protobuf schemas for any message types that are sent between agents.

async start() None[source]

Start the engine in a background task.

async stop() None[source]

Stop the engine immediately.

async stop_when_signal(signals: Sequence[Signals] = (Signals.SIGTERM, Signals.SIGINT)) None[source]

Stop the engine when a signal is received.

async send_message(message: Any, recipient: AgentId, *, sender: AgentId | None = None, cancellation_token: CancellationToken | None = None, message_id: str | None = None) Any[source]

Send a message to an agent and get a response.

Parameters:
  • message (Any) – The message to send.

  • recipient (AgentId) – The agent to send the message to.

  • sender (AgentId | None, optional) – Agent which sent the message. Should only be None if this was sent from no agent, such as directly to the engine externally. Defaults to None.

  • cancellation_token (CancellationToken | None, optional) – Token used to cancel an in progress . Defaults to None.

Raises:
  • CantHandleException – If the recipient cannot handle the message.

  • UndeliverableException – If the message cannot be delivered.

  • Other – Any other exception raised by the recipient.

Returns:

The response from the agent.

Return type:

Any

async publish_message(message: Any, message_channel: MessageChannel, *, sender: AgentId | None = None, cancellation_token: CancellationToken | None = None, message_id: str | None = None) None[source]

Publish a message to all agents in the given namespace, or if no namespace is provided, the namespace of the sender.

No responses are expected from publishing.

Parameters:
  • message (Any) – The message to publish.

  • message_channel (MessageChannel) – The message channel to publish the message to.

  • sender (AgentId | None, optional) – The agent which sent the message. Defaults to None.

  • cancellation_token (CancellationToken | None, optional) – Token used to cancel an in progress. Defaults to None.

  • message_id (str | None, optional) – The message id. If None, a new message id will be generated. Defaults to None. This message id must be unique. and is recommended to be a UUID.

Raises:

UndeliverableException – If the message cannot be delivered.

async save_state() Mapping[str, Any][source]

Save the state of the entire engine, including all hosted agents. The only way to restore the state is to pass it to load_state().

The structure of the state is implementation defined and can be any JSON serializable object.

Returns:

The saved state.

Return type:

Mapping[str, Any]

async load_state(state: Mapping[str, Any]) None[source]

Load the state of the entire engine, including all hosted agents. The state should be the same as the one returned by save_state().

Parameters:

state (Mapping[str, Any]) – The saved state.

async agent_metadata(agent: AgentId) AgentMetadata[source]

Get the metadata for an agent.

Parameters:

agent (AgentId) – The agent id.

Returns:

The agent metadata.

Return type:

AgentMetadata

async agent_save_state(agent: AgentId) Mapping[str, Any][source]

Save the state of a single agent.

The structure of the state is implementation defined and can be any JSON serializable object.

Parameters:

agent (AgentId) – The agent id.

Returns:

The saved state.

Return type:

Mapping[str, Any]

async agent_load_state(agent: AgentId, state: Mapping[str, Any]) None[source]

Load the state of a single agent.

Parameters:
  • agent (AgentId) – The agent id.

  • state (Mapping[str, Any]) – The saved state.

async register_agent_builder(type: str | AgentType, agent_builder: Callable[[], T | Awaitable[T]], *, expected_class: type[T] | None = None) AgentType[source]

Register an agent factory with the engine associated with a specific type. The type must be unique. This API does not add any subscriptions.

Note

This is a low level API and usually the agent class’s register method should be used instead, as this also handles subscriptions automatically.

Example:

from dataclasses import dataclass

from agentopera.core import AgentEngine, MessageContext, RoutedAgent, event
from agentopera.core.types.models import UserMessage


@dataclass
class MyMessage:
    content: str


class MyAgent(RoutedAgent):
    def __init__(self) -> None:
        super().__init__("My core agent")

    @event
    async def handler(self, message: UserMessage, context: MessageContext) -> None:
        print("Event received: ", message.content)


async def my_agent_factory():
    return MyAgent()


async def main() -> None:
    engine: AgentEngine = ...  # type: ignore
    await engine.register_agent_builder("my_agent", lambda: MyAgent())


import asyncio

asyncio.run(main())
Parameters:
  • type (str) – The type of agent this factory creates. It is not the same as agent class name. The type parameter is used to differentiate between different factory functions rather than agent classes.

  • agent_factory (Callable[[], T]) – The factory that creates the agent, where T is a concrete Agent type. Inside the factory, use agentopera.core.AgentInstantiationContext to access variables like the current engine and agent ID.

  • expected_class (type[T] | None, optional) – The expected class of the agent, used for engine validation of the factory. Defaults to None. If None, no validation is performed.

async remove_agent(agent_id: AgentId) None[source]
async remove_agents_with_type(agent_type: str) None[source]

Remove all instantiated agents matching the given type.

async try_get_underlying_agent_instance(id: ~agentopera.engine.types.agent.agent_id.AgentId, type: ~typing.Type[~agentopera.engine.runtime.distributed.dist_agent_engine.T] = <class 'agentopera.engine.types.agent.agent.Agent'>) T[source]
async subscribe(subscription: Subscription) None[source]

Add a new subscription that the engine should fulfill when processing published messages

Parameters:

subscription (Subscription) – The subscription to add

async unsubscribe(id: str) None[source]

Remove a subscription from the engine

Parameters:

id (str) – id of the subscription to remove

Raises:

LookupError – If the subscription does not exist

async get(id_or_type: AgentId | AgentType | str, /, key: str = 'default', *, lazy: bool = True) AgentId[source]
register_msg_serializer(serializer: MessageSerializer[Any] | Sequence[MessageSerializer[Any]]) None[source]

Add a new message serialization serializer to the engine

Note: This will deduplicate serializers based on the type_name and data_content_type properties

Parameters:

serializer (MessageSerializer[Any] | Sequence[MessageSerializer[Any]]) – The serializer/s to add

class agentopera.engine.runtime.DistAgentEngineSrv(address: str, extra_grpc_config: Sequence[Tuple[str, Any]] | None = None)[source]

Bases: object

start() None[source]

Start the server in a background task.

async stop(grace: int = 5) None[source]

Stop the server.

async stop_when_signal(grace: int = 5, signals: Sequence[Signals] = (Signals.SIGTERM, Signals.SIGINT)) None[source]

Stop the server when a signal is received.

class agentopera.engine.runtime.SubscriptionManager[source]

Bases: object

property subscriptions: Sequence[Subscription]
async subscribe(subscription: Subscription) None[source]
async unsubscribe(id: str) None[source]
async get_subscribed_recipients(topic: MessageChannel) List[AgentId][source]
class agentopera.engine.runtime.AgentEngine(*args, **kwargs)[source]

Bases: Protocol

async send_message(message: Any, recipient: AgentId, *, sender: AgentId | None = None, cancellation_token: CancellationToken | None = None, message_id: str | None = None) Any[source]

Send a message to an agent and get a response.

Parameters:
  • message (Any) – The message to send.

  • recipient (AgentId) – The agent to send the message to.

  • sender (AgentId | None, optional) – Agent which sent the message. Should only be None if this was sent from no agent, such as directly to the engine externally. Defaults to None.

  • cancellation_token (CancellationToken | None, optional) – Token used to cancel an in progress . Defaults to None.

Raises:
  • CantHandleException – If the recipient cannot handle the message.

  • UndeliverableException – If the message cannot be delivered.

  • Other – Any other exception raised by the recipient.

Returns:

The response from the agent.

Return type:

Any

async publish_message(message: Any, message_channel: MessageChannel, *, sender: AgentId | None = None, cancellation_token: CancellationToken | None = None, message_id: str | None = None) None[source]

Publish a message to all agents in the given namespace, or if no namespace is provided, the namespace of the sender.

No responses are expected from publishing.

Parameters:
  • message (Any) – The message to publish.

  • message_channel (MessageChannel) – The message channel to publish the message to.

  • sender (AgentId | None, optional) – The agent which sent the message. Defaults to None.

  • cancellation_token (CancellationToken | None, optional) – Token used to cancel an in progress. Defaults to None.

  • message_id (str | None, optional) – The message id. If None, a new message id will be generated. Defaults to None. This message id must be unique. and is recommended to be a UUID.

Raises:

UndeliverableException – If the message cannot be delivered.

async register_agent_builder(type: str | AgentType, agent_builder: Callable[[], T | Awaitable[T]], *, expected_class: type[T] | None = None) AgentType[source]

Register an agent factory with the engine associated with a specific type. The type must be unique. This API does not add any subscriptions.

Note

This is a low level API and usually the agent class’s register method should be used instead, as this also handles subscriptions automatically.

Example:

from dataclasses import dataclass

from agentopera.core import AgentEngine, MessageContext, RoutedAgent, event
from agentopera.core.types.models import UserMessage


@dataclass
class MyMessage:
    content: str


class MyAgent(RoutedAgent):
    def __init__(self) -> None:
        super().__init__("My core agent")

    @event
    async def handler(self, message: UserMessage, context: MessageContext) -> None:
        print("Event received: ", message.content)


async def my_agent_factory():
    return MyAgent()


async def main() -> None:
    engine: AgentEngine = ...  # type: ignore
    await engine.register_agent_builder("my_agent", lambda: MyAgent())


import asyncio

asyncio.run(main())
Parameters:
  • type (str) – The type of agent this factory creates. It is not the same as agent class name. The type parameter is used to differentiate between different factory functions rather than agent classes.

  • agent_factory (Callable[[], T]) – The factory that creates the agent, where T is a concrete Agent type. Inside the factory, use agentopera.core.AgentInstantiationContext to access variables like the current engine and agent ID.

  • expected_class (type[T] | None, optional) – The expected class of the agent, used for engine validation of the factory. Defaults to None. If None, no validation is performed.

async get(id: AgentId, /, *, lazy: bool = True) AgentId[source]
async get(type: AgentType | str, /, key: str = 'default', *, lazy: bool = True) AgentId
async save_state() Mapping[str, Any][source]

Save the state of the entire engine, including all hosted agents. The only way to restore the state is to pass it to load_state().

The structure of the state is implementation defined and can be any JSON serializable object.

Returns:

The saved state.

Return type:

Mapping[str, Any]

async load_state(state: Mapping[str, Any]) None[source]

Load the state of the entire engine, including all hosted agents. The state should be the same as the one returned by save_state().

Parameters:

state (Mapping[str, Any]) – The saved state.

async agent_metadata(agent: AgentId) AgentMetadata[source]

Get the metadata for an agent.

Parameters:

agent (AgentId) – The agent id.

Returns:

The agent metadata.

Return type:

AgentMetadata

async agent_save_state(agent: AgentId) Mapping[str, Any][source]

Save the state of a single agent.

The structure of the state is implementation defined and can be any JSON serializable object.

Parameters:

agent (AgentId) – The agent id.

Returns:

The saved state.

Return type:

Mapping[str, Any]

async agent_load_state(agent: AgentId, state: Mapping[str, Any]) None[source]

Load the state of a single agent.

Parameters:
  • agent (AgentId) – The agent id.

  • state (Mapping[str, Any]) – The saved state.

async subscribe(subscription: Subscription) None[source]

Add a new subscription that the engine should fulfill when processing published messages

Parameters:

subscription (Subscription) – The subscription to add

async unsubscribe(id: str) None[source]

Remove a subscription from the engine

Parameters:

id (str) – id of the subscription to remove

Raises:

LookupError – If the subscription does not exist

register_msg_serializer(serializer: MessageSerializer[Any] | Sequence[MessageSerializer[Any]]) None[source]

Add a new message serialization serializer to the engine

Note: This will deduplicate serializers based on the type_name and data_content_type properties

Parameters:

serializer (MessageSerializer[Any] | Sequence[MessageSerializer[Any]]) – The serializer/s to add

class agentopera.engine.runtime.MessageHandlerContext[source]

Bases: object

classmethod agent_id() AgentId[source]
class agentopera.engine.runtime.AgentInstantiationContext[source]

Bases: object

A static class that provides context for agent instantiation.

This static class can be used to access the current engine and agent ID during agent instantiation – inside the factory function or the agent’s class constructor.

Example

Get the current engine and agent ID inside the factory function and the agent’s constructor:

import asyncio
from dataclasses import dataclass

from agentopera.core import (
    AgentId,
    AgentInstantiationContext,
    MessageContext,
    RoutedAgent,
    LocalAgentEngine,
    message_handler,
)


@dataclass
class TestMessage:
    content: str


class TestAgent(RoutedAgent):
    def __init__(self, description: str):
        super().__init__(description)
        # Get the current engine -- we don't use it here, but it's available.
        _ = AgentInstantiationContext.current_engine()
        # Get the current agent ID.
        agent_id = AgentInstantiationContext.current_agent_id()
        print(f"Current AgentID from constructor: {agent_id}")

    @message_handler
    async def handle_test_message(self, message: TestMessage, ctx: MessageContext) -> None:
        print(f"Received message: {message.content}")


def test_agent_factory() -> TestAgent:
    # Get the current engine -- we don't use it here, but it's available.
    _ = AgentInstantiationContext.current_engine()
    # Get the current agent ID.
    agent_id = AgentInstantiationContext.current_agent_id()
    print(f"Current AgentID from factory: {agent_id}")
    return TestAgent(description="Test agent")


async def main() -> None:
    # Create a LocalAgentEngine instance.
    agent_engine = LocalAgentEngine()

    # Start the engine.
    agent_engine.start()

    # Register the agent type with a factory function.
    await agent_engine.register_agent_builder("test_agent", test_agent_factory)

    # Send a message to the agent. The engine will instantiate the agent and call the message handler.
    await agent_engine.send_message(TestMessage(content="Hello, world!"), AgentId("test_agent", "default"))

    # Stop the engine.
    await agent_engine.stop()


asyncio.run(main())
classmethod current_engine() AgentEngine[source]
classmethod current_agent_id() AgentId[source]