import json
import os
import time
from typing import Any, Dict, List
import requests
from agentopera.engine.types.msg_context import MessageContext
from agentopera.engine.agent import message_handler
from agentopera.models.openai import OpenAIChatCompletionClient
from agentopera.chatflow.agents import AssistantAgent
from agentopera.chatflow.messages import TextMessage
from agentopera.engine.agent.routed_agent import RoutedAgent
from agentopera.engine.types.models import ModelInfo
from agentopera.engine.types.models import ModelFamily
from agentopera.engine.function_call import FunctionTool
from agentopera.utils.streaming_utils import stream_chunks_and_publish
from ...utils.tools.meme_trending_tool import fetch_trending_coins_with_descriptions
from ...utils.logger import logger
[docs]
class MemeTrendAgent(RoutedAgent):
"""A specialized agent for conducting meme trend analysis via an external API."""
def __init__(self, name: str) -> None:
super().__init__("Meme Trend Agent for meme trend analysis")
self.name = name
"""init the agent instance"""
# Initialize the model client
self.model_name = "Qwen2.5-72B-Instruct"
model_info: ModelInfo = {"vision": False, "function_calling": True, "json_output": False, "family": ModelFamily.UNKNOWN}
model_client = OpenAIChatCompletionClient(
base_url=os.getenv("TENSOROPERA_API_GATEWAY"),
api_key=os.environ['LLM_GATEWAY_KEY'],
model_info=model_info,
model=self.model_name,
temperature=0.6,
top_p=0.95,
)
trend_data_tool = FunctionTool(fetch_trending_coins_with_descriptions, description="Get sorted meme data from CoinGecko")
self.agent = AssistantAgent(
name="meme_trend_agent",
model_client=model_client,
system_message="You are a financial assistant specialized in cryptocurrency market analysis.\
You can only recommend coins at most 15, so if the user needs you recommend more than 15 coins, you should ONLY recommend 15 coins, that is number_of_coins = 15.\
and remember to tell the use that 'I only care about the top 15 coins for quality and relevance'.\
Always use the `fetch_trending_coins_with_descriptions` function with the parameter:\
number_of_coins (str): the number of cryptocurrencies to recommend based on the rules below, should be between 1 and 15.\
- When the user asks for coin recommendations **without specifying a number**, number_of_coins = 10 by default.\
- When the user asks recommendations for **BIGGER THAN 15 coins**, number_of_coins = 15, \
and then **YOU ONLY RECOMMEND THE TOP 15 INSTEAD OF THE NUMBER SPECIFIED BY THE USER**,\
and remember to tell the use that 'I only care about the top 15 coins for quality and relevance'.\
**ONLY recommend coins that in the tool result**\
**Analyze the top 3 coins and give the PROFESSIONAL recommendation for the user if the user only asks for the trending coins**\
**ONLY include the result Name and Description, do not include Price, Market Cap, Total Volume, etc in your response.**\
You need to start from 'Here are {number_of_coins} coins with their descriptions:'\
",
model_client_stream=True, # Enable streaming tokens from the model client.
tools=[trend_data_tool],
reflect_on_tool_use=True
)
# for user role, each user has a unique chat agent instance
logger.info(f"MemeTrendAgent initialized. agent id = {self.id}")
[docs]
@message_handler
async def my_message_handler(self, message: TextMessage, ctx: MessageContext) -> None:
"""Handles messages, performing deep research and streaming responses."""
assert ctx.message_channel is not None
logger.info(f"MemeTrendAgent received messages. model name = {self.model_name}")
messages = [message] # Ensure we at least process the latest message
session_id = message.metadata.get("session_id")
if not session_id:
raise ValueError("Message metadata is missing `session_id` field")
await stream_chunks_and_publish(self.agent, ctx, messages, self.publish_message, self.name, session_id, self.cancel_token)