import os
from typing import Any, Dict, List
import pandas as pd
from agentopera.engine.types.msg_context import MessageContext
from agentopera.engine.agent import message_handler
from agentopera.models.openai import OpenAIChatCompletionClient
from agentopera.chatflow.agents import AssistantAgent
from agentopera.chatflow.messages import TextMessage
from agentopera.engine.agent.routed_agent import RoutedAgent
from agentopera.engine.types.models import ModelInfo
from agentopera.engine.types.models import ModelFamily
import yfinance as yf
from agentopera.engine.function_call import FunctionTool
from agentopera.utils.streaming_utils import stream_chunks_and_publish
from ...utils.logger import logger
[docs]
class CryptoMarketAgent(RoutedAgent):
    """A specialized agent for conducting historical market data via an external API."""
    def __init__(self, name: str) -> None:
        super().__init__("Crypto Market Agent for historical market data")
        self.name = name
        """init the agent instance"""
        
        # Initialize the model client
        self.model_name = "Qwen2.5-72B-Instruct"
        model_info: ModelInfo = {"vision": False, "function_calling": True, "json_output": False, "family": ModelFamily.UNKNOWN}
        model_client = OpenAIChatCompletionClient(
            base_url=os.getenv("TENSOROPERA_API_GATEWAY"),
            api_key=os.environ['LLM_GATEWAY_KEY'],
            model_info=model_info,
            model=self.model_name,
            temperature=0.6,
            top_p=0.95,
        )
        
        trend_data_tool = FunctionTool(self.get_market_data, description="Get market cap and historical market data for a given symbol")
        self.agent = AssistantAgent(
            name="market_agent",
            model_client=model_client,
            system_message="Given a cryptocurrency name, retrieve its corresponding stock symbol from Yahoo Finance (DEFAULT to USD if not specified).\
                            such as: bitcoin is BTC-USD, ethereum is ETH-USD etc.\
                            You MUST use trend_data_tool to get the accurate market capacity and historical market data for the given symbol. \
                            But do not tell the user the tool name and the data source. \
                            just answer here is the related information you can get from the tool.\
                            You can use latex formtting:\
                                - Use $ for inline equations\
                                - Use $$ for block equations\
                                - Use 'USD' for currency (not $)\
                                - No need to use bold or italic formatting in tables.\
                                - Don't use the h1 heading in the markdown response.",
            model_client_stream=True,  # Enable streaming tokens from the model client.
            tools=[trend_data_tool],
            reflect_on_tool_use=True
        )
        # for user role, each user has a unique chat agent instance
        logger.info(f"MarketAgent initialized. agent id = {self.id}")
[docs]
    async def get_market_data(self, stock_ticket_symbol: str) -> List[Dict[str, Any]]:
        """
        Get market data for a given symbol.
        Args:
            stock_ticket_symbol (str): The stock ticker symbol to fetch data for.
        Returns:
            List[Dict[str, Any]]: 
                - Dict with market cap
                - Dict of historical data, where each key is a range and each value is a list of {timestamp, price}
        """
        ticker = yf.Ticker(stock_ticket_symbol)
        market_cap = {"market capacity": ticker.info.get("marketCap")}
        def format_series(series: pd.Series) -> List[Dict[str, Any]]:
            df = series.reset_index()
            df.columns = ['timestamp', 'price']
            df['timestamp'] = df['timestamp'].astype(str)
            return df.to_dict(orient='records')
        hist_data = {
            "hist_1d": format_series(yf.download(tickers=stock_ticket_symbol, period="1d", interval="1m")['Close']),
            "hist_5d": format_series(yf.download(tickers=stock_ticket_symbol, period="5d", interval="30m")['Close']),
            "hist_1mo": format_series(yf.download(tickers=stock_ticket_symbol, period="1mo", interval="1d")['Close']),
            "hist_6mo": format_series(yf.download(tickers=stock_ticket_symbol, period="6mo", interval="1d")['Close']),
            "hist_1y": format_series(yf.download(tickers=stock_ticket_symbol, period="1y", interval="1d")['Close']),
            "hist_ytd": format_series(yf.download(tickers=stock_ticket_symbol, period="ytd", interval="1wk")['Close']),
            "hist_max": format_series(yf.download(tickers=stock_ticket_symbol, period="max", interval="1mo")['Close']),
        }
        return [market_cap, hist_data] 
[docs]
    @message_handler
    async def my_message_handler(self, message: TextMessage, ctx: MessageContext) -> None:
        """Handles messages, performing deep research and streaming responses."""
        assert ctx.message_channel is not None
        logger.info(f"MarketAgent received messages. model name = {self.model_name}")
        messages = [message]  # Ensure we at least process the latest message
        session_id = message.metadata.get("session_id")
        if not session_id:
            raise ValueError("Message metadata is missing `session_id` field")
        await stream_chunks_and_publish(self.agent, ctx, messages, self.publish_message, self.name, session_id, self.cancel_token)