Spaces:
Paused
Paused
| """Sandboxed Python execution environment for the NexaSci tool server.""" | |
| from __future__ import annotations | |
| import builtins | |
| import io | |
| import os | |
| import queue | |
| import shutil | |
| import sys | |
| import tempfile | |
| import traceback | |
| from contextlib import redirect_stderr, redirect_stdout | |
| from dataclasses import dataclass | |
| from multiprocessing import Event, Process, Queue, get_context | |
| from pathlib import Path | |
| from typing import Dict, Iterable, List, Sequence | |
| from tools.schemas import ArtifactRecord, PythonRunRequest, PythonRunResponse | |
| try: # POSIX-only module; guard for portability. | |
| import resource | |
| except ImportError: # pragma: no cover - platform specific. | |
| resource = None # type: ignore | |
| ALLOWED_BUILTINS: Sequence[str] = ( | |
| "abs", | |
| "all", | |
| "any", | |
| "bool", | |
| "complex", | |
| "dict", | |
| "enumerate", | |
| "float", | |
| "int", | |
| "len", | |
| "list", | |
| "map", | |
| "max", | |
| "min", | |
| "pow", | |
| "print", | |
| "range", | |
| "round", | |
| "set", | |
| "slice", | |
| "sorted", | |
| "str", | |
| "sum", | |
| "tuple", | |
| "zip", | |
| ) | |
| DEFAULT_ALLOWED_MODULES: Sequence[str] = ( | |
| "math", | |
| "statistics", | |
| "numpy", | |
| "scipy", | |
| "pandas", | |
| "sympy", | |
| "matplotlib", | |
| "matplotlib.pyplot", | |
| "seaborn", | |
| ) | |
| class SandboxConfig: | |
| """Configuration describing the sandbox constraints.""" | |
| timeout_s: int = 10 | |
| memory_limit_mb: int = 2048 | |
| working_directory: Path = Path(tempfile.gettempdir()) / "nexasci_python_sandbox" | |
| allowed_modules: Sequence[str] = DEFAULT_ALLOWED_MODULES | |
| def ensure_directory(self) -> None: | |
| """Create the sandbox working directory if it does not already exist.""" | |
| self.working_directory.mkdir(parents=True, exist_ok=True) | |
| def _restricted_builtins() -> Dict[str, object]: | |
| """Return a dictionary of safe builtins exposed to sandboxed code.""" | |
| return {name: getattr(builtins, name) for name in ALLOWED_BUILTINS} | |
| def _apply_memory_limit(memory_limit_mb: int) -> None: | |
| """Apply a per-process address space limit if supported by the platform.""" | |
| if resource is None: | |
| return | |
| soft_limit = hard_limit = memory_limit_mb * 1024 * 1024 | |
| try: | |
| resource.setrlimit(resource.RLIMIT_AS, (soft_limit, hard_limit)) | |
| except (ValueError, resource.error): # pragma: no cover - platform differences. | |
| pass | |
| def _sandbox_worker( | |
| code: str, | |
| queue_: Queue, | |
| allowed_modules: Sequence[str], | |
| memory_limit_mb: int, | |
| work_dir: Path, | |
| stop_event: Event, | |
| ) -> None: | |
| """Execute user code inside a sandboxed process and report results via queue.""" | |
| stdout_buffer = io.StringIO() | |
| stderr_buffer = io.StringIO() | |
| _apply_memory_limit(memory_limit_mb) | |
| os.chdir(work_dir) | |
| safe_globals: Dict[str, object] = {"__builtins__": _restricted_builtins()} | |
| for module_name in allowed_modules: | |
| try: | |
| module = __import__(module_name) | |
| safe_globals[module_name.split(".")[0]] = module | |
| except Exception as exc: # pragma: no cover - defensive. | |
| stderr_buffer.write(f"Failed to import allowed module '{module_name}': {exc}\n") | |
| try: | |
| compiled = compile(code, "<sandbox>", "exec") | |
| with redirect_stdout(stdout_buffer), redirect_stderr(stderr_buffer): | |
| exec(compiled, safe_globals, {}) | |
| except Exception: # pragma: no cover - propagate errors to stderr. | |
| traceback.print_exc(file=stderr_buffer) | |
| finally: | |
| queue_.put( | |
| { | |
| "stdout": stdout_buffer.getvalue(), | |
| "stderr": stderr_buffer.getvalue(), | |
| } | |
| ) | |
| stop_event.set() | |
| def _collect_artifacts(directory: Path) -> List[ArtifactRecord]: | |
| """Collect non-code artifacts generated within the sandbox directory.""" | |
| artifacts: List[ArtifactRecord] = [] | |
| for path in directory.iterdir(): | |
| if path.is_dir(): | |
| continue | |
| if path.suffix == ".py": | |
| continue | |
| artifacts.append( | |
| ArtifactRecord( | |
| name=path.name, | |
| path=str(path), | |
| mime_type=None, | |
| ) | |
| ) | |
| return artifacts | |
| def execute_python(request: PythonRunRequest, config: SandboxConfig) -> PythonRunResponse: | |
| """Execute Python code within the configured sandbox.""" | |
| config.ensure_directory() | |
| run_dir = Path(tempfile.mkdtemp(dir=config.working_directory)) | |
| queue_: Queue = get_context("spawn").Queue() | |
| stop_event: Event = get_context("spawn").Event() | |
| process = Process( | |
| target=_sandbox_worker, | |
| kwargs={ | |
| "code": request.code, | |
| "queue_": queue_, | |
| "allowed_modules": config.allowed_modules, | |
| "memory_limit_mb": config.memory_limit_mb, | |
| "work_dir": run_dir, | |
| "stop_event": stop_event, | |
| }, | |
| daemon=True, | |
| ) | |
| process.start() | |
| try: | |
| stop_event.wait(timeout=request.timeout_s or config.timeout_s) | |
| finally: | |
| if process.is_alive(): | |
| process.terminate() | |
| process.join(timeout=1) | |
| stdout = "" | |
| stderr = "" | |
| try: | |
| result = queue_.get_nowait() | |
| stdout = result.get("stdout", "") | |
| stderr = result.get("stderr", "") | |
| except queue.Empty: | |
| stderr = "Execution timed out or produced no output." | |
| artifacts = _collect_artifacts(run_dir) | |
| shutil.rmtree(run_dir, ignore_errors=True) | |
| return PythonRunResponse(stdout=stdout, stderr=stderr, artifacts=artifacts) | |
| __all__ = ["DEFAULT_ALLOWED_MODULES", "SandboxConfig", "execute_python"] | |