import os
import aiohttp
import json
from typing import Any, Dict, AsyncGenerator
from agentopera.engine.types.msg_context import MessageContext
from agentopera.engine.agent import message_handler
from agentopera.engine.types.msg_channel import DefaultMessageChannel
from agentopera.zerocode.developer_agent import DeveloperAgent
from agentopera.ui.vercel_ai_response import StreamMessageType
from ..constants import READ_BUFFER_SIZE, TIME_OUT
from agentopera.chatflow.messages import AgentEvent, ChatMessage, TextMessage
from ...utils.logger import logger
[docs]
class CailaAgent(DeveloperAgent):
    """Caila agent for weather information."""
    def __init__(self, name: str) -> None:
        """
        Initialize the Caila agent.
        
        Args:
            name: The name of the agent
        """
        super().__init__(
            agent_description="Caila Agent for weather information", 
            agent_name=name, 
            agent_url=os.getenv("CAILA_URL"), 
            is_output_streaming=True, 
            agent_intent=None,
            agent_type="weather"
        )
[docs]
    @message_handler
    async def message_handler(self, message: TextMessage, ctx: MessageContext) -> None:
        """Handles messages and streams response chunks."""
        assert ctx.message_channel is not None
        logger.debug(f"[{self.__class__.__name__}] Received message from {message.source}: {message.content}")
        session_id = message.metadata.get("session_id")
        if not session_id:
            raise ValueError("Message metadata is missing `session_id` field")
        
        headers = {
            "Content-Type": "application/json",
        }
        #"username" - in telegram, user name and name are the same, in twitter, user name is id of the user
        #"name" - name is the name of the user
        #"text"   --- user query
        #"roomId" --- that is session id, roomId should be unique between all the channels to avoid collisions
        #"userId" --- user id will be used to save and retrieve user recent messages
        payload = {
            "roomId": session_id,
            "userName": f"user_{session_id}",
            "text": message.content
        }
        
        is_first_message = True
        async with aiohttp.ClientSession(read_bufsize=READ_BUFFER_SIZE) as session:
            try:
                async with session.post(
                    self.agent_url, 
                    headers=headers, 
                    json=payload, 
                    timeout=TIME_OUT,
                ) as response:
                    if response.status == 200:
                        async for line in response.content:
                            if self.cancel_token.is_cancelled():
                                logger.warning(f"[Caila Agent] Cancellation requested — closing stream for {session_id}")
                                response.close()
                                return
                            #logger.info(f"line: {line}")
                            line = line.decode('utf-8')
                            if line.strip():
                                try:
                                    type_id, _ = line.split(":", 1)
                                    if type_id == StreamMessageType.TOOL_CALL_DELTA_PART.value or type_id == StreamMessageType.TOOL_CALL_STREAMING_START_PART.value:
                                        continue
                                    await self.publish_message(
                                        TextMessage(content=line.strip(), source="output", metadata={"session_id": session_id}),
                                        message_channel=DefaultMessageChannel(topic="response", source=ctx.message_channel.source),
                                        message_id=ctx.message_id,
                                    )
                                    if type_id == StreamMessageType.START_STEP_PART.value and is_first_message:
                                        await self.publish_message(
                                            TextMessage(content=f"{StreamMessageType.MESSAGE_ANNOTATION_PART.value}:[{{\"agent_id\":{json.dumps(self.agent_name)}}}]", source="output", metadata={"session_id": session_id}),
                                            message_channel=DefaultMessageChannel(topic="response", source=ctx.message_channel.source),
                                            message_id=ctx.message_id,
                                        )
                                        is_first_message = False
                                except (ValueError, json.JSONDecodeError) as e:
                                    logger.error(f"Error parsing line: {line}, Error: {e}")
            except Exception as e:
                await self.publish_message(
                    TextMessage(content=f"Error: {e}", source="error", metadata={"session_id": session_id}),
                    message_channel=DefaultMessageChannel(topic="response", source=ctx.message_channel.source),
                    message_id=ctx.message_id,
                )
                logger.error(f"Error: {e}")