import json
from typing import Dict
from collections import defaultdict
from agentopera.engine.runtime import DistAgentEngine
from agentopera.router.agent_registry import create_agent_from_info
from agentopera.engine import DefaultSubscription
from agentopera.utils.logger import logger
from .context import USER_CONTEXT
[docs]
class UserManager:
_registered_user_agents: Dict[str, set] = defaultdict(set)
[docs]
@classmethod
async def ensure_agents_registered_for_user(cls, user_id: str, runtime: DistAgentEngine):
agent_infos = await USER_CONTEXT.get_agent_metadata(user_id)
# Pull built-in agent names directly from runtime's known agents
builtin_agent_names = runtime._known_agent_names # Or adapt this if method is different
for agent in agent_infos:
agent_name = agent.agent_name
# Skip if this is a built-in agent already registered at startup
if agent_name in builtin_agent_names:
logger.info(f"Skipping built-in agent: name={agent_name}")
continue
already_registered = agent_name in cls._registered_user_agents[user_id]
if already_registered:
logger.info(f"Updating dynamic agent: type={agent.agent_type}, name={agent_name}")
await runtime.remove_agents_with_type(agent_type=agent_name)
else:
logger.info(f"Registering new dynamic agent: type={agent.agent_type}, name={agent_name}")
# Register agent factory (overwrite if needed)
await runtime.register_agent_builder(agent_name, lambda agent=agent: create_agent_from_info(agent))
logger.info(f"Registered dynamic agent: name={agent_name}, id={id(agent)}")
if not already_registered:
await runtime.subscribe(DefaultSubscription(topic=agent_name, agent_type=agent_name))
cls._registered_user_agents[user_id].add(agent_name)