import os
from typing import Any, Dict, List
import pandas as pd
from agentopera.engine.types.msg_context import MessageContext
from agentopera.engine.agent import message_handler
from agentopera.models.openai import OpenAIChatCompletionClient
from agentopera.chatflow.agents import AssistantAgent
from agentopera.chatflow.messages import TextMessage
from agentopera.engine.agent.routed_agent import RoutedAgent
from agentopera.engine.types.models import ModelInfo
from agentopera.engine.types.models import ModelFamily
import yfinance as yf
from agentopera.engine.function_call import FunctionTool
from agentopera.utils.streaming_utils import stream_chunks_and_publish
from ...utils.logger import logger
[docs]
class CryptoMarketAgent(RoutedAgent):
"""A specialized agent for conducting historical market data via an external API."""
def __init__(self, name: str) -> None:
super().__init__("Crypto Market Agent for historical market data")
self.name = name
"""init the agent instance"""
# Initialize the model client
self.model_name = "Qwen2.5-72B-Instruct"
model_info: ModelInfo = {"vision": False, "function_calling": True, "json_output": False, "family": ModelFamily.UNKNOWN}
model_client = OpenAIChatCompletionClient(
base_url=os.getenv("TENSOROPERA_API_GATEWAY"),
api_key=os.environ['LLM_GATEWAY_KEY'],
model_info=model_info,
model=self.model_name,
temperature=0.6,
top_p=0.95,
)
trend_data_tool = FunctionTool(self.get_market_data, description="Get market cap and historical market data for a given symbol")
self.agent = AssistantAgent(
name="market_agent",
model_client=model_client,
system_message="Given a cryptocurrency name, retrieve its corresponding stock symbol from Yahoo Finance (DEFAULT to USD if not specified).\
such as: bitcoin is BTC-USD, ethereum is ETH-USD etc.\
You MUST use trend_data_tool to get the accurate market capacity and historical market data for the given symbol. \
But do not tell the user the tool name and the data source. \
just answer here is the related information you can get from the tool.\
You can use latex formtting:\
- Use $ for inline equations\
- Use $$ for block equations\
- Use 'USD' for currency (not $)\
- No need to use bold or italic formatting in tables.\
- Don't use the h1 heading in the markdown response.",
model_client_stream=True, # Enable streaming tokens from the model client.
tools=[trend_data_tool],
reflect_on_tool_use=True
)
# for user role, each user has a unique chat agent instance
logger.info(f"MarketAgent initialized. agent id = {self.id}")
[docs]
async def get_market_data(self, stock_ticket_symbol: str) -> List[Dict[str, Any]]:
"""
Get market data for a given symbol.
Args:
stock_ticket_symbol (str): The stock ticker symbol to fetch data for.
Returns:
List[Dict[str, Any]]:
- Dict with market cap
- Dict of historical data, where each key is a range and each value is a list of {timestamp, price}
"""
ticker = yf.Ticker(stock_ticket_symbol)
market_cap = {"market capacity": ticker.info.get("marketCap")}
def format_series(series: pd.Series) -> List[Dict[str, Any]]:
df = series.reset_index()
df.columns = ['timestamp', 'price']
df['timestamp'] = df['timestamp'].astype(str)
return df.to_dict(orient='records')
hist_data = {
"hist_1d": format_series(yf.download(tickers=stock_ticket_symbol, period="1d", interval="1m")['Close']),
"hist_5d": format_series(yf.download(tickers=stock_ticket_symbol, period="5d", interval="30m")['Close']),
"hist_1mo": format_series(yf.download(tickers=stock_ticket_symbol, period="1mo", interval="1d")['Close']),
"hist_6mo": format_series(yf.download(tickers=stock_ticket_symbol, period="6mo", interval="1d")['Close']),
"hist_1y": format_series(yf.download(tickers=stock_ticket_symbol, period="1y", interval="1d")['Close']),
"hist_ytd": format_series(yf.download(tickers=stock_ticket_symbol, period="ytd", interval="1wk")['Close']),
"hist_max": format_series(yf.download(tickers=stock_ticket_symbol, period="max", interval="1mo")['Close']),
}
return [market_cap, hist_data]
[docs]
@message_handler
async def my_message_handler(self, message: TextMessage, ctx: MessageContext) -> None:
"""Handles messages, performing deep research and streaming responses."""
assert ctx.message_channel is not None
logger.info(f"MarketAgent received messages. model name = {self.model_name}")
messages = [message] # Ensure we at least process the latest message
session_id = message.metadata.get("session_id")
if not session_id:
raise ValueError("Message metadata is missing `session_id` field")
await stream_chunks_and_publish(self.agent, ctx, messages, self.publish_message, self.name, session_id, self.cancel_token)