|
|
""" |
|
|
Helion-OSC Comprehensive Benchmark Suite |
|
|
Performance benchmarking and comparison with other models |
|
|
""" |
|
|
|
|
|
import torch |
|
|
import time |
|
|
import psutil |
|
|
import numpy as np |
|
|
from typing import Dict, List, Any, Optional |
|
|
from dataclasses import dataclass, asdict |
|
|
import json |
|
|
import logging |
|
|
from transformers import AutoTokenizer, AutoModelForCausalLM |
|
|
from tqdm import tqdm |
|
|
import pandas as pd |
|
|
import matplotlib.pyplot as plt |
|
|
import seaborn as sns |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class BenchmarkResult: |
|
|
"""Single benchmark result""" |
|
|
model_name: str |
|
|
task: str |
|
|
prompt_length: int |
|
|
generation_length: int |
|
|
temperature: float |
|
|
inference_time: float |
|
|
tokens_per_second: float |
|
|
memory_used_mb: float |
|
|
gpu_memory_mb: Optional[float] |
|
|
success: bool |
|
|
error: Optional[str] = None |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class AggregatedResults: |
|
|
"""Aggregated benchmark results""" |
|
|
model_name: str |
|
|
total_tests: int |
|
|
successful_tests: int |
|
|
failed_tests: int |
|
|
avg_inference_time: float |
|
|
avg_tokens_per_second: float |
|
|
avg_memory_mb: float |
|
|
min_inference_time: float |
|
|
max_inference_time: float |
|
|
std_inference_time: float |
|
|
|
|
|
|
|
|
class PerformanceBenchmark: |
|
|
"""Performance benchmarking utilities""" |
|
|
|
|
|
def __init__(self, model_name: str = "DeepXR/Helion-OSC"): |
|
|
self.model_name = model_name |
|
|
self.device = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
|
|
|
logger.info(f"Loading model: {model_name}") |
|
|
self.tokenizer = AutoTokenizer.from_pretrained(model_name) |
|
|
self.model = AutoModelForCausalLM.from_pretrained( |
|
|
model_name, |
|
|
torch_dtype=torch.bfloat16 if self.device == "cuda" else torch.float32, |
|
|
device_map="auto" if self.device == "cuda" else None |
|
|
) |
|
|
|
|
|
if self.device == "cpu": |
|
|
self.model = self.model.to(self.device) |
|
|
|
|
|
self.model.eval() |
|
|
self.results: List[BenchmarkResult] = [] |
|
|
|
|
|
def get_memory_usage(self) -> tuple: |
|
|
"""Get current memory usage""" |
|
|
process = psutil.Process() |
|
|
ram_mb = process.memory_info().rss / 1024 / 1024 |
|
|
|
|
|
gpu_mb = None |
|
|
if torch.cuda.is_available(): |
|
|
gpu_mb = torch.cuda.memory_allocated() / 1024 / 1024 |
|
|
|
|
|
return ram_mb, gpu_mb |
|
|
|
|
|
def benchmark_inference( |
|
|
self, |
|
|
prompt: str, |
|
|
task: str, |
|
|
max_length: int = 512, |
|
|
temperature: float = 0.7, |
|
|
num_runs: int = 1 |
|
|
) -> List[BenchmarkResult]: |
|
|
"""Benchmark inference performance""" |
|
|
run_results = [] |
|
|
|
|
|
for run in range(num_runs): |
|
|
try: |
|
|
|
|
|
inputs = self.tokenizer(prompt, return_tensors="pt").to(self.device) |
|
|
prompt_length = inputs.input_ids.shape[1] |
|
|
|
|
|
|
|
|
if run == 0 and self.device == "cuda": |
|
|
with torch.no_grad(): |
|
|
_ = self.model.generate(**inputs, max_length=prompt_length + 10) |
|
|
torch.cuda.synchronize() |
|
|
|
|
|
|
|
|
if self.device == "cuda": |
|
|
torch.cuda.empty_cache() |
|
|
|
|
|
|
|
|
ram_before, gpu_before = self.get_memory_usage() |
|
|
|
|
|
|
|
|
start_time = time.time() |
|
|
|
|
|
with torch.no_grad(): |
|
|
outputs = self.model.generate( |
|
|
**inputs, |
|
|
max_length=max_length, |
|
|
temperature=temperature, |
|
|
do_sample=temperature > 0, |
|
|
pad_token_id=self.tokenizer.eos_token_id |
|
|
) |
|
|
|
|
|
if self.device == "cuda": |
|
|
torch.cuda.synchronize() |
|
|
|
|
|
end_time = time.time() |
|
|
|
|
|
|
|
|
ram_after, gpu_after = self.get_memory_usage() |
|
|
|
|
|
|
|
|
inference_time = end_time - start_time |
|
|
generation_length = outputs.shape[1] - prompt_length |
|
|
tokens_per_second = generation_length / inference_time if inference_time > 0 else 0 |
|
|
memory_used = ram_after - ram_before |
|
|
gpu_memory = (gpu_after - gpu_before) if gpu_after and gpu_before else None |
|
|
|
|
|
result = BenchmarkResult( |
|
|
model_name=self.model_name, |
|
|
task=task, |
|
|
prompt_length=prompt_length, |
|
|
generation_length=generation_length, |
|
|
temperature=temperature, |
|
|
inference_time=inference_time, |
|
|
tokens_per_second=tokens_per_second, |
|
|
memory_used_mb=memory_used, |
|
|
gpu_memory_mb=gpu_memory, |
|
|
success=True |
|
|
) |
|
|
|
|
|
run_results.append(result) |
|
|
self.results.append(result) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Benchmark failed: {e}") |
|
|
result = BenchmarkResult( |
|
|
model_name=self.model_name, |
|
|
task=task, |
|
|
prompt_length=0, |
|
|
generation_length=0, |
|
|
temperature=temperature, |
|
|
inference_time=0, |
|
|
tokens_per_second=0, |
|
|
memory_used_mb=0, |
|
|
gpu_memory_mb=None, |
|
|
success=False, |
|
|
error=str(e) |
|
|
) |
|
|
run_results.append(result) |
|
|
self.results.append(result) |
|
|
|
|
|
return run_results |
|
|
|
|
|
def run_benchmark_suite(self) -> List[BenchmarkResult]: |
|
|
"""Run comprehensive benchmark suite""" |
|
|
logger.info("Starting comprehensive benchmark suite...") |
|
|
|
|
|
test_cases = [ |
|
|
{ |
|
|
"prompt": "def fibonacci(n):", |
|
|
"task": "simple_function", |
|
|
"max_length": 256, |
|
|
"temperature": 0.7 |
|
|
}, |
|
|
{ |
|
|
"prompt": "Write a Python class for a binary search tree with insert, search, and delete methods:", |
|
|
"task": "complex_class", |
|
|
"max_length": 1024, |
|
|
"temperature": 0.7 |
|
|
}, |
|
|
{ |
|
|
"prompt": "Implement quicksort algorithm in Python with detailed comments:", |
|
|
"task": "algorithm", |
|
|
"max_length": 512, |
|
|
"temperature": 0.5 |
|
|
}, |
|
|
{ |
|
|
"prompt": "Solve: What is the derivative of f(x) = x^3 + 2x^2 - 5x + 3?", |
|
|
"task": "math_simple", |
|
|
"max_length": 256, |
|
|
"temperature": 0.3 |
|
|
}, |
|
|
{ |
|
|
"prompt": "Prove using mathematical induction that the sum of first n natural numbers is n(n+1)/2:", |
|
|
"task": "math_proof", |
|
|
"max_length": 1024, |
|
|
"temperature": 0.2 |
|
|
}, |
|
|
{ |
|
|
"prompt": "Design a RESTful API for a todo list application with proper documentation:", |
|
|
"task": "system_design", |
|
|
"max_length": 2048, |
|
|
"temperature": 0.7 |
|
|
}, |
|
|
] |
|
|
|
|
|
all_results = [] |
|
|
|
|
|
for test_case in tqdm(test_cases, desc="Running benchmarks"): |
|
|
results = self.benchmark_inference( |
|
|
prompt=test_case["prompt"], |
|
|
task=test_case["task"], |
|
|
max_length=test_case["max_length"], |
|
|
temperature=test_case["temperature"], |
|
|
num_runs=3 |
|
|
) |
|
|
all_results.extend(results) |
|
|
|
|
|
logger.info("Benchmark suite completed!") |
|
|
return all_results |
|
|
|
|
|
def aggregate_results(self) -> AggregatedResults: |
|
|
"""Aggregate benchmark results""" |
|
|
if not self.results: |
|
|
raise ValueError("No benchmark results available") |
|
|
|
|
|
successful = [r for r in self.results if r.success] |
|
|
|
|
|
if not successful: |
|
|
raise ValueError("No successful benchmark runs") |
|
|
|
|
|
inference_times = [r.inference_time for r in successful] |
|
|
tokens_per_sec = [r.tokens_per_second for r in successful] |
|
|
memory_usage = [r.memory_used_mb for r in successful] |
|
|
|
|
|
return AggregatedResults( |
|
|
model_name=self.model_name, |
|
|
total_tests=len(self.results), |
|
|
successful_tests=len(successful), |
|
|
failed_tests=len(self.results) - len(successful), |
|
|
avg_inference_time=np.mean(inference_times), |
|
|
avg_tokens_per_second=np.mean(tokens_per_sec), |
|
|
avg_memory_mb=np.mean(memory_usage), |
|
|
min_inference_time=np.min(inference_times), |
|
|
max_inference_time=np.max(inference_times), |
|
|
std_inference_time=np.std(inference_times) |
|
|
) |
|
|
|
|
|
def save_results(self, output_file: str = "benchmark_results.json"): |
|
|
"""Save benchmark results to file""" |
|
|
results_dict = [asdict(r) for r in self.results] |
|
|
|
|
|
with open(output_file, 'w') as f: |
|
|
json.dump(results_dict, f, indent=2) |
|
|
|
|
|
logger.info(f"Results saved to {output_file}") |
|
|
|
|
|
def generate_report(self, output_file: str = "benchmark_report.txt"): |
|
|
"""Generate human-readable benchmark report""" |
|
|
agg = self.aggregate_results() |
|
|
|
|
|
report = f""" |
|
|
{'='*80} |
|
|
HELION-OSC BENCHMARK REPORT |
|
|
{'='*80} |
|
|
|
|
|
Model: {agg.model_name} |
|
|
Device: {self.device} |
|
|
|
|
|
OVERALL STATISTICS |
|
|
{'='*80} |
|
|
Total Tests: {agg.total_tests} |
|
|
Successful: {agg.successful_tests} |
|
|
Failed: {agg.failed_tests} |
|
|
Success Rate: {(agg.successful_tests/agg.total_tests)*100:.2f}% |
|
|
|
|
|
PERFORMANCE METRICS |
|
|
{'='*80} |
|
|
Average Inference Time: {agg.avg_inference_time:.4f} seconds |
|
|
Min Inference Time: {agg.min_inference_time:.4f} seconds |
|
|
Max Inference Time: {agg.max_inference_time:.4f} seconds |
|
|
Std Inference Time: {agg.std_inference_time:.4f} seconds |
|
|
|
|
|
Average Tokens/Second: {agg.avg_tokens_per_second:.2f} |
|
|
Average Memory Usage: {agg.avg_memory_mb:.2f} MB |
|
|
|
|
|
PER-TASK BREAKDOWN |
|
|
{'='*80} |
|
|
""" |
|
|
|
|
|
|
|
|
df = pd.DataFrame([asdict(r) for r in self.results if r.success]) |
|
|
if not df.empty: |
|
|
task_stats = df.groupby('task').agg({ |
|
|
'inference_time': ['mean', 'min', 'max'], |
|
|
'tokens_per_second': 'mean', |
|
|
'memory_used_mb': 'mean' |
|
|
}) |
|
|
|
|
|
report += task_stats.to_string() |
|
|
|
|
|
report += f"\n\n{'='*80}\n" |
|
|
|
|
|
with open(output_file, 'w') as f: |
|
|
f.write(report) |
|
|
|
|
|
logger.info(f"Report saved to {output_file}") |
|
|
print(report) |
|
|
|
|
|
def plot_results(self, output_dir: str = "./benchmark_plots"): |
|
|
"""Generate visualization plots""" |
|
|
import os |
|
|
os.makedirs(output_dir, exist_ok=True) |
|
|
|
|
|
df = pd.DataFrame([asdict(r) for r in self.results if r.success]) |
|
|
|
|
|
if df.empty: |
|
|
logger.warning("No data to plot") |
|
|
return |
|
|
|
|
|
|
|
|
sns.set_style("whitegrid") |
|
|
|
|
|
|
|
|
plt.figure(figsize=(12, 6)) |
|
|
sns.barplot(data=df, x='task', y='inference_time') |
|
|
plt.xticks(rotation=45, ha='right') |
|
|
plt.title('Inference Time by Task') |
|
|
plt.ylabel('Time (seconds)') |
|
|
plt.tight_layout() |
|
|
plt.savefig(f"{output_dir}/inference_time_by_task.png", dpi=300) |
|
|
plt.close() |
|
|
|
|
|
|
|
|
plt.figure(figsize=(12, 6)) |
|
|
sns.barplot(data=df, x='task', y='tokens_per_second') |
|
|
plt.xticks(rotation=45, ha='right') |
|
|
plt.title('Tokens Per Second by Task') |
|
|
plt.ylabel('Tokens/Second') |
|
|
plt.tight_layout() |
|
|
plt.savefig(f"{output_dir}/tokens_per_second_by_task.png", dpi=300) |
|
|
plt.close() |
|
|
|
|
|
|
|
|
plt.figure(figsize=(12, 6)) |
|
|
sns.barplot(data=df, x='task', y='memory_used_mb') |
|
|
plt.xticks(rotation=45, ha='right') |
|
|
plt.title('Memory Usage by Task') |
|
|
plt.ylabel('Memory (MB)') |
|
|
plt.tight_layout() |
|
|
plt.savefig(f"{output_dir}/memory_usage_by_task.png", dpi=300) |
|
|
plt.close() |
|
|
|
|
|
|
|
|
plt.figure(figsize=(10, 6)) |
|
|
sns.scatterplot(data=df, x='generation_length', y='inference_time', hue='task', s=100) |
|
|
plt.title('Generation Length vs Inference Time') |
|
|
plt.xlabel('Generation Length (tokens)') |
|
|
plt.ylabel('Inference Time (seconds)') |
|
|
plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left') |
|
|
plt.tight_layout() |
|
|
plt.savefig(f"{output_dir}/length_vs_time.png", dpi=300) |
|
|
plt.close() |
|
|
|
|
|
logger.info(f"Plots saved to {output_dir}") |
|
|
|
|
|
|
|
|
class ComparisonBenchmark: |
|
|
"""Compare multiple models""" |
|
|
|
|
|
def __init__(self, model_names: List[str]): |
|
|
self.model_names = model_names |
|
|
self.benchmarks = {} |
|
|
|
|
|
def run_comparison(self): |
|
|
"""Run benchmarks for all models""" |
|
|
for model_name in self.model_names: |
|
|
logger.info(f"\nBenchmarking {model_name}...") |
|
|
try: |
|
|
benchmark = PerformanceBenchmark(model_name) |
|
|
benchmark.run_benchmark_suite() |
|
|
self.benchmarks[model_name] = benchmark |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to benchmark {model_name}: {e}") |
|
|
|
|
|
def generate_comparison_report(self, output_file: str = "comparison_report.txt"): |
|
|
"""Generate comparison report""" |
|
|
report = f""" |
|
|
{'='*80} |
|
|
MODEL COMPARISON REPORT |
|
|
{'='*80} |
|
|
|
|
|
""" |
|
|
|
|
|
for model_name, benchmark in self.benchmarks.items(): |
|
|
agg = benchmark.aggregate_results() |
|
|
report += f""" |
|
|
Model: {model_name} |
|
|
{'='*80} |
|
|
Avg Inference Time: {agg.avg_inference_time:.4f}s |
|
|
Avg Tokens/Second: {agg.avg_tokens_per_second:.2f} |
|
|
Avg Memory Usage: {agg.avg_memory_mb:.2f} MB |
|
|
Success Rate: {(agg.successful_tests/agg.total_tests)*100:.2f}% |
|
|
|
|
|
""" |
|
|
|
|
|
with open(output_file, 'w') as f: |
|
|
f.write(report) |
|
|
|
|
|
print(report) |
|
|
logger.info(f"Comparison report saved to {output_file}") |
|
|
|
|
|
|
|
|
def main(): |
|
|
"""Main benchmark script""" |
|
|
import argparse |
|
|
|
|
|
parser = argparse.ArgumentParser(description="Benchmark Helion-OSC model") |
|
|
parser.add_argument("--model", type=str, default="DeepXR/Helion-OSC") |
|
|
parser.add_argument("--output-dir", type=str, default="./benchmark_results") |
|
|
parser.add_argument("--compare", nargs='+', help="List of models to compare") |
|
|
parser.add_argument("--plot", action="store_true", help="Generate plots") |
|
|
|
|
|
args = parser.parse_args() |
|
|
|
|
|
import os |
|
|
os.makedirs(args.output_dir, exist_ok=True) |
|
|
|
|
|
if args.compare: |
|
|
|
|
|
comparison = ComparisonBenchmark(args.compare) |
|
|
comparison.run_comparison() |
|
|
comparison.generate_comparison_report( |
|
|
os.path.join(args.output_dir, "comparison_report.txt") |
|
|
) |
|
|
else: |
|
|
|
|
|
benchmark = PerformanceBenchmark(args.model) |
|
|
benchmark.run_benchmark_suite() |
|
|
benchmark.save_results(os.path.join(args.output_dir, "benchmark_results.json")) |
|
|
benchmark.generate_report(os.path.join(args.output_dir, "benchmark_report.txt")) |
|
|
|
|
|
if args.plot: |
|
|
benchmark.plot_results(os.path.join(args.output_dir, "plots")) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |