""" 音频同步引擎模块 提供配音音频与原视频的精确同步功能,支持: - 以Whisper时间戳为基准进行对齐 - 智能变速处理(最大1.4倍) - 静音填充 - 音频片段合并 """ import os import time import logging from typing import List, Dict, Any, Optional, Tuple from dataclasses import dataclass # 配置日志 logger = logging.getLogger(__name__) class AudioSyncError(Exception): """音频同步异常基类""" pass class AudioLoadError(AudioSyncError): """音频加载异常""" def __init__(self, path: str, reason: str): self.path = path self.reason = reason self.message = f"音频加载失败 [{path}]: {reason}" super().__init__(self.message) class AudioAlignError(AudioSyncError): """音频对齐异常""" def __init__(self, reason: str): self.reason = reason self.message = f"音频对齐失败: {reason}" super().__init__(self.message) @dataclass class SyncConfig: """ 音频同步配置 属性: max_speed_ratio: 最大变速比例,默认1.4倍 sync_tolerance: 同步容差(秒),默认0.3秒 silence_padding: 是否启用静音填充,默认True output_format: 输出音频格式,默认wav temp_dir: 临时文件目录 """ max_speed_ratio: float = 3.0 # 放宽到 3 倍,适应更大的时长差异 sync_tolerance: float = 0.3 silence_padding: bool = True output_format: str = "wav" temp_dir: str = "temp/sync" class AudioSyncEngine: """ 音频同步引擎 将TTS生成的配音音频与原视频时间轴精确对齐。 使用示例: engine = AudioSyncEngine() # 对齐单个片段 synced_path = await engine.align_segment( tts_audio_path="segment.wav", target_start=10.5, target_end=15.2 ) # 批量对齐并合并 final_path = await engine.align( tts_audio_paths=["seg1.wav", "seg2.wav"], original_segments=[ {"start": 0, "end": 5}, {"start": 5.5, "end": 10} ], target_duration=10.0 ) """ def __init__(self, config: Optional[SyncConfig] = None): """ 初始化音频同步引擎 参数: config: 同步配置,如果为None则使用默认配置 """ self.config = config or SyncConfig() # 确保临时目录存在 os.makedirs(self.config.temp_dir, exist_ok=True) # 跟踪临时文件 self._temp_files: List[str] = [] logger.info( f"音频同步引擎初始化: 最大变速={self.config.max_speed_ratio}x, " f"同步容差={self.config.sync_tolerance}s" ) async def align( self, tts_audio_paths: List[str], original_segments: List[Dict[str, Any]], target_duration: float, client_config: Optional[Dict[str, Any]] = None ) -> str: """ 音频对齐和同步 将多个TTS音频片段对齐到原始时间戳,并合并为最终配音文件。 参数: tts_audio_paths: TTS音频文件路径列表 original_segments: 原始片段信息列表,每个包含: - start: float - 开始时间(秒) - end: float - 结束时间(秒) target_duration: 目标总时长(秒) client_config: 客户端配置,包含: - syncOffset: int - 同步偏移量(毫秒) 返回: 最终配音文件路径 异常: AudioSyncError: 同步处理失败 """ from pydub import AudioSegment if not tts_audio_paths or not original_segments: raise AudioAlignError("输入为空") # 处理客户端配置中的同步偏移 sync_offset_ms = 0 # 默认无偏移 if client_config and 'syncOffset' in client_config: sync_offset_ms = int(client_config['syncOffset']) logger.info(f"使用客户端同步偏移: {sync_offset_ms}ms") logger.info( f"开始音频同步: {len(tts_audio_paths)} 个片段, " f"目标时长={target_duration:.1f}s, " f"同步偏移={sync_offset_ms}ms" ) # 1. 加载并对齐每个TTS片段 aligned_segments = [] for i, (tts_path, orig_seg) in enumerate( zip(tts_audio_paths, original_segments) ): if tts_path is None or not os.path.exists(tts_path): logger.warning(f"片段 {i} 音频文件不存在,跳过") aligned_segments.append(None) continue try: # 加载TTS音频 tts_audio = AudioSegment.from_file(tts_path) # 计算目标时长 target_seg_duration = orig_seg['end'] - orig_seg['start'] # 应用同步偏移(将毫秒转换为秒) sync_offset_seconds = sync_offset_ms / 1000.0 adjusted_start = orig_seg['start'] + sync_offset_seconds adjusted_end = orig_seg['end'] + sync_offset_seconds # 对齐音频 aligned_audio = self._align_single_segment( tts_audio, target_seg_duration ) aligned_segments.append({ 'audio': aligned_audio, 'start': adjusted_start, # 使用调整后的时间戳 'end': adjusted_end # 使用调整后的时间戳 }) except Exception as e: logger.error(f"片段 {i} 对齐失败: {e}") aligned_segments.append(None) # 2. 合并所有片段 final_audio = self._merge_segments(aligned_segments, target_duration) # 3. 保存最终音频 output_path = os.path.join( self.config.temp_dir, f"final_dubbing_{int(time.time())}.{self.config.output_format}" ) final_audio.export(output_path, format=self.config.output_format) self._temp_files.append(output_path) logger.info(f"音频同步完成: {output_path}") return output_path def _align_single_segment( self, tts_audio, target_duration: float ): """ 对齐单个音频片段 根据目标时长调整TTS音频,支持变速和静音填充。 参数: tts_audio: TTS音频对象 (AudioSegment) target_duration: 目标时长(秒) 返回: 对齐后的音频对象 """ from pydub import AudioSegment current_duration = len(tts_audio) / 1000.0 # 转换为秒 # 计算需要的变速比例 if target_duration <= 0: return tts_audio speed_ratio = current_duration / target_duration logger.debug( f"片段对齐: 当前={current_duration:.2f}s, " f"目标={target_duration:.2f}s, 比例={speed_ratio:.2f}" ) # 情况1: 需要加速(TTS太长) if speed_ratio > 1: if speed_ratio > self.config.max_speed_ratio: # 超过最大变速限制,截断 logger.warning( f"变速比例 {speed_ratio:.2f} 超过限制 " f"{self.config.max_speed_ratio},进行截断" ) adjusted_audio = tts_audio[:int(target_duration * 1000)] else: # 正常加速 adjusted_audio = self._change_speed(tts_audio, speed_ratio) # 情况2: 需要减速或填充(TTS太短) elif speed_ratio < 1: min_ratio = 1 / self.config.max_speed_ratio if speed_ratio < min_ratio: # 超过最大减速限制,添加静音填充 if self.config.silence_padding: silence_duration = (target_duration - current_duration) * 1000 silence = AudioSegment.silent(duration=int(silence_duration)) adjusted_audio = tts_audio + silence logger.debug(f"添加静音填充: {silence_duration:.0f}ms") else: adjusted_audio = self._change_speed(tts_audio, min_ratio) else: # 正常减速 adjusted_audio = self._change_speed(tts_audio, speed_ratio) # 情况3: 时长匹配 else: adjusted_audio = tts_audio return adjusted_audio def _change_speed(self, audio, speed_ratio: float): """ 改变音频播放速度 参数: audio: 音频对象 (AudioSegment) speed_ratio: 变速比例(>1加速,<1减速) 返回: 变速后的音频对象 """ from pydub import AudioSegment if abs(speed_ratio - 1.0) < 0.01: return audio try: if speed_ratio > 1: # 加速:使用 speedup 方法 # pydub 的 speedup 需要整数倍,我们用帧率调整 new_frame_rate = int(audio.frame_rate * speed_ratio) adjusted = audio._spawn( audio.raw_data, overrides={"frame_rate": new_frame_rate} ).set_frame_rate(audio.frame_rate) else: # 减速:降低帧率然后恢复 new_frame_rate = int(audio.frame_rate * speed_ratio) adjusted = audio._spawn( audio.raw_data, overrides={"frame_rate": new_frame_rate} ).set_frame_rate(audio.frame_rate) return adjusted except Exception as e: logger.warning(f"变速处理失败: {e},返回原始音频") return audio def _merge_segments( self, aligned_segments: List[Optional[Dict[str, Any]]], total_duration: float ): """ 根据时间信息合并音频片段 参数: aligned_segments: 对齐后的片段列表 total_duration: 目标总时长 返回: 合并后的音频对象 """ from pydub import AudioSegment # 创建空白音频作为基础 final_audio = AudioSegment.silent(duration=int(total_duration * 1000)) # 将每个片段放置到正确的时间位置 for seg_info in aligned_segments: if seg_info is None: continue audio = seg_info['audio'] start_ms = int(seg_info['start'] * 1000) # 使用 overlay 将音频放置到指定位置 final_audio = final_audio.overlay(audio, position=start_ms) return final_audio async def align_segment( self, tts_audio_path: str, target_start: float, target_end: float, output_path: Optional[str] = None ) -> str: """ 对齐单个TTS音频片段 参数: tts_audio_path: TTS音频文件路径 target_start: 目标开始时间(秒) target_end: 目标结束时间(秒) output_path: 输出文件路径(可选) 返回: 对齐后的音频文件路径 """ from pydub import AudioSegment if not os.path.exists(tts_audio_path): raise AudioLoadError(tts_audio_path, "文件不存在") # 加载音频 tts_audio = AudioSegment.from_file(tts_audio_path) # 计算目标时长 target_duration = target_end - target_start # 对齐 aligned_audio = self._align_single_segment(tts_audio, target_duration) # 保存 if output_path is None: output_path = os.path.join( self.config.temp_dir, f"aligned_{int(time.time() * 1000)}.{self.config.output_format}" ) aligned_audio.export(output_path, format=self.config.output_format) self._temp_files.append(output_path) return output_path def check_sync_drift( self, current_position: float, expected_position: float ) -> Tuple[bool, float]: """ 检查音视频同步偏差 参数: current_position: 当前播放位置(秒) expected_position: 期望播放位置(秒) 返回: (是否需要校正, 偏差值) """ drift = abs(current_position - expected_position) needs_correction = drift > self.config.sync_tolerance if needs_correction: logger.warning(f"检测到同步偏差: {drift:.3f}s") return needs_correction, drift def cleanup(self) -> int: """ 清理临时文件 返回: 清理的文件数量 """ cleaned = 0 for path in self._temp_files: try: if os.path.exists(path): os.remove(path) cleaned += 1 except Exception as e: logger.warning(f"清理临时文件失败 {path}: {e}") self._temp_files.clear() logger.info(f"清理了 {cleaned} 个临时文件") return cleaned @property def temp_files(self) -> List[str]: """获取当前跟踪的临时文件列表""" return self._temp_files.copy()