Source code for agentopera.router.workers.meme_trend_agent

import json
import os
import time
from typing import Any, Dict, List

import requests
from agentopera.engine.types.msg_context import MessageContext
from agentopera.engine.agent import message_handler
from agentopera.models.openai import OpenAIChatCompletionClient
from agentopera.chatflow.agents import AssistantAgent
from agentopera.chatflow.messages import TextMessage
from agentopera.engine.agent.routed_agent import RoutedAgent
from agentopera.engine.types.models import ModelInfo
from agentopera.engine.types.models import ModelFamily

from agentopera.engine.function_call import FunctionTool
from agentopera.utils.streaming_utils import stream_chunks_and_publish
from ...utils.tools.meme_trending_tool import fetch_trending_coins_with_descriptions


from ...utils.logger import logger


[docs] class MemeTrendAgent(RoutedAgent): """A specialized agent for conducting meme trend analysis via an external API.""" def __init__(self, name: str) -> None: super().__init__("Meme Trend Agent for meme trend analysis") self.name = name """init the agent instance""" # Initialize the model client self.model_name = "Qwen2.5-72B-Instruct" model_info: ModelInfo = {"vision": False, "function_calling": True, "json_output": False, "family": ModelFamily.UNKNOWN} model_client = OpenAIChatCompletionClient( base_url=os.getenv("TENSOROPERA_API_GATEWAY"), api_key=os.environ['LLM_GATEWAY_KEY'], model_info=model_info, model=self.model_name, temperature=0.6, top_p=0.95, ) trend_data_tool = FunctionTool(fetch_trending_coins_with_descriptions, description="Get sorted meme data from CoinGecko") self.agent = AssistantAgent( name="meme_trend_agent", model_client=model_client, system_message="You are a financial assistant specialized in cryptocurrency market analysis.\ You can only recommend coins at most 15, so if the user needs you recommend more than 15 coins, you should ONLY recommend 15 coins, that is number_of_coins = 15.\ and remember to tell the use that 'I only care about the top 15 coins for quality and relevance'.\ Always use the `fetch_trending_coins_with_descriptions` function with the parameter:\ number_of_coins (str): the number of cryptocurrencies to recommend based on the rules below, should be between 1 and 15.\ - When the user asks for coin recommendations **without specifying a number**, number_of_coins = 10 by default.\ - When the user asks recommendations for **BIGGER THAN 15 coins**, number_of_coins = 15, \ and then **YOU ONLY RECOMMEND THE TOP 15 INSTEAD OF THE NUMBER SPECIFIED BY THE USER**,\ and remember to tell the use that 'I only care about the top 15 coins for quality and relevance'.\ **ONLY recommend coins that in the tool result**\ **Analyze the top 3 coins and give the PROFESSIONAL recommendation for the user if the user only asks for the trending coins**\ **ONLY include the result Name and Description, do not include Price, Market Cap, Total Volume, etc in your response.**\ You need to start from 'Here are {number_of_coins} coins with their descriptions:'\ ", model_client_stream=True, # Enable streaming tokens from the model client. tools=[trend_data_tool], reflect_on_tool_use=True ) # for user role, each user has a unique chat agent instance logger.info(f"MemeTrendAgent initialized. agent id = {self.id}")
[docs] @message_handler async def my_message_handler(self, message: TextMessage, ctx: MessageContext) -> None: """Handles messages, performing deep research and streaming responses.""" assert ctx.message_channel is not None logger.info(f"MemeTrendAgent received messages. model name = {self.model_name}") messages = [message] # Ensure we at least process the latest message session_id = message.metadata.get("session_id") if not session_id: raise ValueError("Message metadata is missing `session_id` field") await stream_chunks_and_publish(self.agent, ctx, messages, self.publish_message, self.name, session_id, self.cancel_token)