Source code for agentopera.engine.subscription.topic_prefix_subscription

import uuid

from ..types.agent import AgentId, AgentType
from .subscription import Subscription
from ..types.msg_channel import MessageChannel
from ..types.exceptions import CantHandleException


[docs] class TopicPrefixSubscription(Subscription): """This subscription matches on topics based on a prefix of the topic and maps to agents using the source of the topic as the agent key. This subscription causes each source to have its own agent instance. Example: .. code-block:: python from agentopera.core import TopicPrefixSubscription subscription = TopicPrefixSubscription(topic_prefix="t1", agent_type="a1") In this case: - A message_channel with topic `t1` and source `s1` will be handled by an agent of type `a1` with key `s1` - A message_channel with topic `t1` and source `s2` will be handled by an agent of type `a1` with key `s2`. - A message_channel with topic `t1SUFFIX` and source `s2` will be handled by an agent of type `a1` with key `s2`. Args: topic_prefix (str): Topic prefix to match against agent_type (str): Agent type to handle this subscription """ def __init__(self, topic_prefix: str, agent_type: str | AgentType, id: str | None = None): self._topic_prefix = topic_prefix if isinstance(agent_type, AgentType): self._agent_type = agent_type.type else: self._agent_type = agent_type self._id = id or str(uuid.uuid4()) @property def id(self) -> str: return self._id @property def topic_prefix(self) -> str: return self._topic_prefix @property def agent_type(self) -> str: return self._agent_type
[docs] def is_match(self, message_channel: MessageChannel) -> bool: return message_channel.topic.startswith(self._topic_prefix)
[docs] def map_to_agent(self, message_channel: MessageChannel) -> AgentId: if not self.is_match(message_channel): raise CantHandleException("MessageChannel does not match the subscription") return AgentId(type=self._agent_type, key=message_channel.source)
def __eq__(self, other: object) -> bool: if not isinstance(other, TopicPrefixSubscription): return False return self.id == other.id or ( self.agent_type == other.agent_type and self.topic_prefix == other.topic_prefix )