# File based from: https://github.com/FedML-AI/AgentOpera/blob/main/agentopera/coding/func_with_reqs.py
# Credit to original authors
from __future__ import annotations
import functools
import inspect
from dataclasses import dataclass, field
from importlib.abc import SourceLoader
from importlib.util import module_from_spec, spec_from_loader
from textwrap import dedent, indent
from typing import Any, Callable, Generic, List, Sequence, Set, Tuple, TypeVar, Union
from typing_extensions import ParamSpec
T = TypeVar("T")
P = ParamSpec("P")
def _to_code(func: Union[FunctionWithRequirements[T, P], Callable[P, T], FunctionWithRequirementsStr]) -> str:
if isinstance(func, FunctionWithRequirementsStr):
return func.func
if isinstance(func, FunctionWithRequirements):
code = inspect.getsource(func.func)
else:
code = inspect.getsource(func)
# Strip the decorator
if code.startswith("@"):
code = code[code.index("\n") + 1 :]
return code
@dataclass(frozen=True)
class Alias:
name: str
alias: str
@dataclass(frozen=True)
class ImportFromModule:
module: str
imports: Tuple[Union[str, Alias], ...]
# backward compatibility
def __init__(
self,
module: str,
imports: Union[Tuple[Union[str, Alias], ...], List[Union[str, Alias]]],
):
object.__setattr__(self, "module", module)
if isinstance(imports, list):
object.__setattr__(self, "imports", tuple(imports))
else:
object.__setattr__(self, "imports", imports)
Import = Union[str, ImportFromModule, Alias]
def _import_to_str(im: Import) -> str:
if isinstance(im, str):
return f"import {im}"
elif isinstance(im, Alias):
return f"import {im.name} as {im.alias}"
else:
def to_str(i: Union[str, Alias]) -> str:
if isinstance(i, str):
return i
else:
return f"{i.name} as {i.alias}"
imports = ", ".join(map(to_str, im.imports))
return f"from {im.module} import {imports}"
class _StringLoader(SourceLoader):
def __init__(self, data: str):
self.data = data
def get_source(self, fullname: str) -> str:
return self.data
def get_data(self, path: str) -> bytes:
return self.data.encode("utf-8")
def get_filename(self, fullname: str) -> str:
return "<not a real path>/" + fullname + ".py"
[docs]
@dataclass
class FunctionWithRequirementsStr:
func: str
compiled_func: Callable[..., Any]
_func_name: str
python_packages: Sequence[str] = field(default_factory=list)
global_imports: Sequence[Import] = field(default_factory=list)
def __init__(self, func: str, python_packages: Sequence[str] = [], global_imports: Sequence[Import] = []):
self.func = func
self.python_packages = python_packages
self.global_imports = global_imports
module_name = "func_module"
loader = _StringLoader(func)
spec = spec_from_loader(module_name, loader)
if spec is None:
raise ValueError("Could not create spec")
module = module_from_spec(spec)
if spec.loader is None:
raise ValueError("Could not create loader")
try:
spec.loader.exec_module(module)
except Exception as e:
raise ValueError(f"Could not compile function: {e}") from e
functions = inspect.getmembers(module, inspect.isfunction)
if len(functions) != 1:
raise ValueError("The string must contain exactly one function")
self._func_name, self.compiled_func = functions[0]
def __call__(self, *args: Any, **kwargs: Any) -> None:
raise NotImplementedError("String based function with requirement objects are not directly callable")
[docs]
@dataclass
class FunctionWithRequirements(Generic[T, P]):
func: Callable[P, T]
python_packages: Sequence[str] = field(default_factory=list)
global_imports: Sequence[Import] = field(default_factory=list)
[docs]
@classmethod
def from_callable(
cls, func: Callable[P, T], python_packages: Sequence[str] = [], global_imports: Sequence[Import] = []
) -> FunctionWithRequirements[T, P]:
return cls(python_packages=python_packages, global_imports=global_imports, func=func)
[docs]
@staticmethod
def from_str(
func: str, python_packages: Sequence[str] = [], global_imports: Sequence[Import] = []
) -> FunctionWithRequirementsStr:
return FunctionWithRequirementsStr(func=func, python_packages=python_packages, global_imports=global_imports)
# Type this based on F
def __call__(self, *args: P.args, **kwargs: P.kwargs) -> T:
return self.func(*args, **kwargs)
def with_requirements(
python_packages: Sequence[str] = [], global_imports: Sequence[Import] = []
) -> Callable[[Callable[P, T]], FunctionWithRequirements[T, P]]:
"""
Decorate a function with package and import requirements for code execution environments.
This decorator makes a function available for reference in dynamically executed code blocks
by wrapping it in a `FunctionWithRequirements` object that tracks its dependencies. When the
decorated function is passed to a code executor, it can be imported by name in the executed
code, with all dependencies automatically handled.
Args:
python_packages (Sequence[str], optional): Python packages required by the function.
Can include version specifications (e.g., ["pandas>=1.0.0"]). Defaults to [].
global_imports (Sequence[Import], optional): Import statements required by the function.
Can be strings ("numpy"), ImportFromModule objects, or Alias objects. Defaults to [].
Returns:
Callable[[Callable[P, T]], FunctionWithRequirements[T, P]]: A decorator that wraps
the target function, preserving its functionality while registering its dependencies.
Example:
.. code-block:: python
import tempfile
import asyncio
from agentopera.core import CancellationToken
from agentopera.core.code_executor import with_requirements, CodeBlock
from agentopera.agents.code_executors.local import LocalCommandLineCodeExecutor
import pandas
@with_requirements(python_packages=["pandas"], global_imports=["pandas"])
def load_data() -> pandas.DataFrame:
\"\"\"Load some sample data.
Returns:
pandas.DataFrame: A DataFrame with sample data
\"\"\"
data = {
"name": ["John", "Anna", "Peter", "Linda"],
"location": ["New York", "Paris", "Berlin", "London"],
"age": [24, 13, 53, 33],
}
return pandas.DataFrame(data)
async def run_example():
# The decorated function can be used in executed code
with tempfile.TemporaryDirectory() as temp_dir:
executor = LocalCommandLineCodeExecutor(work_dir=temp_dir, functions=[load_data])
code = f\"\"\"from {executor.functions_module} import load_data
# Use the imported function
data = load_data()
print(data['name'][0])\"\"\"
result = await executor.execute_code_blocks(
code_blocks=[CodeBlock(language="python", code=code)],
cancellation_token=CancellationToken(),
)
print(result.output) # Output: John
# Run the async example
asyncio.run(run_example())
"""
def wrapper(func: Callable[P, T]) -> FunctionWithRequirements[T, P]:
func_with_reqs = FunctionWithRequirements(
python_packages=python_packages, global_imports=global_imports, func=func
)
functools.update_wrapper(func_with_reqs, func)
return func_with_reqs
return wrapper
def build_python_functions_file(
funcs: Sequence[Union[FunctionWithRequirements[Any, P], Callable[..., Any], FunctionWithRequirementsStr]],
) -> str:
""":meta private:"""
# First collect all global imports
global_imports: Set[Import] = set()
for func in funcs:
if isinstance(func, (FunctionWithRequirements, FunctionWithRequirementsStr)):
global_imports.update(func.global_imports)
content = "\n".join(map(_import_to_str, global_imports)) + "\n\n"
for func in funcs:
content += _to_code(func) + "\n\n"
return content
def to_stub(func: Union[Callable[..., Any], FunctionWithRequirementsStr]) -> str:
"""Generate a stub for a function as a string
Args:
func (Callable[..., Any]): The function to generate a stub for
Returns:
str: The stub for the function
"""
if isinstance(func, FunctionWithRequirementsStr):
return to_stub(func.compiled_func)
content = f"def {func.__name__}{inspect.signature(func)}:\n"
docstring = func.__doc__
if docstring:
docstring = dedent(docstring)
docstring = '"""' + docstring + '"""'
docstring = indent(docstring, " ")
content += docstring + "\n"
content += " ..."
return content
def to_code(func: Union[FunctionWithRequirements[T, P], Callable[P, T], FunctionWithRequirementsStr]) -> str:
return _to_code(func)
def import_to_str(im: Import) -> str:
return _import_to_str(im)