engine

class agentopera.engine.MessageSerializer(*args, **kwargs)[source]

Bases: Protocol[T]

property data_content_type: str
property type_name: str
deserialize(payload: bytes) T[source]
serialize(message: T) bytes[source]
class agentopera.engine.UnknownPayload(type_name: str, data_content_type: str, payload: bytes)[source]

Bases: object

type_name: str
data_content_type: str
payload: bytes
agentopera.engine.JSON_DATA_CONTENT_TYPE = 'application/json'

The content type for JSON data.

agentopera.engine.PROTOBUF_DATA_CONTENT_TYPE = 'application/x-protobuf'

The content type for Protobuf data.

agentopera.engine.ROOT_LOGGER_NAME = 'agentopera.core'

The name of the root logger.

agentopera.engine.EVENT_LOGGER_NAME = 'agentopera.core.events'

The name of the logger used for structured events.

agentopera.engine.TRACE_LOGGER_NAME = 'agentopera.core.trace'

Logger name used for developer intended trace logging. The content and format of this log should not be depended upon.

class agentopera.engine.DefaultSubscription(topic: str = 'default', agent_type: str | AgentType | None = None)[source]

Bases: TopicSubscription

The default subscription is designed to be a sensible default for applications that only need global scope for agents.

This topic by default uses the “default” topic type and attempts to detect the agent type to use based on the instantiation context.

Parameters:
  • topic (str, optional) – The topic to subscribe to. Defaults to “default”.

  • agent_type (str, optional) – The agent type to use for the subscription. Defaults to None, in which case it will attempt to detect the agent type based on the instantiation context.

class agentopera.engine.Subscription(*args, **kwargs)[source]

Bases: Protocol

Subscriptions define the topics that an agent is interested in.

property id: str

Get the ID of the subscription.

Implementations should return a unique ID for the subscription. Usually this is a UUID.

Returns:

ID of the subscription.

Return type:

str

is_match(message_channel: MessageChannel) bool[source]

Check if a given message_channel matches the subscription.

Parameters:

message_channel (MessageChannel) – MessageChannel to check.

Returns:

True if the message_channel matches the subscription, False otherwise.

Return type:

bool

map_to_agent(message_channel: MessageChannel) AgentId[source]

Map a message_channel to an agent. Should only be called if is_match returns True for the given message_channel.

Parameters:

message_channel (MessageChannel) – MessageChannel to map.

Returns:

ID of the agent that should handle the message_channel.

Return type:

AgentId

Raises:

CantHandleException – If the subscription cannot handle the message_channel.

class agentopera.engine.SubscriptionInstantiationContext[source]

Bases: object

classmethod agent_type() AgentType[source]
class agentopera.engine.TopicPrefixSubscription(topic_prefix: str, agent_type: str | AgentType, id: str | None = None)[source]

Bases: Subscription

This subscription matches on topics based on a prefix of the topic and maps to agents using the source of the topic as the agent key.

This subscription causes each source to have its own agent instance.

Example

from agentopera.core import TopicPrefixSubscription

subscription = TopicPrefixSubscription(topic_prefix="t1", agent_type="a1")

In this case:

  • A message_channel with topic t1 and source s1 will be handled by an agent of type a1 with key s1

  • A message_channel with topic t1 and source s2 will be handled by an agent of type a1 with key s2.

  • A message_channel with topic t1SUFFIX and source s2 will be handled by an agent of type a1 with key s2.

Parameters:
  • topic_prefix (str) – Topic prefix to match against

  • agent_type (str) – Agent type to handle this subscription

property id: str

Get the ID of the subscription.

Implementations should return a unique ID for the subscription. Usually this is a UUID.

Returns:

ID of the subscription.

Return type:

str

property topic_prefix: str
property agent_type: str
is_match(message_channel: MessageChannel) bool[source]

Check if a given message_channel matches the subscription.

Parameters:

message_channel (MessageChannel) – MessageChannel to check.

Returns:

True if the message_channel matches the subscription, False otherwise.

Return type:

bool

map_to_agent(message_channel: MessageChannel) AgentId[source]

Map a message_channel to an agent. Should only be called if is_match returns True for the given message_channel.

Parameters:

message_channel (MessageChannel) – MessageChannel to map.

Returns:

ID of the agent that should handle the message_channel.

Return type:

AgentId

Raises:

CantHandleException – If the subscription cannot handle the message_channel.

class agentopera.engine.TopicSubscription(topic: str, agent_type: str | AgentType, id: str | None = None)[source]

Bases: Subscription

This subscription matches on topics based on the topic and maps to agents using the source of the topic as the agent key.

This subscription causes each source to have its own agent instance.

Example

from agentopera.core import TopicSubscription

subscription = TopicSubscription(topic="t1", agent_type="a1")

In this case:

  • A message_channel with topic t1 and source s1 will be handled by an agent of type a1 with key s1

  • A message_channel with topic t1 and source s2 will be handled by an agent of type a1 with key s2.

Parameters:
  • topic (str) – Topic to match against

  • agent_type (str) – Agent type to handle this subscription

property id: str

Get the ID of the subscription.

Implementations should return a unique ID for the subscription. Usually this is a UUID.

Returns:

ID of the subscription.

Return type:

str

property topic: str
property agent_type: str
is_match(message_channel: MessageChannel) bool[source]

Check if a given message_channel matches the subscription.

Parameters:

message_channel (MessageChannel) – MessageChannel to check.

Returns:

True if the message_channel matches the subscription, False otherwise.

Return type:

bool

map_to_agent(message_channel: MessageChannel) AgentId[source]

Map a message_channel to an agent. Should only be called if is_match returns True for the given message_channel.

Parameters:

message_channel (MessageChannel) – MessageChannel to map.

Returns:

ID of the agent that should handle the message_channel.

Return type:

AgentId

Raises:

CantHandleException – If the subscription cannot handle the message_channel.

agentopera.engine.default_subscription(cls: Type[BaseAgentType] | None = None) Callable[[Type[BaseAgentType]], Type[BaseAgentType]] | Type[BaseAgentType][source]
agentopera.engine.topic_subscription(topic: str) Callable[[Type[BaseAgentType]], Type[BaseAgentType]][source]