import os
import aiohttp
import json
from typing import Any, Dict, AsyncGenerator
from agentopera.engine.types.msg_context import MessageContext
from agentopera.engine.agent import message_handler
from agentopera.engine.types.msg_channel import DefaultMessageChannel
from agentopera.zerocode.developer_agent import DeveloperAgent
from agentopera.ui.vercel_ai_response import StreamMessageType
from ..constants import READ_BUFFER_SIZE, TIME_OUT
from agentopera.chatflow.messages import AgentEvent, ChatMessage, TextMessage
from ...utils.logger import logger
[docs]
class CailaAgent(DeveloperAgent):
"""Caila agent for weather information."""
def __init__(self, name: str) -> None:
"""
Initialize the Caila agent.
Args:
name: The name of the agent
"""
super().__init__(
agent_description="Caila Agent for weather information",
agent_name=name,
agent_url=os.getenv("CAILA_URL"),
is_output_streaming=True,
agent_intent=None,
agent_type="weather"
)
[docs]
@message_handler
async def message_handler(self, message: TextMessage, ctx: MessageContext) -> None:
"""Handles messages and streams response chunks."""
assert ctx.message_channel is not None
logger.debug(f"[{self.__class__.__name__}] Received message from {message.source}: {message.content}")
session_id = message.metadata.get("session_id")
if not session_id:
raise ValueError("Message metadata is missing `session_id` field")
headers = {
"Content-Type": "application/json",
}
#"username" - in telegram, user name and name are the same, in twitter, user name is id of the user
#"name" - name is the name of the user
#"text" --- user query
#"roomId" --- that is session id, roomId should be unique between all the channels to avoid collisions
#"userId" --- user id will be used to save and retrieve user recent messages
payload = {
"roomId": session_id,
"userName": f"user_{session_id}",
"text": message.content
}
is_first_message = True
async with aiohttp.ClientSession(read_bufsize=READ_BUFFER_SIZE) as session:
try:
async with session.post(
self.agent_url,
headers=headers,
json=payload,
timeout=TIME_OUT,
) as response:
if response.status == 200:
async for line in response.content:
if self.cancel_token.is_cancelled():
logger.warning(f"[Caila Agent] Cancellation requested — closing stream for {session_id}")
response.close()
return
#logger.info(f"line: {line}")
line = line.decode('utf-8')
if line.strip():
try:
type_id, _ = line.split(":", 1)
if type_id == StreamMessageType.TOOL_CALL_DELTA_PART.value or type_id == StreamMessageType.TOOL_CALL_STREAMING_START_PART.value:
continue
await self.publish_message(
TextMessage(content=line.strip(), source="output", metadata={"session_id": session_id}),
message_channel=DefaultMessageChannel(topic="response", source=ctx.message_channel.source),
message_id=ctx.message_id,
)
if type_id == StreamMessageType.START_STEP_PART.value and is_first_message:
await self.publish_message(
TextMessage(content=f"{StreamMessageType.MESSAGE_ANNOTATION_PART.value}:[{{\"agent_id\":{json.dumps(self.agent_name)}}}]", source="output", metadata={"session_id": session_id}),
message_channel=DefaultMessageChannel(topic="response", source=ctx.message_channel.source),
message_id=ctx.message_id,
)
is_first_message = False
except (ValueError, json.JSONDecodeError) as e:
logger.error(f"Error parsing line: {line}, Error: {e}")
except Exception as e:
await self.publish_message(
TextMessage(content=f"Error: {e}", source="error", metadata={"session_id": session_id}),
message_channel=DefaultMessageChannel(topic="response", source=ctx.message_channel.source),
message_id=ctx.message_id,
)
logger.error(f"Error: {e}")