universal-fast-dubbing / backend /modules /performance_monitor.py
cytopa99's picture
Upload 47 files
68e5689 verified
"""
性能监控模块
提供系统性能监控功能,包括:
- 处理时间记录
- 内存使用监控
- 并发数动态调整
- 性能指标统计
Requirements: 9.1, 9.3, 9.6
"""
import os
import time
import asyncio
import logging
import psutil
from typing import Dict, Any, Optional, List, Callable
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from collections import deque
from functools import wraps
import threading
# 配置日志
logger = logging.getLogger(__name__)
@dataclass
class PerformanceMetrics:
"""
性能指标数据类
属性:
operation: 操作名称
start_time: 开始时间
end_time: 结束时间
duration_ms: 耗时(毫秒)
success: 是否成功
memory_before: 操作前内存使用(MB)
memory_after: 操作后内存使用(MB)
extra: 额外信息
"""
operation: str
start_time: datetime
end_time: Optional[datetime] = None
duration_ms: Optional[float] = None
success: bool = True
memory_before: Optional[float] = None
memory_after: Optional[float] = None
extra: Dict[str, Any] = field(default_factory=dict)
def complete(self, success: bool = True, memory_after: Optional[float] = None):
"""完成指标记录"""
self.end_time = datetime.now()
self.duration_ms = (self.end_time - self.start_time).total_seconds() * 1000
self.success = success
self.memory_after = memory_after
def to_dict(self) -> Dict[str, Any]:
"""转换为字典"""
return {
"operation": self.operation,
"start_time": self.start_time.isoformat(),
"end_time": self.end_time.isoformat() if self.end_time else None,
"duration_ms": round(self.duration_ms, 2) if self.duration_ms else None,
"success": self.success,
"memory_before_mb": round(self.memory_before, 2) if self.memory_before else None,
"memory_after_mb": round(self.memory_after, 2) if self.memory_after else None,
"extra": self.extra
}
@dataclass
class PerformanceThresholds:
"""
性能阈值配置
属性:
short_video_max_ms: 短视频(1-2分钟)最大处理时间(毫秒)
medium_video_max_ms: 中等视频(5-10分钟)最大处理时间(毫秒)
max_memory_mb: 最大内存使用(MB)
sync_tolerance_ms: 同步容差(毫秒)
success_rate_threshold: 成功率阈值
"""
short_video_max_ms: float = 30000.0 # 30秒
medium_video_max_ms: float = 60000.0 # 60秒
max_memory_mb: float = 2048.0 # 2GB
sync_tolerance_ms: float = 300.0 # 0.3秒
success_rate_threshold: float = 0.95 # 95%
class PerformanceMonitor:
"""
性能监控器
提供系统性能监控和统计功能。
使用示例:
monitor = PerformanceMonitor()
# 记录操作
with monitor.track_operation("语音识别") as metrics:
result = await transcribe(audio)
metrics.extra["segments"] = len(result)
# 获取统计
stats = monitor.get_statistics()
"""
def __init__(
self,
thresholds: Optional[PerformanceThresholds] = None,
history_size: int = 1000
):
"""
初始化性能监控器
参数:
thresholds: 性能阈值配置
history_size: 历史记录最大数量
"""
self.thresholds = thresholds or PerformanceThresholds()
self._history: deque = deque(maxlen=history_size)
self._lock = threading.Lock()
# 操作统计
self._operation_stats: Dict[str, Dict[str, Any]] = {}
# 系统资源监控
self._process = psutil.Process(os.getpid())
logger.info("性能监控器初始化完成")
def get_memory_usage(self) -> float:
"""
获取当前内存使用量(MB)
返回:
内存使用量(MB)
"""
try:
memory_info = self._process.memory_info()
return memory_info.rss / (1024 * 1024) # 转换为MB
except Exception as e:
logger.warning(f"获取内存使用失败: {e}")
return 0.0
def get_cpu_usage(self) -> float:
"""
获取当前CPU使用率
返回:
CPU使用率(百分比)
"""
try:
return self._process.cpu_percent(interval=0.1)
except Exception as e:
logger.warning(f"获取CPU使用率失败: {e}")
return 0.0
def track_operation(self, operation: str) -> 'OperationTracker':
"""
创建操作跟踪器
参数:
operation: 操作名称
返回:
OperationTracker 上下文管理器
"""
return OperationTracker(self, operation)
def record_metrics(self, metrics: PerformanceMetrics) -> None:
"""
记录性能指标
参数:
metrics: 性能指标
"""
with self._lock:
self._history.append(metrics)
self._update_operation_stats(metrics)
# 检查是否超过阈值
self._check_thresholds(metrics)
def _update_operation_stats(self, metrics: PerformanceMetrics) -> None:
"""更新操作统计"""
op = metrics.operation
if op not in self._operation_stats:
self._operation_stats[op] = {
"count": 0,
"success_count": 0,
"total_duration_ms": 0,
"min_duration_ms": float('inf'),
"max_duration_ms": 0,
"last_duration_ms": 0
}
stats = self._operation_stats[op]
stats["count"] += 1
if metrics.success:
stats["success_count"] += 1
if metrics.duration_ms:
stats["total_duration_ms"] += metrics.duration_ms
stats["min_duration_ms"] = min(stats["min_duration_ms"], metrics.duration_ms)
stats["max_duration_ms"] = max(stats["max_duration_ms"], metrics.duration_ms)
stats["last_duration_ms"] = metrics.duration_ms
def _check_thresholds(self, metrics: PerformanceMetrics) -> None:
"""检查性能阈值"""
# 检查内存使用
if metrics.memory_after and metrics.memory_after > self.thresholds.max_memory_mb:
logger.warning(
f"内存使用超过阈值: {metrics.memory_after:.1f}MB > "
f"{self.thresholds.max_memory_mb:.1f}MB"
)
# 检查处理时间
if metrics.duration_ms:
video_duration = metrics.extra.get("video_duration_seconds", 0)
if video_duration <= 120: # 短视频(2分钟以内)
if metrics.duration_ms > self.thresholds.short_video_max_ms:
logger.warning(
f"短视频处理时间超过阈值: {metrics.duration_ms:.0f}ms > "
f"{self.thresholds.short_video_max_ms:.0f}ms"
)
elif video_duration <= 600: # 中等视频(10分钟以内)
if metrics.duration_ms > self.thresholds.medium_video_max_ms:
logger.warning(
f"中等视频处理时间超过阈值: {metrics.duration_ms:.0f}ms > "
f"{self.thresholds.medium_video_max_ms:.0f}ms"
)
def get_statistics(self) -> Dict[str, Any]:
"""
获取性能统计信息
返回:
统计信息字典
"""
with self._lock:
total_count = len(self._history)
success_count = sum(1 for m in self._history if m.success)
# 计算各操作的平均耗时
operation_averages = {}
for op, stats in self._operation_stats.items():
if stats["count"] > 0:
operation_averages[op] = {
"count": stats["count"],
"success_rate": stats["success_count"] / stats["count"],
"avg_duration_ms": stats["total_duration_ms"] / stats["count"],
"min_duration_ms": stats["min_duration_ms"] if stats["min_duration_ms"] != float('inf') else 0,
"max_duration_ms": stats["max_duration_ms"],
"last_duration_ms": stats["last_duration_ms"]
}
return {
"total_operations": total_count,
"success_count": success_count,
"success_rate": success_count / total_count if total_count > 0 else 1.0,
"current_memory_mb": self.get_memory_usage(),
"current_cpu_percent": self.get_cpu_usage(),
"operation_stats": operation_averages,
"thresholds": {
"short_video_max_ms": self.thresholds.short_video_max_ms,
"medium_video_max_ms": self.thresholds.medium_video_max_ms,
"max_memory_mb": self.thresholds.max_memory_mb,
"success_rate_threshold": self.thresholds.success_rate_threshold
}
}
def get_recent_metrics(self, count: int = 10) -> List[Dict[str, Any]]:
"""
获取最近的性能指标
参数:
count: 返回数量
返回:
指标列表
"""
with self._lock:
recent = list(self._history)[-count:]
return [m.to_dict() for m in recent]
def clear_history(self) -> int:
"""
清除历史记录
返回:
清除的记录数
"""
with self._lock:
count = len(self._history)
self._history.clear()
self._operation_stats.clear()
logger.info(f"清除了 {count} 条性能记录")
return count
def is_healthy(self) -> Dict[str, Any]:
"""
检查系统健康状态
返回:
健康状态信息
"""
stats = self.get_statistics()
issues = []
# 检查成功率
if stats["success_rate"] < self.thresholds.success_rate_threshold:
issues.append(
f"成功率低于阈值: {stats['success_rate']:.1%} < "
f"{self.thresholds.success_rate_threshold:.1%}"
)
# 检查内存使用
if stats["current_memory_mb"] > self.thresholds.max_memory_mb * 0.9:
issues.append(
f"内存使用接近上限: {stats['current_memory_mb']:.1f}MB / "
f"{self.thresholds.max_memory_mb:.1f}MB"
)
return {
"healthy": len(issues) == 0,
"issues": issues,
"memory_mb": stats["current_memory_mb"],
"cpu_percent": stats["current_cpu_percent"],
"success_rate": stats["success_rate"]
}
class OperationTracker:
"""
操作跟踪器(上下文管理器)
用于跟踪单个操作的性能指标。
"""
def __init__(self, monitor: PerformanceMonitor, operation: str):
"""
初始化操作跟踪器
参数:
monitor: 性能监控器
operation: 操作名称
"""
self._monitor = monitor
self._operation = operation
self._metrics: Optional[PerformanceMetrics] = None
def __enter__(self) -> PerformanceMetrics:
"""进入上下文"""
self._metrics = PerformanceMetrics(
operation=self._operation,
start_time=datetime.now(),
memory_before=self._monitor.get_memory_usage()
)
return self._metrics
def __exit__(self, exc_type, exc_val, exc_tb):
"""退出上下文"""
if self._metrics:
success = exc_type is None
self._metrics.complete(
success=success,
memory_after=self._monitor.get_memory_usage()
)
self._monitor.record_metrics(self._metrics)
# 不抑制异常
return False
async def __aenter__(self) -> PerformanceMetrics:
"""异步进入上下文"""
return self.__enter__()
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""异步退出上下文"""
return self.__exit__(exc_type, exc_val, exc_tb)
class AdaptiveConcurrencyController:
"""
自适应并发控制器
根据系统负载动态调整并发数。
使用示例:
controller = AdaptiveConcurrencyController(
min_workers=1,
max_workers=5,
target_memory_percent=70
)
# 获取当前推荐的并发数
workers = controller.get_recommended_workers()
"""
def __init__(
self,
min_workers: int = 1,
max_workers: int = 5,
target_memory_percent: float = 70.0,
target_cpu_percent: float = 80.0
):
"""
初始化并发控制器
参数:
min_workers: 最小并发数
max_workers: 最大并发数
target_memory_percent: 目标内存使用率
target_cpu_percent: 目标CPU使用率
"""
self.min_workers = min_workers
self.max_workers = max_workers
self.target_memory_percent = target_memory_percent
self.target_cpu_percent = target_cpu_percent
self._current_workers = min_workers
self._process = psutil.Process(os.getpid())
logger.info(
f"自适应并发控制器初始化: "
f"workers={min_workers}-{max_workers}, "
f"target_memory={target_memory_percent}%, "
f"target_cpu={target_cpu_percent}%"
)
def get_system_load(self) -> Dict[str, float]:
"""
获取系统负载
返回:
负载信息字典
"""
try:
memory = psutil.virtual_memory()
cpu = psutil.cpu_percent(interval=0.1)
return {
"memory_percent": memory.percent,
"cpu_percent": cpu,
"memory_available_mb": memory.available / (1024 * 1024)
}
except Exception as e:
logger.warning(f"获取系统负载失败: {e}")
return {
"memory_percent": 50.0,
"cpu_percent": 50.0,
"memory_available_mb": 1024.0
}
def get_recommended_workers(self) -> int:
"""
获取推荐的并发数
返回:
推荐的并发数
"""
load = self.get_system_load()
# 根据内存使用调整
memory_factor = 1.0
if load["memory_percent"] > self.target_memory_percent:
# 内存使用过高,减少并发
memory_factor = self.target_memory_percent / load["memory_percent"]
elif load["memory_percent"] < self.target_memory_percent * 0.5:
# 内存使用较低,可以增加并发
memory_factor = 1.2
# 根据CPU使用调整
cpu_factor = 1.0
if load["cpu_percent"] > self.target_cpu_percent:
# CPU使用过高,减少并发
cpu_factor = self.target_cpu_percent / load["cpu_percent"]
elif load["cpu_percent"] < self.target_cpu_percent * 0.5:
# CPU使用较低,可以增加并发
cpu_factor = 1.2
# 计算推荐值
factor = min(memory_factor, cpu_factor)
recommended = int(self._current_workers * factor)
# 限制在范围内
recommended = max(self.min_workers, min(self.max_workers, recommended))
# 平滑调整(每次最多变化1)
if recommended > self._current_workers:
self._current_workers = min(self._current_workers + 1, recommended)
elif recommended < self._current_workers:
self._current_workers = max(self._current_workers - 1, recommended)
logger.debug(
f"并发调整: workers={self._current_workers}, "
f"memory={load['memory_percent']:.1f}%, "
f"cpu={load['cpu_percent']:.1f}%"
)
return self._current_workers
def reset(self) -> None:
"""重置到最小并发数"""
self._current_workers = self.min_workers
logger.info(f"并发数重置为 {self.min_workers}")
# 全局性能监控器实例
_global_monitor: Optional[PerformanceMonitor] = None
def get_performance_monitor() -> PerformanceMonitor:
"""
获取全局性能监控器
返回:
PerformanceMonitor 实例
"""
global _global_monitor
if _global_monitor is None:
_global_monitor = PerformanceMonitor()
return _global_monitor
def track_performance(operation: str):
"""
性能跟踪装饰器
参数:
operation: 操作名称
使用示例:
@track_performance("语音识别")
async def transcribe(audio_path: str):
...
"""
def decorator(func):
@wraps(func)
async def async_wrapper(*args, **kwargs):
monitor = get_performance_monitor()
async with monitor.track_operation(operation) as metrics:
result = await func(*args, **kwargs)
return result
@wraps(func)
def sync_wrapper(*args, **kwargs):
monitor = get_performance_monitor()
with monitor.track_operation(operation) as metrics:
result = func(*args, **kwargs)
return result
if asyncio.iscoroutinefunction(func):
return async_wrapper
else:
return sync_wrapper
return decorator