engine.types

engine.types.agent

class agentopera.engine.types.agent.Agent(*args, **kwargs)[source]

Bases: Protocol

property metadata: AgentMetadata

Metadata of the agent.

property id: AgentId

ID of the agent.

async on_message(message: Any, ctx: MessageContext) Any[source]

Message handler for the agent. This should only be called by the engine, not by other agents.

Parameters:
  • message (Any) – Received message. Type is one of the types in subscriptions.

  • ctx (MessageContext) – Context of the message.

Returns:

Response to the message. Can be None.

Return type:

Any

Raises:
  • asyncio.CancelledError – If the message was cancelled.

  • CantHandleException – If the agent cannot handle the message.

async save_state() Mapping[str, Any][source]

Save the state of the agent. The result must be JSON serializable.

async load_state(state: Mapping[str, Any]) None[source]

Load in the state of the agent obtained from save_state.

Parameters:

state (Mapping[str, Any]) – State of the agent. Must be JSON serializable.

async close() None[source]

Called when the engine is closed

class agentopera.engine.types.agent.AgentId(type: str | AgentType, key: str)[source]

Bases: object

Agent ID uniquely identifies an agent instance within an agent engine - including distributed engine. It is the ‘address’ of the agent instance for receiving messages.

classmethod from_str(agent_id: str) Self[source]

Convert a string of the format type/key into an AgentId

property type: str

An identifier that associates an agent with a specific factory function.

Strings may only be composed of alphanumeric letters (a-z) and (0-9), or underscores (_).

property key: str

Agent instance identifier.

Strings may only be composed of alphanumeric letters (a-z) and (0-9), or underscores (_).

class agentopera.engine.types.agent.AgentType(type: str)[source]

Bases: object

type: str

String representation of this agent type.

class agentopera.engine.types.agent.AgentMetadata[source]

Bases: TypedDict

type: str
key: str
description: str
class agentopera.engine.types.agent.CancellationToken[source]

Bases: object

A token used to cancel pending async calls

cancel() None[source]

Cancel pending async calls linked to this cancellation token.

is_cancelled() bool[source]

Check if the CancellationToken has been used

reset() bool[source]

reset the token state to be re-used

add_callback(callback: Callable[[], None]) None[source]

Attach a callback that will be called when cancel is invoked

Link a pending async call to a token to allow its cancellation

class agentopera.engine.types.agent.AgentInstantiationContext[source]

Bases: object

A static class that provides context for agent instantiation.

This static class can be used to access the current engine and agent ID during agent instantiation – inside the factory function or the agent’s class constructor.

Example

Get the current engine and agent ID inside the factory function and the agent’s constructor:

import asyncio
from dataclasses import dataclass

from agentopera.core import (
    AgentId,
    AgentInstantiationContext,
    MessageContext,
    RoutedAgent,
    LocalAgentEngine,
    message_handler,
)


@dataclass
class TestMessage:
    content: str


class TestAgent(RoutedAgent):
    def __init__(self, description: str):
        super().__init__(description)
        # Get the current engine -- we don't use it here, but it's available.
        _ = AgentInstantiationContext.current_engine()
        # Get the current agent ID.
        agent_id = AgentInstantiationContext.current_agent_id()
        print(f"Current AgentID from constructor: {agent_id}")

    @message_handler
    async def handle_test_message(self, message: TestMessage, ctx: MessageContext) -> None:
        print(f"Received message: {message.content}")


def test_agent_factory() -> TestAgent:
    # Get the current engine -- we don't use it here, but it's available.
    _ = AgentInstantiationContext.current_engine()
    # Get the current agent ID.
    agent_id = AgentInstantiationContext.current_agent_id()
    print(f"Current AgentID from factory: {agent_id}")
    return TestAgent(description="Test agent")


async def main() -> None:
    # Create a LocalAgentEngine instance.
    agent_engine = LocalAgentEngine()

    # Start the engine.
    agent_engine.start()

    # Register the agent type with a factory function.
    await agent_engine.register_agent_builder("test_agent", test_agent_factory)

    # Send a message to the agent. The engine will instantiate the agent and call the message handler.
    await agent_engine.send_message(TestMessage(content="Hello, world!"), AgentId("test_agent", "default"))

    # Stop the engine.
    await agent_engine.stop()


asyncio.run(main())
classmethod current_engine() AgentEngine[source]
classmethod current_agent_id() AgentId[source]

engine.types.model_context

class agentopera.engine.types.model_context.ChatCompletionContext(initial_messages: List[SystemMessage | UserMessage | AssistantMessage | FunctionExecutionResultMessage | VercelMessage] | None = None)[source]

Bases: ABC

An abstract base class for defining the interface of a chat completion context. A chat completion context lets agents store and retrieve LLM messages. It can be implemented with different recall strategies.

Parameters:

initial_messages (List[LLMMessage] | None) – The initial messages.

Example

To create a custom model context that filters out the thought field from AssistantMessage. This is useful for reasoning models like DeepSeek R1, which produces very long thought that is not needed for subsequent completions.

from typing import List

from agentopera.core.model_context import UnboundedChatCompletionContext
from agentopera.core.types.models import AssistantMessage, LLMMessage


class ReasoningModelContext(UnboundedChatCompletionContext):
    """A model context for reasoning models."""

    async def get_messages(self) -> List[LLMMessage]:
        messages = await super().get_messages()
        # Filter out thought field from AssistantMessage.
        messages_out: List[LLMMessage] = []
        for message in messages:
            if isinstance(message, AssistantMessage):
                message.thought = None
            messages_out.append(message)
        return messages_out
async add_message(message: SystemMessage | UserMessage | AssistantMessage | FunctionExecutionResultMessage | VercelMessage) None[source]

Add a message to the context.

abstract async get_messages() List[SystemMessage | UserMessage | AssistantMessage | FunctionExecutionResultMessage | VercelMessage][source]
async clear() None[source]

Clear the context.

async save_state() Mapping[str, Any][source]
async load_state(state: Mapping[str, Any]) None[source]
class agentopera.engine.types.model_context.ChatCompletionContextState(*, messages: ~typing.List[~agentopera.engine.types.models.types.SystemMessage | ~agentopera.engine.types.models.types.UserMessage | ~agentopera.engine.types.models.types.AssistantMessage | ~agentopera.engine.types.models.types.FunctionExecutionResultMessage | ~agentopera.engine.types.models.types.VercelMessage] = <factory>)[source]

Bases: BaseModel

messages: List[SystemMessage | UserMessage | AssistantMessage | FunctionExecutionResultMessage | VercelMessage]
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class agentopera.engine.types.model_context.BufferedChatCompletionContext(buffer_size: int, initial_messages: List[SystemMessage | UserMessage | AssistantMessage | FunctionExecutionResultMessage | VercelMessage] | None = None)[source]

Bases: ChatCompletionContext

A buffered chat completion context that keeps a view of the last n messages, where n is the buffer size. The buffer size is set at initialization.

Parameters:
  • buffer_size (int) – The size of the buffer.

  • initial_messages (List[LLMMessage] | None) – The initial messages.

async get_messages() List[SystemMessage | UserMessage | AssistantMessage | FunctionExecutionResultMessage | VercelMessage][source]

Get at most buffer_size recent messages.

class agentopera.engine.types.model_context.HeadAndTailChatCompletionContext(head_size: int, tail_size: int, initial_messages: List[SystemMessage | UserMessage | AssistantMessage | FunctionExecutionResultMessage | VercelMessage] | None = None)[source]

Bases: ChatCompletionContext

A chat completion context that keeps a view of the first n and last m messages, where n is the head size and m is the tail size. The head and tail sizes are set at initialization.

Parameters:
  • head_size (int) – The size of the head.

  • tail_size (int) – The size of the tail.

  • initial_messages (List[LLMMessage] | None) – The initial messages.

async get_messages() List[SystemMessage | UserMessage | AssistantMessage | FunctionExecutionResultMessage | VercelMessage][source]

Get at most head_size recent messages and tail_size oldest messages.

class agentopera.engine.types.model_context.UnboundedChatCompletionContext(initial_messages: List[SystemMessage | UserMessage | AssistantMessage | FunctionExecutionResultMessage | VercelMessage] | None = None)[source]

Bases: ChatCompletionContext

An unbounded chat completion context that keeps a view of the all the messages.

async get_messages() List[SystemMessage | UserMessage | AssistantMessage | FunctionExecutionResultMessage | VercelMessage][source]

Get at most buffer_size recent messages.

engine.types.models

class agentopera.engine.types.models.AssistantMessage(*, content: str | List[FunctionCall], thought: str | None = None, source: str, type: Literal['AssistantMessage'] = 'AssistantMessage')[source]

Bases: BaseModel

Assistant message are sampled from the language model.

content: str | List[FunctionCall]

The content of the message.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

source: str

The name of the agent that sent this message.

thought: str | None

The reasoning text for the completion if available. Used for reasoning model and additional text content besides function calls.

type: Literal['AssistantMessage']
class agentopera.engine.types.models.ChatCompletionClient[source]

Bases: ABC

abstract actual_usage() RequestUsage[source]
abstract property capabilities: ModelCapabilities
abstract count_tokens(messages: Sequence[SystemMessage | UserMessage | AssistantMessage | FunctionExecutionResultMessage | VercelMessage], *, tools: Sequence[Tool | ToolSchema] = []) int[source]
abstract async create(messages: Sequence[SystemMessage | UserMessage | AssistantMessage | FunctionExecutionResultMessage | VercelMessage], *, tools: Sequence[Tool | ToolSchema] = [], json_output: bool | None = None, extra_create_args: Mapping[str, Any] = {}, cancellation_token: CancellationToken | None = None) CreateResult[source]
abstract create_stream(messages: Sequence[SystemMessage | UserMessage | AssistantMessage | FunctionExecutionResultMessage | VercelMessage], *, tools: Sequence[Tool | ToolSchema] = [], json_output: bool | None = None, extra_create_args: Mapping[str, Any] = {}, cancellation_token: CancellationToken | None = None) AsyncGenerator[str | CreateResult, None][source]
abstract property model_info: ModelInfo
abstract remaining_tokens(messages: Sequence[SystemMessage | UserMessage | AssistantMessage | FunctionExecutionResultMessage | VercelMessage], *, tools: Sequence[Tool | ToolSchema] = []) int[source]
abstract total_usage() RequestUsage[source]
class agentopera.engine.types.models.ChatCompletionTokenLogprob(*, token: str, logprob: float, top_logprobs: List[TopLogprob] | None = None, bytes: List[int] | None = None)[source]

Bases: BaseModel

bytes: List[int] | None
logprob: float
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

token: str
top_logprobs: List[TopLogprob] | None
class agentopera.engine.types.models.CreateResult(*, finish_reason: Literal['stop', 'length', 'function_calls', 'content_filter', 'unknown'], content: str | List[FunctionCall], usage: RequestUsage, cached: bool, logprobs: List[ChatCompletionTokenLogprob] | None = None, thought: str | None = None)[source]

Bases: BaseModel

Create result contains the output of a model completion.

cached: bool

Whether the completion was generated from a cached response.

content: str | List[FunctionCall]

The output of the model completion.

finish_reason: Literal['stop', 'length', 'function_calls', 'content_filter', 'unknown']

The reason the model finished generating the completion.

logprobs: List[ChatCompletionTokenLogprob] | None

The logprobs of the tokens in the completion.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

thought: str | None

The reasoning text for the completion if available. Used for reasoning models and additional text content besides function calls.

usage: RequestUsage

The usage of tokens in the prompt and completion.

class agentopera.engine.types.models.FunctionExecutionResult(*, content: str, name: str, call_id: str, is_error: bool | None = None)[source]

Bases: BaseModel

Function execution result contains the output of a function call.

call_id: str

The ID of the function call. Note this ID may be empty for some models.

content: str

The output of the function call.

is_error: bool | None

Whether the function call resulted in an error.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

name: str

(New in v0.4.8) The name of the function that was called.

class agentopera.engine.types.models.FunctionExecutionResultMessage(*, content: List[FunctionExecutionResult], type: Literal['FunctionExecutionResultMessage'] = 'FunctionExecutionResultMessage')[source]

Bases: BaseModel

Function execution result message contains the output of multiple function calls.

content: List[FunctionExecutionResult]
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

type: Literal['FunctionExecutionResultMessage']
class agentopera.engine.types.models.ModelCapabilities(**kwargs)[source]

Bases: TypedDict

function_calling: Required[bool]
json_output: Required[bool]
vision: Required[bool]
class agentopera.engine.types.models.ModelFamily(*args: Any, **kwargs: Any)[source]

Bases: object

A model family is a group of models that share similar characteristics from a capabilities perspective. This is different to discrete supported features such as vision, function calling, and JSON output.

This namespace class holds constants for the model families that agentopera understands. Other families definitely exist and can be represented by a string, however, agentopera will treat them as unknown.

ANY

alias of Literal[‘gpt-4o’, ‘o1’, ‘o3’, ‘gpt-4’, ‘gpt-35’, ‘r1’, ‘gemini-1.5-flash’, ‘gemini-1.5-pro’, ‘gemini-2.0-flash’, ‘claude-3-haiku’, ‘claude-3-sonnet’, ‘claude-3-opus’, ‘claude-3.5-haiku’, ‘claude-3.5-sonnet’, ‘unknown’]

CLAUDE_3_5_HAIKU = 'claude-3.5-haiku'
CLAUDE_3_5_SONNET = 'claude-3.5-sonnet'
CLAUDE_3_7_SONNET = 'claude-3.7-sonnet'
CLAUDE_3_HAIKU = 'claude-3-haiku'
CLAUDE_3_OPUS = 'claude-3-opus'
CLAUDE_3_SONNET = 'claude-3-sonnet'
GEMINI_1_5_FLASH = 'gemini-1.5-flash'
GEMINI_1_5_PRO = 'gemini-1.5-pro'
GEMINI_2_0_FLASH = 'gemini-2.0-flash'
GPT_35 = 'gpt-35'
GPT_4 = 'gpt-4'
GPT_4O = 'gpt-4o'
O1 = 'o1'
O3 = 'o3'
R1 = 'r1'
UNKNOWN = 'unknown'
static is_claude(family: str) bool[source]
static is_gemini(family: str) bool[source]
static is_openai(family: str) bool[source]
class agentopera.engine.types.models.ModelInfo[source]

Bases: TypedDict

ModelInfo is a dictionary that contains information about a model’s properties. It is expected to be used in the model_info property of a model client.

We are expecting this to grow over time as we add more features.

family: Required[Literal['gpt-4o', 'o1', 'o3', 'gpt-4', 'gpt-35', 'r1', 'gemini-1.5-flash', 'gemini-1.5-pro', 'gemini-2.0-flash', 'claude-3-haiku', 'claude-3-sonnet', 'claude-3-opus', 'claude-3.5-haiku', 'claude-3.5-sonnet', 'unknown'] | str]

Model family should be one of the constants from ModelFamily or a string representing an unknown model family.

function_calling: Required[bool]

True if the model supports function calling, otherwise False.

json_output: Required[bool]

this is different to structured json.

Type:

True if the model supports json output, otherwise False. Note

vision: Required[bool]

True if the model supports vision, aka image input, otherwise False.

class agentopera.engine.types.models.RequestUsage(prompt_tokens: int, completion_tokens: int)[source]

Bases: object

completion_tokens: int
prompt_tokens: int
class agentopera.engine.types.models.SystemMessage(*, content: str, type: Literal['SystemMessage'] = 'SystemMessage')[source]

Bases: BaseModel

System message contains instructions for the model coming from the developer.

Note

Open AI is moving away from using ‘system’ role in favor of ‘developer’ role. See Model Spec for more details. However, the ‘system’ role is still allowed in their API and will be automatically converted to ‘developer’ role on the server side. So, you can use SystemMessage for developer messages.

content: str

The content of the message.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

type: Literal['SystemMessage']
class agentopera.engine.types.models.TopLogprob(logprob: float, bytes: List[int] | None = None)[source]

Bases: object

bytes: List[int] | None = None
logprob: float
class agentopera.engine.types.models.UserMessage(*, content: str | List[str | Image], source: str, type: Literal['UserMessage'] = 'UserMessage')[source]

Bases: BaseModel

User message contains input from end users, or a catch-all for data provided to the model.

content: str | List[str | Image]

The content of the message.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

source: str

The name of the agent that sent this message.

type: Literal['UserMessage']
class agentopera.engine.types.models.VercelMessage(*, role: str, content: str, parts: List[VercelMessagePart], type: Literal['VercelMessage'] = 'VercelMessage')[source]

Bases: BaseModel

Vercel message is a message supported by vercel ai sdk.

content: str

The content of the message.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

parts: List[VercelMessagePart]

The parts of the message.

role: str

The role of the message.

type: Literal['VercelMessage']
agentopera.engine.types.models.validate_model_info(model_info: ModelInfo) None[source]

Validates the model info dictionary.

Raises:

ValueError – If the model info dictionary is missing required fields.

engine.types.msg_channel

class agentopera.engine.types.msg_channel.DefaultMessageChannel(topic: str = 'default', source: str | None = None)[source]

Bases: MessageChannel

DefaultMessageChannel provides a sensible default for the topic and source fields of a MessageChannel.

If created in the context of a message handler, the source will be set to the agent_id of the message handler, otherwise it will be set to “default”.

Parameters:
  • topic (str, optional) – Topic to publish message to. Defaults to “default”.

  • source (str | None, optional) – Topic source to publish message to. If None, the source will be set to the agent_id of the message handler if in the context of a message handler, otherwise it will be set to “default”. Defaults to None.

class agentopera.engine.types.msg_channel.MessageChannel(topic: str, source: str)[source]

Bases: object

MessageChannel defines the scope of a broadcast message. In essence, agent engine implements a publish-subscribe model through its broadcast API: when publishing a message, the topic must be specified.

See here for more information: topic_and_subscription_topic

classmethod from_str(message_channel: str) Self[source]

Convert a string of the format topic/source into a MessageChannel

source: str

Identifies the context in which an event happened. Adhere’s to the cloud event spec.

Learn more here: https://github.com/cloudevents/spec/blob/main/cloudevents/spec.md#source-1

topic: str

Topic of the event that this message channel contains. Adhere’s to the cloud event spec.

Must match the pattern: ^[w-.:=]+Z

Learn more here: https://github.com/cloudevents/spec/blob/main/cloudevents/spec.md#type

engine.types.msg_context

class agentopera.engine.types.msg_context.MessageContext(sender: agentopera.engine.types.agent.agent_id.AgentId | None, message_channel: agentopera.engine.types.msg_channel.message_channel.MessageChannel | None, is_rpc: bool, cancellation_token: agentopera.engine.types.agent.cancellation_token.CancellationToken, message_id: str)[source]

Bases: object

cancellation_token: CancellationToken
is_rpc: bool
message_channel: MessageChannel | None
message_id: str
sender: AgentId | None

others

class agentopera.engine.types.CacheStore[source]

Bases: ABC, Generic[T]

This protocol defines the basic interface for store/cache operations.

Sub-classes should handle the lifecycle of underlying storage.

abstract get(key: str, default: T | None = None) T | None[source]

Retrieve an item from the store.

Parameters:
  • key – The key identifying the item in the store.

  • default (optional) – The default value to return if the key is not found. Defaults to None.

Returns:

The value associated with the key if found, else the default value.

abstract set(key: str, value: T) None[source]

Set an item in the store.

Parameters:
  • key – The key under which the item is to be stored.

  • value – The value to be stored in the store.

class agentopera.engine.types.Image[source]

Bases: object

abstract base class, defines the interface of image object.

all image implementations must implement this protocol to ensure compatibility with the message system.

classmethod from_uri(uri: str) Image[source]

create image object from image uri.

Parameters:

uri (str) – image uri, usually in data uri format

Returns:

image object

Return type:

BaseImage

Raises:

ValueError – if the uri format is invalid

classmethod from_base64(base64_str: str) Image[source]

create image object from base64 encoded string.

Parameters:

base64_str (str) – base64 encoded string of image

Returns:

image object

Return type:

BaseImage

classmethod from_file(file_path: Path) Image[source]

create image object from file path.

Parameters:

file_path (Path) – path of image file

Returns:

image object

Return type:

BaseImage

to_base64() str[source]

convert image to base64 encoded string.

Returns:

base64 encoded string of image

Return type:

str

property data_uri: str

get data uri of image.

Returns:

data uri of image

Return type:

str

to_openai_format(detail: Literal['auto', 'low', 'high'] = 'auto') Dict[str, Any][source]

convert image to openai api compatible format.

Parameters:

detail (Literal["auto", "low", "high"], optional) – detail level of image. default to “auto”.

Returns:

openai api compatible image format

Return type:

Dict[str, Any]

class agentopera.engine.types.Queue(maxsize: int = 0)[source]

Bases: _LoopBoundMixin, Generic[T]

qsize() int[source]

Number of items in the queue.

property maxsize: int

Number of items allowed in the queue.

empty() bool[source]

Return True if the queue is empty, False otherwise.

full() bool[source]

Return True if there are maxsize items in the queue.

Note: if the Queue was initialized with maxsize=0 (the default), then full() is never True.

async put(item: T) None[source]

Put an item into the queue.

Put an item into the queue. If the queue is full, wait until a free slot is available before adding item.

Raises QueueShutDown if the queue has been shut down.

put_nowait(item: T) None[source]

Put an item into the queue without blocking.

If no free slot is immediately available, raise QueueFull.

Raises QueueShutDown if the queue has been shut down.

async get() T[source]

Remove and return an item from the queue.

If queue is empty, wait until an item is available.

Raises QueueShutDown if the queue has been shut down and is empty, or if the queue has been shut down immediately.

get_nowait() T[source]

Remove and return an item from the queue.

Return an item if one is immediately available, else raise QueueEmpty.

Raises QueueShutDown if the queue has been shut down and is empty, or if the queue has been shut down immediately.

task_done() None[source]

Indicate that a formerly enqueued task is complete.

Used by queue consumers. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

shutdown(immediate=True) calls task_done() for each remaining item in the queue.

Raises ValueError if called more times than there were items placed in the queue.

async join() None[source]

Block until all items in the queue have been gotten and processed.

The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.

shutdown(immediate: bool = False) None[source]

Shut-down the queue, making queue gets and puts raise QueueShutDown.

By default, gets will only raise once the queue is empty. Set ‘immediate’ to True to make gets raise immediately instead.

All blocked callers of put() and get() will be unblocked. If ‘immediate’, a task is marked as done for each item remaining in the queue, which may unblock callers of join().

exception agentopera.engine.types.QueueShutDown[source]

Bases: Exception

Raised when putting on to or getting from a shut-down Queue.

class agentopera.engine.types.InMemoryStore[source]

Bases: CacheStore[T]

get(key: str, default: T | None = None) T | None[source]

Retrieve an item from the store.

Parameters:
  • key – The key identifying the item in the store.

  • default (optional) – The default value to return if the key is not found. Defaults to None.

Returns:

The value associated with the key if found, else the default value.

set(key: str, value: T) None[source]

Set an item in the store.

Parameters:
  • key – The key under which the item is to be stored.

  • value – The value to be stored in the store.