Source code for agentopera.router.workers.caila_agent

import os
import aiohttp
import json
from typing import Any, Dict, AsyncGenerator

from agentopera.engine.types.msg_context import MessageContext
from agentopera.engine.agent import message_handler
from agentopera.engine.types.msg_channel import DefaultMessageChannel
from agentopera.zerocode.developer_agent import DeveloperAgent
from agentopera.ui.vercel_ai_response import StreamMessageType
from ..constants import READ_BUFFER_SIZE, TIME_OUT
from agentopera.chatflow.messages import AgentEvent, ChatMessage, TextMessage

from ...utils.logger import logger

[docs] class CailaAgent(DeveloperAgent): """Caila agent for weather information.""" def __init__(self, name: str) -> None: """ Initialize the Caila agent. Args: name: The name of the agent """ super().__init__( agent_description="Caila Agent for weather information", agent_name=name, agent_url=os.getenv("CAILA_URL"), is_output_streaming=True, agent_intent=None, agent_type="weather" )
[docs] @message_handler async def message_handler(self, message: TextMessage, ctx: MessageContext) -> None: """Handles messages and streams response chunks.""" assert ctx.message_channel is not None logger.debug(f"[{self.__class__.__name__}] Received message from {message.source}: {message.content}") session_id = message.metadata.get("session_id") if not session_id: raise ValueError("Message metadata is missing `session_id` field") headers = { "Content-Type": "application/json", } #"username" - in telegram, user name and name are the same, in twitter, user name is id of the user #"name" - name is the name of the user #"text" --- user query #"roomId" --- that is session id, roomId should be unique between all the channels to avoid collisions #"userId" --- user id will be used to save and retrieve user recent messages payload = { "roomId": session_id, "userName": f"user_{session_id}", "text": message.content } is_first_message = True async with aiohttp.ClientSession(read_bufsize=READ_BUFFER_SIZE) as session: try: async with session.post( self.agent_url, headers=headers, json=payload, timeout=TIME_OUT, ) as response: if response.status == 200: async for line in response.content: if self.cancel_token.is_cancelled(): logger.warning(f"[Caila Agent] Cancellation requested — closing stream for {session_id}") response.close() return #logger.info(f"line: {line}") line = line.decode('utf-8') if line.strip(): try: type_id, _ = line.split(":", 1) if type_id == StreamMessageType.TOOL_CALL_DELTA_PART.value or type_id == StreamMessageType.TOOL_CALL_STREAMING_START_PART.value: continue await self.publish_message( TextMessage(content=line.strip(), source="output", metadata={"session_id": session_id}), message_channel=DefaultMessageChannel(topic="response", source=ctx.message_channel.source), message_id=ctx.message_id, ) if type_id == StreamMessageType.START_STEP_PART.value and is_first_message: await self.publish_message( TextMessage(content=f"{StreamMessageType.MESSAGE_ANNOTATION_PART.value}:[{{\"agent_id\":{json.dumps(self.agent_name)}}}]", source="output", metadata={"session_id": session_id}), message_channel=DefaultMessageChannel(topic="response", source=ctx.message_channel.source), message_id=ctx.message_id, ) is_first_message = False except (ValueError, json.JSONDecodeError) as e: logger.error(f"Error parsing line: {line}, Error: {e}") except Exception as e: await self.publish_message( TextMessage(content=f"Error: {e}", source="error", metadata={"session_id": session_id}), message_channel=DefaultMessageChannel(topic="response", source=ctx.message_channel.source), message_id=ctx.message_id, ) logger.error(f"Error: {e}")
[docs] async def fetch_and_transform_data(self, user_query: str, session_id: str, ctx: MessageContext) -> AsyncGenerator[AgentEvent | ChatMessage, None]: """Fetch data from the Caila agent.""" super().fetch_and_transform_data(user_query, session_id, ctx)