Source code for agentopera.router.workers.trend_agent

from datetime import datetime
import os
from typing import Dict, List

import requests
from agentopera.engine.types.msg_context import MessageContext
from agentopera.engine.agent import message_handler
from agentopera.models.openai import OpenAIChatCompletionClient
from agentopera.chatflow.agents import AssistantAgent
from agentopera.chatflow.messages import TextMessage
from agentopera.engine.agent.routed_agent import RoutedAgent
from agentopera.engine.types.models import ModelInfo
from agentopera.engine.types.models import ModelFamily

from agentopera.engine.function_call import FunctionTool
from agentopera.utils.streaming_utils import stream_chunks_and_publish


from ...utils.logger import logger


[docs] class CryptoTrendAgent(RoutedAgent): """A specialized agent for conducting crypto trend analysis via an external API.""" def __init__(self, name: str) -> None: super().__init__("Crypto Trending Agent for specific crypto attributes") self.name = name """init the agent instance""" # Initialize the model client self.model_name = "Qwen2.5-72B-Instruct" model_info: ModelInfo = {"vision": False, "function_calling": True, "json_output": False, "family": ModelFamily.UNKNOWN} model_client = OpenAIChatCompletionClient( base_url=os.getenv("TENSOROPERA_API_GATEWAY"), api_key=os.environ['LLM_GATEWAY_KEY'], model_info=model_info, model=self.model_name, temperature=0.6, top_p=0.95, ) trend_data_tool = FunctionTool(self.get_sorted_crypto_data, description="Get sorted cryptocurrency data from CoinMarketCap") self.agent = AssistantAgent( name="trend_agent", model_client=model_client, system_message="You are a financial assistant specialized in cryptocurrency market analysis. When a user asks for cryptocurrency rankings, comparisons, or trends (e.g., 'top 10 by market cap' or 'highest volume'), \ you MUST use the get_sorted_crypto_data tool to retrieve accurate, real-time data. \ Use the tool with parameters: \ number(str): how many the number of top results to return (e.g. '10') \ sort_by(str): the attribute to sort by, available attributes are: \ market_cap: CoinMarketCap's market cap rank as outlined in our methodology.\ market_cap_strict: A strict market cap sort (latest trade price x circulating supply).\ name: The cryptocurrency name.\ symbol: The cryptocurrency symbol.\ date_added: Date cryptocurrency was added to the system.\ price: latest average trade price across markets.\ circulating_supply: approximate number of coins currently in circulation.\ total_supply: approximate total amount of coins in existence right now (minus any coins that have been verifiably burned).\ max_supply: our best approximation of the maximum amount of coins that will ever exist in the lifetime of the currency.\ num_market_pairs: number of market pairs across all exchanges trading each currency.\ market_cap_by_total_supply_strict: market cap by total supply.\ volume_24h: rolling 24 hour adjusted trading volume.\ volume_7d: rolling 24 hour adjusted trading volume.\ volume_30d: rolling 24 hour adjusted trading volume.\ percent_change_1h: 1 hour trading price percentage change for each currency.\ percent_change_24h: 24 hour trading price percentage change for each currency.\ percent_change_7d: 7 day trading price percentage change for each currency.\ sort_dir(str): the direction of sorting ('desc' for top rankings) \ price_min(float): OPTIONAL, the minimum price to filter by \ price_max(float): OPTIONAL, the maximum price to filter by \ market_cap_min(float): OPTIONAL, the minimum market cap to filter by \ market_cap_max(float): OPTIONAL, the maximum market cap to filter by \ volume_24h_min(float): OPTIONAL, the minimum volume to filter by \ volume_24h_max(float): OPTIONAL, the maximum volume to filter by \ percent_change_24h_min(float): OPTIONAL, the minimum percent change to filter by \ percent_change_24h_max(float): OPTIONAL, the maximum percent change to filter by \ You must not mention the tool name or the data source (e.g., CoinMarketCap) to the user. \ If there is no satisfied result, you should return an empty list. \ **Analyze the top 3 coins and give the PROFESSIONAL recommendation for the user if the user only asks for the trending coins**\ Simply present the answer in natural language, as if you gathered the data yourself.", model_client_stream=True, # Enable streaming tokens from the model client. tools=[trend_data_tool], reflect_on_tool_use=True ) # for user role, each user has a unique chat agent instance logger.info(f"TrendAgent initialized. agent id = {self.id}")
[docs] async def get_sorted_crypto_data(self,number: str = '10', sort_by: str = "market_cap", sort_dir: str = "desc", price_min: float = None, price_max: float = None, market_cap_min: float = None, market_cap_max: float = None, volume_24h_min: float = None, volume_24h_max: float = None, percent_change_24h_min: float = None, percent_change_24h_max: float = None) -> List[Dict[str, float]]: """ Fetches and sorts cryptocurrency data from CoinMarketCap. Args: number (str): The number of cryptocurrencies to retrieve. Must be an integer, default is 10. sort_by (str): The variable to sort by. Must be one of the supported CoinMarketCap sort parameters. The valid input should be one of the following: "name" "symbol" "date_added" "market_cap" "market_cap_strict" "price" "circulating_supply" "total_supply" "max_supply" "num_market_pairs" "volume_24h" "percent_change_1h" "percent_change_24h" "percent_change_7d" "market_cap_by_total_supply_strict" "volume_7d" "volume_30d" sort_dir (str): The direction to sort. Must be either 'asc' or 'desc'. price_min (float): The minimum price to filter by. price_max (float): The maximum price to filter by. market_cap_min (float): The minimum market cap to filter by. market_cap_max (float): The maximum market cap to filter by. volume_24h_min (float): The minimum volume to filter by. volume_24h_max (float): The maximum volume to filter by. percent_change_24h_min (float): The minimum percent change to filter by. percent_change_24h_max (float): The maximum percent change to filter by. Returns: List[Dict[str, Any]]: A list of dicts with 'name' and the selected sorted variable. """ url = 'https://pro-api.coinmarketcap.com/v1/cryptocurrency/listings/latest' parameters = { 'start': 1, 'limit': int(number), 'convert': 'USD', 'sort': sort_by, 'sort_dir': sort_dir, 'price_min': price_min, 'price_max': price_max, 'market_cap_min': market_cap_min, 'market_cap_max': market_cap_max, 'volume_24h_min': volume_24h_min, 'volume_24h_max': volume_24h_max, 'percent_change_24h_min': percent_change_24h_min, 'percent_change_24h_max': percent_change_24h_max } logger.info(f"get_sorted_crypto_data: parameters = {parameters}") parameters = {k: v for k, v in parameters.items() if v is not None} headers = { 'Accepts': 'application/json', 'X-CMC_PRO_API_KEY': os.environ['MARKET_CAP_API_KEY'], } try: response = requests.get(url, headers=headers, params=parameters) response.raise_for_status() data = response.json().get("data", []) result = [] for crypto in data: name = crypto.get("name", "Unknown") total_supply = crypto.get("total_supply", 0) date_added_str = crypto.get("date_added", "") quote_usd = crypto.get("quote", {}).get("USD", {}) market_cap = quote_usd.get("market_cap", 0) volume_24h = quote_usd.get("volume_24h", 0) price = quote_usd.get("price", 0) # Calculate age in years try: date_added = datetime.strptime(date_added_str, "%Y-%m-%dT%H:%M:%S.%fZ") now = datetime.now() delta = now - date_added total_days = delta.days if total_days < 1: age_years = "1d" else: years = total_days // 365 remaining_days = total_days % 365 months = remaining_days // 30 days = remaining_days % 30 if years >= 1: age_years = f"{years}y {months}mon" elif months >= 1: age_years = f"{months}mon {days}d" else: age_years = f"{days}d" except ValueError: age_years = "Unknown" # Try to find the sort_by value (check both quote and root level) if sort_by in quote_usd: sort_value = quote_usd[sort_by] else: sort_value = crypto.get(sort_by) # Base fields to always return crypto_data = { "name": name, "age": age_years, "market_cap": market_cap, "volume_24h": volume_24h, "total_supply": total_supply, "price": price } # Add sort_by field if it's not one of the base fields if sort_by not in ["market_cap", "volume_24h", "total_supply", "age", "price"]: crypto_data[sort_by] = sort_value result.append(crypto_data) return result except requests.exceptions.RequestException as e: print(f"Request error: {e}") return []
[docs] @message_handler async def my_message_handler(self, message: TextMessage, ctx: MessageContext) -> None: """Handles messages, performing deep research and streaming responses.""" assert ctx.message_channel is not None logger.info(f"TrendAgent received messages. model name = {self.model_name}") messages = [message] # Ensure we at least process the latest message session_id = message.metadata.get("session_id") if not session_id: raise ValueError("Message metadata is missing `session_id` field") await stream_chunks_and_publish(self.agent, ctx, messages, self.publish_message, self.name, session_id, self.cancel_token)