Helion-OSC / benchmark.py
Trouter-Library's picture
Create benchmark.py
788f4ae verified
"""
Helion-OSC Comprehensive Benchmark Suite
Performance benchmarking and comparison with other models
"""
import torch
import time
import psutil
import numpy as np
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict
import json
import logging
from transformers import AutoTokenizer, AutoModelForCausalLM
from tqdm import tqdm
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class BenchmarkResult:
"""Single benchmark result"""
model_name: str
task: str
prompt_length: int
generation_length: int
temperature: float
inference_time: float
tokens_per_second: float
memory_used_mb: float
gpu_memory_mb: Optional[float]
success: bool
error: Optional[str] = None
@dataclass
class AggregatedResults:
"""Aggregated benchmark results"""
model_name: str
total_tests: int
successful_tests: int
failed_tests: int
avg_inference_time: float
avg_tokens_per_second: float
avg_memory_mb: float
min_inference_time: float
max_inference_time: float
std_inference_time: float
class PerformanceBenchmark:
"""Performance benchmarking utilities"""
def __init__(self, model_name: str = "DeepXR/Helion-OSC"):
self.model_name = model_name
self.device = "cuda" if torch.cuda.is_available() else "cpu"
logger.info(f"Loading model: {model_name}")
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.model = AutoModelForCausalLM.from_pretrained(
model_name,
torch_dtype=torch.bfloat16 if self.device == "cuda" else torch.float32,
device_map="auto" if self.device == "cuda" else None
)
if self.device == "cpu":
self.model = self.model.to(self.device)
self.model.eval()
self.results: List[BenchmarkResult] = []
def get_memory_usage(self) -> tuple:
"""Get current memory usage"""
process = psutil.Process()
ram_mb = process.memory_info().rss / 1024 / 1024
gpu_mb = None
if torch.cuda.is_available():
gpu_mb = torch.cuda.memory_allocated() / 1024 / 1024
return ram_mb, gpu_mb
def benchmark_inference(
self,
prompt: str,
task: str,
max_length: int = 512,
temperature: float = 0.7,
num_runs: int = 1
) -> List[BenchmarkResult]:
"""Benchmark inference performance"""
run_results = []
for run in range(num_runs):
try:
# Tokenize
inputs = self.tokenizer(prompt, return_tensors="pt").to(self.device)
prompt_length = inputs.input_ids.shape[1]
# Warm up GPU
if run == 0 and self.device == "cuda":
with torch.no_grad():
_ = self.model.generate(**inputs, max_length=prompt_length + 10)
torch.cuda.synchronize()
# Clear cache
if self.device == "cuda":
torch.cuda.empty_cache()
# Measure memory before
ram_before, gpu_before = self.get_memory_usage()
# Generate
start_time = time.time()
with torch.no_grad():
outputs = self.model.generate(
**inputs,
max_length=max_length,
temperature=temperature,
do_sample=temperature > 0,
pad_token_id=self.tokenizer.eos_token_id
)
if self.device == "cuda":
torch.cuda.synchronize()
end_time = time.time()
# Measure memory after
ram_after, gpu_after = self.get_memory_usage()
# Calculate metrics
inference_time = end_time - start_time
generation_length = outputs.shape[1] - prompt_length
tokens_per_second = generation_length / inference_time if inference_time > 0 else 0
memory_used = ram_after - ram_before
gpu_memory = (gpu_after - gpu_before) if gpu_after and gpu_before else None
result = BenchmarkResult(
model_name=self.model_name,
task=task,
prompt_length=prompt_length,
generation_length=generation_length,
temperature=temperature,
inference_time=inference_time,
tokens_per_second=tokens_per_second,
memory_used_mb=memory_used,
gpu_memory_mb=gpu_memory,
success=True
)
run_results.append(result)
self.results.append(result)
except Exception as e:
logger.error(f"Benchmark failed: {e}")
result = BenchmarkResult(
model_name=self.model_name,
task=task,
prompt_length=0,
generation_length=0,
temperature=temperature,
inference_time=0,
tokens_per_second=0,
memory_used_mb=0,
gpu_memory_mb=None,
success=False,
error=str(e)
)
run_results.append(result)
self.results.append(result)
return run_results
def run_benchmark_suite(self) -> List[BenchmarkResult]:
"""Run comprehensive benchmark suite"""
logger.info("Starting comprehensive benchmark suite...")
test_cases = [
{
"prompt": "def fibonacci(n):",
"task": "simple_function",
"max_length": 256,
"temperature": 0.7
},
{
"prompt": "Write a Python class for a binary search tree with insert, search, and delete methods:",
"task": "complex_class",
"max_length": 1024,
"temperature": 0.7
},
{
"prompt": "Implement quicksort algorithm in Python with detailed comments:",
"task": "algorithm",
"max_length": 512,
"temperature": 0.5
},
{
"prompt": "Solve: What is the derivative of f(x) = x^3 + 2x^2 - 5x + 3?",
"task": "math_simple",
"max_length": 256,
"temperature": 0.3
},
{
"prompt": "Prove using mathematical induction that the sum of first n natural numbers is n(n+1)/2:",
"task": "math_proof",
"max_length": 1024,
"temperature": 0.2
},
{
"prompt": "Design a RESTful API for a todo list application with proper documentation:",
"task": "system_design",
"max_length": 2048,
"temperature": 0.7
},
]
all_results = []
for test_case in tqdm(test_cases, desc="Running benchmarks"):
results = self.benchmark_inference(
prompt=test_case["prompt"],
task=test_case["task"],
max_length=test_case["max_length"],
temperature=test_case["temperature"],
num_runs=3
)
all_results.extend(results)
logger.info("Benchmark suite completed!")
return all_results
def aggregate_results(self) -> AggregatedResults:
"""Aggregate benchmark results"""
if not self.results:
raise ValueError("No benchmark results available")
successful = [r for r in self.results if r.success]
if not successful:
raise ValueError("No successful benchmark runs")
inference_times = [r.inference_time for r in successful]
tokens_per_sec = [r.tokens_per_second for r in successful]
memory_usage = [r.memory_used_mb for r in successful]
return AggregatedResults(
model_name=self.model_name,
total_tests=len(self.results),
successful_tests=len(successful),
failed_tests=len(self.results) - len(successful),
avg_inference_time=np.mean(inference_times),
avg_tokens_per_second=np.mean(tokens_per_sec),
avg_memory_mb=np.mean(memory_usage),
min_inference_time=np.min(inference_times),
max_inference_time=np.max(inference_times),
std_inference_time=np.std(inference_times)
)
def save_results(self, output_file: str = "benchmark_results.json"):
"""Save benchmark results to file"""
results_dict = [asdict(r) for r in self.results]
with open(output_file, 'w') as f:
json.dump(results_dict, f, indent=2)
logger.info(f"Results saved to {output_file}")
def generate_report(self, output_file: str = "benchmark_report.txt"):
"""Generate human-readable benchmark report"""
agg = self.aggregate_results()
report = f"""
{'='*80}
HELION-OSC BENCHMARK REPORT
{'='*80}
Model: {agg.model_name}
Device: {self.device}
OVERALL STATISTICS
{'='*80}
Total Tests: {agg.total_tests}
Successful: {agg.successful_tests}
Failed: {agg.failed_tests}
Success Rate: {(agg.successful_tests/agg.total_tests)*100:.2f}%
PERFORMANCE METRICS
{'='*80}
Average Inference Time: {agg.avg_inference_time:.4f} seconds
Min Inference Time: {agg.min_inference_time:.4f} seconds
Max Inference Time: {agg.max_inference_time:.4f} seconds
Std Inference Time: {agg.std_inference_time:.4f} seconds
Average Tokens/Second: {agg.avg_tokens_per_second:.2f}
Average Memory Usage: {agg.avg_memory_mb:.2f} MB
PER-TASK BREAKDOWN
{'='*80}
"""
# Group by task
df = pd.DataFrame([asdict(r) for r in self.results if r.success])
if not df.empty:
task_stats = df.groupby('task').agg({
'inference_time': ['mean', 'min', 'max'],
'tokens_per_second': 'mean',
'memory_used_mb': 'mean'
})
report += task_stats.to_string()
report += f"\n\n{'='*80}\n"
with open(output_file, 'w') as f:
f.write(report)
logger.info(f"Report saved to {output_file}")
print(report)
def plot_results(self, output_dir: str = "./benchmark_plots"):
"""Generate visualization plots"""
import os
os.makedirs(output_dir, exist_ok=True)
df = pd.DataFrame([asdict(r) for r in self.results if r.success])
if df.empty:
logger.warning("No data to plot")
return
# Set style
sns.set_style("whitegrid")
# Plot 1: Inference time by task
plt.figure(figsize=(12, 6))
sns.barplot(data=df, x='task', y='inference_time')
plt.xticks(rotation=45, ha='right')
plt.title('Inference Time by Task')
plt.ylabel('Time (seconds)')
plt.tight_layout()
plt.savefig(f"{output_dir}/inference_time_by_task.png", dpi=300)
plt.close()
# Plot 2: Tokens per second by task
plt.figure(figsize=(12, 6))
sns.barplot(data=df, x='task', y='tokens_per_second')
plt.xticks(rotation=45, ha='right')
plt.title('Tokens Per Second by Task')
plt.ylabel('Tokens/Second')
plt.tight_layout()
plt.savefig(f"{output_dir}/tokens_per_second_by_task.png", dpi=300)
plt.close()
# Plot 3: Memory usage by task
plt.figure(figsize=(12, 6))
sns.barplot(data=df, x='task', y='memory_used_mb')
plt.xticks(rotation=45, ha='right')
plt.title('Memory Usage by Task')
plt.ylabel('Memory (MB)')
plt.tight_layout()
plt.savefig(f"{output_dir}/memory_usage_by_task.png", dpi=300)
plt.close()
# Plot 4: Scatter plot - generation length vs inference time
plt.figure(figsize=(10, 6))
sns.scatterplot(data=df, x='generation_length', y='inference_time', hue='task', s=100)
plt.title('Generation Length vs Inference Time')
plt.xlabel('Generation Length (tokens)')
plt.ylabel('Inference Time (seconds)')
plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
plt.tight_layout()
plt.savefig(f"{output_dir}/length_vs_time.png", dpi=300)
plt.close()
logger.info(f"Plots saved to {output_dir}")
class ComparisonBenchmark:
"""Compare multiple models"""
def __init__(self, model_names: List[str]):
self.model_names = model_names
self.benchmarks = {}
def run_comparison(self):
"""Run benchmarks for all models"""
for model_name in self.model_names:
logger.info(f"\nBenchmarking {model_name}...")
try:
benchmark = PerformanceBenchmark(model_name)
benchmark.run_benchmark_suite()
self.benchmarks[model_name] = benchmark
except Exception as e:
logger.error(f"Failed to benchmark {model_name}: {e}")
def generate_comparison_report(self, output_file: str = "comparison_report.txt"):
"""Generate comparison report"""
report = f"""
{'='*80}
MODEL COMPARISON REPORT
{'='*80}
"""
for model_name, benchmark in self.benchmarks.items():
agg = benchmark.aggregate_results()
report += f"""
Model: {model_name}
{'='*80}
Avg Inference Time: {agg.avg_inference_time:.4f}s
Avg Tokens/Second: {agg.avg_tokens_per_second:.2f}
Avg Memory Usage: {agg.avg_memory_mb:.2f} MB
Success Rate: {(agg.successful_tests/agg.total_tests)*100:.2f}%
"""
with open(output_file, 'w') as f:
f.write(report)
print(report)
logger.info(f"Comparison report saved to {output_file}")
def main():
"""Main benchmark script"""
import argparse
parser = argparse.ArgumentParser(description="Benchmark Helion-OSC model")
parser.add_argument("--model", type=str, default="DeepXR/Helion-OSC")
parser.add_argument("--output-dir", type=str, default="./benchmark_results")
parser.add_argument("--compare", nargs='+', help="List of models to compare")
parser.add_argument("--plot", action="store_true", help="Generate plots")
args = parser.parse_args()
import os
os.makedirs(args.output_dir, exist_ok=True)
if args.compare:
# Comparison mode
comparison = ComparisonBenchmark(args.compare)
comparison.run_comparison()
comparison.generate_comparison_report(
os.path.join(args.output_dir, "comparison_report.txt")
)
else:
# Single model benchmark
benchmark = PerformanceBenchmark(args.model)
benchmark.run_benchmark_suite()
benchmark.save_results(os.path.join(args.output_dir, "benchmark_results.json"))
benchmark.generate_report(os.path.join(args.output_dir, "benchmark_report.txt"))
if args.plot:
benchmark.plot_results(os.path.join(args.output_dir, "plots"))
if __name__ == "__main__":
main()