Source code for agentopera.adapter.langchain.langchain_adapter

from __future__ import annotations

import asyncio
import inspect
from typing import TYPE_CHECKING, Any, Callable, Dict, Type, cast

from agentopera.engine.types.agent import CancellationToken
from agentopera.engine.function_call import BaseTool
from pydantic import BaseModel, Field, create_model

if TYPE_CHECKING:
    from langchain_core.tools import BaseTool as LangChainTool


[docs] class LangChainToolAdapter(BaseTool[BaseModel, Any]): """Allows you to wrap a LangChain tool and make it available to agentopera. .. note:: This class requires the :code:`langchain` extra for the :code:`agentopera` package. .. code-block:: bash pip install -U "agentopera[langchain]" Args: langchain_tool (LangChainTool): A LangChain tool to wrap Examples: Use the `PythonAstREPLTool` from the `langchain_experimental` package to create a tool that allows you to interact with a Pandas DataFrame. .. code-block:: python import asyncio import pandas as pd from langchain_experimental.tools.python.tool import PythonAstREPLTool from agentopera.agents.tools.langchain import LangChainToolAdapter from agentopera.models.openai import OpenAIChatCompletionClient from agentopera.chatflow.messages import TextMessage from agentopera.chatflow.agents import AssistantAgent from agentopera.chatflow.ui import Console from agentopera.core import CancellationToken async def main() -> None: df = pd.read_csv("https://raw.githubusercontent.com/pandas-dev/pandas/main/doc/data/titanic.csv") # type: ignore tool = LangChainToolAdapter(PythonAstREPLTool(locals={"df": df})) model_client = OpenAIChatCompletionClient(model="gpt-4o") agent = AssistantAgent( "assistant", tools=[tool], model_client=model_client, system_message="Use the `df` variable to access the dataset.", ) await Console( agent.on_messages_stream( [TextMessage(content="What's the average age of the passengers?", source="user")], CancellationToken() ) ) asyncio.run(main()) This example demonstrates how to use the `SQLDatabaseToolkit` from the `langchain_community` package to interact with an SQLite database. It uses the :class:`~agentopera.chatflow.team.RoundRobinGroupChat` to iterate the single agent over multiple steps. If you want to one step at a time, you can just call `run_stream` method of the :class:`~agentopera.chatflow.agents.AssistantAgent` class directly. .. code-block:: python import asyncio import sqlite3 import requests from agentopera.chatflow.agents import AssistantAgent from agentopera.chatflow.conditions import TextMentionTermination from agentopera.chatflow.teams import RoundRobinGroupChat from agentopera.chatflow.ui import Console from agentopera.models.openai import OpenAIChatCompletionClient from agentopera.agents.tools.langchain import LangChainToolAdapter from langchain_community.agent_toolkits.sql.toolkit import SQLDatabaseToolkit from langchain_community.utilities.sql_database import SQLDatabase from langchain_openai import ChatOpenAI from sqlalchemy import Engine, create_engine from sqlalchemy.pool import StaticPool def get_engine_for_chinook_db() -> Engine: url = "https://raw.githubusercontent.com/lerocha/chinook-database/master/ChinookDatabase/DataSources/Chinook_Sqlite.sql" response = requests.get(url) sql_script = response.text connection = sqlite3.connect(":memory:", check_same_thread=False) connection.executescript(sql_script) return create_engine( "sqlite://", creator=lambda: connection, poolclass=StaticPool, connect_args={"check_same_thread": False}, ) async def main() -> None: # Create the engine and database wrapper. engine = get_engine_for_chinook_db() db = SQLDatabase(engine) # Create the toolkit. llm = ChatOpenAI(temperature=0) toolkit = SQLDatabaseToolkit(db=db, llm=llm) # Create the LangChain tool adapter for every tool in the toolkit. tools = [LangChainToolAdapter(tool) for tool in toolkit.get_tools()] # Create the chat completion client. model_client = OpenAIChatCompletionClient(model="gpt-4o") # Create the assistant agent. agent = AssistantAgent( "assistant", model_client=model_client, tools=tools, # type: ignore model_client_stream=True, system_message="Respond with 'TERMINATE' if the task is completed.", ) # Create termination condition. termination = TextMentionTermination("TERMINATE") # Create a round-robin group chat to iterate the single agent over multiple steps. chat = RoundRobinGroupChat([agent], termination_condition=termination) # Run the chat. await Console(chat.run_stream(task="Show some tables in the database")) if __name__ == "__main__": asyncio.run(main()) """ def __init__(self, langchain_tool: LangChainTool): self._langchain_tool: LangChainTool = langchain_tool # Extract name and description name = self._langchain_tool.name description = self._langchain_tool.description or "" # Determine the callable method if hasattr(self._langchain_tool, "func") and callable(self._langchain_tool.func): # type: ignore assert self._langchain_tool.func is not None # type: ignore self._callable: Callable[..., Any] = self._langchain_tool.func # type: ignore elif hasattr(self._langchain_tool, "_run") and callable(self._langchain_tool._run): # type: ignore self._callable: Callable[..., Any] = self._langchain_tool._run # type: ignore else: raise AttributeError( f"The provided LangChain tool '{name}' does not have a callable 'func' or '_run' method." ) # Determine args_type if self._langchain_tool.args_schema: # pyright: ignore args_type = self._langchain_tool.args_schema # pyright: ignore else: # Infer args_type from the callable's signature sig = inspect.signature(cast(Callable[..., Any], self._callable)) # type: ignore fields = { k: (v.annotation, Field(...)) for k, v in sig.parameters.items() if k != "self" and v.kind not in (inspect.Parameter.VAR_POSITIONAL, inspect.Parameter.VAR_KEYWORD) } args_type = create_model(f"{name}Args", **fields) # type: ignore # Note: type ignore is used due to a LangChain typing limitation # Ensure args_type is a subclass of BaseModel if not issubclass(args_type, BaseModel): raise ValueError(f"Failed to create a valid Pydantic v2 model for {name}") # Assume return_type as Any if not specified return_type: Type[Any] = object super().__init__(args_type, return_type, name, description)
[docs] async def run(self, args: BaseModel, cancellation_token: CancellationToken) -> Any: # Prepare arguments kwargs = args.model_dump() # Determine if the callable is asynchronous if inspect.iscoroutinefunction(self._callable): result = await self._callable(**kwargs) else: # Run in a thread to avoid blocking the event loop result = await asyncio.to_thread(self._call_sync, kwargs) return result
def _call_sync(self, kwargs: Dict[str, Any]) -> Any: return self._callable(**kwargs)