import asyncio
import os
import sys
import time
from inspect import iscoroutinefunction
from typing import AsyncGenerator, Awaitable, Callable, Dict, List, Optional, TypeVar, Union, cast
from agentopera.engine.types.agent import CancellationToken
from agentopera.engine.types import Image
from agentopera.engine.types.models import RequestUsage
from agentopera.chatflow.agents import UserProxyAgent
from agentopera.chatflow.base import Response, TaskResult
from agentopera.chatflow.messages import (
AgentEvent,
ChatMessage,
ModelClientStreamingChunkEvent,
MultiModalMessage,
UserInputRequestedEvent,
)
def _is_running_in_iterm() -> bool:
return os.getenv("TERM_PROGRAM") == "iTerm.app"
def _is_output_a_tty() -> bool:
return sys.stdout.isatty()
SyncInputFunc = Callable[[str], str]
AsyncInputFunc = Callable[[str, Optional[CancellationToken]], Awaitable[str]]
InputFuncType = Union[SyncInputFunc, AsyncInputFunc]
T = TypeVar("T", bound=TaskResult | Response)
def aprint(output: str, end: str = "\n", flush: bool = False) -> Awaitable[None]:
return asyncio.to_thread(print, output, end=end, flush=flush)
[docs]
async def Console(
stream: AsyncGenerator[AgentEvent | ChatMessage | T, None],
*,
no_inline_images: bool = False,
output_stats: bool = False,
user_input_manager: UserInputManager | None = None,
) -> T:
"""
Consumes the message stream from :meth:`~agentopera.chatflow.base.TaskRunner.run_stream`
or :meth:`~agentopera.chatflow.base.ChatAgent.on_messages_stream` and renders the messages to the console.
Returns the last processed TaskResult or Response.
.. note::
`output_stats` is experimental and the stats may not be accurate.
It will be improved in future releases.
Args:
stream (AsyncGenerator[AgentEvent | ChatMessage | TaskResult, None] | AsyncGenerator[AgentEvent | ChatMessage | Response, None]): Message stream to render.
This can be from :meth:`~agentopera.chatflow.base.TaskRunner.run_stream` or :meth:`~agentopera.chatflow.base.ChatAgent.on_messages_stream`.
no_inline_images (bool, optional): If terminal is iTerm2 will render images inline. Use this to disable this behavior. Defaults to False.
output_stats (bool, optional): (Experimental) If True, will output a summary of the messages and inline token usage info. Defaults to False.
Returns:
last_processed: A :class:`~agentopera.chatflow.base.TaskResult` if the stream is from :meth:`~agentopera.chatflow.base.TaskRunner.run_stream`
or a :class:`~agentopera.chatflow.base.Response` if the stream is from :meth:`~agentopera.chatflow.base.ChatAgent.on_messages_stream`.
"""
render_image_iterm = _is_running_in_iterm() and _is_output_a_tty() and not no_inline_images
start_time = time.time()
total_usage = RequestUsage(prompt_tokens=0, completion_tokens=0)
last_processed: Optional[T] = None
streaming_chunks: List[str] = []
async for message in stream:
if isinstance(message, TaskResult):
duration = time.time() - start_time
if output_stats:
output = (
f"{'-' * 10} Summary {'-' * 10}\n"
f"Number of messages: {len(message.messages)}\n"
f"Finish reason: {message.stop_reason}\n"
f"Total prompt tokens: {total_usage.prompt_tokens}\n"
f"Total completion tokens: {total_usage.completion_tokens}\n"
f"Duration: {duration:.2f} seconds\n"
)
await aprint(output, end="", flush=True)
# mypy ignore
last_processed = message # type: ignore
elif isinstance(message, Response):
duration = time.time() - start_time
# Print final response.
output = f"{'-' * 10} {message.chat_message.source} {'-' * 10}\n{_message_to_str(message.chat_message, render_image_iterm=render_image_iterm)}\n"
if message.chat_message.models_usage:
if output_stats:
output += f"[Prompt tokens: {message.chat_message.models_usage.prompt_tokens}, Completion tokens: {message.chat_message.models_usage.completion_tokens}]\n"
total_usage.completion_tokens += message.chat_message.models_usage.completion_tokens
total_usage.prompt_tokens += message.chat_message.models_usage.prompt_tokens
await aprint(output, end="", flush=True)
# Print summary.
if output_stats:
if message.inner_messages is not None:
num_inner_messages = len(message.inner_messages)
else:
num_inner_messages = 0
output = (
f"{'-' * 10} Summary {'-' * 10}\n"
f"Number of inner messages: {num_inner_messages}\n"
f"Total prompt tokens: {total_usage.prompt_tokens}\n"
f"Total completion tokens: {total_usage.completion_tokens}\n"
f"Duration: {duration:.2f} seconds\n"
)
await aprint(output, end="", flush=True)
# mypy ignore
last_processed = message # type: ignore
# We don't want to print UserInputRequestedEvent messages, we just use them to signal the user input event.
elif isinstance(message, UserInputRequestedEvent):
if user_input_manager is not None:
user_input_manager.notify_event_received(message.request_id)
else:
# Cast required for mypy to be happy
message = cast(AgentEvent | ChatMessage, message) # type: ignore
if not streaming_chunks:
# Print message sender.
await aprint(f"{'-' * 10} {message.source} {'-' * 10}", end="\n", flush=True)
if isinstance(message, ModelClientStreamingChunkEvent):
await aprint(message.content, end="")
streaming_chunks.append(message.content)
else:
if streaming_chunks:
streaming_chunks.clear()
# Chunked messages are already printed, so we just print a newline.
await aprint("", end="\n", flush=True)
else:
# Print message content.
await aprint(_message_to_str(message, render_image_iterm=render_image_iterm), end="\n", flush=True)
if message.models_usage:
if output_stats:
await aprint(
f"[Prompt tokens: {message.models_usage.prompt_tokens}, Completion tokens: {message.models_usage.completion_tokens}]",
end="\n",
flush=True,
)
total_usage.completion_tokens += message.models_usage.completion_tokens
total_usage.prompt_tokens += message.models_usage.prompt_tokens
if last_processed is None:
raise ValueError("No TaskResult or Response was processed.")
return last_processed
# iTerm2 image rendering protocol: https://iterm2.com/documentation-images.html
def _image_to_iterm(image: Image) -> str:
image_data = image.to_base64()
return f"\033]1337;File=inline=1:{image_data}\a\n"
def _message_to_str(message: AgentEvent | ChatMessage, *, render_image_iterm: bool = False) -> str:
if isinstance(message, MultiModalMessage):
result: List[str] = []
for c in message.content:
if isinstance(c, str):
result.append(c)
else:
if render_image_iterm:
result.append(_image_to_iterm(c))
else:
result.append("<image>")
return "\n".join(result)
else:
return f"{message.content}"