Source code for agentopera.router.workers.messari_agent


import aiohttp
import os
import re
import html
from typing import AsyncGenerator, Sequence, Any

from agentopera.chatflow.messages import TextMessage, StopMessage, ModelClientStreamingChunkEvent
from agentopera.engine.types.agent.cancellation_token import CancellationToken
from agentopera.engine.types.msg_context import MessageContext
from agentopera.engine.agent import RoutedAgent, message_handler
from agentopera.chatflow.messages import (
   ChatMessage, TextMessage
)

from ...utils.logger import logger
from .streaming_agent import StreamingEndpointAgent
from ..message_utils import parse_context_to_dicts


class MessariStreamingAgent(StreamingEndpointAgent):
    """
    A specialized StreamingEndpointAgent for Messari AI streaming.
    """

    def __init__(self, name: str, endpoint_url: str = "https://api.messari.io/ai/v1"):
        super().__init__(name=name, endpoint_url=endpoint_url)
        self.api_key = os.getenv("MESSARI_API_KEY")
        self.headers = {
            "x-messari-api-key": self.api_key,
            "Accept": "text/event-stream",
            "Content-Type": "application/json",
        }

    async def on_messages_stream(
        self, messages: Sequence[ChatMessage], cancellation_token: CancellationToken
    ) -> AsyncGenerator[Any, None]:
        """
        Implements Messari AI streaming with timeout-based end detection.
        """
        try:
            if not messages:
                raise ValueError("No messages provided")

            messages = parse_context_to_dicts(messages)
            
            payload = {
                "model": "messari-default",
                "messages": messages,
                "stream": True,  # Enable streaming for Messari AI
                "verbosity": "verbose",
                "response_format": "markdown",
                # "inline_citations": True,
            }

            async with aiohttp.ClientSession(headers=self.headers) as session:
                async with session.post(
                    self.endpoint_url + "/chat/completions",
                    json=payload
                ) as response:
                    async for chunk in response.content.iter_any():
                        if chunk:
                            chunk_data = chunk.decode("utf-8")
                            yield chunk_data
    
        except Exception as e:
            raise RuntimeError(f"Messari AI streaming failed: {str(e)}") from e




def clean_text(text):
    # Step 1: Decode Unicode escape sequences
    text = html.unescape(text)  # Converts \u0026 to &

    # Step 2: Remove stray double quotes around words
    text = re.sub(r'""(\w+.*?)""', r'\1', text)

    # Step 3: Fix newlines properly
    text = text.replace('\\n', '\n')

    return text



def parse_sse_content(event: str) -> str:
    """
    Extracts all `content` fields from the SSE chunk using regex and concatenates them with a space.

    :param event: The raw SSE event as a string.
    :return: A concatenated string of all extracted content, separated by spaces.
    """
    # logger.info(f"Received SSE event: {type(event)}: {event}")

    try:
        # Finds all occurrences of `"content":"..."` and extracts the content inside quotes
        matches = re.findall(r'"content"\s*:\s*"([^"]+)"', event)  

        if matches:
            content_str = "".join(matches)
            #logger.info(f"Extracted content: {content_str}")
            return clean_text(content_str)

    except re.error as e:
        logger.error(f"Regex error while parsing SSE content: {e}", exc_info=True)
    except Exception as e:
        logger.error(f"Unexpected error while parsing SSE content: {e}", exc_info=True)

    return ""  # Return empty string if no content is found

[docs] class MessariAIAgent(RoutedAgent): """ A RoutedAgent that utilizes Messari AI's streaming API. """ def __init__(self, name: str, endpoint_url: str = "https://api.messari.io/ai/v1"): super().__init__("Agent that streams from Messari") self.agent = MessariStreamingAgent(name=name, endpoint_url=endpoint_url) self.name = name
[docs] @message_handler async def my_message_handler(self, message: TextMessage, ctx: MessageContext) -> None: """ Handles messages and streams AI responses from Messari with chunk tracking. - First chunk → `TextMessage` (source="user") - Middle chunks → `ModelClientStreamingChunkEvent` - Last chunk → `StopMessage` """ assert ctx.message_channel is not None #logger.info(f"{self.name} received message: {message.content}") messages = [message] is_first_chunk = True # Track first chunk session_id = message.metadata.get("session_id") if not session_id: raise ValueError("Message metadata is missing `session_id` field") async for event in self.agent.on_messages_stream(messages, self.cancel_token): content = parse_sse_content(event) # If first chunk, use `TextMessage` if is_first_chunk: # Flag the start of streaming. await self.publish_message( TextMessage(content="", source="user", metadata={"session_id": session_id}), message_channel=DefaultMessageChannel(topic="response", source="user"), message_id=ctx.message_id, ) # Send the name of the agent await self.publish_message( TextMessage(content=self.name, source="agent_id", metadata={"session_id": session_id}), message_channel=DefaultMessageChannel(topic="response", source=ctx.message_channel.topic), message_id=ctx.message_id, ) # send the content of the first chunk await self.publish_message( ModelClientStreamingChunkEvent(content=content, source=ctx.message_channel.topic, metadata={"session_id": session_id}), message_channel=DefaultMessageChannel(topic="response", source=ctx.message_channel.topic), message_id=ctx.message_id, ) is_first_chunk = False # Mark first chunk as processed else: # If it's a middle chunk, use `ModelClientStreamingChunkEvent` await self.publish_message( ModelClientStreamingChunkEvent(content=content, source=ctx.message_channel.topic, metadata={"session_id": session_id}), message_channel=DefaultMessageChannel(topic="response", source=ctx.message_channel.topic), message_id=ctx.message_id, ) #Finish step part, based on vercel streaming protocol await self.publish_message( TextMessage(content="", source=ctx.message_channel.topic, metadata={"session_id": session_id}), message_channel=DefaultMessageChannel(topic="response", source=ctx.message_channel.source), message_id=ctx.message_id, ) # Finish message part, based on vercel streaming protocol await self.publish_message( StopMessage(content="", source=ctx.message_channel.topic, metadata={"session_id": session_id}), message_channel=DefaultMessageChannel(topic="response", source=ctx.message_channel.source), message_id=ctx.message_id, )