Source code for agentopera.router.workers.messari_chat_agent
from typing import Any, AsyncGenerator
import aiohttp
import json
from agentopera.chatflow.messages import AgentEvent, ChatMessage, ModelClientStreamingChunkEvent, StopMessage, TextMessage, ToolCallExecutionEvent, ToolCallRequestEvent
from agentopera.engine.types.msg_context import MessageContext
from agentopera.engine.agent.routed_agent import message_handler
from agentopera.engine.function_call import FunctionCall
from agentopera.engine.types.models.types import FunctionExecutionResult
from agentopera.router.constants import READ_BUFFER_SIZE, TIME_OUT
from agentopera.utils.logger import logger
from ...zerocode.developer_agent import DeveloperAgent
import os
[docs]
class MessariChatAgent(DeveloperAgent):
def __init__(self):
super().__init__(
agent_description="A chat agent that can answer crypto questions using Messari API",
agent_name="Messari Chat Agent",
agent_url=os.getenv("MESSARI_CHAT_URL"),
is_output_streaming=True,
agent_intent=None,
agent_type="crypto",
agent_api_key=os.getenv("MESSARI_API_KEY")
)
[docs]
async def yield_initial_message(self, session_id: str, is_first_message: bool) -> tuple[list[TextMessage], bool]:
if is_first_message:
return [TextMessage(content="", source="user", metadata={"session_id": session_id})], False
return [], is_first_message
[docs]
async def handle_metadata(self, metadata: dict, session_id: str, is_first_message: bool) -> tuple[list[AgentEvent | ChatMessage], bool]:
events = []
if metadata is not None:
if is_first_message:
initial_events, is_first_message = await self.yield_initial_message(session_id, is_first_message)
events.extend(initial_events)
function_call = FunctionCall(id="tool_metadata_messari", name="messari_metadata", arguments=json.dumps({}))
events.append(ToolCallRequestEvent(content=[function_call], source="tool_call", metadata={"session_id": session_id}))
function_execution_result = FunctionExecutionResult(call_id=function_call.id, content=str(metadata), name="messari_metadata")
events.append(ToolCallExecutionEvent(content=[function_execution_result], source="tool_call", metadata={"session_id": session_id}))
events.append(TextMessage(content="", source="stop", metadata={"session_id": session_id}))
is_first_message = True
return events, is_first_message
[docs]
async def fetch_and_transform_data(self, user_query: str, session_id: str, ctx: MessageContext) -> AsyncGenerator[AgentEvent | ChatMessage, None]:
"""Streams responses from an external API."""
headers = {
"x-messari-api-key": self.agent_api_key,
"Content-Type": "application/json",
}
payload = {
"messages": [
{
"role": "user",
"content": user_query
}
],
"verbosity": "verbose",
"response_format": "markdown",
"inline_citations": True,
"stream": True
}
try:
async with aiohttp.ClientSession(read_bufsize=READ_BUFFER_SIZE) as session:
async with session.post(
self.agent_url,
headers=headers,
json=payload,
timeout=TIME_OUT
) as response:
if response.status != 200:
error_text = await response.text()
logger.error(f"Failed to fetch data from Messari API: {response.status}. Error: {error_text}")
yield TextMessage(content=f"Failed to fetch data from Messari API: {response.status}. Error: {error_text}", source="error", metadata={"session_id": session_id})
is_first_message = True
async for line in response.content:
if self.cancel_token.is_cancelled():
logger.warning(f"[Messari Chat Agent] Cancellation requested — closing stream for {session_id}")
response.close()
return
if line:
try:
line_text = line.decode('utf-8').strip()
if "id:" in line_text or line_text == "":
continue
else:
# Split the line into id and data parts
parts = line_text.split('\n')
prefix = 'data: '
for part in parts:
if part.startswith(prefix):
json_str = part[len(prefix):] # Remove 'data: ' prefix
if json_str:
try:
data = json.loads(json_str)
#error part
if data.get('error') is not None:
yield TextMessage(content=data.get('error'), source="error", metadata={"session_id": session_id})
#message part
messages = data.get('data', {}).get('messages', None)
if messages is None:
if is_first_message:
events, is_first_message = await self.yield_initial_message(session_id, is_first_message)
for event in events:
yield event
yield TextMessage(content=self.agent_name, source="agent_id", metadata={"session_id": session_id})
else:
yield TextMessage(
content="",
source="stop",
metadata={"session_id": session_id}
)
is_first_message = True
else:
for message in messages:
if 'delta' in message and 'content' in message['delta']:
content = message['delta']['content']
if content:
yield ModelClientStreamingChunkEvent(content=content, source=ctx.message_channel.source, metadata={"session_id": session_id})
#metadata part
citied_sources = data.get('metadata', {}).get('cited_sources', None)
if citied_sources is not None:
metadata = data.get('metadata', {})
events, is_first_message = await self.handle_metadata(metadata, session_id, is_first_message)
for event in events:
yield event
except json.JSONDecodeError as e:
yield TextMessage(content=f"Failed to parse JSON: {e}", source="error", metadata={"session_id": session_id})
except Exception as e:
yield TextMessage(content=f"Error processing line: {e}", source="error", metadata={"session_id": session_id})
continue
except aiohttp.ClientError as e:
yield TextMessage(content=f"Network error while connecting to Messari API: {str(e)}", source="error", metadata={"session_id": session_id})
logger.error(f"Network error while connecting to Messari API: {str(e)}")
except Exception as e:
yield TextMessage(content=f"Unexpected error while fetching data: {str(e)}", source="error", metadata={"session_id": session_id})
logger.error(f"Unexpected error while fetching data: {str(e)}")
finally:
yield StopMessage(content="", source="stop", metadata={"session_id": session_id})
[docs]
@message_handler
async def message_handler(self, message: TextMessage, ctx: MessageContext) -> None:
"""Handles messages and streams response chunks."""
await super().message_handler(message, ctx)