import asyncio
import base64
import inspect
import json
import logging
import re
import warnings
# from asyncio import Task
from typing import (
    Any,
    AsyncGenerator,
    Coroutine,
    Dict,
    List,
    Literal,
    Mapping,
    Optional,
    Sequence,
    Set,
    Union,
    cast,
)
import tiktoken
from anthropic import AsyncAnthropic, AsyncStream
from anthropic.types import (
    ContentBlock,
    ImageBlockParam,
    Message,
    MessageParam,
    RawMessageStreamEvent,  # type: ignore
    TextBlock,
    TextBlockParam,
    ToolParam,
    ToolResultBlockParam,
    ToolUseBlock,
)
from anthropic.types.image_block_param import Source
from agentopera.engine import (
    EVENT_LOGGER_NAME,
    TRACE_LOGGER_NAME,
)
from agentopera.engine.function_call import FunctionCall
from agentopera.engine.types import Image
from agentopera.engine.types.agent import CancellationToken
from agentopera.engine.message_handler_context import MessageHandlerContext
from agentopera.engine.logging import LLMCallEvent
from agentopera.engine.types.models import (
    AssistantMessage,
    ChatCompletionClient,
    CreateResult,
    FinishReasons,
    FunctionExecutionResultMessage,
    LLMMessage,
    ModelCapabilities,  # type: ignore
    ModelInfo,
    RequestUsage,
    SystemMessage,
    UserMessage,
    validate_model_info,
)
from agentopera.engine.function_call import Tool, ToolSchema
from typing_extensions import Self, Unpack
from . import model_info_util
from .config import AnthropicClientConfiguration, AnthropicClientConfigurationConfigModel
logger = logging.getLogger(EVENT_LOGGER_NAME)
trace_logger = logging.getLogger(TRACE_LOGGER_NAME)
# Common parameters for message creation
anthropic_message_params = {
    "system",
    "messages",
    "max_tokens",
    "temperature",
    "top_p",
    "top_k",
    "stop_sequences",
    "tools",
    "tool_choice",
    "stream",
    "metadata",
}
disallowed_create_args = {"stream", "messages"}
required_create_args: Set[str] = {"model"}
anthropic_init_kwargs = set(inspect.getfullargspec(AsyncAnthropic.__init__).kwonlyargs)
def _anthropic_client_from_config(config: Mapping[str, Any]) -> AsyncAnthropic:
    # Filter config to only include valid parameters
    client_config = {k: v for k, v in config.items() if k in anthropic_init_kwargs}
    return AsyncAnthropic(**client_config)
def _create_args_from_config(config: Mapping[str, Any]) -> Dict[str, Any]:
    create_args = {k: v for k, v in config.items() if k in anthropic_message_params or k == "model"}
    create_args_keys = set(create_args.keys())
    if not required_create_args.issubset(create_args_keys):
        raise ValueError(f"Required create args are missing: {required_create_args - create_args_keys}")
    if disallowed_create_args.intersection(create_args_keys):
        raise ValueError(f"Disallowed create args are present: {disallowed_create_args.intersection(create_args_keys)}")
    return create_args
def type_to_role(message: LLMMessage) -> str:
    if isinstance(message, SystemMessage):
        return "system"
    elif isinstance(message, UserMessage):
        return "user"
    elif isinstance(message, AssistantMessage):
        return "assistant"
    else:
        return "tool"
def get_mime_type_from_image(image: Image) -> Literal["image/jpeg", "image/png", "image/gif", "image/webp"]:
    """Get a valid Anthropic media type from an Image object."""
    # Get base64 data first
    base64_data = image.to_base64()
    # Decode the base64 string
    image_data = base64.b64decode(base64_data)
    # Check the first few bytes for known signatures
    if image_data.startswith(b"\xff\xd8\xff"):
        return "image/jpeg"
    elif image_data.startswith(b"\x89PNG\r\n\x1a\n"):
        return "image/png"
    elif image_data.startswith(b"GIF87a") or image_data.startswith(b"GIF89a"):
        return "image/gif"
    elif image_data.startswith(b"RIFF") and image_data[8:12] == b"WEBP":
        return "image/webp"
    else:
        # Default to JPEG as a fallback
        return "image/jpeg"
def user_message_to_anthropic(message: UserMessage) -> MessageParam:
    assert_valid_name(message.source)
    if isinstance(message.content, str):
        return {
            "role": "user",
            "content": message.content,
        }
    else:
        blocks: List[Union[TextBlockParam, ImageBlockParam]] = []
        for part in message.content:
            if isinstance(part, str):
                blocks.append(TextBlockParam(type="text", text=part))
            elif isinstance(part, Image):
                blocks.append(
                    ImageBlockParam(
                        type="image",
                        source=Source(
                            type="base64",
                            media_type=get_mime_type_from_image(part),
                            data=part.to_base64(),
                        ),
                    )
                )
            else:
                raise ValueError(f"Unknown content type: {part}")
        return {
            "role": "user",
            "content": blocks,
        }
def system_message_to_anthropic(message: SystemMessage) -> str:
    return message.content
def assistant_message_to_anthropic(message: AssistantMessage) -> MessageParam:
    assert_valid_name(message.source)
    if isinstance(message.content, list):
        # Tool calls
        tool_use_blocks: List[ToolUseBlock] = []
        for func_call in message.content:
            # Parse the arguments and convert to dict if it's a JSON string
            args = func_call.arguments
            if isinstance(args, str):
                try:
                    args_dict = json.loads(args)
                except json.JSONDecodeError:
                    args_dict = {"text": args}
            else:
                args_dict = args
            tool_use_blocks.append(
                ToolUseBlock(
                    type="tool_use",
                    id=func_call.id,
                    name=func_call.name,
                    input=args_dict,
                )
            )
        # Include thought if available
        content_blocks: List[ContentBlock] = []
        if hasattr(message, "thought") and message.thought is not None:
            content_blocks.append(TextBlock(type="text", text=message.thought))
        content_blocks.extend(tool_use_blocks)
        return {
            "role": "assistant",
            "content": content_blocks,
        }
    else:
        # Simple text content
        return {
            "role": "assistant",
            "content": message.content,
        }
def tool_message_to_anthropic(message: FunctionExecutionResultMessage) -> List[MessageParam]:
    # Create a single user message containing all tool results
    content_blocks: List[ToolResultBlockParam] = []
    for result in message.content:
        content_blocks.append(
            ToolResultBlockParam(
                type="tool_result",
                tool_use_id=result.call_id,
                content=result.content,
            )
        )
    return [
        {
            "role": "user",  # Changed from "tool" to "user"
            "content": content_blocks,
        }
    ]
def to_anthropic_type(message: LLMMessage) -> Union[str, List[MessageParam], MessageParam]:
    if isinstance(message, SystemMessage):
        return system_message_to_anthropic(message)
    elif isinstance(message, UserMessage):
        return user_message_to_anthropic(message)
    elif isinstance(message, AssistantMessage):
        return assistant_message_to_anthropic(message)
    else:
        return tool_message_to_anthropic(message)
def convert_tools(tools: Sequence[Tool | ToolSchema]) -> List[ToolParam]:
    result: List[ToolParam] = []
    for tool in tools:
        if isinstance(tool, Tool):
            tool_schema = tool.schema
        else:
            assert isinstance(tool, dict)
            tool_schema = tool
        # Convert parameters to match Anthropic's schema format
        tool_params: Dict[str, Any] = {}
        if "parameters" in tool_schema:
            params = tool_schema["parameters"]
            # Transfer properties
            if "properties" in params:
                tool_params["properties"] = params["properties"]
            # Transfer required fields
            if "required" in params:
                tool_params["required"] = params["required"]
            # Handle schema type
            if "type" in params:
                tool_params["type"] = params["type"]
            else:
                tool_params["type"] = "object"
        result.append(
            ToolParam(
                name=tool_schema["name"],
                input_schema=tool_params,
                description=tool_schema.get("description", ""),
            )
        )
        # Check if the tool has a valid name
        assert_valid_name(tool_schema["name"])
    return result
def normalize_name(name: str) -> str:
    """
    def __init__(self, **kwargs: Unpack[AnthropicClientConfiguration]):
        if "model" not in kwargs:
            raise ValueError("model is required for AnthropicChatCompletionClient")
        self._raw_config: Dict[str, Any] = dict(kwargs).copy()
        copied_args = dict(kwargs).copy()
        model_info: Optional[ModelInfo] = None
        if "model_info" in kwargs:
            model_info = kwargs["model_info"]
            del copied_args["model_info"]
        client = _anthropic_client_from_config(copied_args)
        create_args = _create_args_from_config(copied_args)
        super().__init__(
            client=client,
            create_args=create_args,
            model_info=model_info,
        )
    def __getstate__(self) -> Dict[str, Any]:
        state = self.__dict__.copy()
        state["_client"] = None
        return state
    def __setstate__(self, state: Dict[str, Any]) -> None:
        self.__dict__.update(state)
        self._client = _anthropic_client_from_config(state["_raw_config"])
    """
    return re.sub(r"[^a-zA-Z0-9_-]", "_", name)[:64]
def assert_valid_name(name: str) -> str:
    """
    Ensure that configured names are valid, raises ValueError if not.
    """
    if not re.match(r"^[a-zA-Z0-9_-]+$", name):
        raise ValueError(f"Invalid name: {name}. Only letters, numbers, '_' and '-' are allowed.")
    if len(name) > 64:
        raise ValueError(f"Invalid name: {name}. Name must be less than 64 characters.")
    return name
def normalize_stop_reason(stop_reason: str | None) -> FinishReasons:
    if stop_reason is None:
        return "unknown"
    # Convert to lowercase for comparison
    stop_reason = stop_reason.lower()
    # Map Anthropic stop reasons to standard reasons
    KNOWN_STOP_MAPPINGS: Dict[str, FinishReasons] = {
        "end_turn": "stop",
        "max_tokens": "length",
        "stop_sequence": "stop",
        "tool_use": "function_calls",
    }
    return KNOWN_STOP_MAPPINGS.get(stop_reason, "unknown")
def _add_usage(usage1: RequestUsage, usage2: RequestUsage) -> RequestUsage:
    return RequestUsage(
        prompt_tokens=usage1.prompt_tokens + usage2.prompt_tokens,
        completion_tokens=usage1.completion_tokens + usage2.completion_tokens,
    )
[docs]
class BaseAnthropicChatCompletionClient(ChatCompletionClient):
    def __init__(
        self,
        client: AsyncAnthropic,
        *,
        create_args: Dict[str, Any],
        model_info: Optional[ModelInfo] = None,
    ):
        self._client = client
        if model_info is None:
            try:
                self._model_info = model_info_util.get_info(create_args["model"])
            except KeyError as err:
                raise ValueError("model_info is required when model name is not recognized") from err
        else:
            self._model_info = model_info
        # Validate model_info
        validate_model_info(self._model_info)
        self._create_args = create_args
        self._total_usage = RequestUsage(prompt_tokens=0, completion_tokens=0)
        self._actual_usage = RequestUsage(prompt_tokens=0, completion_tokens=0)
[docs]
    async def create(
        self,
        messages: Sequence[LLMMessage],
        *,
        tools: Sequence[Tool | ToolSchema] = [],
        json_output: Optional[bool] = None,
        extra_create_args: Mapping[str, Any] = {},
        cancellation_token: Optional[CancellationToken] = None,
    ) -> CreateResult:
        # Copy create args and update with extra args
        create_args = self._create_args.copy()
        create_args.update(extra_create_args)
        # Check for vision capability if images are present
        if self.model_info["vision"] is False:
            for message in messages:
                if isinstance(message, UserMessage):
                    if isinstance(message.content, list) and any(isinstance(x, Image) for x in message.content):
                        raise ValueError("Model does not support vision and image was provided")
        # Handle JSON output format
        if json_output is not None:
            if self.model_info["json_output"] is False and json_output is True:
                raise ValueError("Model does not support JSON output")
            if json_output is True:
                create_args["response_format"] = {"type": "json_object"}
        # Process system message separately
        system_message = None
        anthropic_messages: List[MessageParam] = []
        for message in messages:
            if isinstance(message, SystemMessage):
                if system_message is not None:
                    raise ValueError("Multiple system messages are not supported")
                system_message = to_anthropic_type(message)
            else:
                anthropic_message = to_anthropic_type(message)
                if isinstance(anthropic_message, list):
                    anthropic_messages.extend(anthropic_message)
                elif isinstance(anthropic_message, str):
                    msg = MessageParam(
                        role="user" if isinstance(message, UserMessage) else "assistant", content=anthropic_message
                    )
                    anthropic_messages.append(msg)
                else:
                    anthropic_messages.append(anthropic_message)
        # Check for function calling support
        if self.model_info["function_calling"] is False and len(tools) > 0:
            raise ValueError("Model does not support function calling")
        # Set up the request
        request_args: Dict[str, Any] = {
            "model": create_args["model"],
            "messages": anthropic_messages,
            "max_tokens": create_args.get("max_tokens", 4096),
            "temperature": create_args.get("temperature", 1.0),
        }
        # Add system message if present
        if system_message is not None:
            request_args["system"] = system_message
        has_tool_results = any(isinstance(msg, FunctionExecutionResultMessage) for msg in messages)
        # Store and add tools if present
        if len(tools) > 0:
            converted_tools = convert_tools(tools)
            self._last_used_tools = converted_tools
            request_args["tools"] = converted_tools
        elif has_tool_results:
            # anthropic requires tools to be present even if there is any tool use
            request_args["tools"] = self._last_used_tools
        # Optional parameters
        for param in ["top_p", "top_k", "stop_sequences", "metadata"]:
            if param in create_args:
                request_args[param] = create_args[param]
        # Execute the request
        future: asyncio.Task[Message] = asyncio.ensure_future(self._client.messages.create(**request_args))  # type: ignore
        if cancellation_token is not None:
            cancellation_token.link_future(future)  # type: ignore
        result: Message = cast(Message, await future)  # type: ignore
        # Extract usage statistics
        usage = RequestUsage(
            prompt_tokens=result.usage.input_tokens,
            completion_tokens=result.usage.output_tokens,
        )
        # Log the event if in a handler context
        try:
            agent_id = MessageHandlerContext.agent_id()
        except RuntimeError:
            agent_id = None
        logger.info(
            LLMCallEvent(
                messages=cast(Dict[str, Any], anthropic_messages),
                response=result.model_dump(),
                prompt_tokens=usage.prompt_tokens,
                completion_tokens=usage.completion_tokens,
                agent_id=agent_id,
            )
        )
        # Process the response
        content: Union[str, List[FunctionCall]]
        thought = None
        # Check if the response includes tool uses
        tool_uses = [block for block in result.content if getattr(block, "type", None) == "tool_use"]
        if tool_uses:
            # Handle tool use response
            content = []
            # Check for text content that should be treated as thought
            text_blocks: List[TextBlock] = [block for block in result.content if isinstance(block, TextBlock)]
            if text_blocks:
                thought = "".join([block.text for block in text_blocks])
            # Process tool use blocks
            for tool_use in tool_uses:
                if isinstance(tool_use, ToolUseBlock):
                    tool_input = tool_use.input
                    if isinstance(tool_input, dict):
                        tool_input = json.dumps(tool_input)
                    else:
                        tool_input = str(tool_input) if tool_input is not None else ""
                    content.append(
                        FunctionCall(
                            id=tool_use.id,
                            name=normalize_name(tool_use.name),
                            arguments=tool_input,
                        )
                    )
        else:
            # Handle text response
            content = "".join([block.text if isinstance(block, TextBlock) else "" for block in result.content])
        # Create the final result
        response = CreateResult(
            finish_reason=normalize_stop_reason(result.stop_reason),
            content=content,
            usage=usage,
            cached=False,
            thought=thought,
        )
        # Update usage statistics
        self._total_usage = _add_usage(self._total_usage, usage)
        self._actual_usage = _add_usage(self._actual_usage, usage)
        return response 
[docs]
    async def create_stream(
        self,
        messages: Sequence[LLMMessage],
        *,
        tools: Sequence[Tool | ToolSchema] = [],
        json_output: Optional[bool] = None,
        extra_create_args: Mapping[str, Any] = {},
        cancellation_token: Optional[CancellationToken] = None,
        max_consecutive_empty_chunk_tolerance: int = 0,
    ) -> AsyncGenerator[Union[str, CreateResult], None]:
        """
        Creates an AsyncGenerator that yields a stream of completions based on the provided messages and tools.
        """
        # Copy create args and update with extra args
        create_args = self._create_args.copy()
        create_args.update(extra_create_args)
        # Check for vision capability if images are present
        if self.model_info["vision"] is False:
            for message in messages:
                if isinstance(message, UserMessage):
                    if isinstance(message.content, list) and any(isinstance(x, Image) for x in message.content):
                        raise ValueError("Model does not support vision and image was provided")
        # Handle JSON output format
        if json_output is not None:
            if self.model_info["json_output"] is False and json_output is True:
                raise ValueError("Model does not support JSON output")
            if json_output is True:
                create_args["response_format"] = {"type": "json_object"}
        # Process system message separately
        system_message = None
        anthropic_messages: List[MessageParam] = []
        for message in messages:
            if isinstance(message, SystemMessage):
                if system_message is not None:
                    raise ValueError("Multiple system messages are not supported")
                system_message = to_anthropic_type(message)
            else:
                anthropic_message = to_anthropic_type(message)
                if isinstance(anthropic_message, list):
                    anthropic_messages.extend(anthropic_message)
                elif isinstance(anthropic_message, str):
                    msg = MessageParam(
                        role="user" if isinstance(message, UserMessage) else "assistant", content=anthropic_message
                    )
                    anthropic_messages.append(msg)
                else:
                    anthropic_messages.append(anthropic_message)
        # Check for function calling support
        if self.model_info["function_calling"] is False and len(tools) > 0:
            raise ValueError("Model does not support function calling")
        # Set up the request
        request_args: Dict[str, Any] = {
            "model": create_args["model"],
            "messages": anthropic_messages,
            "max_tokens": create_args.get("max_tokens", 4096),
            "temperature": create_args.get("temperature", 1.0),
            "stream": True,
        }
        # Add system message if present
        if system_message is not None:
            request_args["system"] = system_message
        # Check if any message is a tool result
        has_tool_results = any(isinstance(msg, FunctionExecutionResultMessage) for msg in messages)
        # Add tools if present
        if len(tools) > 0:
            converted_tools = convert_tools(tools)
            self._last_used_tools = converted_tools
            request_args["tools"] = converted_tools
        elif has_tool_results:
            request_args["tools"] = self._last_used_tools
        # Optional parameters
        for param in ["top_p", "top_k", "stop_sequences", "metadata"]:
            if param in create_args:
                request_args[param] = create_args[param]
        # Stream the response
        stream_future: asyncio.Task[AsyncStream[RawMessageStreamEvent]] = asyncio.ensure_future(
            cast(Coroutine[Any, Any, AsyncStream[RawMessageStreamEvent]], self._client.messages.create(**request_args))
        )
        if cancellation_token is not None:
            cancellation_token.link_future(stream_future)  # type: ignore
        stream: AsyncStream[RawMessageStreamEvent] = cast(AsyncStream[RawMessageStreamEvent], await stream_future)  # type: ignore
        text_content: List[str] = []
        tool_calls: Dict[str, Dict[str, Any]] = {}  # Track tool calls by ID
        current_tool_id: Optional[str] = None
        input_tokens: int = 0
        output_tokens: int = 0
        stop_reason: Optional[str] = None
        # Process the stream
        async for chunk in stream:
            # Handle different event types
            if chunk.type == "content_block_start":
                if chunk.content_block.type == "tool_use":
                    # Start of a tool use block
                    current_tool_id = chunk.content_block.id
                    tool_calls[current_tool_id] = {
                        "id": chunk.content_block.id,
                        "name": chunk.content_block.name,
                        "input": "",  # Will be populated from deltas
                    }
            elif chunk.type == "content_block_delta":
                if hasattr(chunk.delta, "type") and chunk.delta.type == "text_delta":
                    # Handle text content
                    delta_text = chunk.delta.text
                    text_content.append(delta_text)
                    if delta_text:
                        yield delta_text
                # Handle tool input deltas - they come as InputJSONDelta
                elif hasattr(chunk.delta, "type") and chunk.delta.type == "input_json_delta":
                    if current_tool_id is not None and hasattr(chunk.delta, "partial_json"):
                        # Accumulate partial JSON for the current tool
                        tool_calls[current_tool_id]["input"] += chunk.delta.partial_json
            elif chunk.type == "content_block_stop":
                # End of a content block (could be text or tool)
                current_tool_id = None
            elif chunk.type == "message_delta":
                if hasattr(chunk.delta, "stop_reason") and chunk.delta.stop_reason:
                    stop_reason = chunk.delta.stop_reason
                # Get usage info if available
                if hasattr(chunk, "usage") and hasattr(chunk.usage, "output_tokens"):
                    output_tokens = chunk.usage.output_tokens
            elif chunk.type == "message_start":
                if hasattr(chunk, "message") and hasattr(chunk.message, "usage"):
                    if hasattr(chunk.message.usage, "input_tokens"):
                        input_tokens = chunk.message.usage.input_tokens
                    if hasattr(chunk.message.usage, "output_tokens"):
                        output_tokens = chunk.message.usage.output_tokens
        # Prepare the final response
        usage = RequestUsage(
            prompt_tokens=input_tokens,
            completion_tokens=output_tokens,
        )
        # Determine content based on what was received
        content: Union[str, List[FunctionCall]]
        thought = None
        if tool_calls:
            # We received tool calls
            if text_content:
                # Text before tool calls is treated as thought
                thought = "".join(text_content)
            # Convert tool calls to FunctionCall objects
            content = []
            for _, tool_data in tool_calls.items():
                # Parse the JSON input if needed
                input_str = tool_data["input"]
                try:
                    # If it's valid JSON, parse it; otherwise use as-is
                    if input_str.strip().startswith("{") and input_str.strip().endswith("}"):
                        parsed_input = json.loads(input_str)
                        input_str = json.dumps(parsed_input)  # Re-serialize to ensure valid JSON
                except json.JSONDecodeError:
                    # Keep as string if not valid JSON
                    pass
                content.append(
                    FunctionCall(
                        id=tool_data["id"],
                        name=normalize_name(tool_data["name"]),
                        arguments=input_str,
                    )
                )
        else:
            # Just text content
            content = "".join(text_content)
        # Create the final result
        result = CreateResult(
            finish_reason=normalize_stop_reason(stop_reason),
            content=content,
            usage=usage,
            cached=False,
            thought=thought,
        )
        # Update usage statistics
        self._total_usage = _add_usage(self._total_usage, usage)
        self._actual_usage = _add_usage(self._actual_usage, usage)
        yield result 
[docs]
    def count_tokens(self, messages: Sequence[LLMMessage], *, tools: Sequence[Tool | ToolSchema] = []) -> int:
        """
        Estimate the number of tokens used by messages and tools.
        Note: This is an estimation based on common tokenization patterns and may not perfectly
        match Anthropic's exact token counting for Claude models.
        """
        # Use cl100k_base encoding as an approximation for Claude's tokenizer
        try:
            encoding = tiktoken.get_encoding("cl100k_base")
        except Exception:
            encoding = tiktoken.get_encoding("gpt2")  # Fallback
        num_tokens = 0
        # System message tokens (if any)
        system_content = None
        for message in messages:
            if isinstance(message, SystemMessage):
                system_content = message.content
                break
        if system_content:
            num_tokens += len(encoding.encode(system_content)) + 15  # Approximate system message overhead
        # Message tokens
        for message in messages:
            if isinstance(message, SystemMessage):
                continue  # Already counted
            # Base token cost per message
            num_tokens += 10  # Approximate message role & formatting overhead
            # Content tokens
            if isinstance(message, UserMessage) or isinstance(message, AssistantMessage):
                if isinstance(message.content, str):
                    num_tokens += len(encoding.encode(message.content))
                elif isinstance(message.content, list):
                    # Handle different content types
                    for part in message.content:
                        if isinstance(part, str):
                            num_tokens += len(encoding.encode(part))
                        elif isinstance(part, Image):
                            # Estimate vision tokens (simplified)
                            num_tokens += 512  # Rough estimation for image tokens
                        elif isinstance(part, FunctionCall):
                            num_tokens += len(encoding.encode(part.name))
                            num_tokens += len(encoding.encode(part.arguments))
                            num_tokens += 10  # Function call overhead
            elif isinstance(message, FunctionExecutionResultMessage):
                for result in message.content:
                    num_tokens += len(encoding.encode(result.content))
                    num_tokens += 10  # Function result overhead
        # Tool tokens
        for tool in tools:
            if isinstance(tool, Tool):
                tool_schema = tool.schema
            else:
                tool_schema = tool
            # Name and description
            num_tokens += len(encoding.encode(tool_schema["name"]))
            if "description" in tool_schema:
                num_tokens += len(encoding.encode(tool_schema["description"]))
            # Parameters
            if "parameters" in tool_schema:
                params = tool_schema["parameters"]
                if "properties" in params:
                    for prop_name, prop_schema in params["properties"].items():
                        num_tokens += len(encoding.encode(prop_name))
                        if "type" in prop_schema:
                            num_tokens += len(encoding.encode(prop_schema["type"]))
                        if "description" in prop_schema:
                            num_tokens += len(encoding.encode(prop_schema["description"]))
                        # Special handling for enums
                        if "enum" in prop_schema:
                            for value in prop_schema["enum"]:
                                if isinstance(value, str):
                                    num_tokens += len(encoding.encode(value))
                                else:
                                    num_tokens += 2  # Non-string enum values
            # Tool overhead
            num_tokens += 20
        return num_tokens 
[docs]
    def remaining_tokens(self, messages: Sequence[LLMMessage], *, tools: Sequence[Tool | ToolSchema] = []) -> int:
        """Calculate the remaining tokens based on the model's token limit."""
        token_limit = model_info_util.get_token_limit(self._create_args["model"])
        return token_limit - self.count_tokens(messages, tools=tools) 
[docs]
    def actual_usage(self) -> RequestUsage:
        return self._actual_usage 
[docs]
    def total_usage(self) -> RequestUsage:
        return self._total_usage 
    @property
    def capabilities(self) -> ModelCapabilities:  # type: ignore
        warnings.warn("capabilities is deprecated, use model_info instead", DeprecationWarning, stacklevel=2)
        return self._model_info
    @property
    def model_info(self) -> ModelInfo:
        return self._model_info 
[docs]
class AnthropicChatCompletionClient(
    BaseAnthropicChatCompletionClient
):
    """
    Chat completion client for Anthropic's Claude models.
    Args:
        model (str): The Claude model to use (e.g., "claude-3-sonnet-20240229", "claude-3-opus-20240229")
        api_key (str, optional): Anthropic API key. Required if not in environment variables.
        base_url (str, optional): Override the default API endpoint.
        max_tokens (int, optional): Maximum tokens in the response. Default is 4096.
        temperature (float, optional): Controls randomness. Lower is more deterministic. Default is 1.0.
        top_p (float, optional): Controls diversity via nucleus sampling. Default is 1.0.
        top_k (int, optional): Controls diversity via top-k sampling. Default is -1 (disabled).
        model_info (ModelInfo, optional): The capabilities of the model. Required if using a custom model.
    To use this client, you must install the Anthropic extension:
    .. code-block:: bash
        pip install "agentopera[anthropic]"
    Example:
    .. code-block:: python
        from agentopera.models.anthropic import AnthropicChatCompletionClient
        from agentopera.core.types.models import UserMessage
        anthropic_client = AnthropicChatCompletionClient(
            model="claude-3-sonnet-20240229",
            api_key="your-api-key",  # Optional if ANTHROPIC_API_KEY is set in environment
        )
        result = await anthropic_client.create([UserMessage(content="What is the capital of France?", source="user")])
        print(result)
    To load the client from a configuration:
    .. code-block:: python
        from agentopera.core.types.models import ChatCompletionClient
        config = {
            "provider": "AnthropicChatCompletionClient",
            "config": {"model": "claude-3-sonnet-20240229"},
        }
        client = ChatCompletionClient.load_component(config)
    The client supports function calling with Claude models that have the capability:
    .. code-block:: python
        from agentopera.core.tools import FunctionTool
        def get_weather(location: str) -> str:
            '''Get the weather for a location'''
            return f"The weather in {location} is sunny."
        tool = FunctionTool(get_weather)
        result = await anthropic_client.create(
            [UserMessage(content="What's the weather in Paris?", source="user")],
            tools=[tool],
        )
    """
    component_type = "model"
    component_config_schema = AnthropicClientConfigurationConfigModel
    component_provider_override = "agentopera.agents.models.anthropic.AnthropicChatCompletionClient"
    def __init__(self, **kwargs: Unpack[AnthropicClientConfiguration]):
        if "model" not in kwargs:
            raise ValueError("model is required for AnthropicChatCompletionClient")
        self._raw_config: Dict[str, Any] = dict(kwargs).copy()
        copied_args = dict(kwargs).copy()
        model_info: Optional[ModelInfo] = None
        if "model_info" in kwargs:
            model_info = kwargs["model_info"]
            del copied_args["model_info"]
        client = _anthropic_client_from_config(copied_args)
        create_args = _create_args_from_config(copied_args)
        super().__init__(
            client=client,
            create_args=create_args,
            model_info=model_info,
        )
    def __getstate__(self) -> Dict[str, Any]:
        state = self.__dict__.copy()
        state["_client"] = None
        return state
    def __setstate__(self, state: Dict[str, Any]) -> None:
        self.__dict__.update(state)
        self._client = _anthropic_client_from_config(state["_raw_config"])