Source code for agentopera.router.user.user_manager

import json
from typing import Dict
from collections import defaultdict
from agentopera.engine.runtime import DistAgentEngine
from agentopera.router.agent_registry import create_agent_from_info
from agentopera.engine import DefaultSubscription
from agentopera.utils.logger import logger
from .context import USER_CONTEXT

[docs] class UserManager: _registered_user_agents: Dict[str, set] = defaultdict(set)
[docs] @classmethod async def ensure_agents_registered_for_user(cls, user_id: str, runtime: DistAgentEngine): agent_infos = await USER_CONTEXT.get_agent_metadata(user_id) # Pull built-in agent names directly from runtime's known agents builtin_agent_names = runtime._known_agent_names # Or adapt this if method is different for agent in agent_infos: agent_name = agent.agent_name # Skip if this is a built-in agent already registered at startup if agent_name in builtin_agent_names: logger.info(f"Skipping built-in agent: name={agent_name}") continue already_registered = agent_name in cls._registered_user_agents[user_id] if already_registered: logger.info(f"Updating dynamic agent: type={agent.agent_type}, name={agent_name}") await runtime.remove_agents_with_type(agent_type=agent_name) else: logger.info(f"Registering new dynamic agent: type={agent.agent_type}, name={agent_name}") # Register agent factory (overwrite if needed) await runtime.register_agent_builder(agent_name, lambda agent=agent: create_agent_from_info(agent)) logger.info(f"Registered dynamic agent: name={agent_name}, id={id(agent)}") if not already_registered: await runtime.subscribe(DefaultSubscription(topic=agent_name, agent_type=agent_name)) cls._registered_user_agents[user_id].add(agent_name)