import httpx
import json
from agentopera.engine.types.agent.cancellation_token import CancellationToken
from agentopera.chatflow.messages import TextMessage
from agentopera.engine.types.msg_context import MessageContext
from agentopera.engine.agent import RoutedAgent, message_handler
from typing import AsyncGenerator, Sequence, Any
from agentopera.chatflow.messages import (
ChatMessage, TextMessage
)
from ...ui.vercel_ai_request import UserRequestMetadata
from ...ui.vercel_ai_response import StreamMessageType
from ...utils.logger import logger
from .streaming_agent import StreamingEndpointAgent
class VercelStreamingAgent(StreamingEndpointAgent):
"""
A specialized StreamingEndpointAgent for Vercel AI streaming.
"""
async def on_messages_stream(
self, messages: Sequence[ChatMessage], cancellation_token: CancellationToken
) -> AsyncGenerator[Any, None]:
try:
if not messages:
raise ValueError("No messages provided")
# currently generate responses based on the first message
message = messages[-1]
model = "chainopera-default" if not message.metadata["model"] else message.metadata["model"]
group = "chat" if not message.metadata["group"] else message.metadata["group"]
session_id = message.metadata.get("session_id")
if not session_id:
raise ValueError("Message metadata is missing `session_id` field")
# Convert context messages into JSON-serializable format
context_messages = [msg.model_dump(exclude={"type"}) for msg in message.context]
payload = {
"id": session_id,
"messages": context_messages,
"model": model, # Ensure model is included
"group": group # Ensure group is included
}
# Ensure payload is properly serialized
payload_str = json.dumps(payload, ensure_ascii=False) # Maintain original characters
payload_bytes = payload_str.encode("utf-8") # Convert to bytes
content_length = str(len(payload_bytes)) # Compute byte length
# Extract and update headers
headers = json.loads(message.metadata.get("headers", "{}")) if message.metadata.get("headers") else {}
headers["content-length"] = content_length # Update content-length
is_first_chunk = True
logger.info(f"[before]vercel AI request: headers = {headers}. payload = {payload}")
async with httpx.AsyncClient(timeout=120.0) as client:
async with client.stream(
method="POST",
url=self.endpoint_url,
content=payload_bytes,
#headers=headers,
) as response:
async for line in response.aiter_lines():
if cancellation_token.is_cancelled():
logger.warning(f"[vercel] Cancellation requested — closing stream for {session_id}")
await response.aclose() # 🔥 Close the stream explicitly
return
if line.strip():
try:
type_id, _ = line.split(":", 1)
#content = json.loads(content_json)
#logger.info(f"type_id = {type_id}, content_json = {content}")
yield line.strip()
if type_id == StreamMessageType.START_STEP_PART.value and is_first_chunk:
# send intent_id at the first time
if group == "extreme":
group = "Deep search"
yield f"{StreamMessageType.MESSAGE_ANNOTATION_PART.value}:[{{\"agent_id\":{json.dumps(group)}}}]"
is_first_chunk = False
except (ValueError, json.JSONDecodeError) as e:
print(f"Error parsing line: {line}, Error: {e}")
except Exception as e:
logger.error(f"Unexpected error: {e}")
raise
# Define the Streaming Agent
[docs]
class VercelAIAgent(RoutedAgent):
def __init__(self, name: str, endpoint_url: str):
super().__init__("Agent that streams from vercel")
self.agent = VercelStreamingAgent(name=name, endpoint_url=endpoint_url)
self.name = name
[docs]
@message_handler
async def my_message_handler(self, message: TextMessage, ctx: MessageContext) -> None:
assert ctx.message_channel is not None
content = f"Hello from {self.name}! You said: {message.content}, topic is {ctx.message_channel.topic}"
# logger.info(f"Returning message: {content}")
messages = [message]
session_id = message.metadata.get("session_id")
if not session_id:
raise ValueError("Message metadata is missing `session_id` field")
async for event in self.agent.on_messages_stream(messages, self.cancel_token):
await self.publish_message(
TextMessage(content=event, source=ctx.message_channel.topic, metadata={"session_id": session_id}),
message_channel=DefaultMessageChannel(topic="response", source=ctx.message_channel.source),
message_id=ctx.message_id,
)