Source code for agentopera.engine.subscription.topic_subscription

import uuid

from ..types.agent import AgentId, AgentType
from .subscription import Subscription
from ..types.msg_channel import MessageChannel
from ..types.exceptions import CantHandleException


[docs] class TopicSubscription(Subscription): """This subscription matches on topics based on the topic and maps to agents using the source of the topic as the agent key. This subscription causes each source to have its own agent instance. Example: .. code-block:: python from agentopera.core import TopicSubscription subscription = TopicSubscription(topic="t1", agent_type="a1") In this case: - A message_channel with topic `t1` and source `s1` will be handled by an agent of type `a1` with key `s1` - A message_channel with topic `t1` and source `s2` will be handled by an agent of type `a1` with key `s2`. Args: topic (str): Topic to match against agent_type (str): Agent type to handle this subscription """ def __init__(self, topic: str, agent_type: str | AgentType, id: str | None = None): self._topic = topic if isinstance(agent_type, AgentType): self._agent_type = agent_type.type else: self._agent_type = agent_type self._id = id or str(uuid.uuid4()) @property def id(self) -> str: return self._id @property def topic(self) -> str: return self._topic @property def agent_type(self) -> str: return self._agent_type
[docs] def is_match(self, message_channel: MessageChannel) -> bool: return message_channel.topic == self._topic
[docs] def map_to_agent(self, message_channel: MessageChannel) -> AgentId: if not self.is_match(message_channel): raise CantHandleException("MessageChannel does not match the subscription") return AgentId(type=self._agent_type, key=message_channel.source)
def __eq__(self, other: object) -> bool: if not isinstance(other, TopicSubscription): return False return self.id == other.id or (self.agent_type == other.agent_type and self.topic == other.topic)