|
|
""" |
|
|
Helion-OSC Evaluation Script |
|
|
Comprehensive evaluation suite for code generation and mathematical reasoning |
|
|
""" |
|
|
|
|
|
import os |
|
|
import json |
|
|
import torch |
|
|
import logging |
|
|
import numpy as np |
|
|
from typing import List, Dict, Any, Optional, Tuple |
|
|
from dataclasses import dataclass, field |
|
|
from tqdm import tqdm |
|
|
import subprocess |
|
|
import tempfile |
|
|
import signal |
|
|
from contextlib import contextmanager |
|
|
import multiprocessing as mp |
|
|
from transformers import AutoTokenizer, AutoModelForCausalLM |
|
|
from datasets import load_dataset |
|
|
import re |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class EvaluationConfig: |
|
|
"""Configuration for evaluation""" |
|
|
model_name: str = "DeepXR/Helion-OSC" |
|
|
device: str = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
batch_size: int = 4 |
|
|
max_length: int = 2048 |
|
|
temperature: float = 0.7 |
|
|
top_p: float = 0.95 |
|
|
num_samples: int = 1 |
|
|
timeout: int = 5 |
|
|
output_dir: str = "./evaluation_results" |
|
|
|
|
|
|
|
|
class TimeoutException(Exception): |
|
|
"""Exception raised when code execution times out""" |
|
|
pass |
|
|
|
|
|
|
|
|
@contextmanager |
|
|
def time_limit(seconds): |
|
|
"""Context manager for timing out code execution""" |
|
|
def signal_handler(signum, frame): |
|
|
raise TimeoutException("Code execution timed out") |
|
|
|
|
|
signal.signal(signal.SIGALRM, signal_handler) |
|
|
signal.alarm(seconds) |
|
|
try: |
|
|
yield |
|
|
finally: |
|
|
signal.alarm(0) |
|
|
|
|
|
|
|
|
class CodeExecutor: |
|
|
"""Safe code execution environment""" |
|
|
|
|
|
@staticmethod |
|
|
def execute_python(code: str, timeout: int = 5) -> Tuple[bool, str]: |
|
|
""" |
|
|
Execute Python code safely |
|
|
|
|
|
Args: |
|
|
code: Python code to execute |
|
|
timeout: Timeout in seconds |
|
|
|
|
|
Returns: |
|
|
Tuple of (success, output/error) |
|
|
""" |
|
|
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f: |
|
|
f.write(code) |
|
|
temp_file = f.name |
|
|
|
|
|
try: |
|
|
result = subprocess.run( |
|
|
['python', temp_file], |
|
|
capture_output=True, |
|
|
text=True, |
|
|
timeout=timeout |
|
|
) |
|
|
|
|
|
os.unlink(temp_file) |
|
|
|
|
|
if result.returncode == 0: |
|
|
return True, result.stdout |
|
|
else: |
|
|
return False, result.stderr |
|
|
|
|
|
except subprocess.TimeoutExpired: |
|
|
os.unlink(temp_file) |
|
|
return False, "Execution timed out" |
|
|
except Exception as e: |
|
|
if os.path.exists(temp_file): |
|
|
os.unlink(temp_file) |
|
|
return False, str(e) |
|
|
|
|
|
@staticmethod |
|
|
def check_syntax(code: str, language: str = "python") -> Tuple[bool, str]: |
|
|
""" |
|
|
Check code syntax without execution |
|
|
|
|
|
Args: |
|
|
code: Code to check |
|
|
language: Programming language |
|
|
|
|
|
Returns: |
|
|
Tuple of (is_valid, error_message) |
|
|
""" |
|
|
if language.lower() == "python": |
|
|
try: |
|
|
compile(code, '<string>', 'exec') |
|
|
return True, "" |
|
|
except SyntaxError as e: |
|
|
return False, str(e) |
|
|
|
|
|
return True, "Syntax checking not implemented for this language" |
|
|
|
|
|
|
|
|
class HumanEvalEvaluator: |
|
|
"""Evaluator for HumanEval benchmark""" |
|
|
|
|
|
def __init__(self, config: EvaluationConfig): |
|
|
self.config = config |
|
|
self.tokenizer = AutoTokenizer.from_pretrained(config.model_name) |
|
|
self.model = AutoModelForCausalLM.from_pretrained( |
|
|
config.model_name, |
|
|
torch_dtype=torch.bfloat16 if config.device == "cuda" else torch.float32, |
|
|
device_map="auto" if config.device == "cuda" else None |
|
|
) |
|
|
if config.device == "cpu": |
|
|
self.model = self.model.to(config.device) |
|
|
self.model.eval() |
|
|
self.executor = CodeExecutor() |
|
|
|
|
|
def load_humaneval(self) -> List[Dict]: |
|
|
"""Load HumanEval dataset""" |
|
|
logger.info("Loading HumanEval dataset...") |
|
|
dataset = load_dataset("openai_humaneval", split="test") |
|
|
return list(dataset) |
|
|
|
|
|
def generate_solution(self, prompt: str) -> str: |
|
|
"""Generate code solution for a prompt""" |
|
|
inputs = self.tokenizer(prompt, return_tensors="pt").to(self.config.device) |
|
|
|
|
|
with torch.no_grad(): |
|
|
outputs = self.model.generate( |
|
|
**inputs, |
|
|
max_length=self.config.max_length, |
|
|
temperature=self.config.temperature, |
|
|
top_p=self.config.top_p, |
|
|
do_sample=True, |
|
|
pad_token_id=self.tokenizer.eos_token_id |
|
|
) |
|
|
|
|
|
generated = self.tokenizer.decode(outputs[0], skip_special_tokens=True) |
|
|
|
|
|
solution = generated[len(prompt):].strip() |
|
|
return solution |
|
|
|
|
|
def test_solution(self, solution: str, test_code: str) -> bool: |
|
|
"""Test a solution against test cases""" |
|
|
full_code = solution + "\n" + test_code |
|
|
success, output = self.executor.execute_python(full_code, self.config.timeout) |
|
|
return success |
|
|
|
|
|
def evaluate(self) -> Dict[str, float]: |
|
|
"""Run HumanEval evaluation""" |
|
|
logger.info("Starting HumanEval evaluation...") |
|
|
|
|
|
problems = self.load_humaneval() |
|
|
results = { |
|
|
"total": len(problems), |
|
|
"passed": 0, |
|
|
"failed": 0, |
|
|
"syntax_errors": 0, |
|
|
"runtime_errors": 0, |
|
|
"timeouts": 0 |
|
|
} |
|
|
|
|
|
for problem in tqdm(problems, desc="Evaluating HumanEval"): |
|
|
prompt = problem["prompt"] |
|
|
test = problem["test"] |
|
|
entry_point = problem["entry_point"] |
|
|
|
|
|
|
|
|
solution = self.generate_solution(prompt) |
|
|
|
|
|
|
|
|
is_valid, error = self.executor.check_syntax(solution) |
|
|
if not is_valid: |
|
|
results["syntax_errors"] += 1 |
|
|
results["failed"] += 1 |
|
|
continue |
|
|
|
|
|
|
|
|
try: |
|
|
if self.test_solution(solution, test): |
|
|
results["passed"] += 1 |
|
|
else: |
|
|
results["failed"] += 1 |
|
|
results["runtime_errors"] += 1 |
|
|
except TimeoutException: |
|
|
results["failed"] += 1 |
|
|
results["timeouts"] += 1 |
|
|
|
|
|
|
|
|
results["pass@1"] = results["passed"] / results["total"] |
|
|
|
|
|
logger.info(f"HumanEval Results: {results}") |
|
|
return results |
|
|
|
|
|
|
|
|
class MBPPEvaluator: |
|
|
"""Evaluator for MBPP (Mostly Basic Python Problems) benchmark""" |
|
|
|
|
|
def __init__(self, config: EvaluationConfig): |
|
|
self.config = config |
|
|
self.tokenizer = AutoTokenizer.from_pretrained(config.model_name) |
|
|
self.model = AutoModelForCausalLM.from_pretrained( |
|
|
config.model_name, |
|
|
torch_dtype=torch.bfloat16 if config.device == "cuda" else torch.float32, |
|
|
device_map="auto" if config.device == "cuda" else None |
|
|
) |
|
|
if config.device == "cpu": |
|
|
self.model = self.model.to(config.device) |
|
|
self.model.eval() |
|
|
self.executor = CodeExecutor() |
|
|
|
|
|
def load_mbpp(self) -> List[Dict]: |
|
|
"""Load MBPP dataset""" |
|
|
logger.info("Loading MBPP dataset...") |
|
|
dataset = load_dataset("mbpp", split="test") |
|
|
return list(dataset) |
|
|
|
|
|
def generate_solution(self, prompt: str) -> str: |
|
|
"""Generate code solution""" |
|
|
inputs = self.tokenizer(prompt, return_tensors="pt").to(self.config.device) |
|
|
|
|
|
with torch.no_grad(): |
|
|
outputs = self.model.generate( |
|
|
**inputs, |
|
|
max_length=self.config.max_length, |
|
|
temperature=self.config.temperature, |
|
|
top_p=self.config.top_p, |
|
|
do_sample=True, |
|
|
pad_token_id=self.tokenizer.eos_token_id |
|
|
) |
|
|
|
|
|
generated = self.tokenizer.decode(outputs[0], skip_special_tokens=True) |
|
|
solution = generated[len(prompt):].strip() |
|
|
return solution |
|
|
|
|
|
def evaluate(self) -> Dict[str, float]: |
|
|
"""Run MBPP evaluation""" |
|
|
logger.info("Starting MBPP evaluation...") |
|
|
|
|
|
problems = self.load_mbpp() |
|
|
results = { |
|
|
"total": len(problems), |
|
|
"passed": 0, |
|
|
"failed": 0 |
|
|
} |
|
|
|
|
|
for problem in tqdm(problems, desc="Evaluating MBPP"): |
|
|
prompt = problem["text"] |
|
|
test_cases = problem["test_list"] |
|
|
|
|
|
|
|
|
solution = self.generate_solution(prompt) |
|
|
|
|
|
|
|
|
all_passed = True |
|
|
for test in test_cases: |
|
|
test_code = solution + "\n" + test |
|
|
success, _ = self.executor.execute_python(test_code, self.config.timeout) |
|
|
if not success: |
|
|
all_passed = False |
|
|
break |
|
|
|
|
|
if all_passed: |
|
|
results["passed"] += 1 |
|
|
else: |
|
|
results["failed"] += 1 |
|
|
|
|
|
results["pass@1"] = results["passed"] / results["total"] |
|
|
|
|
|
logger.info(f"MBPP Results: {results}") |
|
|
return results |
|
|
|
|
|
|
|
|
class GSM8KEvaluator: |
|
|
"""Evaluator for GSM8K mathematical reasoning benchmark""" |
|
|
|
|
|
def __init__(self, config: EvaluationConfig): |
|
|
self.config = config |
|
|
self.tokenizer = AutoTokenizer.from_pretrained(config.model_name) |
|
|
self.model = AutoModelForCausalLM.from_pretrained( |
|
|
config.model_name, |
|
|
torch_dtype=torch.bfloat16 if config.device == "cuda" else torch.float32, |
|
|
device_map="auto" if config.device == "cuda" else None |
|
|
) |
|
|
if config.device == "cpu": |
|
|
self.model = self.model.to(config.device) |
|
|
self.model.eval() |
|
|
|
|
|
def load_gsm8k(self) -> List[Dict]: |
|
|
"""Load GSM8K dataset""" |
|
|
logger.info("Loading GSM8K dataset...") |
|
|
dataset = load_dataset("gsm8k", "main", split="test") |
|
|
return list(dataset) |
|
|
|
|
|
def extract_answer(self, text: str) -> Optional[float]: |
|
|
"""Extract numerical answer from text""" |
|
|
|
|
|
patterns = [ |
|
|
r'####\s*(-?\d+\.?\d*)', |
|
|
r'answer is\s*(-?\d+\.?\d*)', |
|
|
r'equals?\s*(-?\d+\.?\d*)', |
|
|
r'=\s*(-?\d+\.?\d*)', |
|
|
r'\$?\s*(-?\d+\.?\d*)\s*$' |
|
|
] |
|
|
|
|
|
for pattern in patterns: |
|
|
match = re.search(pattern, text, re.IGNORECASE) |
|
|
if match: |
|
|
try: |
|
|
return float(match.group(1)) |
|
|
except: |
|
|
continue |
|
|
|
|
|
return None |
|
|
|
|
|
def generate_solution(self, problem: str) -> str: |
|
|
"""Generate solution for math problem""" |
|
|
prompt = f"Problem: {problem}\n\nLet's solve this step by step:\n" |
|
|
inputs = self.tokenizer(prompt, return_tensors="pt").to(self.config.device) |
|
|
|
|
|
with torch.no_grad(): |
|
|
outputs = self.model.generate( |
|
|
**inputs, |
|
|
max_length=self.config.max_length, |
|
|
temperature=0.3, |
|
|
top_p=0.9, |
|
|
do_sample=False, |
|
|
pad_token_id=self.tokenizer.eos_token_id |
|
|
) |
|
|
|
|
|
generated = self.tokenizer.decode(outputs[0], skip_special_tokens=True) |
|
|
return generated |
|
|
|
|
|
def evaluate(self) -> Dict[str, float]: |
|
|
"""Run GSM8K evaluation""" |
|
|
logger.info("Starting GSM8K evaluation...") |
|
|
|
|
|
problems = self.load_gsm8k() |
|
|
results = { |
|
|
"total": len(problems), |
|
|
"correct": 0, |
|
|
"incorrect": 0, |
|
|
"no_answer": 0 |
|
|
} |
|
|
|
|
|
for problem in tqdm(problems, desc="Evaluating GSM8K"): |
|
|
question = problem["question"] |
|
|
correct_answer_text = problem["answer"] |
|
|
|
|
|
|
|
|
correct_answer = self.extract_answer(correct_answer_text) |
|
|
if correct_answer is None: |
|
|
continue |
|
|
|
|
|
|
|
|
solution = self.generate_solution(question) |
|
|
|
|
|
|
|
|
predicted_answer = self.extract_answer(solution) |
|
|
|
|
|
if predicted_answer is None: |
|
|
results["no_answer"] += 1 |
|
|
results["incorrect"] += 1 |
|
|
elif abs(predicted_answer - correct_answer) < 1e-5: |
|
|
results["correct"] += 1 |
|
|
else: |
|
|
results["incorrect"] += 1 |
|
|
|
|
|
results["accuracy"] = results["correct"] / results["total"] |
|
|
|
|
|
logger.info(f"GSM8K Results: {results}") |
|
|
return results |
|
|
|
|
|
|
|
|
class ComprehensiveEvaluator: |
|
|
"""Run comprehensive evaluation across all benchmarks""" |
|
|
|
|
|
def __init__(self, config: EvaluationConfig): |
|
|
self.config = config |
|
|
os.makedirs(config.output_dir, exist_ok=True) |
|
|
|
|
|
def run_all_evaluations(self) -> Dict[str, Any]: |
|
|
"""Run all evaluation benchmarks""" |
|
|
logger.info("Starting comprehensive evaluation...") |
|
|
|
|
|
all_results = {} |
|
|
|
|
|
|
|
|
try: |
|
|
logger.info("\n" + "="*80) |
|
|
logger.info("Running HumanEval Evaluation") |
|
|
logger.info("="*80) |
|
|
humaneval_evaluator = HumanEvalEvaluator(self.config) |
|
|
all_results["humaneval"] = humaneval_evaluator.evaluate() |
|
|
except Exception as e: |
|
|
logger.error(f"HumanEval evaluation failed: {e}") |
|
|
all_results["humaneval"] = {"error": str(e)} |
|
|
|
|
|
|
|
|
try: |
|
|
logger.info("\n" + "="*80) |
|
|
logger.info("Running MBPP Evaluation") |
|
|
logger.info("="*80) |
|
|
mbpp_evaluator = MBPPEvaluator(self.config) |
|
|
all_results["mbpp"] = mbpp_evaluator.evaluate() |
|
|
except Exception as e: |
|
|
logger.error(f"MBPP evaluation failed: {e}") |
|
|
all_results["mbpp"] = {"error": str(e)} |
|
|
|
|
|
|
|
|
try: |
|
|
logger.info("\n" + "="*80) |
|
|
logger.info("Running GSM8K Evaluation") |
|
|
logger.info("="*80) |
|
|
gsm8k_evaluator = GSM8KEvaluator(self.config) |
|
|
all_results["gsm8k"] = gsm8k_evaluator.evaluate() |
|
|
except Exception as e: |
|
|
logger.error(f"GSM8K evaluation failed: {e}") |
|
|
all_results["gsm8k"] = {"error": str(e)} |
|
|
|
|
|
|
|
|
self.save_results(all_results) |
|
|
|
|
|
|
|
|
self.print_summary(all_results) |
|
|
|
|
|
return all_results |
|
|
|
|
|
def save_results(self, results: Dict[str, Any]): |
|
|
"""Save evaluation results to file""" |
|
|
output_file = os.path.join(self.config.output_dir, "evaluation_results.json") |
|
|
with open(output_file, 'w') as f: |
|
|
json.dump(results, f, indent=2) |
|
|
logger.info(f"Results saved to {output_file}") |
|
|
|
|
|
def print_summary(self, results: Dict[str, Any]): |
|
|
"""Print evaluation summary""" |
|
|
logger.info("\n" + "="*80) |
|
|
logger.info("EVALUATION SUMMARY") |
|
|
logger.info("="*80) |
|
|
|
|
|
if "humaneval" in results and "pass@1" in results["humaneval"]: |
|
|
logger.info(f"HumanEval Pass@1: {results['humaneval']['pass@1']:.3f}") |
|
|
|
|
|
if "mbpp" in results and "pass@1" in results["mbpp"]: |
|
|
logger.info(f"MBPP Pass@1: {results['mbpp']['pass@1']:.3f}") |
|
|
|
|
|
if "gsm8k" in results and "accuracy" in results["gsm8k"]: |
|
|
logger.info(f"GSM8K Accuracy: {results['gsm8k']['accuracy']:.3f}") |
|
|
|
|
|
logger.info("="*80) |
|
|
|
|
|
|
|
|
def main(): |
|
|
"""Main evaluation script""" |
|
|
import argparse |
|
|
|
|
|
parser = argparse.ArgumentParser(description="Evaluate Helion-OSC model") |
|
|
parser.add_argument("--model_name", type=str, default="DeepXR/Helion-OSC") |
|
|
parser.add_argument("--device", type=str, default="cuda" if torch.cuda.is_available() else "cpu") |
|
|
parser.add_argument("--batch_size", type=int, default=4) |
|
|
parser.add_argument("--max_length", type=int, default=2048) |
|
|
parser.add_argument("--temperature", type=float, default=0.7) |
|
|
parser.add_argument("--top_p", type=float, default=0.95) |
|
|
parser.add_argument("--timeout", type=int, default=5) |
|
|
parser.add_argument("--output_dir", type=str, default="./evaluation_results") |
|
|
parser.add_argument("--benchmark", type=str, choices=["all", "humaneval", "mbpp", "gsm8k"], default="all") |
|
|
|
|
|
args = parser.parse_args() |
|
|
|
|
|
config = EvaluationConfig( |
|
|
model_name=args.model_name, |
|
|
device=args.device, |
|
|
batch_size=args.batch_size, |
|
|
max_length=args.max_length, |
|
|
temperature=args.temperature, |
|
|
top_p=args.top_p, |
|
|
timeout=args.timeout, |
|
|
output_dir=args.output_dir |
|
|
) |
|
|
|
|
|
if args.benchmark == "all": |
|
|
evaluator = ComprehensiveEvaluator(config) |
|
|
evaluator.run_all_evaluations() |
|
|
elif args.benchmark == "humaneval": |
|
|
evaluator = HumanEvalEvaluator(config) |
|
|
evaluator.evaluate() |
|
|
elif args.benchmark == "mbpp": |
|
|
evaluator = MBPPEvaluator(config) |
|
|
evaluator.evaluate() |
|
|
elif args.benchmark == "gsm8k": |
|
|
evaluator = GSM8KEvaluator(config) |
|
|
evaluator.evaluate() |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |