import os
import uuid
from typing import Optional
from agentopera.chatflow.messages import TextMessage, MultiModalMessage
from agentopera.engine.types.msg_channel import DefaultMessageChannel
from agentopera.engine.types.msg_context import MessageContext
from agentopera.engine.agent import RoutedAgent, message_handler
from agentopera.utils.logger import logger
from .semantic_router_components import AgentRegistryBase, IntentClassifierBase
from .constants import AUTO_MODE_MODEL_INDENTIFIER, VERCEL_PROVIDED_GROUPS
from ..ui.vercel_ai_request import UserRequestMetadata
from .user.context import USER_CONTEXT
from agentopera.utils.logger import logger
ROUTER_TIMEOUT_IN_SECONDS = 50.0
from posthog import Posthog
[docs]
class SemanticRouterAgent(RoutedAgent):
def __init__(self, name: str) -> None:
super().__init__("Semantic Router Agent")
self._name = name
self._initialized = False
self.posthog = Posthog(project_api_key=os.getenv('NEXT_PUBLIC_POSTHOG_KEY'), host='https://us.i.posthog.com', enable_exception_autocapture=True)
[docs]
async def initialize(self):
agent_id = self.id
self.user_id = agent_id.key
intent, classifier, registry = await USER_CONTEXT.get_or_build(self.user_id)
self._classifier = classifier
self._registry = registry
self._initialized = True
logger.info(f"[Router Init] user={self.user_id}, intents={len(intent.intent_descriptions)}")
# The User has sent a message that needs to be routed
[docs]
@message_handler
async def route_to_agent(self, message: TextMessage | MultiModalMessage, ctx: MessageContext) -> None:
if not self._initialized:
await self.initialize()
assert ctx.message_channel is not None
# Rebuild context from user-specific config
intent, classifier, registry = await USER_CONTEXT.get_or_build(self.user_id)
self._classifier = classifier
self._registry = registry
logger.info(f"[Router Config Reloaded] user={self.user_id}, intents={len(intent.intent_descriptions)}")
logger.info(f"[Intent Mapping] {self._classifier.intent_registry.agent_intent_mapping}")
# each user has a unique session_id
session_id = message.metadata.get("session_id") or str(uuid.uuid4())
# classify which agent to route the message
agent_id = await self._determine_agent_id(message)
# Log the selected agent id with session_id
self.event_log(agent_id=agent_id, session_id=session_id)
group = message.metadata.get("group")
logger.info(f"Router {self.id} ===> agent_id = {agent_id}, session_id = {session_id}, group = {group}")
await self.contact_agent(agent_id, message, ctx, session_id)
async def _determine_agent_id(self, message: TextMessage) -> str:
"""Determine the correct agent based on message metadata or intent."""
model = message.metadata.get("model")
# Step 1: Check frontend specification
frontend_agent = self._check_user_frontend_specification(message)
if frontend_agent:
return frontend_agent
# Step 2: Check if a specific model is provided
if model:
if model != AUTO_MODE_MODEL_INDENTIFIER:
# If a model is explicitly provided and is not "chainopera-default":
# 1. If the model name is registered in the agent registry, use it as the agent_id.
# 2. If the model is not registered, assume it is managed by the Vercel backend
# and assign "vercel" as the agent_id.
agents = await self._registry.get_agents()
agent_id = model if model in agents else "vercel"
# classify intent for group validation
if agent_id == "vercel":
intent = await self._identify_intent(message)
message.metadata["group"] = self._determine_vercel_group(intent)
return agent_id
# Step 3: No specific model → Classify intent
intent = await self._identify_intent(message)
agent_id = await self._find_agent(intent)
# Step 4: Update "group" for Vercel AI Agent
if model == AUTO_MODE_MODEL_INDENTIFIER and agent_id == "vercel":
message.metadata["group"] = self._determine_vercel_group(intent)
return agent_id
## Identify the intent of the user message
async def _identify_intent(self, message: TextMessage | MultiModalMessage) -> str:
return await self._classifier.classify_intent(message, ROUTER_TIMEOUT_IN_SECONDS)
## Use a lookup, search, or LLM to identify the most relevant agent for the intent
async def _find_agent(self, intent: str) -> str:
logger.info(f"Identified intent: {intent}")
try:
agent = await self._registry.get_agent(intent)
return agent
except KeyError:
logger.info("No relevant agent found for intent: " + intent)
return "termination"
def _check_user_frontend_specification(self, message: TextMessage) -> Optional[str]:
"""
Checks the frontend group specification for routing.
If the user specifies the group as "extreme," the request is routed to "vercel."
Args:
message (TextMessage): The user message.
Returns:
Optional[str]: "vercel" if the group is "extreme," otherwise None.
"""
headers = UserRequestMetadata.decode_headers(message.metadata.get("headers", {}))
if "x-chat-mode" in headers and headers["x-chat-mode"] == "reasoning":
message.metadata['model'] = "chainopera-think"
return "vercel"
if message.metadata.get("group") == "extreme":
return "vercel"
return None
def _determine_vercel_group(self, intent: str) -> str:
"""
Determines the correct group for Vercel-provided agents based on intent.
- Special case: `deep_research_intent` → `"extreme"`
- Other intents must be in `<group>_intent` format.
- If valid, extracts `<group>` from `<group>_intent`.
- If invalid, defaults to `"chat"`.
Args:
intent (str): The classified intent.
Returns:
str: The determined group.
"""
if intent == "deep_research_intent":
return "extreme"
if intent.endswith("_intent"):
group = intent.rsplit("_", 1)[0] # Extract group before "_intent"
if group in VERCEL_PROVIDED_GROUPS:
return group
logger.warning(f"Unexpected intent format or invalid group: {intent}. Defaulting to 'chat'.")
return "chat" # Default fallback
[docs]
def event_log(self, session_id: str, agent_id: str) -> None:
"""
Logs the agent selection event to PostHog.
Args:
session_id (str): The ID of the current customer session.
agent_id (str): The ID of the agent handling the message.
"""
properties = {
"$set": {
"agent_id": agent_id,
}
}
self.posthog.capture(
distinct_id=f"/api/chat: {session_id}",
event="chat",
properties=properties
)
## Forward user message to the appropriate agent, or end the thread.