import uuid
from ..types.agent import AgentId, AgentType
from .subscription import Subscription
from ..types.msg_channel import MessageChannel
from ..types.exceptions import CantHandleException
[docs]
class TopicPrefixSubscription(Subscription):
"""This subscription matches on topics based on a prefix of the topic and maps to agents using the source of the topic as the agent key.
This subscription causes each source to have its own agent instance.
Example:
.. code-block:: python
from agentopera.core import TopicPrefixSubscription
subscription = TopicPrefixSubscription(topic_prefix="t1", agent_type="a1")
In this case:
- A message_channel with topic `t1` and source `s1` will be handled by an agent of type `a1` with key `s1`
- A message_channel with topic `t1` and source `s2` will be handled by an agent of type `a1` with key `s2`.
- A message_channel with topic `t1SUFFIX` and source `s2` will be handled by an agent of type `a1` with key `s2`.
Args:
topic_prefix (str): Topic prefix to match against
agent_type (str): Agent type to handle this subscription
"""
def __init__(self, topic_prefix: str, agent_type: str | AgentType, id: str | None = None):
self._topic_prefix = topic_prefix
if isinstance(agent_type, AgentType):
self._agent_type = agent_type.type
else:
self._agent_type = agent_type
self._id = id or str(uuid.uuid4())
@property
def id(self) -> str:
return self._id
@property
def topic_prefix(self) -> str:
return self._topic_prefix
@property
def agent_type(self) -> str:
return self._agent_type
[docs]
def is_match(self, message_channel: MessageChannel) -> bool:
return message_channel.topic.startswith(self._topic_prefix)
[docs]
def map_to_agent(self, message_channel: MessageChannel) -> AgentId:
if not self.is_match(message_channel):
raise CantHandleException("MessageChannel does not match the subscription")
return AgentId(type=self._agent_type, key=message_channel.source)
def __eq__(self, other: object) -> bool:
if not isinstance(other, TopicPrefixSubscription):
return False
return self.id == other.id or (
self.agent_type == other.agent_type and self.topic_prefix == other.topic_prefix
)