from typing import Dict, List
import os
from dotenv import load_dotenv
from ..router_config import (
INTENT_DESCRIPTIONS,
AGENT_DESCRIPTIONS,
AGENT_INTENT_MAPPING,
INTENT_SCHEMA_OVERRIDES
)
from agentopera.router.intent_registry import IntentRegistry
from agentopera.router.agent_registry import AgentFactory, AgentRegistryBase
from agentopera.router.intent_classifier import IntentClassifierBase
from agentopera.ui.vercel_ai_request import UserAgentInfo
from agentopera.utils.logger import logger
from ..constants import DB_TIMEOUT
import httpx
load_dotenv()
LOCAL_LIST_DEV_AGENTS_API_URL = os.getenv('LOCAL_LIST_DEV_AGENTS_API_URL')
PUBLIC_AGENT_LIST_URL = os.getenv('PUBLIC_AGENT_LIST_URL')
USER_SUBSCRIBED_AGENT_URL = os.getenv("USER_SUBSCRIBED_AGENT_URL")
async def get_developer_agents_for_user(developer_id: str) -> Dict:
"""
Fetches the list of developer-owned agents for a given user_id.
"""
params = {
"userId": developer_id,
"name": "",
}
async with httpx.AsyncClient(timeout=DB_TIMEOUT) as client:
response = await client.get(LOCAL_LIST_DEV_AGENTS_API_URL, params=params)
if response.status_code != 200:
raise RuntimeError(f"[get_developer_agents_for_user] HTTP {response.status_code} — {response.text}")
res = response.json()
if res.get("code") != "SUCCESS":
raise RuntimeError(f"[get_developer_agents_for_user] API error: {res.get('message')}")
return res
async def get_public_agents() -> Dict:
"""
Fetches the list of all public agents from the local backend.
Returns:
Dict: Response JSON containing public agent metadata.
"""
async with httpx.AsyncClient(timeout=DB_TIMEOUT) as client:
response = await client.get(PUBLIC_AGENT_LIST_URL)
if response.status_code != 200:
raise RuntimeError(f"[get_public_agents] HTTP {response.status_code} — {response.text}")
res = response.json()
if res.get("code") != "SUCCESS":
raise RuntimeError(f"[get_public_agents] API error: {res.get('message')}")
return res
async def get_user_subscribed_agents(user_id: str) -> Dict:
"""
Fetches the list of agents the user is subscribed to.
Args:
user_id (str): The ID of the user.
Returns:
Dict: JSON response containing subscribed agent information.
"""
params = {
"userId": user_id
}
async with httpx.AsyncClient(timeout=DB_TIMEOUT) as client:
response = await client.get(USER_SUBSCRIBED_AGENT_URL, params=params)
if response.status_code != 200:
raise RuntimeError(f"[get_user_subscribed_agents] HTTP {response.status_code} — {response.text}")
res = response.json()
if res.get("code") != "SUCCESS":
raise RuntimeError(f"[get_user_subscribed_agents] API error: {res.get('message')}")
return res
def convert_backend_agent_to_user_info(agent: dict) -> dict:
return {
"agent_url": agent.get("requestUrl", None),
"is_output_streaming": agent.get("streaming", True), # You may want to derive this from model
"is_session_supported": agent.get("sessionSupported", False),
"agent_name": agent.get("name", None),
"agent_intent": agent.get("intent") or agent.get("details", None),
"agent_description": agent.get("details", None),
"agent_type": agent.get("category", agent.get("name", None)),
"required_param": [],
"agent_api_key": agent.get("apiKey", None),
"required_payload_structure": {},
"source": agent.get("source", 4),
"system_prompt": agent.get("systemPrompt", None),
"model_name": agent.get("llmModelName", None),
}
[docs]
class USER_CONTEXT:
_metadata: Dict[str, List[UserAgentInfo]] = {}
_intent: Dict[str, IntentRegistry] = {}
_classifier: Dict[str, IntentClassifierBase] = {}
_registry: Dict[str, AgentRegistryBase] = {}
[docs]
@classmethod
async def get_or_build(cls, user_id: str):
# Always start from default base config
default_intent_registry = AgentFactory.create_intent_registry(
intent_descriptions=INTENT_DESCRIPTIONS,
agent_descriptions=AGENT_DESCRIPTIONS,
agent_intent_mapping=AGENT_INTENT_MAPPING,
schema_overrides=INTENT_SCHEMA_OVERRIDES
)
if user_id == "user" or not user_id.isdigit():
agent_registry = AgentFactory.create_agent_registry(default_intent_registry)
classifier = AgentFactory.create_intent_classifier(default_intent_registry)
return default_intent_registry, classifier, agent_registry
# Fetch per-user data and map to UserAgentInfo
await cls.get_agent_metadata(user_id)
# Build dynamic intent registry from API data
dynamic_registry = IntentRegistry.from_api_data(cls._metadata[user_id])
# Merge: dynamic overrides default
merged_registry = default_intent_registry.merge(dynamic_registry)
classifier = AgentFactory.create_intent_classifier(merged_registry)
registry = AgentFactory.create_agent_registry(merged_registry)
cls._intent[user_id] = merged_registry
cls._classifier[user_id] = classifier
cls._registry[user_id] = registry
return merged_registry, classifier, registry
[docs]
@classmethod
async def get_agent_metadata(cls, user_id: str) -> List[UserAgentInfo]:
"""
Retrieves and caches agent metadata for the given user by combining:
- Developer-owned agents
- Subscribed agents
If both sources contain agents with the same `agent_name`, the developer-owned version is preferred.
Args:
user_id (str): The ID of the user.
Returns:
List[UserAgentInfo]: A list of unique structured agent metadata.
"""
try:
# Fetch both dev and subscribed agents
dev_response = await get_developer_agents_for_user(user_id)
subscribed_response = await get_user_subscribed_agents(user_id)
dev_agents = dev_response.get("data", [])
subscribed_agents = subscribed_response.get("data", [])
# Index dev agents by name (priority)
merged_agents: Dict[str, dict] = {
agent["name"]: agent for agent in dev_agents
}
# Merge in subscribed agents if not already in dev agents
for agent in subscribed_agents:
name = agent["name"]
if name not in merged_agents:
merged_agents[name] = agent
# Transform to UserAgentInfo
agent_info_list = [
UserAgentInfo(**convert_backend_agent_to_user_info(agent))
for agent in merged_agents.values()
]
# Cache and return
cls._metadata[user_id] = agent_info_list
logger.info(f"[get_agent_metadata] Final merged agent info for user '{user_id}': {agent_info_list}")
return agent_info_list
except Exception as e:
logger.error(f"[get_agent_metadata] Error fetching agent metadata for user '{user_id}': {e}")
raise