Source code for agentopera.router.session.session

from datetime import datetime
from typing import Set
from pydantic import BaseModel, Field, ConfigDict
from agentopera.engine.types.agent.agent_id import AgentId
from agentopera.utils.logger import logger
[docs] class Session(BaseModel): session_id: str start_time: datetime = Field(default_factory=datetime.utcnow) last_updated: datetime = Field(default_factory=datetime.utcnow) agents_streaming: Set[AgentId] = Field(default_factory=set) model_config = ConfigDict(arbitrary_types_allowed=True)
[docs] def touch(self): """Update last active time.""" self.last_updated = datetime.utcnow()
[docs] def mark_agent_streaming(self, agent_id: AgentId): """Mark an agent as currently streaming.""" self.agents_streaming.add(agent_id) self.touch()
[docs] def unmark_agent_streaming(self, agent_id: AgentId): """Unmark an agent from live-streaming mode""" self.agents_streaming.discard(agent_id) logger.info(f"discarding agent {agent_id} from session, so now {self.agents_streaming}") self.touch()
[docs] def end_session(self): """Marks session as complete.""" self.touch()