Source code for agentopera.router.user.context

from typing import Dict, List
import os
from dotenv import load_dotenv
from ..router_config import (
    INTENT_DESCRIPTIONS,
    AGENT_DESCRIPTIONS,
    AGENT_INTENT_MAPPING,
    INTENT_SCHEMA_OVERRIDES
)
from agentopera.router.intent_registry import IntentRegistry
from agentopera.router.agent_registry import AgentFactory, AgentRegistryBase
from agentopera.router.intent_classifier import IntentClassifierBase
from agentopera.ui.vercel_ai_request import UserAgentInfo
from agentopera.utils.logger import logger
from ..constants import DB_TIMEOUT

import httpx

load_dotenv()

LOCAL_LIST_DEV_AGENTS_API_URL = os.getenv('LOCAL_LIST_DEV_AGENTS_API_URL')
PUBLIC_AGENT_LIST_URL = os.getenv('PUBLIC_AGENT_LIST_URL')
USER_SUBSCRIBED_AGENT_URL = os.getenv("USER_SUBSCRIBED_AGENT_URL")

async def get_developer_agents_for_user(developer_id: str) -> Dict:
    """
    Fetches the list of developer-owned agents for a given user_id.

    """
    params = {
        "userId": developer_id,
        "name": "",
    }

    async with httpx.AsyncClient(timeout=DB_TIMEOUT) as client:
        response = await client.get(LOCAL_LIST_DEV_AGENTS_API_URL, params=params)

    if response.status_code != 200:
        raise RuntimeError(f"[get_developer_agents_for_user] HTTP {response.status_code}{response.text}")

    res = response.json()

    if res.get("code") != "SUCCESS":
        raise RuntimeError(f"[get_developer_agents_for_user] API error: {res.get('message')}")

    return res


async def get_public_agents() -> Dict:
    """
    Fetches the list of all public agents from the local backend.

    Returns:
        Dict: Response JSON containing public agent metadata.
    """
    async with httpx.AsyncClient(timeout=DB_TIMEOUT) as client:
        response = await client.get(PUBLIC_AGENT_LIST_URL)

    if response.status_code != 200:
        raise RuntimeError(f"[get_public_agents] HTTP {response.status_code}{response.text}")

    res = response.json()

    if res.get("code") != "SUCCESS":
        raise RuntimeError(f"[get_public_agents] API error: {res.get('message')}")

    return res


async def get_user_subscribed_agents(user_id: str) -> Dict:
    """
    Fetches the list of agents the user is subscribed to.

    Args:
        user_id (str): The ID of the user.

    Returns:
        Dict: JSON response containing subscribed agent information.
    """
    params = {
        "userId": user_id
    }

    async with httpx.AsyncClient(timeout=DB_TIMEOUT) as client:
        response = await client.get(USER_SUBSCRIBED_AGENT_URL, params=params)

    if response.status_code != 200:
        raise RuntimeError(f"[get_user_subscribed_agents] HTTP {response.status_code}{response.text}")

    res = response.json()

    if res.get("code") != "SUCCESS":
        raise RuntimeError(f"[get_user_subscribed_agents] API error: {res.get('message')}")

    return res


def convert_backend_agent_to_user_info(agent: dict) -> dict:
    return {
        "agent_url": agent.get("requestUrl", None),
        "is_output_streaming": agent.get("streaming", True),  # You may want to derive this from model
        "is_session_supported": agent.get("sessionSupported", False),
        "agent_name": agent.get("name", None),
        "agent_intent": agent.get("intent") or agent.get("details", None),
        "agent_description": agent.get("details", None),
        "agent_type": agent.get("category", agent.get("name", None)),
        "required_param": [],
        "agent_api_key": agent.get("apiKey", None),
        "required_payload_structure": {},
        "source": agent.get("source", 4),
        "system_prompt": agent.get("systemPrompt", None),
        "model_name": agent.get("llmModelName", None),
    }


[docs] class USER_CONTEXT: _metadata: Dict[str, List[UserAgentInfo]] = {} _intent: Dict[str, IntentRegistry] = {} _classifier: Dict[str, IntentClassifierBase] = {} _registry: Dict[str, AgentRegistryBase] = {}
[docs] @classmethod async def get_or_build(cls, user_id: str): # Always start from default base config default_intent_registry = AgentFactory.create_intent_registry( intent_descriptions=INTENT_DESCRIPTIONS, agent_descriptions=AGENT_DESCRIPTIONS, agent_intent_mapping=AGENT_INTENT_MAPPING, schema_overrides=INTENT_SCHEMA_OVERRIDES ) if user_id == "user" or not user_id.isdigit(): agent_registry = AgentFactory.create_agent_registry(default_intent_registry) classifier = AgentFactory.create_intent_classifier(default_intent_registry) return default_intent_registry, classifier, agent_registry # Fetch per-user data and map to UserAgentInfo await cls.get_agent_metadata(user_id) # Build dynamic intent registry from API data dynamic_registry = IntentRegistry.from_api_data(cls._metadata[user_id]) # Merge: dynamic overrides default merged_registry = default_intent_registry.merge(dynamic_registry) classifier = AgentFactory.create_intent_classifier(merged_registry) registry = AgentFactory.create_agent_registry(merged_registry) cls._intent[user_id] = merged_registry cls._classifier[user_id] = classifier cls._registry[user_id] = registry return merged_registry, classifier, registry
[docs] @classmethod async def get_agent_metadata(cls, user_id: str) -> List[UserAgentInfo]: """ Retrieves and caches agent metadata for the given user by combining: - Developer-owned agents - Subscribed agents If both sources contain agents with the same `agent_name`, the developer-owned version is preferred. Args: user_id (str): The ID of the user. Returns: List[UserAgentInfo]: A list of unique structured agent metadata. """ try: # Fetch both dev and subscribed agents dev_response = await get_developer_agents_for_user(user_id) subscribed_response = await get_user_subscribed_agents(user_id) dev_agents = dev_response.get("data", []) subscribed_agents = subscribed_response.get("data", []) # Index dev agents by name (priority) merged_agents: Dict[str, dict] = { agent["name"]: agent for agent in dev_agents } # Merge in subscribed agents if not already in dev agents for agent in subscribed_agents: name = agent["name"] if name not in merged_agents: merged_agents[name] = agent # Transform to UserAgentInfo agent_info_list = [ UserAgentInfo(**convert_backend_agent_to_user_info(agent)) for agent in merged_agents.values() ] # Cache and return cls._metadata[user_id] = agent_info_list logger.info(f"[get_agent_metadata] Final merged agent info for user '{user_id}': {agent_info_list}") return agent_info_list except Exception as e: logger.error(f"[get_agent_metadata] Error fetching agent metadata for user '{user_id}': {e}") raise