""" Helion-OSC Comprehensive Benchmark Suite Performance benchmarking and comparison with other models """ import torch import time import psutil import numpy as np from typing import Dict, List, Any, Optional from dataclasses import dataclass, asdict import json import logging from transformers import AutoTokenizer, AutoModelForCausalLM from tqdm import tqdm import pandas as pd import matplotlib.pyplot as plt import seaborn as sns logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @dataclass class BenchmarkResult: """Single benchmark result""" model_name: str task: str prompt_length: int generation_length: int temperature: float inference_time: float tokens_per_second: float memory_used_mb: float gpu_memory_mb: Optional[float] success: bool error: Optional[str] = None @dataclass class AggregatedResults: """Aggregated benchmark results""" model_name: str total_tests: int successful_tests: int failed_tests: int avg_inference_time: float avg_tokens_per_second: float avg_memory_mb: float min_inference_time: float max_inference_time: float std_inference_time: float class PerformanceBenchmark: """Performance benchmarking utilities""" def __init__(self, model_name: str = "DeepXR/Helion-OSC"): self.model_name = model_name self.device = "cuda" if torch.cuda.is_available() else "cpu" logger.info(f"Loading model: {model_name}") self.tokenizer = AutoTokenizer.from_pretrained(model_name) self.model = AutoModelForCausalLM.from_pretrained( model_name, torch_dtype=torch.bfloat16 if self.device == "cuda" else torch.float32, device_map="auto" if self.device == "cuda" else None ) if self.device == "cpu": self.model = self.model.to(self.device) self.model.eval() self.results: List[BenchmarkResult] = [] def get_memory_usage(self) -> tuple: """Get current memory usage""" process = psutil.Process() ram_mb = process.memory_info().rss / 1024 / 1024 gpu_mb = None if torch.cuda.is_available(): gpu_mb = torch.cuda.memory_allocated() / 1024 / 1024 return ram_mb, gpu_mb def benchmark_inference( self, prompt: str, task: str, max_length: int = 512, temperature: float = 0.7, num_runs: int = 1 ) -> List[BenchmarkResult]: """Benchmark inference performance""" run_results = [] for run in range(num_runs): try: # Tokenize inputs = self.tokenizer(prompt, return_tensors="pt").to(self.device) prompt_length = inputs.input_ids.shape[1] # Warm up GPU if run == 0 and self.device == "cuda": with torch.no_grad(): _ = self.model.generate(**inputs, max_length=prompt_length + 10) torch.cuda.synchronize() # Clear cache if self.device == "cuda": torch.cuda.empty_cache() # Measure memory before ram_before, gpu_before = self.get_memory_usage() # Generate start_time = time.time() with torch.no_grad(): outputs = self.model.generate( **inputs, max_length=max_length, temperature=temperature, do_sample=temperature > 0, pad_token_id=self.tokenizer.eos_token_id ) if self.device == "cuda": torch.cuda.synchronize() end_time = time.time() # Measure memory after ram_after, gpu_after = self.get_memory_usage() # Calculate metrics inference_time = end_time - start_time generation_length = outputs.shape[1] - prompt_length tokens_per_second = generation_length / inference_time if inference_time > 0 else 0 memory_used = ram_after - ram_before gpu_memory = (gpu_after - gpu_before) if gpu_after and gpu_before else None result = BenchmarkResult( model_name=self.model_name, task=task, prompt_length=prompt_length, generation_length=generation_length, temperature=temperature, inference_time=inference_time, tokens_per_second=tokens_per_second, memory_used_mb=memory_used, gpu_memory_mb=gpu_memory, success=True ) run_results.append(result) self.results.append(result) except Exception as e: logger.error(f"Benchmark failed: {e}") result = BenchmarkResult( model_name=self.model_name, task=task, prompt_length=0, generation_length=0, temperature=temperature, inference_time=0, tokens_per_second=0, memory_used_mb=0, gpu_memory_mb=None, success=False, error=str(e) ) run_results.append(result) self.results.append(result) return run_results def run_benchmark_suite(self) -> List[BenchmarkResult]: """Run comprehensive benchmark suite""" logger.info("Starting comprehensive benchmark suite...") test_cases = [ { "prompt": "def fibonacci(n):", "task": "simple_function", "max_length": 256, "temperature": 0.7 }, { "prompt": "Write a Python class for a binary search tree with insert, search, and delete methods:", "task": "complex_class", "max_length": 1024, "temperature": 0.7 }, { "prompt": "Implement quicksort algorithm in Python with detailed comments:", "task": "algorithm", "max_length": 512, "temperature": 0.5 }, { "prompt": "Solve: What is the derivative of f(x) = x^3 + 2x^2 - 5x + 3?", "task": "math_simple", "max_length": 256, "temperature": 0.3 }, { "prompt": "Prove using mathematical induction that the sum of first n natural numbers is n(n+1)/2:", "task": "math_proof", "max_length": 1024, "temperature": 0.2 }, { "prompt": "Design a RESTful API for a todo list application with proper documentation:", "task": "system_design", "max_length": 2048, "temperature": 0.7 }, ] all_results = [] for test_case in tqdm(test_cases, desc="Running benchmarks"): results = self.benchmark_inference( prompt=test_case["prompt"], task=test_case["task"], max_length=test_case["max_length"], temperature=test_case["temperature"], num_runs=3 ) all_results.extend(results) logger.info("Benchmark suite completed!") return all_results def aggregate_results(self) -> AggregatedResults: """Aggregate benchmark results""" if not self.results: raise ValueError("No benchmark results available") successful = [r for r in self.results if r.success] if not successful: raise ValueError("No successful benchmark runs") inference_times = [r.inference_time for r in successful] tokens_per_sec = [r.tokens_per_second for r in successful] memory_usage = [r.memory_used_mb for r in successful] return AggregatedResults( model_name=self.model_name, total_tests=len(self.results), successful_tests=len(successful), failed_tests=len(self.results) - len(successful), avg_inference_time=np.mean(inference_times), avg_tokens_per_second=np.mean(tokens_per_sec), avg_memory_mb=np.mean(memory_usage), min_inference_time=np.min(inference_times), max_inference_time=np.max(inference_times), std_inference_time=np.std(inference_times) ) def save_results(self, output_file: str = "benchmark_results.json"): """Save benchmark results to file""" results_dict = [asdict(r) for r in self.results] with open(output_file, 'w') as f: json.dump(results_dict, f, indent=2) logger.info(f"Results saved to {output_file}") def generate_report(self, output_file: str = "benchmark_report.txt"): """Generate human-readable benchmark report""" agg = self.aggregate_results() report = f""" {'='*80} HELION-OSC BENCHMARK REPORT {'='*80} Model: {agg.model_name} Device: {self.device} OVERALL STATISTICS {'='*80} Total Tests: {agg.total_tests} Successful: {agg.successful_tests} Failed: {agg.failed_tests} Success Rate: {(agg.successful_tests/agg.total_tests)*100:.2f}% PERFORMANCE METRICS {'='*80} Average Inference Time: {agg.avg_inference_time:.4f} seconds Min Inference Time: {agg.min_inference_time:.4f} seconds Max Inference Time: {agg.max_inference_time:.4f} seconds Std Inference Time: {agg.std_inference_time:.4f} seconds Average Tokens/Second: {agg.avg_tokens_per_second:.2f} Average Memory Usage: {agg.avg_memory_mb:.2f} MB PER-TASK BREAKDOWN {'='*80} """ # Group by task df = pd.DataFrame([asdict(r) for r in self.results if r.success]) if not df.empty: task_stats = df.groupby('task').agg({ 'inference_time': ['mean', 'min', 'max'], 'tokens_per_second': 'mean', 'memory_used_mb': 'mean' }) report += task_stats.to_string() report += f"\n\n{'='*80}\n" with open(output_file, 'w') as f: f.write(report) logger.info(f"Report saved to {output_file}") print(report) def plot_results(self, output_dir: str = "./benchmark_plots"): """Generate visualization plots""" import os os.makedirs(output_dir, exist_ok=True) df = pd.DataFrame([asdict(r) for r in self.results if r.success]) if df.empty: logger.warning("No data to plot") return # Set style sns.set_style("whitegrid") # Plot 1: Inference time by task plt.figure(figsize=(12, 6)) sns.barplot(data=df, x='task', y='inference_time') plt.xticks(rotation=45, ha='right') plt.title('Inference Time by Task') plt.ylabel('Time (seconds)') plt.tight_layout() plt.savefig(f"{output_dir}/inference_time_by_task.png", dpi=300) plt.close() # Plot 2: Tokens per second by task plt.figure(figsize=(12, 6)) sns.barplot(data=df, x='task', y='tokens_per_second') plt.xticks(rotation=45, ha='right') plt.title('Tokens Per Second by Task') plt.ylabel('Tokens/Second') plt.tight_layout() plt.savefig(f"{output_dir}/tokens_per_second_by_task.png", dpi=300) plt.close() # Plot 3: Memory usage by task plt.figure(figsize=(12, 6)) sns.barplot(data=df, x='task', y='memory_used_mb') plt.xticks(rotation=45, ha='right') plt.title('Memory Usage by Task') plt.ylabel('Memory (MB)') plt.tight_layout() plt.savefig(f"{output_dir}/memory_usage_by_task.png", dpi=300) plt.close() # Plot 4: Scatter plot - generation length vs inference time plt.figure(figsize=(10, 6)) sns.scatterplot(data=df, x='generation_length', y='inference_time', hue='task', s=100) plt.title('Generation Length vs Inference Time') plt.xlabel('Generation Length (tokens)') plt.ylabel('Inference Time (seconds)') plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left') plt.tight_layout() plt.savefig(f"{output_dir}/length_vs_time.png", dpi=300) plt.close() logger.info(f"Plots saved to {output_dir}") class ComparisonBenchmark: """Compare multiple models""" def __init__(self, model_names: List[str]): self.model_names = model_names self.benchmarks = {} def run_comparison(self): """Run benchmarks for all models""" for model_name in self.model_names: logger.info(f"\nBenchmarking {model_name}...") try: benchmark = PerformanceBenchmark(model_name) benchmark.run_benchmark_suite() self.benchmarks[model_name] = benchmark except Exception as e: logger.error(f"Failed to benchmark {model_name}: {e}") def generate_comparison_report(self, output_file: str = "comparison_report.txt"): """Generate comparison report""" report = f""" {'='*80} MODEL COMPARISON REPORT {'='*80} """ for model_name, benchmark in self.benchmarks.items(): agg = benchmark.aggregate_results() report += f""" Model: {model_name} {'='*80} Avg Inference Time: {agg.avg_inference_time:.4f}s Avg Tokens/Second: {agg.avg_tokens_per_second:.2f} Avg Memory Usage: {agg.avg_memory_mb:.2f} MB Success Rate: {(agg.successful_tests/agg.total_tests)*100:.2f}% """ with open(output_file, 'w') as f: f.write(report) print(report) logger.info(f"Comparison report saved to {output_file}") def main(): """Main benchmark script""" import argparse parser = argparse.ArgumentParser(description="Benchmark Helion-OSC model") parser.add_argument("--model", type=str, default="DeepXR/Helion-OSC") parser.add_argument("--output-dir", type=str, default="./benchmark_results") parser.add_argument("--compare", nargs='+', help="List of models to compare") parser.add_argument("--plot", action="store_true", help="Generate plots") args = parser.parse_args() import os os.makedirs(args.output_dir, exist_ok=True) if args.compare: # Comparison mode comparison = ComparisonBenchmark(args.compare) comparison.run_comparison() comparison.generate_comparison_report( os.path.join(args.output_dir, "comparison_report.txt") ) else: # Single model benchmark benchmark = PerformanceBenchmark(args.model) benchmark.run_benchmark_suite() benchmark.save_results(os.path.join(args.output_dir, "benchmark_results.json")) benchmark.generate_report(os.path.join(args.output_dir, "benchmark_report.txt")) if args.plot: benchmark.plot_results(os.path.join(args.output_dir, "plots")) if __name__ == "__main__": main()