import aiohttp
import os
import re
import html
from typing import AsyncGenerator, Sequence, Any
from agentopera.chatflow.messages import TextMessage, StopMessage, ModelClientStreamingChunkEvent
from agentopera.engine.types.agent.cancellation_token import CancellationToken
from agentopera.engine.types.msg_context import MessageContext
from agentopera.engine.agent import RoutedAgent, message_handler
from agentopera.chatflow.messages import (
ChatMessage, TextMessage
)
from ...utils.logger import logger
from .streaming_agent import StreamingEndpointAgent
from ..message_utils import parse_context_to_dicts
class MessariStreamingAgent(StreamingEndpointAgent):
"""
A specialized StreamingEndpointAgent for Messari AI streaming.
"""
def __init__(self, name: str, endpoint_url: str = "https://api.messari.io/ai/v1"):
super().__init__(name=name, endpoint_url=endpoint_url)
self.api_key = os.getenv("MESSARI_API_KEY")
self.headers = {
"x-messari-api-key": self.api_key,
"Accept": "text/event-stream",
"Content-Type": "application/json",
}
async def on_messages_stream(
self, messages: Sequence[ChatMessage], cancellation_token: CancellationToken
) -> AsyncGenerator[Any, None]:
"""
Implements Messari AI streaming with timeout-based end detection.
"""
try:
if not messages:
raise ValueError("No messages provided")
messages = parse_context_to_dicts(messages)
payload = {
"model": "messari-default",
"messages": messages,
"stream": True, # Enable streaming for Messari AI
"verbosity": "verbose",
"response_format": "markdown",
# "inline_citations": True,
}
async with aiohttp.ClientSession(headers=self.headers) as session:
async with session.post(
self.endpoint_url + "/chat/completions",
json=payload
) as response:
async for chunk in response.content.iter_any():
if chunk:
chunk_data = chunk.decode("utf-8")
yield chunk_data
except Exception as e:
raise RuntimeError(f"Messari AI streaming failed: {str(e)}") from e
def clean_text(text):
# Step 1: Decode Unicode escape sequences
text = html.unescape(text) # Converts \u0026 to &
# Step 2: Remove stray double quotes around words
text = re.sub(r'""(\w+.*?)""', r'\1', text)
# Step 3: Fix newlines properly
text = text.replace('\\n', '\n')
return text
def parse_sse_content(event: str) -> str:
"""
Extracts all `content` fields from the SSE chunk using regex and concatenates them with a space.
:param event: The raw SSE event as a string.
:return: A concatenated string of all extracted content, separated by spaces.
"""
# logger.info(f"Received SSE event: {type(event)}: {event}")
try:
# Finds all occurrences of `"content":"..."` and extracts the content inside quotes
matches = re.findall(r'"content"\s*:\s*"([^"]+)"', event)
if matches:
content_str = "".join(matches)
#logger.info(f"Extracted content: {content_str}")
return clean_text(content_str)
except re.error as e:
logger.error(f"Regex error while parsing SSE content: {e}", exc_info=True)
except Exception as e:
logger.error(f"Unexpected error while parsing SSE content: {e}", exc_info=True)
return "" # Return empty string if no content is found
[docs]
class MessariAIAgent(RoutedAgent):
"""
A RoutedAgent that utilizes Messari AI's streaming API.
"""
def __init__(self, name: str, endpoint_url: str = "https://api.messari.io/ai/v1"):
super().__init__("Agent that streams from Messari")
self.agent = MessariStreamingAgent(name=name, endpoint_url=endpoint_url)
self.name = name
[docs]
@message_handler
async def my_message_handler(self, message: TextMessage, ctx: MessageContext) -> None:
"""
Handles messages and streams AI responses from Messari with chunk tracking.
- First chunk → `TextMessage` (source="user")
- Middle chunks → `ModelClientStreamingChunkEvent`
- Last chunk → `StopMessage`
"""
assert ctx.message_channel is not None
#logger.info(f"{self.name} received message: {message.content}")
messages = [message]
is_first_chunk = True # Track first chunk
session_id = message.metadata.get("session_id")
if not session_id:
raise ValueError("Message metadata is missing `session_id` field")
async for event in self.agent.on_messages_stream(messages, self.cancel_token):
content = parse_sse_content(event)
# If first chunk, use `TextMessage`
if is_first_chunk:
# Flag the start of streaming.
await self.publish_message(
TextMessage(content="", source="user", metadata={"session_id": session_id}),
message_channel=DefaultMessageChannel(topic="response", source="user"),
message_id=ctx.message_id,
)
# Send the name of the agent
await self.publish_message(
TextMessage(content=self.name, source="agent_id", metadata={"session_id": session_id}),
message_channel=DefaultMessageChannel(topic="response", source=ctx.message_channel.topic),
message_id=ctx.message_id,
)
# send the content of the first chunk
await self.publish_message(
ModelClientStreamingChunkEvent(content=content, source=ctx.message_channel.topic, metadata={"session_id": session_id}),
message_channel=DefaultMessageChannel(topic="response", source=ctx.message_channel.topic),
message_id=ctx.message_id,
)
is_first_chunk = False # Mark first chunk as processed
else: # If it's a middle chunk, use `ModelClientStreamingChunkEvent`
await self.publish_message(
ModelClientStreamingChunkEvent(content=content, source=ctx.message_channel.topic, metadata={"session_id": session_id}),
message_channel=DefaultMessageChannel(topic="response", source=ctx.message_channel.topic),
message_id=ctx.message_id,
)
#Finish step part, based on vercel streaming protocol
await self.publish_message(
TextMessage(content="", source=ctx.message_channel.topic, metadata={"session_id": session_id}),
message_channel=DefaultMessageChannel(topic="response", source=ctx.message_channel.source),
message_id=ctx.message_id,
)
# Finish message part, based on vercel streaming protocol
await self.publish_message(
StopMessage(content="", source=ctx.message_channel.topic, metadata={"session_id": session_id}),
message_channel=DefaultMessageChannel(topic="response", source=ctx.message_channel.source),
message_id=ctx.message_id,
)