Nexa_Labs / tools /python_sandbox.py
Allanatrix's picture
Upload 57 files
d8328bf verified
"""Sandboxed Python execution environment for the NexaSci tool server."""
from __future__ import annotations
import builtins
import io
import os
import queue
import shutil
import sys
import tempfile
import traceback
from contextlib import redirect_stderr, redirect_stdout
from dataclasses import dataclass
from multiprocessing import Event, Process, Queue, get_context
from pathlib import Path
from typing import Dict, Iterable, List, Sequence
from tools.schemas import ArtifactRecord, PythonRunRequest, PythonRunResponse
try: # POSIX-only module; guard for portability.
import resource
except ImportError: # pragma: no cover - platform specific.
resource = None # type: ignore
ALLOWED_BUILTINS: Sequence[str] = (
"abs",
"all",
"any",
"bool",
"complex",
"dict",
"enumerate",
"float",
"int",
"len",
"list",
"map",
"max",
"min",
"pow",
"print",
"range",
"round",
"set",
"slice",
"sorted",
"str",
"sum",
"tuple",
"zip",
)
DEFAULT_ALLOWED_MODULES: Sequence[str] = (
"math",
"statistics",
"numpy",
"scipy",
"pandas",
"sympy",
"matplotlib",
"matplotlib.pyplot",
"seaborn",
)
@dataclass(frozen=True)
class SandboxConfig:
"""Configuration describing the sandbox constraints."""
timeout_s: int = 10
memory_limit_mb: int = 2048
working_directory: Path = Path(tempfile.gettempdir()) / "nexasci_python_sandbox"
allowed_modules: Sequence[str] = DEFAULT_ALLOWED_MODULES
def ensure_directory(self) -> None:
"""Create the sandbox working directory if it does not already exist."""
self.working_directory.mkdir(parents=True, exist_ok=True)
def _restricted_builtins() -> Dict[str, object]:
"""Return a dictionary of safe builtins exposed to sandboxed code."""
return {name: getattr(builtins, name) for name in ALLOWED_BUILTINS}
def _apply_memory_limit(memory_limit_mb: int) -> None:
"""Apply a per-process address space limit if supported by the platform."""
if resource is None:
return
soft_limit = hard_limit = memory_limit_mb * 1024 * 1024
try:
resource.setrlimit(resource.RLIMIT_AS, (soft_limit, hard_limit))
except (ValueError, resource.error): # pragma: no cover - platform differences.
pass
def _sandbox_worker(
code: str,
queue_: Queue,
allowed_modules: Sequence[str],
memory_limit_mb: int,
work_dir: Path,
stop_event: Event,
) -> None:
"""Execute user code inside a sandboxed process and report results via queue."""
stdout_buffer = io.StringIO()
stderr_buffer = io.StringIO()
_apply_memory_limit(memory_limit_mb)
os.chdir(work_dir)
safe_globals: Dict[str, object] = {"__builtins__": _restricted_builtins()}
for module_name in allowed_modules:
try:
module = __import__(module_name)
safe_globals[module_name.split(".")[0]] = module
except Exception as exc: # pragma: no cover - defensive.
stderr_buffer.write(f"Failed to import allowed module '{module_name}': {exc}\n")
try:
compiled = compile(code, "<sandbox>", "exec")
with redirect_stdout(stdout_buffer), redirect_stderr(stderr_buffer):
exec(compiled, safe_globals, {})
except Exception: # pragma: no cover - propagate errors to stderr.
traceback.print_exc(file=stderr_buffer)
finally:
queue_.put(
{
"stdout": stdout_buffer.getvalue(),
"stderr": stderr_buffer.getvalue(),
}
)
stop_event.set()
def _collect_artifacts(directory: Path) -> List[ArtifactRecord]:
"""Collect non-code artifacts generated within the sandbox directory."""
artifacts: List[ArtifactRecord] = []
for path in directory.iterdir():
if path.is_dir():
continue
if path.suffix == ".py":
continue
artifacts.append(
ArtifactRecord(
name=path.name,
path=str(path),
mime_type=None,
)
)
return artifacts
def execute_python(request: PythonRunRequest, config: SandboxConfig) -> PythonRunResponse:
"""Execute Python code within the configured sandbox."""
config.ensure_directory()
run_dir = Path(tempfile.mkdtemp(dir=config.working_directory))
queue_: Queue = get_context("spawn").Queue()
stop_event: Event = get_context("spawn").Event()
process = Process(
target=_sandbox_worker,
kwargs={
"code": request.code,
"queue_": queue_,
"allowed_modules": config.allowed_modules,
"memory_limit_mb": config.memory_limit_mb,
"work_dir": run_dir,
"stop_event": stop_event,
},
daemon=True,
)
process.start()
try:
stop_event.wait(timeout=request.timeout_s or config.timeout_s)
finally:
if process.is_alive():
process.terminate()
process.join(timeout=1)
stdout = ""
stderr = ""
try:
result = queue_.get_nowait()
stdout = result.get("stdout", "")
stderr = result.get("stderr", "")
except queue.Empty:
stderr = "Execution timed out or produced no output."
artifacts = _collect_artifacts(run_dir)
shutil.rmtree(run_dir, ignore_errors=True)
return PythonRunResponse(stdout=stdout, stderr=stderr, artifacts=artifacts)
__all__ = ["DEFAULT_ALLOWED_MODULES", "SandboxConfig", "execute_python"]