engine.types
engine.types.agent
- class agentopera.engine.types.agent.Agent(*args, **kwargs)[source]
Bases:
Protocol
- property metadata: AgentMetadata
Metadata of the agent.
- async on_message(message: Any, ctx: MessageContext) Any [source]
Message handler for the agent. This should only be called by the engine, not by other agents.
- Parameters:
message (Any) – Received message. Type is one of the types in subscriptions.
ctx (MessageContext) – Context of the message.
- Returns:
Response to the message. Can be None.
- Return type:
Any
- Raises:
asyncio.CancelledError – If the message was cancelled.
CantHandleException – If the agent cannot handle the message.
- async save_state() Mapping[str, Any] [source]
Save the state of the agent. The result must be JSON serializable.
- class agentopera.engine.types.agent.AgentId(type: str | AgentType, key: str)[source]
Bases:
object
Agent ID uniquely identifies an agent instance within an agent engine - including distributed engine. It is the ‘address’ of the agent instance for receiving messages.
- classmethod from_str(agent_id: str) Self [source]
Convert a string of the format
type/key
into an AgentId
- property type: str
An identifier that associates an agent with a specific factory function.
Strings may only be composed of alphanumeric letters (a-z) and (0-9), or underscores (_).
- property key: str
Agent instance identifier.
Strings may only be composed of alphanumeric letters (a-z) and (0-9), or underscores (_).
- class agentopera.engine.types.agent.AgentType(type: str)[source]
Bases:
object
- type: str
String representation of this agent type.
- class agentopera.engine.types.agent.AgentMetadata[source]
Bases:
TypedDict
- type: str
- key: str
- description: str
- class agentopera.engine.types.agent.CancellationToken[source]
Bases:
object
A token used to cancel pending async calls
- class agentopera.engine.types.agent.AgentInstantiationContext[source]
Bases:
object
A static class that provides context for agent instantiation.
This static class can be used to access the current engine and agent ID during agent instantiation – inside the factory function or the agent’s class constructor.
Example
Get the current engine and agent ID inside the factory function and the agent’s constructor:
import asyncio from dataclasses import dataclass from agentopera.core import ( AgentId, AgentInstantiationContext, MessageContext, RoutedAgent, LocalAgentEngine, message_handler, ) @dataclass class TestMessage: content: str class TestAgent(RoutedAgent): def __init__(self, description: str): super().__init__(description) # Get the current engine -- we don't use it here, but it's available. _ = AgentInstantiationContext.current_engine() # Get the current agent ID. agent_id = AgentInstantiationContext.current_agent_id() print(f"Current AgentID from constructor: {agent_id}") @message_handler async def handle_test_message(self, message: TestMessage, ctx: MessageContext) -> None: print(f"Received message: {message.content}") def test_agent_factory() -> TestAgent: # Get the current engine -- we don't use it here, but it's available. _ = AgentInstantiationContext.current_engine() # Get the current agent ID. agent_id = AgentInstantiationContext.current_agent_id() print(f"Current AgentID from factory: {agent_id}") return TestAgent(description="Test agent") async def main() -> None: # Create a LocalAgentEngine instance. agent_engine = LocalAgentEngine() # Start the engine. agent_engine.start() # Register the agent type with a factory function. await agent_engine.register_agent_builder("test_agent", test_agent_factory) # Send a message to the agent. The engine will instantiate the agent and call the message handler. await agent_engine.send_message(TestMessage(content="Hello, world!"), AgentId("test_agent", "default")) # Stop the engine. await agent_engine.stop() asyncio.run(main())
- classmethod current_engine() AgentEngine [source]
engine.types.model_context
- class agentopera.engine.types.model_context.ChatCompletionContext(initial_messages: List[SystemMessage | UserMessage | AssistantMessage | FunctionExecutionResultMessage | VercelMessage] | None = None)[source]
Bases:
ABC
An abstract base class for defining the interface of a chat completion context. A chat completion context lets agents store and retrieve LLM messages. It can be implemented with different recall strategies.
- Parameters:
initial_messages (List[LLMMessage] | None) – The initial messages.
Example
To create a custom model context that filters out the thought field from AssistantMessage. This is useful for reasoning models like DeepSeek R1, which produces very long thought that is not needed for subsequent completions.
from typing import List from agentopera.core.model_context import UnboundedChatCompletionContext from agentopera.core.types.models import AssistantMessage, LLMMessage class ReasoningModelContext(UnboundedChatCompletionContext): """A model context for reasoning models.""" async def get_messages(self) -> List[LLMMessage]: messages = await super().get_messages() # Filter out thought field from AssistantMessage. messages_out: List[LLMMessage] = [] for message in messages: if isinstance(message, AssistantMessage): message.thought = None messages_out.append(message) return messages_out
- async add_message(message: SystemMessage | UserMessage | AssistantMessage | FunctionExecutionResultMessage | VercelMessage) None [source]
Add a message to the context.
- abstract async get_messages() List[SystemMessage | UserMessage | AssistantMessage | FunctionExecutionResultMessage | VercelMessage] [source]
- class agentopera.engine.types.model_context.ChatCompletionContextState(*, messages: ~typing.List[~agentopera.engine.types.models.types.SystemMessage | ~agentopera.engine.types.models.types.UserMessage | ~agentopera.engine.types.models.types.AssistantMessage | ~agentopera.engine.types.models.types.FunctionExecutionResultMessage | ~agentopera.engine.types.models.types.VercelMessage] = <factory>)[source]
Bases:
BaseModel
- messages: List[SystemMessage | UserMessage | AssistantMessage | FunctionExecutionResultMessage | VercelMessage]
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class agentopera.engine.types.model_context.BufferedChatCompletionContext(buffer_size: int, initial_messages: List[SystemMessage | UserMessage | AssistantMessage | FunctionExecutionResultMessage | VercelMessage] | None = None)[source]
Bases:
ChatCompletionContext
A buffered chat completion context that keeps a view of the last n messages, where n is the buffer size. The buffer size is set at initialization.
- Parameters:
buffer_size (int) – The size of the buffer.
initial_messages (List[LLMMessage] | None) – The initial messages.
- async get_messages() List[SystemMessage | UserMessage | AssistantMessage | FunctionExecutionResultMessage | VercelMessage] [source]
Get at most buffer_size recent messages.
- class agentopera.engine.types.model_context.HeadAndTailChatCompletionContext(head_size: int, tail_size: int, initial_messages: List[SystemMessage | UserMessage | AssistantMessage | FunctionExecutionResultMessage | VercelMessage] | None = None)[source]
Bases:
ChatCompletionContext
A chat completion context that keeps a view of the first n and last m messages, where n is the head size and m is the tail size. The head and tail sizes are set at initialization.
- Parameters:
head_size (int) – The size of the head.
tail_size (int) – The size of the tail.
initial_messages (List[LLMMessage] | None) – The initial messages.
- async get_messages() List[SystemMessage | UserMessage | AssistantMessage | FunctionExecutionResultMessage | VercelMessage] [source]
Get at most head_size recent messages and tail_size oldest messages.
- class agentopera.engine.types.model_context.UnboundedChatCompletionContext(initial_messages: List[SystemMessage | UserMessage | AssistantMessage | FunctionExecutionResultMessage | VercelMessage] | None = None)[source]
Bases:
ChatCompletionContext
An unbounded chat completion context that keeps a view of the all the messages.
- async get_messages() List[SystemMessage | UserMessage | AssistantMessage | FunctionExecutionResultMessage | VercelMessage] [source]
Get at most buffer_size recent messages.
engine.types.models
- class agentopera.engine.types.models.AssistantMessage(*, content: str | List[FunctionCall], thought: str | None = None, source: str, type: Literal['AssistantMessage'] = 'AssistantMessage')[source]
Bases:
BaseModel
Assistant message are sampled from the language model.
- content: str | List[FunctionCall]
The content of the message.
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- source: str
The name of the agent that sent this message.
- thought: str | None
The reasoning text for the completion if available. Used for reasoning model and additional text content besides function calls.
- type: Literal['AssistantMessage']
- class agentopera.engine.types.models.ChatCompletionClient[source]
Bases:
ABC
- abstract actual_usage() RequestUsage [source]
- abstract property capabilities: ModelCapabilities
- abstract count_tokens(messages: Sequence[SystemMessage | UserMessage | AssistantMessage | FunctionExecutionResultMessage | VercelMessage], *, tools: Sequence[Tool | ToolSchema] = []) int [source]
- abstract async create(messages: Sequence[SystemMessage | UserMessage | AssistantMessage | FunctionExecutionResultMessage | VercelMessage], *, tools: Sequence[Tool | ToolSchema] = [], json_output: bool | None = None, extra_create_args: Mapping[str, Any] = {}, cancellation_token: CancellationToken | None = None) CreateResult [source]
- abstract create_stream(messages: Sequence[SystemMessage | UserMessage | AssistantMessage | FunctionExecutionResultMessage | VercelMessage], *, tools: Sequence[Tool | ToolSchema] = [], json_output: bool | None = None, extra_create_args: Mapping[str, Any] = {}, cancellation_token: CancellationToken | None = None) AsyncGenerator[str | CreateResult, None] [source]
- abstract remaining_tokens(messages: Sequence[SystemMessage | UserMessage | AssistantMessage | FunctionExecutionResultMessage | VercelMessage], *, tools: Sequence[Tool | ToolSchema] = []) int [source]
- abstract total_usage() RequestUsage [source]
- class agentopera.engine.types.models.ChatCompletionTokenLogprob(*, token: str, logprob: float, top_logprobs: List[TopLogprob] | None = None, bytes: List[int] | None = None)[source]
Bases:
BaseModel
- bytes: List[int] | None
- logprob: float
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- token: str
- top_logprobs: List[TopLogprob] | None
- class agentopera.engine.types.models.CreateResult(*, finish_reason: Literal['stop', 'length', 'function_calls', 'content_filter', 'unknown'], content: str | List[FunctionCall], usage: RequestUsage, cached: bool, logprobs: List[ChatCompletionTokenLogprob] | None = None, thought: str | None = None)[source]
Bases:
BaseModel
Create result contains the output of a model completion.
- cached: bool
Whether the completion was generated from a cached response.
- content: str | List[FunctionCall]
The output of the model completion.
- finish_reason: Literal['stop', 'length', 'function_calls', 'content_filter', 'unknown']
The reason the model finished generating the completion.
- logprobs: List[ChatCompletionTokenLogprob] | None
The logprobs of the tokens in the completion.
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- thought: str | None
The reasoning text for the completion if available. Used for reasoning models and additional text content besides function calls.
- usage: RequestUsage
The usage of tokens in the prompt and completion.
- class agentopera.engine.types.models.FunctionExecutionResult(*, content: str, name: str, call_id: str, is_error: bool | None = None)[source]
Bases:
BaseModel
Function execution result contains the output of a function call.
- call_id: str
The ID of the function call. Note this ID may be empty for some models.
- content: str
The output of the function call.
- is_error: bool | None
Whether the function call resulted in an error.
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- name: str
(New in v0.4.8) The name of the function that was called.
- class agentopera.engine.types.models.FunctionExecutionResultMessage(*, content: List[FunctionExecutionResult], type: Literal['FunctionExecutionResultMessage'] = 'FunctionExecutionResultMessage')[source]
Bases:
BaseModel
Function execution result message contains the output of multiple function calls.
- content: List[FunctionExecutionResult]
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- type: Literal['FunctionExecutionResultMessage']
- class agentopera.engine.types.models.ModelCapabilities(**kwargs)[source]
Bases:
TypedDict
- function_calling: Required[bool]
- json_output: Required[bool]
- vision: Required[bool]
- class agentopera.engine.types.models.ModelFamily(*args: Any, **kwargs: Any)[source]
Bases:
object
A model family is a group of models that share similar characteristics from a capabilities perspective. This is different to discrete supported features such as vision, function calling, and JSON output.
This namespace class holds constants for the model families that agentopera understands. Other families definitely exist and can be represented by a string, however, agentopera will treat them as unknown.
- ANY
alias of
Literal
[‘gpt-4o’, ‘o1’, ‘o3’, ‘gpt-4’, ‘gpt-35’, ‘r1’, ‘gemini-1.5-flash’, ‘gemini-1.5-pro’, ‘gemini-2.0-flash’, ‘claude-3-haiku’, ‘claude-3-sonnet’, ‘claude-3-opus’, ‘claude-3.5-haiku’, ‘claude-3.5-sonnet’, ‘unknown’]
- CLAUDE_3_5_HAIKU = 'claude-3.5-haiku'
- CLAUDE_3_5_SONNET = 'claude-3.5-sonnet'
- CLAUDE_3_7_SONNET = 'claude-3.7-sonnet'
- CLAUDE_3_HAIKU = 'claude-3-haiku'
- CLAUDE_3_OPUS = 'claude-3-opus'
- CLAUDE_3_SONNET = 'claude-3-sonnet'
- GEMINI_1_5_FLASH = 'gemini-1.5-flash'
- GEMINI_1_5_PRO = 'gemini-1.5-pro'
- GEMINI_2_0_FLASH = 'gemini-2.0-flash'
- GPT_35 = 'gpt-35'
- GPT_4 = 'gpt-4'
- GPT_4O = 'gpt-4o'
- O1 = 'o1'
- O3 = 'o3'
- R1 = 'r1'
- UNKNOWN = 'unknown'
- class agentopera.engine.types.models.ModelInfo[source]
Bases:
TypedDict
ModelInfo is a dictionary that contains information about a model’s properties. It is expected to be used in the model_info property of a model client.
We are expecting this to grow over time as we add more features.
- family: Required[Literal['gpt-4o', 'o1', 'o3', 'gpt-4', 'gpt-35', 'r1', 'gemini-1.5-flash', 'gemini-1.5-pro', 'gemini-2.0-flash', 'claude-3-haiku', 'claude-3-sonnet', 'claude-3-opus', 'claude-3.5-haiku', 'claude-3.5-sonnet', 'unknown'] | str]
Model family should be one of the constants from
ModelFamily
or a string representing an unknown model family.
- function_calling: Required[bool]
True if the model supports function calling, otherwise False.
- json_output: Required[bool]
this is different to structured json.
- Type:
True if the model supports json output, otherwise False. Note
- vision: Required[bool]
True if the model supports vision, aka image input, otherwise False.
- class agentopera.engine.types.models.RequestUsage(prompt_tokens: int, completion_tokens: int)[source]
Bases:
object
- completion_tokens: int
- prompt_tokens: int
- class agentopera.engine.types.models.SystemMessage(*, content: str, type: Literal['SystemMessage'] = 'SystemMessage')[source]
Bases:
BaseModel
System message contains instructions for the model coming from the developer.
Note
Open AI is moving away from using ‘system’ role in favor of ‘developer’ role. See Model Spec for more details. However, the ‘system’ role is still allowed in their API and will be automatically converted to ‘developer’ role on the server side. So, you can use SystemMessage for developer messages.
- content: str
The content of the message.
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- type: Literal['SystemMessage']
- class agentopera.engine.types.models.TopLogprob(logprob: float, bytes: List[int] | None = None)[source]
Bases:
object
- bytes: List[int] | None = None
- logprob: float
- class agentopera.engine.types.models.UserMessage(*, content: str | List[str | Image], source: str, type: Literal['UserMessage'] = 'UserMessage')[source]
Bases:
BaseModel
User message contains input from end users, or a catch-all for data provided to the model.
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- source: str
The name of the agent that sent this message.
- type: Literal['UserMessage']
- class agentopera.engine.types.models.VercelMessage(*, role: str, content: str, parts: List[VercelMessagePart], type: Literal['VercelMessage'] = 'VercelMessage')[source]
Bases:
BaseModel
Vercel message is a message supported by vercel ai sdk.
- content: str
The content of the message.
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- parts: List[VercelMessagePart]
The parts of the message.
- role: str
The role of the message.
- type: Literal['VercelMessage']
engine.types.msg_channel
- class agentopera.engine.types.msg_channel.DefaultMessageChannel(topic: str = 'default', source: str | None = None)[source]
Bases:
MessageChannel
DefaultMessageChannel provides a sensible default for the topic and source fields of a MessageChannel.
If created in the context of a message handler, the source will be set to the agent_id of the message handler, otherwise it will be set to “default”.
- Parameters:
topic (str, optional) – Topic to publish message to. Defaults to “default”.
source (str | None, optional) – Topic source to publish message to. If None, the source will be set to the agent_id of the message handler if in the context of a message handler, otherwise it will be set to “default”. Defaults to None.
- class agentopera.engine.types.msg_channel.MessageChannel(topic: str, source: str)[source]
Bases:
object
MessageChannel defines the scope of a broadcast message. In essence, agent engine implements a publish-subscribe model through its broadcast API: when publishing a message, the topic must be specified.
See here for more information: topic_and_subscription_topic
- classmethod from_str(message_channel: str) Self [source]
Convert a string of the format
topic/source
into a MessageChannel
- source: str
Identifies the context in which an event happened. Adhere’s to the cloud event spec.
Learn more here: https://github.com/cloudevents/spec/blob/main/cloudevents/spec.md#source-1
- topic: str
Topic of the event that this message channel contains. Adhere’s to the cloud event spec.
Must match the pattern: ^[w-.:=]+Z
Learn more here: https://github.com/cloudevents/spec/blob/main/cloudevents/spec.md#type
engine.types.msg_context
- class agentopera.engine.types.msg_context.MessageContext(sender: agentopera.engine.types.agent.agent_id.AgentId | None, message_channel: agentopera.engine.types.msg_channel.message_channel.MessageChannel | None, is_rpc: bool, cancellation_token: agentopera.engine.types.agent.cancellation_token.CancellationToken, message_id: str)[source]
Bases:
object
- cancellation_token: CancellationToken
- is_rpc: bool
- message_channel: MessageChannel | None
- message_id: str
others
- class agentopera.engine.types.CacheStore[source]
Bases:
ABC
,Generic
[T
]This protocol defines the basic interface for store/cache operations.
Sub-classes should handle the lifecycle of underlying storage.
- abstract get(key: str, default: T | None = None) T | None [source]
Retrieve an item from the store.
- Parameters:
key – The key identifying the item in the store.
default (optional) – The default value to return if the key is not found. Defaults to None.
- Returns:
The value associated with the key if found, else the default value.
- class agentopera.engine.types.Image[source]
Bases:
object
abstract base class, defines the interface of image object.
all image implementations must implement this protocol to ensure compatibility with the message system.
- classmethod from_uri(uri: str) Image [source]
create image object from image uri.
- Parameters:
uri (str) – image uri, usually in data uri format
- Returns:
image object
- Return type:
BaseImage
- Raises:
ValueError – if the uri format is invalid
- classmethod from_base64(base64_str: str) Image [source]
create image object from base64 encoded string.
- Parameters:
base64_str (str) – base64 encoded string of image
- Returns:
image object
- Return type:
BaseImage
- classmethod from_file(file_path: Path) Image [source]
create image object from file path.
- Parameters:
file_path (Path) – path of image file
- Returns:
image object
- Return type:
BaseImage
- to_base64() str [source]
convert image to base64 encoded string.
- Returns:
base64 encoded string of image
- Return type:
str
- property data_uri: str
get data uri of image.
- Returns:
data uri of image
- Return type:
str
- to_openai_format(detail: Literal['auto', 'low', 'high'] = 'auto') Dict[str, Any] [source]
convert image to openai api compatible format.
- Parameters:
detail (Literal["auto", "low", "high"], optional) – detail level of image. default to “auto”.
- Returns:
openai api compatible image format
- Return type:
Dict[str, Any]
- class agentopera.engine.types.Queue(maxsize: int = 0)[source]
Bases:
_LoopBoundMixin
,Generic
[T
]- property maxsize: int
Number of items allowed in the queue.
- full() bool [source]
Return True if there are maxsize items in the queue.
Note: if the Queue was initialized with maxsize=0 (the default), then full() is never True.
- async put(item: T) None [source]
Put an item into the queue.
Put an item into the queue. If the queue is full, wait until a free slot is available before adding item.
Raises QueueShutDown if the queue has been shut down.
- put_nowait(item: T) None [source]
Put an item into the queue without blocking.
If no free slot is immediately available, raise QueueFull.
Raises QueueShutDown if the queue has been shut down.
- async get() T [source]
Remove and return an item from the queue.
If queue is empty, wait until an item is available.
Raises QueueShutDown if the queue has been shut down and is empty, or if the queue has been shut down immediately.
- get_nowait() T [source]
Remove and return an item from the queue.
Return an item if one is immediately available, else raise QueueEmpty.
Raises QueueShutDown if the queue has been shut down and is empty, or if the queue has been shut down immediately.
- task_done() None [source]
Indicate that a formerly enqueued task is complete.
Used by queue consumers. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.
If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).
shutdown(immediate=True) calls task_done() for each remaining item in the queue.
Raises ValueError if called more times than there were items placed in the queue.
- async join() None [source]
Block until all items in the queue have been gotten and processed.
The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.
- shutdown(immediate: bool = False) None [source]
Shut-down the queue, making queue gets and puts raise QueueShutDown.
By default, gets will only raise once the queue is empty. Set ‘immediate’ to True to make gets raise immediately instead.
All blocked callers of put() and get() will be unblocked. If ‘immediate’, a task is marked as done for each item remaining in the queue, which may unblock callers of join().
- exception agentopera.engine.types.QueueShutDown[source]
Bases:
Exception
Raised when putting on to or getting from a shut-down Queue.
- class agentopera.engine.types.InMemoryStore[source]
Bases:
CacheStore
[T
]- get(key: str, default: T | None = None) T | None [source]
Retrieve an item from the store.
- Parameters:
key – The key identifying the item in the store.
default (optional) – The default value to return if the key is not found. Defaults to None.
- Returns:
The value associated with the key if found, else the default value.