Spaces:
Running
Running
| """ | |
| 性能监控模块 | |
| 提供系统性能监控功能,包括: | |
| - 处理时间记录 | |
| - 内存使用监控 | |
| - 并发数动态调整 | |
| - 性能指标统计 | |
| Requirements: 9.1, 9.3, 9.6 | |
| """ | |
| import os | |
| import time | |
| import asyncio | |
| import logging | |
| import psutil | |
| from typing import Dict, Any, Optional, List, Callable | |
| from dataclasses import dataclass, field | |
| from datetime import datetime, timedelta | |
| from collections import deque | |
| from functools import wraps | |
| import threading | |
| # 配置日志 | |
| logger = logging.getLogger(__name__) | |
| class PerformanceMetrics: | |
| """ | |
| 性能指标数据类 | |
| 属性: | |
| operation: 操作名称 | |
| start_time: 开始时间 | |
| end_time: 结束时间 | |
| duration_ms: 耗时(毫秒) | |
| success: 是否成功 | |
| memory_before: 操作前内存使用(MB) | |
| memory_after: 操作后内存使用(MB) | |
| extra: 额外信息 | |
| """ | |
| operation: str | |
| start_time: datetime | |
| end_time: Optional[datetime] = None | |
| duration_ms: Optional[float] = None | |
| success: bool = True | |
| memory_before: Optional[float] = None | |
| memory_after: Optional[float] = None | |
| extra: Dict[str, Any] = field(default_factory=dict) | |
| def complete(self, success: bool = True, memory_after: Optional[float] = None): | |
| """完成指标记录""" | |
| self.end_time = datetime.now() | |
| self.duration_ms = (self.end_time - self.start_time).total_seconds() * 1000 | |
| self.success = success | |
| self.memory_after = memory_after | |
| def to_dict(self) -> Dict[str, Any]: | |
| """转换为字典""" | |
| return { | |
| "operation": self.operation, | |
| "start_time": self.start_time.isoformat(), | |
| "end_time": self.end_time.isoformat() if self.end_time else None, | |
| "duration_ms": round(self.duration_ms, 2) if self.duration_ms else None, | |
| "success": self.success, | |
| "memory_before_mb": round(self.memory_before, 2) if self.memory_before else None, | |
| "memory_after_mb": round(self.memory_after, 2) if self.memory_after else None, | |
| "extra": self.extra | |
| } | |
| class PerformanceThresholds: | |
| """ | |
| 性能阈值配置 | |
| 属性: | |
| short_video_max_ms: 短视频(1-2分钟)最大处理时间(毫秒) | |
| medium_video_max_ms: 中等视频(5-10分钟)最大处理时间(毫秒) | |
| max_memory_mb: 最大内存使用(MB) | |
| sync_tolerance_ms: 同步容差(毫秒) | |
| success_rate_threshold: 成功率阈值 | |
| """ | |
| short_video_max_ms: float = 30000.0 # 30秒 | |
| medium_video_max_ms: float = 60000.0 # 60秒 | |
| max_memory_mb: float = 2048.0 # 2GB | |
| sync_tolerance_ms: float = 300.0 # 0.3秒 | |
| success_rate_threshold: float = 0.95 # 95% | |
| class PerformanceMonitor: | |
| """ | |
| 性能监控器 | |
| 提供系统性能监控和统计功能。 | |
| 使用示例: | |
| monitor = PerformanceMonitor() | |
| # 记录操作 | |
| with monitor.track_operation("语音识别") as metrics: | |
| result = await transcribe(audio) | |
| metrics.extra["segments"] = len(result) | |
| # 获取统计 | |
| stats = monitor.get_statistics() | |
| """ | |
| def __init__( | |
| self, | |
| thresholds: Optional[PerformanceThresholds] = None, | |
| history_size: int = 1000 | |
| ): | |
| """ | |
| 初始化性能监控器 | |
| 参数: | |
| thresholds: 性能阈值配置 | |
| history_size: 历史记录最大数量 | |
| """ | |
| self.thresholds = thresholds or PerformanceThresholds() | |
| self._history: deque = deque(maxlen=history_size) | |
| self._lock = threading.Lock() | |
| # 操作统计 | |
| self._operation_stats: Dict[str, Dict[str, Any]] = {} | |
| # 系统资源监控 | |
| self._process = psutil.Process(os.getpid()) | |
| logger.info("性能监控器初始化完成") | |
| def get_memory_usage(self) -> float: | |
| """ | |
| 获取当前内存使用量(MB) | |
| 返回: | |
| 内存使用量(MB) | |
| """ | |
| try: | |
| memory_info = self._process.memory_info() | |
| return memory_info.rss / (1024 * 1024) # 转换为MB | |
| except Exception as e: | |
| logger.warning(f"获取内存使用失败: {e}") | |
| return 0.0 | |
| def get_cpu_usage(self) -> float: | |
| """ | |
| 获取当前CPU使用率 | |
| 返回: | |
| CPU使用率(百分比) | |
| """ | |
| try: | |
| return self._process.cpu_percent(interval=0.1) | |
| except Exception as e: | |
| logger.warning(f"获取CPU使用率失败: {e}") | |
| return 0.0 | |
| def track_operation(self, operation: str) -> 'OperationTracker': | |
| """ | |
| 创建操作跟踪器 | |
| 参数: | |
| operation: 操作名称 | |
| 返回: | |
| OperationTracker 上下文管理器 | |
| """ | |
| return OperationTracker(self, operation) | |
| def record_metrics(self, metrics: PerformanceMetrics) -> None: | |
| """ | |
| 记录性能指标 | |
| 参数: | |
| metrics: 性能指标 | |
| """ | |
| with self._lock: | |
| self._history.append(metrics) | |
| self._update_operation_stats(metrics) | |
| # 检查是否超过阈值 | |
| self._check_thresholds(metrics) | |
| def _update_operation_stats(self, metrics: PerformanceMetrics) -> None: | |
| """更新操作统计""" | |
| op = metrics.operation | |
| if op not in self._operation_stats: | |
| self._operation_stats[op] = { | |
| "count": 0, | |
| "success_count": 0, | |
| "total_duration_ms": 0, | |
| "min_duration_ms": float('inf'), | |
| "max_duration_ms": 0, | |
| "last_duration_ms": 0 | |
| } | |
| stats = self._operation_stats[op] | |
| stats["count"] += 1 | |
| if metrics.success: | |
| stats["success_count"] += 1 | |
| if metrics.duration_ms: | |
| stats["total_duration_ms"] += metrics.duration_ms | |
| stats["min_duration_ms"] = min(stats["min_duration_ms"], metrics.duration_ms) | |
| stats["max_duration_ms"] = max(stats["max_duration_ms"], metrics.duration_ms) | |
| stats["last_duration_ms"] = metrics.duration_ms | |
| def _check_thresholds(self, metrics: PerformanceMetrics) -> None: | |
| """检查性能阈值""" | |
| # 检查内存使用 | |
| if metrics.memory_after and metrics.memory_after > self.thresholds.max_memory_mb: | |
| logger.warning( | |
| f"内存使用超过阈值: {metrics.memory_after:.1f}MB > " | |
| f"{self.thresholds.max_memory_mb:.1f}MB" | |
| ) | |
| # 检查处理时间 | |
| if metrics.duration_ms: | |
| video_duration = metrics.extra.get("video_duration_seconds", 0) | |
| if video_duration <= 120: # 短视频(2分钟以内) | |
| if metrics.duration_ms > self.thresholds.short_video_max_ms: | |
| logger.warning( | |
| f"短视频处理时间超过阈值: {metrics.duration_ms:.0f}ms > " | |
| f"{self.thresholds.short_video_max_ms:.0f}ms" | |
| ) | |
| elif video_duration <= 600: # 中等视频(10分钟以内) | |
| if metrics.duration_ms > self.thresholds.medium_video_max_ms: | |
| logger.warning( | |
| f"中等视频处理时间超过阈值: {metrics.duration_ms:.0f}ms > " | |
| f"{self.thresholds.medium_video_max_ms:.0f}ms" | |
| ) | |
| def get_statistics(self) -> Dict[str, Any]: | |
| """ | |
| 获取性能统计信息 | |
| 返回: | |
| 统计信息字典 | |
| """ | |
| with self._lock: | |
| total_count = len(self._history) | |
| success_count = sum(1 for m in self._history if m.success) | |
| # 计算各操作的平均耗时 | |
| operation_averages = {} | |
| for op, stats in self._operation_stats.items(): | |
| if stats["count"] > 0: | |
| operation_averages[op] = { | |
| "count": stats["count"], | |
| "success_rate": stats["success_count"] / stats["count"], | |
| "avg_duration_ms": stats["total_duration_ms"] / stats["count"], | |
| "min_duration_ms": stats["min_duration_ms"] if stats["min_duration_ms"] != float('inf') else 0, | |
| "max_duration_ms": stats["max_duration_ms"], | |
| "last_duration_ms": stats["last_duration_ms"] | |
| } | |
| return { | |
| "total_operations": total_count, | |
| "success_count": success_count, | |
| "success_rate": success_count / total_count if total_count > 0 else 1.0, | |
| "current_memory_mb": self.get_memory_usage(), | |
| "current_cpu_percent": self.get_cpu_usage(), | |
| "operation_stats": operation_averages, | |
| "thresholds": { | |
| "short_video_max_ms": self.thresholds.short_video_max_ms, | |
| "medium_video_max_ms": self.thresholds.medium_video_max_ms, | |
| "max_memory_mb": self.thresholds.max_memory_mb, | |
| "success_rate_threshold": self.thresholds.success_rate_threshold | |
| } | |
| } | |
| def get_recent_metrics(self, count: int = 10) -> List[Dict[str, Any]]: | |
| """ | |
| 获取最近的性能指标 | |
| 参数: | |
| count: 返回数量 | |
| 返回: | |
| 指标列表 | |
| """ | |
| with self._lock: | |
| recent = list(self._history)[-count:] | |
| return [m.to_dict() for m in recent] | |
| def clear_history(self) -> int: | |
| """ | |
| 清除历史记录 | |
| 返回: | |
| 清除的记录数 | |
| """ | |
| with self._lock: | |
| count = len(self._history) | |
| self._history.clear() | |
| self._operation_stats.clear() | |
| logger.info(f"清除了 {count} 条性能记录") | |
| return count | |
| def is_healthy(self) -> Dict[str, Any]: | |
| """ | |
| 检查系统健康状态 | |
| 返回: | |
| 健康状态信息 | |
| """ | |
| stats = self.get_statistics() | |
| issues = [] | |
| # 检查成功率 | |
| if stats["success_rate"] < self.thresholds.success_rate_threshold: | |
| issues.append( | |
| f"成功率低于阈值: {stats['success_rate']:.1%} < " | |
| f"{self.thresholds.success_rate_threshold:.1%}" | |
| ) | |
| # 检查内存使用 | |
| if stats["current_memory_mb"] > self.thresholds.max_memory_mb * 0.9: | |
| issues.append( | |
| f"内存使用接近上限: {stats['current_memory_mb']:.1f}MB / " | |
| f"{self.thresholds.max_memory_mb:.1f}MB" | |
| ) | |
| return { | |
| "healthy": len(issues) == 0, | |
| "issues": issues, | |
| "memory_mb": stats["current_memory_mb"], | |
| "cpu_percent": stats["current_cpu_percent"], | |
| "success_rate": stats["success_rate"] | |
| } | |
| class OperationTracker: | |
| """ | |
| 操作跟踪器(上下文管理器) | |
| 用于跟踪单个操作的性能指标。 | |
| """ | |
| def __init__(self, monitor: PerformanceMonitor, operation: str): | |
| """ | |
| 初始化操作跟踪器 | |
| 参数: | |
| monitor: 性能监控器 | |
| operation: 操作名称 | |
| """ | |
| self._monitor = monitor | |
| self._operation = operation | |
| self._metrics: Optional[PerformanceMetrics] = None | |
| def __enter__(self) -> PerformanceMetrics: | |
| """进入上下文""" | |
| self._metrics = PerformanceMetrics( | |
| operation=self._operation, | |
| start_time=datetime.now(), | |
| memory_before=self._monitor.get_memory_usage() | |
| ) | |
| return self._metrics | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| """退出上下文""" | |
| if self._metrics: | |
| success = exc_type is None | |
| self._metrics.complete( | |
| success=success, | |
| memory_after=self._monitor.get_memory_usage() | |
| ) | |
| self._monitor.record_metrics(self._metrics) | |
| # 不抑制异常 | |
| return False | |
| async def __aenter__(self) -> PerformanceMetrics: | |
| """异步进入上下文""" | |
| return self.__enter__() | |
| async def __aexit__(self, exc_type, exc_val, exc_tb): | |
| """异步退出上下文""" | |
| return self.__exit__(exc_type, exc_val, exc_tb) | |
| class AdaptiveConcurrencyController: | |
| """ | |
| 自适应并发控制器 | |
| 根据系统负载动态调整并发数。 | |
| 使用示例: | |
| controller = AdaptiveConcurrencyController( | |
| min_workers=1, | |
| max_workers=5, | |
| target_memory_percent=70 | |
| ) | |
| # 获取当前推荐的并发数 | |
| workers = controller.get_recommended_workers() | |
| """ | |
| def __init__( | |
| self, | |
| min_workers: int = 1, | |
| max_workers: int = 5, | |
| target_memory_percent: float = 70.0, | |
| target_cpu_percent: float = 80.0 | |
| ): | |
| """ | |
| 初始化并发控制器 | |
| 参数: | |
| min_workers: 最小并发数 | |
| max_workers: 最大并发数 | |
| target_memory_percent: 目标内存使用率 | |
| target_cpu_percent: 目标CPU使用率 | |
| """ | |
| self.min_workers = min_workers | |
| self.max_workers = max_workers | |
| self.target_memory_percent = target_memory_percent | |
| self.target_cpu_percent = target_cpu_percent | |
| self._current_workers = min_workers | |
| self._process = psutil.Process(os.getpid()) | |
| logger.info( | |
| f"自适应并发控制器初始化: " | |
| f"workers={min_workers}-{max_workers}, " | |
| f"target_memory={target_memory_percent}%, " | |
| f"target_cpu={target_cpu_percent}%" | |
| ) | |
| def get_system_load(self) -> Dict[str, float]: | |
| """ | |
| 获取系统负载 | |
| 返回: | |
| 负载信息字典 | |
| """ | |
| try: | |
| memory = psutil.virtual_memory() | |
| cpu = psutil.cpu_percent(interval=0.1) | |
| return { | |
| "memory_percent": memory.percent, | |
| "cpu_percent": cpu, | |
| "memory_available_mb": memory.available / (1024 * 1024) | |
| } | |
| except Exception as e: | |
| logger.warning(f"获取系统负载失败: {e}") | |
| return { | |
| "memory_percent": 50.0, | |
| "cpu_percent": 50.0, | |
| "memory_available_mb": 1024.0 | |
| } | |
| def get_recommended_workers(self) -> int: | |
| """ | |
| 获取推荐的并发数 | |
| 返回: | |
| 推荐的并发数 | |
| """ | |
| load = self.get_system_load() | |
| # 根据内存使用调整 | |
| memory_factor = 1.0 | |
| if load["memory_percent"] > self.target_memory_percent: | |
| # 内存使用过高,减少并发 | |
| memory_factor = self.target_memory_percent / load["memory_percent"] | |
| elif load["memory_percent"] < self.target_memory_percent * 0.5: | |
| # 内存使用较低,可以增加并发 | |
| memory_factor = 1.2 | |
| # 根据CPU使用调整 | |
| cpu_factor = 1.0 | |
| if load["cpu_percent"] > self.target_cpu_percent: | |
| # CPU使用过高,减少并发 | |
| cpu_factor = self.target_cpu_percent / load["cpu_percent"] | |
| elif load["cpu_percent"] < self.target_cpu_percent * 0.5: | |
| # CPU使用较低,可以增加并发 | |
| cpu_factor = 1.2 | |
| # 计算推荐值 | |
| factor = min(memory_factor, cpu_factor) | |
| recommended = int(self._current_workers * factor) | |
| # 限制在范围内 | |
| recommended = max(self.min_workers, min(self.max_workers, recommended)) | |
| # 平滑调整(每次最多变化1) | |
| if recommended > self._current_workers: | |
| self._current_workers = min(self._current_workers + 1, recommended) | |
| elif recommended < self._current_workers: | |
| self._current_workers = max(self._current_workers - 1, recommended) | |
| logger.debug( | |
| f"并发调整: workers={self._current_workers}, " | |
| f"memory={load['memory_percent']:.1f}%, " | |
| f"cpu={load['cpu_percent']:.1f}%" | |
| ) | |
| return self._current_workers | |
| def reset(self) -> None: | |
| """重置到最小并发数""" | |
| self._current_workers = self.min_workers | |
| logger.info(f"并发数重置为 {self.min_workers}") | |
| # 全局性能监控器实例 | |
| _global_monitor: Optional[PerformanceMonitor] = None | |
| def get_performance_monitor() -> PerformanceMonitor: | |
| """ | |
| 获取全局性能监控器 | |
| 返回: | |
| PerformanceMonitor 实例 | |
| """ | |
| global _global_monitor | |
| if _global_monitor is None: | |
| _global_monitor = PerformanceMonitor() | |
| return _global_monitor | |
| def track_performance(operation: str): | |
| """ | |
| 性能跟踪装饰器 | |
| 参数: | |
| operation: 操作名称 | |
| 使用示例: | |
| @track_performance("语音识别") | |
| async def transcribe(audio_path: str): | |
| ... | |
| """ | |
| def decorator(func): | |
| async def async_wrapper(*args, **kwargs): | |
| monitor = get_performance_monitor() | |
| async with monitor.track_operation(operation) as metrics: | |
| result = await func(*args, **kwargs) | |
| return result | |
| def sync_wrapper(*args, **kwargs): | |
| monitor = get_performance_monitor() | |
| with monitor.track_operation(operation) as metrics: | |
| result = func(*args, **kwargs) | |
| return result | |
| if asyncio.iscoroutinefunction(func): | |
| return async_wrapper | |
| else: | |
| return sync_wrapper | |
| return decorator | |