Source code for agentopera.router.workers.vercel_agent


import httpx
import json

from agentopera.engine.types.agent.cancellation_token import CancellationToken
from agentopera.chatflow.messages import TextMessage
from agentopera.engine.types.msg_context import MessageContext
from agentopera.engine.agent import RoutedAgent, message_handler
from typing import AsyncGenerator, Sequence, Any
from agentopera.chatflow.messages import (
    ChatMessage, TextMessage
)

from ...ui.vercel_ai_request import UserRequestMetadata
from ...ui.vercel_ai_response import StreamMessageType

from ...utils.logger import logger
from .streaming_agent import StreamingEndpointAgent

class VercelStreamingAgent(StreamingEndpointAgent):
    """
    A specialized StreamingEndpointAgent for Vercel AI streaming.
    """

    async def on_messages_stream(
        self, messages: Sequence[ChatMessage], cancellation_token: CancellationToken
    ) -> AsyncGenerator[Any, None]:
        try:
            if not messages:
                raise ValueError("No messages provided")

            # currently generate responses based on the first message
            message = messages[-1]
            model = "chainopera-default" if not message.metadata["model"] else message.metadata["model"]
            group = "chat" if not message.metadata["group"] else message.metadata["group"]

            session_id = message.metadata.get("session_id")
            if not session_id:
                raise ValueError("Message metadata is missing `session_id` field")

            # Convert context messages into JSON-serializable format
            context_messages = [msg.model_dump(exclude={"type"}) for msg in message.context]

            payload = {
                "id": session_id,
                "messages": context_messages,
                "model": model,  # Ensure model is included
                "group": group  # Ensure group is included
            }

            # Ensure payload is properly serialized
            payload_str = json.dumps(payload, ensure_ascii=False)  # Maintain original characters
            payload_bytes = payload_str.encode("utf-8")  # Convert to bytes
            content_length = str(len(payload_bytes))  # Compute byte length

            # Extract and update headers
            headers = json.loads(message.metadata.get("headers", "{}")) if message.metadata.get("headers") else {}
            headers["content-length"] = content_length  # Update content-length
            is_first_chunk = True
                
            logger.info(f"[before]vercel AI request: headers = {headers}. payload = {payload}")
            async with httpx.AsyncClient(timeout=120.0) as client:
                async with client.stream(
                    method="POST", 
                    url=self.endpoint_url, 
                    content=payload_bytes,
                    #headers=headers,
                ) as response:
                    async for line in response.aiter_lines():
                        if cancellation_token.is_cancelled():
                            logger.warning(f"[vercel] Cancellation requested — closing stream for {session_id}")
                            await response.aclose()  # 🔥 Close the stream explicitly
                            return
                        if line.strip():
                            try:
                                type_id, _ = line.split(":", 1)
                                #content = json.loads(content_json)
                                #logger.info(f"type_id = {type_id}, content_json = {content}")
                                yield line.strip()
                                if type_id == StreamMessageType.START_STEP_PART.value and is_first_chunk:
                                    # send intent_id at the first time
                                    if group == "extreme":
                                        group = "Deep search"
                                    yield f"{StreamMessageType.MESSAGE_ANNOTATION_PART.value}:[{{\"agent_id\":{json.dumps(group)}}}]"
                                    is_first_chunk = False
                            except (ValueError, json.JSONDecodeError) as e:
                                print(f"Error parsing line: {line}, Error: {e}")

        except Exception as e:
            logger.error(f"Unexpected error: {e}")
            raise

# Define the Streaming Agent
[docs] class VercelAIAgent(RoutedAgent): def __init__(self, name: str, endpoint_url: str): super().__init__("Agent that streams from vercel") self.agent = VercelStreamingAgent(name=name, endpoint_url=endpoint_url) self.name = name
[docs] @message_handler async def my_message_handler(self, message: TextMessage, ctx: MessageContext) -> None: assert ctx.message_channel is not None content = f"Hello from {self.name}! You said: {message.content}, topic is {ctx.message_channel.topic}" # logger.info(f"Returning message: {content}") messages = [message] session_id = message.metadata.get("session_id") if not session_id: raise ValueError("Message metadata is missing `session_id` field") async for event in self.agent.on_messages_stream(messages, self.cancel_token): await self.publish_message( TextMessage(content=event, source=ctx.message_channel.topic, metadata={"session_id": session_id}), message_channel=DefaultMessageChannel(topic="response", source=ctx.message_channel.source), message_id=ctx.message_id, )