Source code for agentopera.router.workers.chat_agent

import os
from agentopera.engine.types.msg_channel import DefaultMessageChannel
from agentopera.engine.types.msg_context import MessageContext
from agentopera.engine.agent import RoutedAgent, message_handler
from agentopera.models.openai import OpenAIChatCompletionClient
from agentopera.chatflow.agents import AssistantAgent
from agentopera.chatflow.messages import TextMessage, StopMessage, ModelClientStreamingChunkEvent
from agentopera.chatflow.base import TaskResult
from agentopera.engine.agent.routed_agent import RoutedAgent
from agentopera.engine.types.models import ModelInfo
from agentopera.engine.types.models.model_client import ModelFamily
from agentopera.utils.server_utils import load_tools_from_mcp_server
from ..message_utils import parse_context_to_text_messages

from ...utils.logger import logger

SESSION_LIFE_TIME = 600 # 10 MINS 
[docs] class ChatAgent(RoutedAgent): """A specialized agent for conducting chat via an external API.""" def __init__(self, name: str) -> None: super().__init__("Simple Chat Agent") """init the agent instance""" self.name = name self.model_name = "Qwen2.5-72B-Instruct" model_info: ModelInfo = { "vision": False, "function_calling": True, "json_output": False, "family": ModelFamily.UNKNOWN, } # Create a model client instance model_client = OpenAIChatCompletionClient( base_url=os.getenv("TENSOROPERA_API_GATEWAY"), api_key=os.getenv("LLM_GATEWAY_KEY"), model=self.model_name, model_info=model_info, temperature=0.6, top_p=0.95, ) self.agent = AssistantAgent( name="chat_agent", model_client=model_client, system_message="You are a helpful assistant. Your name is Opera AI. You are trained based on ChainOpera X foundation models. You should NOT say you are from DeepSeek or Alibaba Qwen. ", model_client_stream=True, use_context=False, ) logger.info(f"ChatAgent initialized. agent = {name} id = {self.id}")
[docs] @message_handler async def my_message_handler(self, message: TextMessage, ctx: MessageContext) -> None: """Handles messages, performing deep research and streaming responses.""" assert ctx.message_channel is not None try: mcp_server_params = message.metadata.get("mcp_server_params") or [] if len(mcp_server_params) > 0: tools = await load_tools_from_mcp_server(mcp_server_params) self.agent.add_tools(tools) logger.info(f"Successfully loaded and added {len(tools)} tools from MCP server.") except Exception as e: logger.error(f"Error while loading tools from MCP server or adding tools: {e}") session_id = message.metadata.get("session_id") if not session_id: raise ValueError("Message metadata is missing `session_id` field") if message.context: messages = parse_context_to_text_messages(message.context) else: messages = [message] # Ensure we at least process the latest message is_first_chunk = True try: async for chunk in self.agent.run_stream(task=messages, cancellation_token=self.cancel_token): if isinstance(chunk, TextMessage): await self.publish_message( TextMessage(content=chunk.content, source=chunk.source, metadata={"session_id": session_id}), message_channel=DefaultMessageChannel(topic="response", source=chunk.source), message_id=ctx.message_id, ) # Send the name of the agent if is_first_chunk: await self.publish_message( TextMessage(content=self.name, source="agent_id", metadata={"session_id": session_id}), message_channel=DefaultMessageChannel(topic="response", source=ctx.message_channel.topic), message_id=ctx.message_id, ) is_first_chunk = False elif isinstance(chunk, ModelClientStreamingChunkEvent): await self.publish_message( ModelClientStreamingChunkEvent(content=chunk.content, source=ctx.message_channel.topic, metadata={"session_id": session_id}), message_channel=DefaultMessageChannel(topic="response", source=ctx.message_channel.topic), message_id=ctx.message_id, ) elif isinstance(chunk, TaskResult): await self.publish_message( StopMessage(content=chunk.messages[-1].content, source=ctx.message_channel.topic, metadata={"session_id": session_id}), message_channel=DefaultMessageChannel(topic="response", source=ctx.message_channel.source), message_id=ctx.message_id, ) else: logger.info(f"Unexpected chunk type {type(chunk)}, chunk content {chunk}") except Exception as e: logger.info(f"Error in generate: {e}")