Source code for agentopera.router.workers.messari_chat_agent
from typing import Any, AsyncGenerator
import aiohttp
import json
from agentopera.chatflow.messages import AgentEvent, ChatMessage, ModelClientStreamingChunkEvent, StopMessage, TextMessage, ToolCallExecutionEvent, ToolCallRequestEvent
from agentopera.engine.types.msg_context import MessageContext
from agentopera.engine.agent.routed_agent import message_handler
from agentopera.engine.function_call import FunctionCall
from agentopera.engine.types.models.types import FunctionExecutionResult
from agentopera.router.constants import READ_BUFFER_SIZE, TIME_OUT
from agentopera.utils.logger import logger
from ...zerocode.developer_agent import DeveloperAgent
import os
[docs]
class MessariChatAgent(DeveloperAgent):
    def __init__(self):
        super().__init__(
            agent_description="A chat agent that can answer crypto questions using Messari API",
            agent_name="Messari Chat Agent",
            agent_url=os.getenv("MESSARI_CHAT_URL"),
            is_output_streaming=True,
            agent_intent=None,
            agent_type="crypto",
            agent_api_key=os.getenv("MESSARI_API_KEY")
        )
[docs]
    async def yield_initial_message(self, session_id: str, is_first_message: bool) -> tuple[list[TextMessage], bool]:
        if is_first_message:
            return [TextMessage(content="", source="user", metadata={"session_id": session_id})], False
        return [], is_first_message
    
[docs]
    async def handle_metadata(self, metadata: dict, session_id: str, is_first_message: bool) -> tuple[list[AgentEvent | ChatMessage], bool]:
        events = []
        if metadata is not None:
            if is_first_message:
                initial_events, is_first_message = await self.yield_initial_message(session_id, is_first_message)
                events.extend(initial_events)
            
            function_call = FunctionCall(id="tool_metadata_messari", name="messari_metadata", arguments=json.dumps({}))
            events.append(ToolCallRequestEvent(content=[function_call], source="tool_call", metadata={"session_id": session_id}))
            function_execution_result = FunctionExecutionResult(call_id=function_call.id, content=str(metadata), name="messari_metadata")
            events.append(ToolCallExecutionEvent(content=[function_execution_result], source="tool_call", metadata={"session_id": session_id}))
            events.append(TextMessage(content="", source="stop", metadata={"session_id": session_id}))
            is_first_message = True
        return events, is_first_message
[docs]
    async def fetch_and_transform_data(self, user_query: str, session_id: str, ctx: MessageContext) -> AsyncGenerator[AgentEvent | ChatMessage,  None]:
        """Streams responses from an external API."""
        headers = {
            "x-messari-api-key": self.agent_api_key,
            "Content-Type": "application/json",
        }
        payload = {
            "messages": [
                {
                    "role": "user",
                    "content": user_query
                }
            ],
            "verbosity": "verbose",
            "response_format": "markdown",
            "inline_citations": True,
            "stream": True
        }
        try:
            async with aiohttp.ClientSession(read_bufsize=READ_BUFFER_SIZE) as session:
                async with session.post(
                    self.agent_url, 
                    headers=headers, 
                    json=payload,
                    timeout=TIME_OUT
                ) as response:
                    if response.status != 200:
                        error_text = await response.text()
                        logger.error(f"Failed to fetch data from Messari API: {response.status}. Error: {error_text}")
                        yield TextMessage(content=f"Failed to fetch data from Messari API: {response.status}. Error: {error_text}", source="error", metadata={"session_id": session_id})
                    
                    is_first_message = True
                    async for line in response.content:
                        if self.cancel_token.is_cancelled():
                            logger.warning(f"[Messari Chat Agent] Cancellation requested — closing stream for {session_id}")
                            response.close()
                            return
                        if line:
                            try:
                                line_text = line.decode('utf-8').strip()
                                if "id:" in line_text or line_text == "":
                                    continue
                                else:
                                    # Split the line into id and data parts
                                    parts = line_text.split('\n')
                                    prefix = 'data: '
                                    for part in parts:
                                        if part.startswith(prefix):
                                            json_str = part[len(prefix):]  # Remove 'data: ' prefix
                                            if json_str:
                                                try:
                                                    data = json.loads(json_str)
                                                    #error part
                                                    if data.get('error') is not None:
                                                        yield TextMessage(content=data.get('error'), source="error", metadata={"session_id": session_id})
                                                    
                                                    #message part
                                                    messages = data.get('data', {}).get('messages', None)
                                                    if messages is None:
                                                        if is_first_message:
                                                            events, is_first_message = await self.yield_initial_message(session_id, is_first_message)
                                                            for event in events:
                                                                yield event
                                                            yield TextMessage(content=self.agent_name, source="agent_id", metadata={"session_id": session_id})
                                                        else:
                                                            yield TextMessage(
                                                                content="",
                                                                source="stop",
                                                                metadata={"session_id": session_id}
                                                            )
                                                            is_first_message = True
                                                    else:
                                                        for message in messages:
                                                            if 'delta' in message and 'content' in message['delta']:
                                                                content = message['delta']['content']
                                                            if content:
                                                                yield ModelClientStreamingChunkEvent(content=content, source=ctx.message_channel.source, metadata={"session_id": session_id})                                                        
                                                    #metadata part
                                                    citied_sources = data.get('metadata', {}).get('cited_sources', None)
                                                    if citied_sources is not None:
                                                        metadata = data.get('metadata', {})
                                                        events, is_first_message = await self.handle_metadata(metadata, session_id, is_first_message)
                                                        for event in events:
                                                            yield event
    
                                                except json.JSONDecodeError as e:
                                                    yield TextMessage(content=f"Failed to parse JSON: {e}", source="error", metadata={"session_id": session_id})
                            except Exception as e:
                                yield TextMessage(content=f"Error processing line: {e}", source="error", metadata={"session_id": session_id})
                                continue
        except aiohttp.ClientError as e:
            yield TextMessage(content=f"Network error while connecting to Messari API: {str(e)}", source="error", metadata={"session_id": session_id})
            logger.error(f"Network error while connecting to Messari API: {str(e)}")
        except Exception as e:
            yield TextMessage(content=f"Unexpected error while fetching data: {str(e)}", source="error", metadata={"session_id": session_id})
            logger.error(f"Unexpected error while fetching data: {str(e)}")
        finally:
            yield StopMessage(content="", source="stop", metadata={"session_id": session_id})
[docs]
    @message_handler
    async def message_handler(self, message: TextMessage, ctx: MessageContext) -> None:
        """Handles messages and streams response chunks."""
        await super().message_handler(message, ctx)