Source code for agentopera.router.semantic_router_agent

import os
import uuid
from typing import Optional
from agentopera.chatflow.messages import TextMessage, MultiModalMessage

from agentopera.engine.types.msg_channel import DefaultMessageChannel
from agentopera.engine.types.msg_context import MessageContext
from agentopera.engine.agent import RoutedAgent, message_handler
from agentopera.utils.logger import logger

from .semantic_router_components import AgentRegistryBase, IntentClassifierBase
from .constants import AUTO_MODE_MODEL_INDENTIFIER, VERCEL_PROVIDED_GROUPS
from ..ui.vercel_ai_request import UserRequestMetadata

from .user.context import USER_CONTEXT
from agentopera.utils.logger import logger

ROUTER_TIMEOUT_IN_SECONDS = 50.0

from posthog import Posthog

[docs] class SemanticRouterAgent(RoutedAgent): def __init__(self, name: str) -> None: super().__init__("Semantic Router Agent") self._name = name self._initialized = False self.posthog = Posthog(project_api_key=os.getenv('NEXT_PUBLIC_POSTHOG_KEY'), host='https://us.i.posthog.com', enable_exception_autocapture=True)
[docs] async def initialize(self): agent_id = self.id self.user_id = agent_id.key intent, classifier, registry = await USER_CONTEXT.get_or_build(self.user_id) self._classifier = classifier self._registry = registry self._initialized = True logger.info(f"[Router Init] user={self.user_id}, intents={len(intent.intent_descriptions)}")
# The User has sent a message that needs to be routed
[docs] @message_handler async def route_to_agent(self, message: TextMessage | MultiModalMessage, ctx: MessageContext) -> None: if not self._initialized: await self.initialize() assert ctx.message_channel is not None # Rebuild context from user-specific config intent, classifier, registry = await USER_CONTEXT.get_or_build(self.user_id) self._classifier = classifier self._registry = registry logger.info(f"[Router Config Reloaded] user={self.user_id}, intents={len(intent.intent_descriptions)}") logger.info(f"[Intent Mapping] {self._classifier.intent_registry.agent_intent_mapping}") # each user has a unique session_id session_id = message.metadata.get("session_id") or str(uuid.uuid4()) # classify which agent to route the message agent_id = await self._determine_agent_id(message) # Log the selected agent id with session_id self.event_log(agent_id=agent_id, session_id=session_id) group = message.metadata.get("group") logger.info(f"Router {self.id} ===> agent_id = {agent_id}, session_id = {session_id}, group = {group}") await self.contact_agent(agent_id, message, ctx, session_id)
async def _determine_agent_id(self, message: TextMessage) -> str: """Determine the correct agent based on message metadata or intent.""" model = message.metadata.get("model") # Step 1: Check frontend specification frontend_agent = self._check_user_frontend_specification(message) if frontend_agent: return frontend_agent # Step 2: Check if a specific model is provided if model: if model != AUTO_MODE_MODEL_INDENTIFIER: # If a model is explicitly provided and is not "chainopera-default": # 1. If the model name is registered in the agent registry, use it as the agent_id. # 2. If the model is not registered, assume it is managed by the Vercel backend # and assign "vercel" as the agent_id. agents = await self._registry.get_agents() agent_id = model if model in agents else "vercel" # classify intent for group validation if agent_id == "vercel": intent = await self._identify_intent(message) message.metadata["group"] = self._determine_vercel_group(intent) return agent_id # Step 3: No specific model → Classify intent intent = await self._identify_intent(message) agent_id = await self._find_agent(intent) # Step 4: Update "group" for Vercel AI Agent if model == AUTO_MODE_MODEL_INDENTIFIER and agent_id == "vercel": message.metadata["group"] = self._determine_vercel_group(intent) return agent_id ## Identify the intent of the user message async def _identify_intent(self, message: TextMessage | MultiModalMessage) -> str: return await self._classifier.classify_intent(message, ROUTER_TIMEOUT_IN_SECONDS) ## Use a lookup, search, or LLM to identify the most relevant agent for the intent async def _find_agent(self, intent: str) -> str: logger.info(f"Identified intent: {intent}") try: agent = await self._registry.get_agent(intent) return agent except KeyError: logger.info("No relevant agent found for intent: " + intent) return "termination" def _check_user_frontend_specification(self, message: TextMessage) -> Optional[str]: """ Checks the frontend group specification for routing. If the user specifies the group as "extreme," the request is routed to "vercel." Args: message (TextMessage): The user message. Returns: Optional[str]: "vercel" if the group is "extreme," otherwise None. """ headers = UserRequestMetadata.decode_headers(message.metadata.get("headers", {})) if "x-chat-mode" in headers and headers["x-chat-mode"] == "reasoning": message.metadata['model'] = "chainopera-think" return "vercel" if message.metadata.get("group") == "extreme": return "vercel" return None def _determine_vercel_group(self, intent: str) -> str: """ Determines the correct group for Vercel-provided agents based on intent. - Special case: `deep_research_intent` → `"extreme"` - Other intents must be in `<group>_intent` format. - If valid, extracts `<group>` from `<group>_intent`. - If invalid, defaults to `"chat"`. Args: intent (str): The classified intent. Returns: str: The determined group. """ if intent == "deep_research_intent": return "extreme" if intent.endswith("_intent"): group = intent.rsplit("_", 1)[0] # Extract group before "_intent" if group in VERCEL_PROVIDED_GROUPS: return group logger.warning(f"Unexpected intent format or invalid group: {intent}. Defaulting to 'chat'.") return "chat" # Default fallback
[docs] def event_log(self, session_id: str, agent_id: str) -> None: """ Logs the agent selection event to PostHog. Args: session_id (str): The ID of the current customer session. agent_id (str): The ID of the agent handling the message. """ properties = { "$set": { "agent_id": agent_id, } } self.posthog.capture( distinct_id=f"/api/chat: {session_id}", event="chat", properties=properties )
## Forward user message to the appropriate agent, or end the thread.
[docs] async def contact_agent(self, agent_name: str, message: TextMessage | MultiModalMessage, ctx: MessageContext, session_id: str) -> None: logger.info("Routing to agent_type {}, content = {}".format(agent_name, message.content if len(message.content) < 100 else message.content[:100])) if isinstance(message, TextMessage): logger.info(f"routing text message with message id {ctx.message_id}") await self.publish_message( TextMessage( content=message.content, context=message.context, source=message.source, metadata=message.metadata), DefaultMessageChannel(topic=agent_name, source=session_id), message_id=ctx.message_id ) elif isinstance(message, MultiModalMessage): logger.info("routing to multi modal message") await self.publish_message( MultiModalMessage(content=message.content, source=message.source, metadata=message.metadata), DefaultMessageChannel(topic=agent_name, source=session_id), message_id=ctx.message_id, ) logger.info("publish_message is done for agent_type {}, message_id = {}".format(agent_name, ctx.message_id))