Source code for agentopera.router.workers.messari_chat_agent

from typing import Any, AsyncGenerator

import aiohttp
import json

from agentopera.chatflow.messages import AgentEvent, ChatMessage, ModelClientStreamingChunkEvent, StopMessage, TextMessage, ToolCallExecutionEvent, ToolCallRequestEvent
from agentopera.engine.types.msg_context import MessageContext
from agentopera.engine.agent.routed_agent import message_handler
from agentopera.engine.function_call import FunctionCall
from agentopera.engine.types.models.types import FunctionExecutionResult
from agentopera.router.constants import READ_BUFFER_SIZE, TIME_OUT
from agentopera.utils.logger import logger
from ...zerocode.developer_agent import DeveloperAgent
import os

[docs] class MessariChatAgent(DeveloperAgent): def __init__(self): super().__init__( agent_description="A chat agent that can answer crypto questions using Messari API", agent_name="Messari Chat Agent", agent_url=os.getenv("MESSARI_CHAT_URL"), is_output_streaming=True, agent_intent=None, agent_type="crypto", agent_api_key=os.getenv("MESSARI_API_KEY") )
[docs] async def yield_initial_message(self, session_id: str, is_first_message: bool) -> tuple[list[TextMessage], bool]: if is_first_message: return [TextMessage(content="", source="user", metadata={"session_id": session_id})], False return [], is_first_message
[docs] async def handle_metadata(self, metadata: dict, session_id: str, is_first_message: bool) -> tuple[list[AgentEvent | ChatMessage], bool]: events = [] if metadata is not None: if is_first_message: initial_events, is_first_message = await self.yield_initial_message(session_id, is_first_message) events.extend(initial_events) function_call = FunctionCall(id="tool_metadata_messari", name="messari_metadata", arguments=json.dumps({})) events.append(ToolCallRequestEvent(content=[function_call], source="tool_call", metadata={"session_id": session_id})) function_execution_result = FunctionExecutionResult(call_id=function_call.id, content=str(metadata), name="messari_metadata") events.append(ToolCallExecutionEvent(content=[function_execution_result], source="tool_call", metadata={"session_id": session_id})) events.append(TextMessage(content="", source="stop", metadata={"session_id": session_id})) is_first_message = True return events, is_first_message
[docs] async def fetch_and_transform_data(self, user_query: str, session_id: str, ctx: MessageContext) -> AsyncGenerator[AgentEvent | ChatMessage, None]: """Streams responses from an external API.""" headers = { "x-messari-api-key": self.agent_api_key, "Content-Type": "application/json", } payload = { "messages": [ { "role": "user", "content": user_query } ], "verbosity": "verbose", "response_format": "markdown", "inline_citations": True, "stream": True } try: async with aiohttp.ClientSession(read_bufsize=READ_BUFFER_SIZE) as session: async with session.post( self.agent_url, headers=headers, json=payload, timeout=TIME_OUT ) as response: if response.status != 200: error_text = await response.text() logger.error(f"Failed to fetch data from Messari API: {response.status}. Error: {error_text}") yield TextMessage(content=f"Failed to fetch data from Messari API: {response.status}. Error: {error_text}", source="error", metadata={"session_id": session_id}) is_first_message = True async for line in response.content: if self.cancel_token.is_cancelled(): logger.warning(f"[Messari Chat Agent] Cancellation requested — closing stream for {session_id}") response.close() return if line: try: line_text = line.decode('utf-8').strip() if "id:" in line_text or line_text == "": continue else: # Split the line into id and data parts parts = line_text.split('\n') prefix = 'data: ' for part in parts: if part.startswith(prefix): json_str = part[len(prefix):] # Remove 'data: ' prefix if json_str: try: data = json.loads(json_str) #error part if data.get('error') is not None: yield TextMessage(content=data.get('error'), source="error", metadata={"session_id": session_id}) #message part messages = data.get('data', {}).get('messages', None) if messages is None: if is_first_message: events, is_first_message = await self.yield_initial_message(session_id, is_first_message) for event in events: yield event yield TextMessage(content=self.agent_name, source="agent_id", metadata={"session_id": session_id}) else: yield TextMessage( content="", source="stop", metadata={"session_id": session_id} ) is_first_message = True else: for message in messages: if 'delta' in message and 'content' in message['delta']: content = message['delta']['content'] if content: yield ModelClientStreamingChunkEvent(content=content, source=ctx.message_channel.source, metadata={"session_id": session_id}) #metadata part citied_sources = data.get('metadata', {}).get('cited_sources', None) if citied_sources is not None: metadata = data.get('metadata', {}) events, is_first_message = await self.handle_metadata(metadata, session_id, is_first_message) for event in events: yield event except json.JSONDecodeError as e: yield TextMessage(content=f"Failed to parse JSON: {e}", source="error", metadata={"session_id": session_id}) except Exception as e: yield TextMessage(content=f"Error processing line: {e}", source="error", metadata={"session_id": session_id}) continue except aiohttp.ClientError as e: yield TextMessage(content=f"Network error while connecting to Messari API: {str(e)}", source="error", metadata={"session_id": session_id}) logger.error(f"Network error while connecting to Messari API: {str(e)}") except Exception as e: yield TextMessage(content=f"Unexpected error while fetching data: {str(e)}", source="error", metadata={"session_id": session_id}) logger.error(f"Unexpected error while fetching data: {str(e)}") finally: yield StopMessage(content="", source="stop", metadata={"session_id": session_id})
[docs] @message_handler async def message_handler(self, message: TextMessage, ctx: MessageContext) -> None: """Handles messages and streams response chunks.""" await super().message_handler(message, ctx)