import os
import requests
from agentopera.engine.types.msg_context import MessageContext
from agentopera.engine.agent import message_handler
from agentopera.models.openai import OpenAIChatCompletionClient
from agentopera.chatflow.agents import AssistantAgent
from agentopera.chatflow.messages import TextMessage
from agentopera.engine.agent.routed_agent import RoutedAgent
from agentopera.engine.types.models import ModelInfo
from agentopera.engine.types.models import ModelFamily
from agentopera.engine.function_call import FunctionTool
from agentopera.utils.streaming_utils import stream_chunks_and_publish
from ...utils.logger import logger
[docs]
class CryptoSentimentAgent(RoutedAgent):
"""A specialized agent for conducting crypto sentiment analysis via an external API."""
def __init__(self, name: str) -> None:
super().__init__("Crypto Sentiment Agent for crypto sentiment analysis")
self.name = name
"""init the agent instance"""
# Initialize the model client
self.model_name = "Qwen2.5-72B-Instruct"
model_info: ModelInfo = {"vision": False, "function_calling": True, "json_output": False, "family": ModelFamily.UNKNOWN}
model_client = OpenAIChatCompletionClient(
base_url=os.getenv("TENSOROPERA_API_GATEWAY"),
api_key=os.environ['LLM_GATEWAY_KEY'],
model_info=model_info,
model=self.model_name,
temperature=0.6,
top_p=0.95,
)
sentiment_tool = FunctionTool(self.get_sentiment, description="Get the market sentiment of the crypto, use fear and greed index")
self.agent = AssistantAgent(
name="sentiment_agent",
model_client=model_client,
system_message="You are the crypto sentiment agent and you need to get the sentiment of the crypto market.\
use this tool with this parameter:\
days(str): the number of days to get the sentiment data. \
if the user does not provide the days, use the default value 7.",
model_client_stream=True,
tools=[sentiment_tool],
reflect_on_tool_use=True,
)
logger.info(f"SentimentAgent initialized. agent id = {self.id}")
[docs]
async def get_sentiment(self, days: str = '7') -> str:
"""Get the crypto sentiment of the crypto market"""
logger.info("Getting the crypto sentiment of the crypto market")
url = 'https://pro-api.coinmarketcap.com/v3/fear-and-greed/historical'
parameters = {
'limit': int(days),
}
headers = {
'Accepts': 'application/json',
'X-CMC_PRO_API_KEY': os.environ['X_CMC_PRO_API_KEY'],
}
try:
response = requests.get(url, headers=headers, params=parameters)
response.raise_for_status()
data = response.json().get("data", [])
return data
except Exception as e:
logger.error(f"Error getting the crypto sentiment of the crypto market: {e}")
return "[]"
[docs]
@message_handler
async def handle_message(self, message: TextMessage, ctx: MessageContext) -> None:
"""Handles messages, performing deep research and streaming responses."""
assert ctx.message_channel is not None
logger.info(f"{self.name} received messages. model name = {self.model_name}")
messages = [message] # Ensure we at least process the latest message
session_id = message.metadata.get("session_id")
if not session_id:
raise ValueError("Message metadata is missing `session_id` field")
await stream_chunks_and_publish(self.agent, ctx, messages, self.publish_message, self.name, session_id, self.cancel_token)