from datetime import datetime
import os
from typing import Dict, List
import requests
from agentopera.engine.types.msg_context import MessageContext
from agentopera.engine.agent import message_handler
from agentopera.models.openai import OpenAIChatCompletionClient
from agentopera.chatflow.agents import AssistantAgent
from agentopera.chatflow.messages import TextMessage
from agentopera.engine.agent.routed_agent import RoutedAgent
from agentopera.engine.types.models import ModelInfo
from agentopera.engine.types.models import ModelFamily
from agentopera.engine.function_call import FunctionTool
from agentopera.utils.streaming_utils import stream_chunks_and_publish
from ...utils.logger import logger
[docs]
class CryptoTrendAgent(RoutedAgent):
"""A specialized agent for conducting crypto trend analysis via an external API."""
def __init__(self, name: str) -> None:
super().__init__("Crypto Trending Agent for specific crypto attributes")
self.name = name
"""init the agent instance"""
# Initialize the model client
self.model_name = "Qwen2.5-72B-Instruct"
model_info: ModelInfo = {"vision": False, "function_calling": True, "json_output": False, "family": ModelFamily.UNKNOWN}
model_client = OpenAIChatCompletionClient(
base_url=os.getenv("TENSOROPERA_API_GATEWAY"),
api_key=os.environ['LLM_GATEWAY_KEY'],
model_info=model_info,
model=self.model_name,
temperature=0.6,
top_p=0.95,
)
trend_data_tool = FunctionTool(self.get_sorted_crypto_data, description="Get sorted cryptocurrency data from CoinMarketCap")
self.agent = AssistantAgent(
name="trend_agent",
model_client=model_client,
system_message="You are a financial assistant specialized in cryptocurrency market analysis. When a user asks for cryptocurrency rankings, comparisons, or trends (e.g., 'top 10 by market cap' or 'highest volume'), \
you MUST use the get_sorted_crypto_data tool to retrieve accurate, real-time data. \
Use the tool with parameters: \
number(str): how many the number of top results to return (e.g. '10') \
sort_by(str): the attribute to sort by, available attributes are: \
market_cap: CoinMarketCap's market cap rank as outlined in our methodology.\
market_cap_strict: A strict market cap sort (latest trade price x circulating supply).\
name: The cryptocurrency name.\
symbol: The cryptocurrency symbol.\
date_added: Date cryptocurrency was added to the system.\
price: latest average trade price across markets.\
circulating_supply: approximate number of coins currently in circulation.\
total_supply: approximate total amount of coins in existence right now (minus any coins that have been verifiably burned).\
max_supply: our best approximation of the maximum amount of coins that will ever exist in the lifetime of the currency.\
num_market_pairs: number of market pairs across all exchanges trading each currency.\
market_cap_by_total_supply_strict: market cap by total supply.\
volume_24h: rolling 24 hour adjusted trading volume.\
volume_7d: rolling 24 hour adjusted trading volume.\
volume_30d: rolling 24 hour adjusted trading volume.\
percent_change_1h: 1 hour trading price percentage change for each currency.\
percent_change_24h: 24 hour trading price percentage change for each currency.\
percent_change_7d: 7 day trading price percentage change for each currency.\
sort_dir(str): the direction of sorting ('desc' for top rankings) \
price_min(float): OPTIONAL, the minimum price to filter by \
price_max(float): OPTIONAL, the maximum price to filter by \
market_cap_min(float): OPTIONAL, the minimum market cap to filter by \
market_cap_max(float): OPTIONAL, the maximum market cap to filter by \
volume_24h_min(float): OPTIONAL, the minimum volume to filter by \
volume_24h_max(float): OPTIONAL, the maximum volume to filter by \
percent_change_24h_min(float): OPTIONAL, the minimum percent change to filter by \
percent_change_24h_max(float): OPTIONAL, the maximum percent change to filter by \
You must not mention the tool name or the data source (e.g., CoinMarketCap) to the user. \
If there is no satisfied result, you should return an empty list. \
**Analyze the top 3 coins and give the PROFESSIONAL recommendation for the user if the user only asks for the trending coins**\
Simply present the answer in natural language, as if you gathered the data yourself.",
model_client_stream=True, # Enable streaming tokens from the model client.
tools=[trend_data_tool],
reflect_on_tool_use=True
)
# for user role, each user has a unique chat agent instance
logger.info(f"TrendAgent initialized. agent id = {self.id}")
[docs]
async def get_sorted_crypto_data(self,number: str = '10', sort_by: str = "market_cap", sort_dir: str = "desc",
price_min: float = None, price_max: float = None, market_cap_min: float = None, market_cap_max: float = None,
volume_24h_min: float = None, volume_24h_max: float = None,
percent_change_24h_min: float = None, percent_change_24h_max: float = None) -> List[Dict[str, float]]:
"""
Fetches and sorts cryptocurrency data from CoinMarketCap.
Args:
number (str): The number of cryptocurrencies to retrieve. Must be an integer, default is 10.
sort_by (str): The variable to sort by. Must be one of the supported CoinMarketCap sort parameters.
The valid input should be one of the following: "name" "symbol" "date_added" "market_cap" "market_cap_strict"
"price" "circulating_supply" "total_supply" "max_supply" "num_market_pairs" "volume_24h" "percent_change_1h" "percent_change_24h" "percent_change_7d"
"market_cap_by_total_supply_strict" "volume_7d" "volume_30d"
sort_dir (str): The direction to sort. Must be either 'asc' or 'desc'.
price_min (float): The minimum price to filter by.
price_max (float): The maximum price to filter by.
market_cap_min (float): The minimum market cap to filter by.
market_cap_max (float): The maximum market cap to filter by.
volume_24h_min (float): The minimum volume to filter by.
volume_24h_max (float): The maximum volume to filter by.
percent_change_24h_min (float): The minimum percent change to filter by.
percent_change_24h_max (float): The maximum percent change to filter by.
Returns:
List[Dict[str, Any]]: A list of dicts with 'name' and the selected sorted variable.
"""
url = 'https://pro-api.coinmarketcap.com/v1/cryptocurrency/listings/latest'
parameters = {
'start': 1,
'limit': int(number),
'convert': 'USD',
'sort': sort_by,
'sort_dir': sort_dir,
'price_min': price_min,
'price_max': price_max,
'market_cap_min': market_cap_min,
'market_cap_max': market_cap_max,
'volume_24h_min': volume_24h_min,
'volume_24h_max': volume_24h_max,
'percent_change_24h_min': percent_change_24h_min,
'percent_change_24h_max': percent_change_24h_max
}
logger.info(f"get_sorted_crypto_data: parameters = {parameters}")
parameters = {k: v for k, v in parameters.items() if v is not None}
headers = {
'Accepts': 'application/json',
'X-CMC_PRO_API_KEY': os.environ['MARKET_CAP_API_KEY'],
}
try:
response = requests.get(url, headers=headers, params=parameters)
response.raise_for_status()
data = response.json().get("data", [])
result = []
for crypto in data:
name = crypto.get("name", "Unknown")
total_supply = crypto.get("total_supply", 0)
date_added_str = crypto.get("date_added", "")
quote_usd = crypto.get("quote", {}).get("USD", {})
market_cap = quote_usd.get("market_cap", 0)
volume_24h = quote_usd.get("volume_24h", 0)
price = quote_usd.get("price", 0)
# Calculate age in years
try:
date_added = datetime.strptime(date_added_str, "%Y-%m-%dT%H:%M:%S.%fZ")
now = datetime.now()
delta = now - date_added
total_days = delta.days
if total_days < 1:
age_years = "1d"
else:
years = total_days // 365
remaining_days = total_days % 365
months = remaining_days // 30
days = remaining_days % 30
if years >= 1:
age_years = f"{years}y {months}mon"
elif months >= 1:
age_years = f"{months}mon {days}d"
else:
age_years = f"{days}d"
except ValueError:
age_years = "Unknown"
# Try to find the sort_by value (check both quote and root level)
if sort_by in quote_usd:
sort_value = quote_usd[sort_by]
else:
sort_value = crypto.get(sort_by)
# Base fields to always return
crypto_data = {
"name": name,
"age": age_years,
"market_cap": market_cap,
"volume_24h": volume_24h,
"total_supply": total_supply,
"price": price
}
# Add sort_by field if it's not one of the base fields
if sort_by not in ["market_cap", "volume_24h", "total_supply", "age", "price"]:
crypto_data[sort_by] = sort_value
result.append(crypto_data)
return result
except requests.exceptions.RequestException as e:
print(f"Request error: {e}")
return []
[docs]
@message_handler
async def my_message_handler(self, message: TextMessage, ctx: MessageContext) -> None:
"""Handles messages, performing deep research and streaming responses."""
assert ctx.message_channel is not None
logger.info(f"TrendAgent received messages. model name = {self.model_name}")
messages = [message] # Ensure we at least process the latest message
session_id = message.metadata.get("session_id")
if not session_id:
raise ValueError("Message metadata is missing `session_id` field")
await stream_chunks_and_publish(self.agent, ctx, messages, self.publish_message, self.name, session_id, self.cancel_token)