Source code for agentopera.engine.function_call.func_with_reqs

# File based from: https://github.com/FedML-AI/AgentOpera/blob/main/agentopera/coding/func_with_reqs.py
# Credit to original authors

from __future__ import annotations

import functools
import inspect
from dataclasses import dataclass, field
from importlib.abc import SourceLoader
from importlib.util import module_from_spec, spec_from_loader
from textwrap import dedent, indent
from typing import Any, Callable, Generic, List, Sequence, Set, Tuple, TypeVar, Union

from typing_extensions import ParamSpec

T = TypeVar("T")
P = ParamSpec("P")


def _to_code(func: Union[FunctionWithRequirements[T, P], Callable[P, T], FunctionWithRequirementsStr]) -> str:
    if isinstance(func, FunctionWithRequirementsStr):
        return func.func

    if isinstance(func, FunctionWithRequirements):
        code = inspect.getsource(func.func)
    else:
        code = inspect.getsource(func)
    # Strip the decorator
    if code.startswith("@"):
        code = code[code.index("\n") + 1 :]
    return code


@dataclass(frozen=True)
class Alias:
    name: str
    alias: str


@dataclass(frozen=True)
class ImportFromModule:
    module: str
    imports: Tuple[Union[str, Alias], ...]

    # backward compatibility
    def __init__(
        self,
        module: str,
        imports: Union[Tuple[Union[str, Alias], ...], List[Union[str, Alias]]],
    ):
        object.__setattr__(self, "module", module)
        if isinstance(imports, list):
            object.__setattr__(self, "imports", tuple(imports))
        else:
            object.__setattr__(self, "imports", imports)


Import = Union[str, ImportFromModule, Alias]


def _import_to_str(im: Import) -> str:
    if isinstance(im, str):
        return f"import {im}"
    elif isinstance(im, Alias):
        return f"import {im.name} as {im.alias}"
    else:

        def to_str(i: Union[str, Alias]) -> str:
            if isinstance(i, str):
                return i
            else:
                return f"{i.name} as {i.alias}"

        imports = ", ".join(map(to_str, im.imports))
        return f"from {im.module} import {imports}"


class _StringLoader(SourceLoader):
    def __init__(self, data: str):
        self.data = data

    def get_source(self, fullname: str) -> str:
        return self.data

    def get_data(self, path: str) -> bytes:
        return self.data.encode("utf-8")

    def get_filename(self, fullname: str) -> str:
        return "<not a real path>/" + fullname + ".py"


[docs] @dataclass class FunctionWithRequirementsStr: func: str compiled_func: Callable[..., Any] _func_name: str python_packages: Sequence[str] = field(default_factory=list) global_imports: Sequence[Import] = field(default_factory=list) def __init__(self, func: str, python_packages: Sequence[str] = [], global_imports: Sequence[Import] = []): self.func = func self.python_packages = python_packages self.global_imports = global_imports module_name = "func_module" loader = _StringLoader(func) spec = spec_from_loader(module_name, loader) if spec is None: raise ValueError("Could not create spec") module = module_from_spec(spec) if spec.loader is None: raise ValueError("Could not create loader") try: spec.loader.exec_module(module) except Exception as e: raise ValueError(f"Could not compile function: {e}") from e functions = inspect.getmembers(module, inspect.isfunction) if len(functions) != 1: raise ValueError("The string must contain exactly one function") self._func_name, self.compiled_func = functions[0] def __call__(self, *args: Any, **kwargs: Any) -> None: raise NotImplementedError("String based function with requirement objects are not directly callable")
[docs] @dataclass class FunctionWithRequirements(Generic[T, P]): func: Callable[P, T] python_packages: Sequence[str] = field(default_factory=list) global_imports: Sequence[Import] = field(default_factory=list)
[docs] @classmethod def from_callable( cls, func: Callable[P, T], python_packages: Sequence[str] = [], global_imports: Sequence[Import] = [] ) -> FunctionWithRequirements[T, P]: return cls(python_packages=python_packages, global_imports=global_imports, func=func)
[docs] @staticmethod def from_str( func: str, python_packages: Sequence[str] = [], global_imports: Sequence[Import] = [] ) -> FunctionWithRequirementsStr: return FunctionWithRequirementsStr(func=func, python_packages=python_packages, global_imports=global_imports)
# Type this based on F def __call__(self, *args: P.args, **kwargs: P.kwargs) -> T: return self.func(*args, **kwargs)
def with_requirements( python_packages: Sequence[str] = [], global_imports: Sequence[Import] = [] ) -> Callable[[Callable[P, T]], FunctionWithRequirements[T, P]]: """ Decorate a function with package and import requirements for code execution environments. This decorator makes a function available for reference in dynamically executed code blocks by wrapping it in a `FunctionWithRequirements` object that tracks its dependencies. When the decorated function is passed to a code executor, it can be imported by name in the executed code, with all dependencies automatically handled. Args: python_packages (Sequence[str], optional): Python packages required by the function. Can include version specifications (e.g., ["pandas>=1.0.0"]). Defaults to []. global_imports (Sequence[Import], optional): Import statements required by the function. Can be strings ("numpy"), ImportFromModule objects, or Alias objects. Defaults to []. Returns: Callable[[Callable[P, T]], FunctionWithRequirements[T, P]]: A decorator that wraps the target function, preserving its functionality while registering its dependencies. Example: .. code-block:: python import tempfile import asyncio from agentopera.core import CancellationToken from agentopera.core.code_executor import with_requirements, CodeBlock from agentopera.agents.code_executors.local import LocalCommandLineCodeExecutor import pandas @with_requirements(python_packages=["pandas"], global_imports=["pandas"]) def load_data() -> pandas.DataFrame: \"\"\"Load some sample data. Returns: pandas.DataFrame: A DataFrame with sample data \"\"\" data = { "name": ["John", "Anna", "Peter", "Linda"], "location": ["New York", "Paris", "Berlin", "London"], "age": [24, 13, 53, 33], } return pandas.DataFrame(data) async def run_example(): # The decorated function can be used in executed code with tempfile.TemporaryDirectory() as temp_dir: executor = LocalCommandLineCodeExecutor(work_dir=temp_dir, functions=[load_data]) code = f\"\"\"from {executor.functions_module} import load_data # Use the imported function data = load_data() print(data['name'][0])\"\"\" result = await executor.execute_code_blocks( code_blocks=[CodeBlock(language="python", code=code)], cancellation_token=CancellationToken(), ) print(result.output) # Output: John # Run the async example asyncio.run(run_example()) """ def wrapper(func: Callable[P, T]) -> FunctionWithRequirements[T, P]: func_with_reqs = FunctionWithRequirements( python_packages=python_packages, global_imports=global_imports, func=func ) functools.update_wrapper(func_with_reqs, func) return func_with_reqs return wrapper def build_python_functions_file( funcs: Sequence[Union[FunctionWithRequirements[Any, P], Callable[..., Any], FunctionWithRequirementsStr]], ) -> str: """:meta private:""" # First collect all global imports global_imports: Set[Import] = set() for func in funcs: if isinstance(func, (FunctionWithRequirements, FunctionWithRequirementsStr)): global_imports.update(func.global_imports) content = "\n".join(map(_import_to_str, global_imports)) + "\n\n" for func in funcs: content += _to_code(func) + "\n\n" return content def to_stub(func: Union[Callable[..., Any], FunctionWithRequirementsStr]) -> str: """Generate a stub for a function as a string Args: func (Callable[..., Any]): The function to generate a stub for Returns: str: The stub for the function """ if isinstance(func, FunctionWithRequirementsStr): return to_stub(func.compiled_func) content = f"def {func.__name__}{inspect.signature(func)}:\n" docstring = func.__doc__ if docstring: docstring = dedent(docstring) docstring = '"""' + docstring + '"""' docstring = indent(docstring, " ") content += docstring + "\n" content += " ..." return content def to_code(func: Union[FunctionWithRequirements[T, P], Callable[P, T], FunctionWithRequirementsStr]) -> str: return _to_code(func) def import_to_str(im: Import) -> str: return _import_to_str(im)