import os
from agentopera.engine.types.msg_channel import DefaultMessageChannel
from agentopera.engine.types.msg_context import MessageContext
from agentopera.engine.agent import RoutedAgent, message_handler
from agentopera.models.openai import OpenAIChatCompletionClient
from agentopera.chatflow.agents import AssistantAgent
from agentopera.chatflow.messages import TextMessage, StopMessage, ModelClientStreamingChunkEvent
from agentopera.chatflow.base import TaskResult
from agentopera.engine.agent.routed_agent import RoutedAgent
from agentopera.engine.types.models import ModelInfo
from agentopera.engine.types.models.model_client import ModelFamily
from agentopera.utils.server_utils import load_tools_from_mcp_server
from ..message_utils import parse_context_to_text_messages
from ...utils.logger import logger
SESSION_LIFE_TIME = 600 # 10 MINS
[docs]
class ChatAgent(RoutedAgent):
"""A specialized agent for conducting chat via an external API."""
def __init__(self, name: str) -> None:
super().__init__("Simple Chat Agent")
"""init the agent instance"""
self.name = name
self.model_name = "Qwen2.5-72B-Instruct"
model_info: ModelInfo = {
"vision": False,
"function_calling": True,
"json_output": False,
"family": ModelFamily.UNKNOWN,
}
# Create a model client instance
model_client = OpenAIChatCompletionClient(
base_url=os.getenv("TENSOROPERA_API_GATEWAY"),
api_key=os.getenv("LLM_GATEWAY_KEY"),
model=self.model_name,
model_info=model_info,
temperature=0.6,
top_p=0.95,
)
self.agent = AssistantAgent(
name="chat_agent",
model_client=model_client,
system_message="You are a helpful assistant. Your name is Opera AI. You are trained based on ChainOpera X foundation models. You should NOT say you are from DeepSeek or Alibaba Qwen. ",
model_client_stream=True,
use_context=False,
)
logger.info(f"ChatAgent initialized. agent = {name} id = {self.id}")
[docs]
@message_handler
async def my_message_handler(self, message: TextMessage, ctx: MessageContext) -> None:
"""Handles messages, performing deep research and streaming responses."""
assert ctx.message_channel is not None
try:
mcp_server_params = message.metadata.get("mcp_server_params") or []
if len(mcp_server_params) > 0:
tools = await load_tools_from_mcp_server(mcp_server_params)
self.agent.add_tools(tools)
logger.info(f"Successfully loaded and added {len(tools)} tools from MCP server.")
except Exception as e:
logger.error(f"Error while loading tools from MCP server or adding tools: {e}")
session_id = message.metadata.get("session_id")
if not session_id:
raise ValueError("Message metadata is missing `session_id` field")
if message.context:
messages = parse_context_to_text_messages(message.context)
else:
messages = [message] # Ensure we at least process the latest message
is_first_chunk = True
try:
async for chunk in self.agent.run_stream(task=messages, cancellation_token=self.cancel_token):
if isinstance(chunk, TextMessage):
await self.publish_message(
TextMessage(content=chunk.content, source=chunk.source, metadata={"session_id": session_id}),
message_channel=DefaultMessageChannel(topic="response", source=chunk.source),
message_id=ctx.message_id,
)
# Send the name of the agent
if is_first_chunk:
await self.publish_message(
TextMessage(content=self.name, source="agent_id", metadata={"session_id": session_id}),
message_channel=DefaultMessageChannel(topic="response", source=ctx.message_channel.topic),
message_id=ctx.message_id,
)
is_first_chunk = False
elif isinstance(chunk, ModelClientStreamingChunkEvent):
await self.publish_message(
ModelClientStreamingChunkEvent(content=chunk.content, source=ctx.message_channel.topic, metadata={"session_id": session_id}),
message_channel=DefaultMessageChannel(topic="response", source=ctx.message_channel.topic),
message_id=ctx.message_id,
)
elif isinstance(chunk, TaskResult):
await self.publish_message(
StopMessage(content=chunk.messages[-1].content, source=ctx.message_channel.topic, metadata={"session_id": session_id}),
message_channel=DefaultMessageChannel(topic="response", source=ctx.message_channel.source),
message_id=ctx.message_id,
)
else:
logger.info(f"Unexpected chunk type {type(chunk)}, chunk content {chunk}")
except Exception as e:
logger.info(f"Error in generate: {e}")