Spaces:
Running
Running
| """ | |
| 流式异步处理模块 | |
| 实现完整的流式配音处理流程: | |
| 扩展上传音频 → 语音识别 (Whisper V3 带时间戳) → 带时间戳分段异步处理 | |
| → 翻译 + 角色识别 (Llama 3) → 语音合成 (Edge-TTS / SiliconFlow) | |
| → 音频同步对齐 (时间戳匹配) → 配音音频分段输出 | |
| 支持: | |
| - 流式 SSE 输出 | |
| - 分段并行处理 | |
| - 第一段完成即可开始播放 | |
| """ | |
| import os | |
| import asyncio | |
| import logging | |
| import time | |
| import uuid | |
| import base64 | |
| from typing import Dict, Any, Optional, List, AsyncGenerator | |
| from dataclasses import dataclass, field | |
| from enum import Enum | |
| from .groq_client import GroqClient, GroqConfig, GroqError | |
| from .siliconflow_client import SiliconFlowClient, SiliconFlowConfig | |
| from .tts_generator import TTSGenerator, TTSConfig | |
| from .audio_sync import AudioSyncEngine, SyncConfig | |
| from .logging_config import get_component_logger, Component | |
| # 使用统一的日志配置 | |
| logger = get_component_logger(Component.STREAM) | |
| class ProcessingStage(Enum): | |
| """处理阶段枚举""" | |
| INIT = "init" | |
| ASR = "asr" | |
| TRANSLATE = "translate" | |
| TTS = "tts" | |
| SYNC = "sync" | |
| COMPLETE = "complete" | |
| ERROR = "error" | |
| class StreamConfig: | |
| """ | |
| 流式处理配置 | |
| 属性: | |
| temp_dir: 临时文件目录 | |
| max_segment_duration: 每段最大时长(秒) | |
| first_segment_target: 首段目标时长(秒),用于快速开始播放 | |
| parallel_tts: 是否并行生成TTS | |
| max_tts_workers: TTS最大并发数 | |
| parallel_groups: 是否并行处理多个分组 | |
| max_group_workers: 分组最大并发数 | |
| audio_chunk_duration: 音频切片时长(秒),用于 ASR 分段处理 | |
| enable_audio_chunking: 是否启用音频切片(大文件自动启用) | |
| audio_chunk_threshold_mb: 启用切片的文件大小阈值(MB) | |
| """ | |
| temp_dir: str = "temp/stream" | |
| max_segment_duration: float = 60.0 # 缩短到1分钟,更快输出 | |
| first_segment_target: float = 10.0 # 首段10秒,极速开始 | |
| parallel_tts: bool = True | |
| max_tts_workers: int = 8 # Edge-TTS 无限制,提高并发 | |
| parallel_groups: bool = True # 并行处理分组 | |
| max_group_workers: int = 1 # 降到1,避免 Groq LLM 限流(6000 TPM) | |
| audio_chunk_duration: float = 120.0 # 每个切片 2 分钟 | |
| enable_audio_chunking: bool = True # 启用音频切片 | |
| audio_chunk_threshold_mb: float = 5.0 # 超过 5MB 自动切片 | |
| segment_processing_delay: float = 10.0 # 分段处理间隔(秒),避免 API 限流 | |
| first_segment_priority: bool = True # 首段优先处理,无延迟 | |
| class SegmentData: | |
| """ | |
| 分段数据 | |
| 属性: | |
| index: 分段索引 | |
| start_time: 开始时间(秒) | |
| end_time: 结束时间(秒) | |
| original_text: 原文 | |
| translated_text: 译文 | |
| role: 角色标签 | |
| audio_path: 生成的音频路径 | |
| audio_data: 音频二进制数据 | |
| """ | |
| index: int | |
| start_time: float | |
| end_time: float | |
| original_text: str = "" | |
| translated_text: str = "" | |
| role: str = "MALE" | |
| audio_path: Optional[str] = None | |
| audio_data: Optional[bytes] = None | |
| class StreamProcessor: | |
| """ | |
| 流式异步处理器 | |
| 实现完整的配音处理流程,支持流式输出。 | |
| 使用示例: | |
| processor = StreamProcessor() | |
| await processor.initialize() | |
| async for update in processor.process_stream(audio_path, config): | |
| if update['type'] == 'segment_ready': | |
| # 播放分段音频 | |
| play_audio(update['audio_data']) | |
| """ | |
| def __init__( | |
| self, | |
| config: Optional[StreamConfig] = None, | |
| groq_config: Optional[GroqConfig] = None, | |
| siliconflow_config: Optional[SiliconFlowConfig] = None | |
| ): | |
| """ | |
| 初始化流式处理器 | |
| 参数: | |
| config: 流式处理配置 | |
| groq_config: Groq 客户端配置 | |
| siliconflow_config: SiliconFlow 客户端配置 | |
| """ | |
| self.config = config or StreamConfig() | |
| self._groq_config = groq_config | |
| self._siliconflow_config = siliconflow_config | |
| # 子模块 | |
| self.groq_client: Optional[GroqClient] = None | |
| self.siliconflow_client: Optional[SiliconFlowClient] = None | |
| self.tts_generator: Optional[TTSGenerator] = None | |
| self.audio_sync: Optional[AudioSyncEngine] = None | |
| self._initialized = False | |
| self._temp_files: List[str] = [] | |
| # 确保临时目录存在 | |
| os.makedirs(self.config.temp_dir, exist_ok=True) | |
| logger.info("流式处理器配置完成") | |
| async def initialize(self) -> None: | |
| """初始化所有子模块""" | |
| if self._initialized: | |
| return | |
| logger.info("初始化流式处理器...") | |
| # 初始化 Groq 客户端(ASR + LLM) | |
| if self._groq_config: | |
| self.groq_client = GroqClient(self._groq_config) | |
| await self.groq_client.initialize() | |
| # 初始化 SiliconFlow 客户端(可选的 ASR + TTS) | |
| if self._siliconflow_config: | |
| self.siliconflow_client = SiliconFlowClient(self._siliconflow_config) | |
| await self.siliconflow_client.initialize() | |
| # 初始化 TTS 生成器 | |
| tts_config = TTSConfig(temp_dir=os.path.join(self.config.temp_dir, "tts")) | |
| self.tts_generator = TTSGenerator(tts_config) | |
| # 初始化音频同步引擎 | |
| sync_config = SyncConfig(temp_dir=os.path.join(self.config.temp_dir, "sync")) | |
| self.audio_sync = AudioSyncEngine(sync_config) | |
| self._initialized = True | |
| logger.info("流式处理器初始化完成") | |
| def _ensure_initialized(self) -> None: | |
| """确保处理器已初始化""" | |
| if not self._initialized: | |
| raise RuntimeError("流式处理器未初始化,请先调用 initialize()") | |
| async def _split_audio_by_time( | |
| self, | |
| audio_path: str, | |
| chunk_duration: float = 120.0 | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| 按时间切分音频文件 | |
| 使用 ffmpeg 将大音频文件切分为多个小片段, | |
| 每个片段独立进行 ASR 处理,然后合并结果。 | |
| 参数: | |
| audio_path: 原始音频文件路径 | |
| chunk_duration: 每个切片的时长(秒),默认 120 秒 | |
| 返回: | |
| 切片信息列表,每个元素包含: | |
| - path: 切片文件路径 | |
| - start_time: 切片在原音频中的开始时间 | |
| - duration: 切片时长 | |
| """ | |
| import subprocess | |
| # 获取音频总时长 | |
| try: | |
| result = subprocess.run( | |
| [ | |
| 'ffprobe', '-v', 'error', | |
| '-show_entries', 'format=duration', | |
| '-of', 'default=noprint_wrappers=1:nokey=1', | |
| audio_path | |
| ], | |
| capture_output=True, | |
| text=True, | |
| timeout=30 | |
| ) | |
| total_duration = float(result.stdout.strip()) | |
| logger.info(f"音频总时长: {total_duration:.1f}秒") | |
| except Exception as e: | |
| logger.warning(f"无法获取音频时长,使用默认处理: {e}") | |
| # 如果无法获取时长,返回原文件 | |
| return [{'path': audio_path, 'start_time': 0, 'duration': 0, 'is_original': True}] | |
| # 如果音频时长小于切片时长,不需要切分 | |
| if total_duration <= chunk_duration: | |
| logger.info(f"音频时长 {total_duration:.1f}s <= 切片时长 {chunk_duration}s,无需切分") | |
| return [{'path': audio_path, 'start_time': 0, 'duration': total_duration, 'is_original': True}] | |
| # 计算切片数量 | |
| num_chunks = int(total_duration / chunk_duration) + 1 | |
| logger.info(f"将音频切分为 {num_chunks} 个片段,每段 {chunk_duration}秒") | |
| chunks = [] | |
| chunk_dir = os.path.join(self.config.temp_dir, "chunks") | |
| os.makedirs(chunk_dir, exist_ok=True) | |
| for i in range(num_chunks): | |
| start_time = i * chunk_duration | |
| # 最后一个切片可能更短 | |
| actual_duration = min(chunk_duration, total_duration - start_time) | |
| if actual_duration <= 0: | |
| break | |
| chunk_path = os.path.join(chunk_dir, f"chunk_{uuid.uuid4().hex[:8]}_{i}.m4a") | |
| try: | |
| # 使用 ffmpeg 切分音频 | |
| cmd = [ | |
| 'ffmpeg', '-y', | |
| '-i', audio_path, | |
| '-ss', str(start_time), | |
| '-t', str(actual_duration), | |
| '-c', 'copy', # 直接复制,不重新编码,速度快 | |
| '-loglevel', 'error', | |
| chunk_path | |
| ] | |
| subprocess.run(cmd, check=True, timeout=60) | |
| chunks.append({ | |
| 'path': chunk_path, | |
| 'start_time': start_time, | |
| 'duration': actual_duration, | |
| 'index': i, | |
| 'is_original': False | |
| }) | |
| self._temp_files.append(chunk_path) | |
| logger.info(f"切片 {i+1}/{num_chunks}: {start_time:.1f}s - {start_time + actual_duration:.1f}s") | |
| except subprocess.TimeoutExpired: | |
| logger.error(f"切片 {i} 超时") | |
| continue | |
| except subprocess.CalledProcessError as e: | |
| logger.error(f"切片 {i} 失败: {e}") | |
| continue | |
| if not chunks: | |
| logger.warning("音频切分失败,使用原文件") | |
| return [{'path': audio_path, 'start_time': 0, 'duration': total_duration, 'is_original': True}] | |
| logger.info(f"音频切分完成: {len(chunks)} 个切片") | |
| return chunks | |
| async def _do_asr_with_chunking( | |
| self, | |
| audio_path: str, | |
| client_config: Optional[Dict[str, Any]] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| 带切片的 ASR 处理 | |
| 对大文件进行切片,并行处理每个切片,然后合并结果。 | |
| 时间戳会根据切片的起始时间进行调整。 | |
| 参数: | |
| audio_path: 音频文件路径 | |
| client_config: 客户端配置 | |
| 返回: | |
| 合并后的 ASR 结果 | |
| """ | |
| file_size_mb = os.path.getsize(audio_path) / (1024 * 1024) | |
| # 判断是否需要切片 | |
| need_chunking = ( | |
| self.config.enable_audio_chunking and | |
| file_size_mb > self.config.audio_chunk_threshold_mb | |
| ) | |
| if not need_chunking: | |
| logger.info(f"文件大小 {file_size_mb:.2f}MB <= {self.config.audio_chunk_threshold_mb}MB,直接处理") | |
| return await self._do_asr(audio_path, client_config) | |
| logger.info(f"文件大小 {file_size_mb:.2f}MB > {self.config.audio_chunk_threshold_mb}MB,启用切片处理") | |
| # 切分音频 | |
| chunks = await self._split_audio_by_time( | |
| audio_path, | |
| self.config.audio_chunk_duration | |
| ) | |
| if len(chunks) == 1 and chunks[0].get('is_original'): | |
| # 不需要切分或切分失败,直接处理原文件 | |
| return await self._do_asr(audio_path, client_config) | |
| # 并行处理所有切片 | |
| logger.info(f"开始并行 ASR 处理 {len(chunks)} 个切片...") | |
| async def process_chunk(chunk: Dict) -> Dict[str, Any]: | |
| """处理单个切片""" | |
| try: | |
| result = await self._do_asr(chunk['path'], client_config) | |
| # 调整时间戳 | |
| for seg in result.get('segments', []): | |
| seg['start'] = seg.get('start', 0) + chunk['start_time'] | |
| seg['end'] = seg.get('end', 0) + chunk['start_time'] | |
| return { | |
| 'success': True, | |
| 'index': chunk['index'], | |
| 'result': result | |
| } | |
| except Exception as e: | |
| logger.error(f"切片 {chunk['index']} ASR 失败: {e}") | |
| return { | |
| 'success': False, | |
| 'index': chunk['index'], | |
| 'error': str(e) | |
| } | |
| # 并行执行(限制并发数为 2,避免 Groq 限流) | |
| semaphore = asyncio.Semaphore(2) | |
| async def limited_process(chunk: Dict) -> Dict[str, Any]: | |
| async with semaphore: | |
| return await process_chunk(chunk) | |
| tasks = [limited_process(chunk) for chunk in chunks] | |
| results = await asyncio.gather(*tasks) | |
| # 合并结果 | |
| all_segments = [] | |
| detected_language = 'unknown' | |
| total_duration = 0 | |
| # 按索引排序结果 | |
| sorted_results = sorted( | |
| [r for r in results if r['success']], | |
| key=lambda x: x['index'] | |
| ) | |
| for r in sorted_results: | |
| result = r['result'] | |
| all_segments.extend(result.get('segments', [])) | |
| if detected_language == 'unknown': | |
| detected_language = result.get('language', 'unknown') | |
| total_duration = max(total_duration, result.get('duration', 0)) | |
| # 重新计算总时长 | |
| if all_segments: | |
| total_duration = all_segments[-1].get('end', 0) | |
| logger.info(f"ASR 切片处理完成: {len(all_segments)} 个片段, 总时长 {total_duration:.1f}s") | |
| return { | |
| 'text': ' '.join(seg.get('text', '') for seg in all_segments), | |
| 'language': detected_language, | |
| 'segments': all_segments, | |
| 'duration': total_duration | |
| } | |
| async def process_stream( | |
| self, | |
| audio_path: str, | |
| client_config: Optional[Dict[str, Any]] = None | |
| ) -> AsyncGenerator[Dict[str, Any], None]: | |
| """ | |
| 流式处理音频文件(激进并行优化版) | |
| 优化策略: | |
| 1. ASR 完成后立即开始处理首段 | |
| 2. 多个分组并行处理(翻译+TTS+同步) | |
| 3. 按完成顺序流式输出,不等待顺序 | |
| 4. 首段优先,15秒内开始播放 | |
| 参数: | |
| audio_path: 音频文件路径 | |
| client_config: 客户端配置 | |
| 生成: | |
| 处理更新字典 | |
| """ | |
| # 🚨 强制日志 - 如果看不到这个,说明代码没有更新 | |
| print("🚨🚨🚨 PROCESS_STREAM 函数被调用 🚨🚨🚨") | |
| logger.error("🚨🚨🚨 PROCESS_STREAM 函数被调用 🚨🚨🚨") | |
| self._ensure_initialized() | |
| logger.error("🚨 _ensure_initialized 通过") | |
| session_id = str(uuid.uuid4())[:8] | |
| start_time = time.time() | |
| logger.error(f"🚨 session_id: {session_id}") | |
| logger.info(f"[{session_id}] 开始流式处理(并行优化): {audio_path}") | |
| logger.info(f"[{session_id}] 客户端配置: {client_config}") | |
| try: | |
| logger.error(f"🚨 [{session_id}] 进入 try 块") | |
| # 检查音频文件 | |
| logger.info(f"[{session_id}] 检查音频文件: {audio_path}") | |
| if not os.path.exists(audio_path): | |
| logger.error(f"[{session_id}] 音频文件不存在: {audio_path}") | |
| yield {'type': 'error', 'message': f'音频文件不存在: {audio_path}'} | |
| return | |
| file_size = os.path.getsize(audio_path) / (1024 * 1024) | |
| logger.info(f"[{session_id}] 音频文件检查通过: {file_size:.2f}MB") | |
| # 1. ASR 语音识别 | |
| logger.info(f"[{session_id}] 开始 ASR 步骤") | |
| yield {'type': 'progress', 'progress': 5, 'message': '语音识别中...'} | |
| try: | |
| # 使用带切片的 ASR 处理(大文件自动切片) | |
| asr_result = await self._do_asr_with_chunking(audio_path, client_config) | |
| except Exception as e: | |
| logger.error(f"[{session_id}] ASR 失败: {e}") | |
| yield {'type': 'error', 'message': f'语音识别失败: {str(e)}'} | |
| return | |
| if not asr_result.get('segments'): | |
| yield {'type': 'error', 'message': '语音识别结果为空'} | |
| return | |
| source_language = asr_result.get('language', 'unknown') | |
| total_duration = asr_result.get('duration', 0) | |
| if total_duration == 0 and asr_result['segments']: | |
| total_duration = asr_result['segments'][-1].get('end', 0) | |
| logger.info(f"[{session_id}] ASR完成: {len(asr_result['segments'])} 片段, {total_duration:.1f}s") | |
| # 调试:打印 ASR 结果样本 | |
| if asr_result['segments']: | |
| sample = asr_result['segments'][0] | |
| logger.info(f"[{session_id}] ASR 样本: {sample}") | |
| yield { | |
| 'type': 'progress', | |
| 'progress': 20, | |
| 'message': f'识别完成,开始并行处理...' | |
| } | |
| # 2. 智能分组 | |
| segment_groups = self._group_segments_for_streaming( | |
| asr_result['segments'], | |
| total_duration | |
| ) | |
| if not segment_groups: | |
| yield {'type': 'error', 'message': '音频分段失败'} | |
| return | |
| total_groups = len(segment_groups) | |
| logger.info(f"[{session_id}] 分组完成: {total_groups} 组,开始智能流式处理") | |
| # 3. 智能流式处理:首段优先,后续段间隔处理 | |
| # 使用 asyncio.Queue 收集结果,按完成顺序输出 | |
| result_queue = asyncio.Queue() | |
| completed_count = [0] # 使用列表以便在闭包中修改 | |
| async def process_group_with_delay(group_index: int, group: List[Dict]) -> None: | |
| """处理单个分组(带智能延迟)""" | |
| group_start = group[0].get('start', 0) if group else 0 | |
| group_end = group[-1].get('end', 0) if group else 0 | |
| # 智能延迟策略:首段立即处理,后续段间隔处理 | |
| if group_index == 0: | |
| logger.info(f"[{session_id}] 🚀 首段优先处理,无延迟") | |
| else: | |
| delay = group_index * self.config.segment_processing_delay | |
| logger.info(f"[{session_id}] 分组 {group_index} 延迟 {delay}s 后开始处理") | |
| await asyncio.sleep(delay) | |
| try: | |
| logger.info(f"[{session_id}] 开始处理分组 {group_index} ({group_start:.1f}s-{group_end:.1f}s)") | |
| # 翻译 | |
| translated = await self._translate_segments(group, source_language, client_config) | |
| # TTS(并行生成所有片段) | |
| tts_results = await self._generate_tts_parallel(translated, client_config) | |
| # 同步 | |
| synced_audio = await self._sync_audio( | |
| tts_results, translated, | |
| group_end - group_start, client_config | |
| ) | |
| # 读取音频 | |
| audio_data = None | |
| if synced_audio and os.path.exists(synced_audio): | |
| with open(synced_audio, 'rb') as f: | |
| audio_data = f.read() | |
| await result_queue.put({ | |
| 'success': True, | |
| 'index': group_index, | |
| 'start_time': group_start, | |
| 'end_time': group_end, | |
| 'audio_data': audio_data, | |
| 'segments': translated | |
| }) | |
| logger.info(f"[{session_id}] ✅ 分组 {group_index} 处理完成") | |
| except Exception as e: | |
| logger.error(f"[{session_id}] 分组 {group_index} 失败: {e}") | |
| await result_queue.put({ | |
| 'success': False, | |
| 'index': group_index, | |
| 'error': str(e) | |
| }) | |
| completed_count[0] += 1 | |
| # 启动所有分组的智能流式处理(不使用 semaphore 限制,让延迟自然控制) | |
| tasks = [] | |
| for i, group in enumerate(segment_groups): | |
| task = asyncio.create_task(process_group_with_delay(i, group)) | |
| tasks.append(task) | |
| # 4. 流式输出结果(按完成顺序) | |
| processed_groups = 0 | |
| while processed_groups < total_groups: | |
| try: | |
| # 等待任意一个分组完成,超时1秒检查进度 | |
| result = await asyncio.wait_for(result_queue.get(), timeout=1.0) | |
| if result['success'] and result.get('audio_data'): | |
| processed_groups += 1 | |
| progress = 20 + (processed_groups / total_groups) * 75 | |
| yield { | |
| 'type': 'segment_ready', | |
| 'index': result['index'], | |
| 'start_time': result['start_time'], | |
| 'end_time': result['end_time'], | |
| 'duration': result['end_time'] - result['start_time'], | |
| 'audio_data': base64.b64encode(result['audio_data']).decode('utf-8'), | |
| 'segments': [ | |
| { | |
| 'original': seg.get('text', ''), | |
| 'translated': seg.get('cn', ''), | |
| 'role': seg.get('role', 'MALE'), | |
| 'start': seg.get('start', 0), | |
| 'end': seg.get('end', 0) | |
| } | |
| for seg in result['segments'] | |
| ] | |
| } | |
| logger.info(f"[{session_id}] 🎵 分组 {result['index']} 音频已输出 ({processed_groups}/{total_groups})") | |
| else: | |
| processed_groups += 1 | |
| logger.warning(f"[{session_id}] 分组 {result['index']} 无输出") | |
| except asyncio.TimeoutError: | |
| # 发送进度更新 | |
| progress = 20 + (completed_count[0] / total_groups) * 75 | |
| yield { | |
| 'type': 'progress', | |
| 'progress': progress, | |
| 'message': f'智能流式处理中 {completed_count[0]}/{total_groups}...' | |
| } | |
| # 等待所有任务完成 | |
| await asyncio.gather(*tasks, return_exceptions=True) | |
| # 5. 完成 | |
| processing_time = time.time() - start_time | |
| yield { | |
| 'type': 'complete', | |
| 'total_segments': processed_groups, | |
| 'total_duration': total_duration, | |
| 'processing_time': processing_time, | |
| 'source_language': source_language | |
| } | |
| logger.info(f"[{session_id}] 处理完成: {processed_groups} 段, 耗时 {processing_time:.1f}s") | |
| except Exception as e: | |
| logger.error(f"[{session_id}] 处理失败: {e}", exc_info=True) | |
| yield {'type': 'error', 'message': str(e)} | |
| async def _generate_tts_parallel( | |
| self, | |
| segments: List[Dict[str, Any]], | |
| client_config: Optional[Dict[str, Any]] = None | |
| ) -> List[Optional[str]]: | |
| """ | |
| 并行生成 TTS 音频(激进优化) | |
| 同时处理所有片段,不等待顺序 | |
| """ | |
| tts_provider = 'edge-tts' | |
| if client_config: | |
| tts_provider = client_config.get('ttsProvider', 'edge-tts') | |
| async def generate_single(seg: Dict, idx: int) -> tuple: | |
| """生成单个片段的TTS""" | |
| text = seg.get('cn', '') | |
| if not text: | |
| return (idx, None) | |
| role = seg.get('role', 'MALE') | |
| output_path = os.path.join( | |
| self.config.temp_dir, | |
| f"tts_{uuid.uuid4().hex[:8]}.mp3" | |
| ) | |
| try: | |
| if tts_provider == 'siliconflow' and self.siliconflow_client: | |
| await self.siliconflow_client.synthesize(text, role, output_path) | |
| else: | |
| # Edge-TTS | |
| await self.tts_generator.generate_single(text, role, output_path, client_config) | |
| self._temp_files.append(output_path) | |
| return (idx, output_path) | |
| except Exception as e: | |
| logger.warning(f"TTS 生成失败 [{idx}]: {e}") | |
| return (idx, None) | |
| # 并行生成所有片段 | |
| semaphore = asyncio.Semaphore(self.config.max_tts_workers) | |
| async def limited_generate(seg: Dict, idx: int) -> tuple: | |
| async with semaphore: | |
| return await generate_single(seg, idx) | |
| tasks = [limited_generate(seg, i) for i, seg in enumerate(segments)] | |
| results = await asyncio.gather(*tasks) | |
| # 按索引排序结果 | |
| sorted_results = sorted(results, key=lambda x: x[0]) | |
| return [r[1] for r in sorted_results] | |
| async def _do_asr( | |
| self, | |
| audio_path: str, | |
| client_config: Optional[Dict[str, Any]] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| 执行语音识别 | |
| 根据配置选择 Groq Whisper 或 SiliconFlow SenseVoice | |
| 如果两者都不可用,返回错误 | |
| """ | |
| # 获取 ASR 提供商配置 | |
| asr_provider = 'groq' # 默认使用 Groq | |
| if client_config: | |
| asr_provider = client_config.get('asrProvider', 'groq') | |
| # 调试:打印客户端状态 | |
| logger.info(f"[ASR] 配置: provider={asr_provider}, groq_client={self.groq_client is not None}, siliconflow_client={self.siliconflow_client is not None}") | |
| # 尝试使用配置的提供商 | |
| if asr_provider == 'siliconflow' and self.siliconflow_client: | |
| try: | |
| logger.info("[ASR] 使用 SiliconFlow SenseVoice 进行语音识别") | |
| return await self.siliconflow_client.transcribe(audio_path) | |
| except Exception as e: | |
| logger.warning(f"[ASR] SiliconFlow ASR 失败: {e},尝试回退到 Groq") | |
| if self.groq_client: | |
| return await self.groq_client.transcribe(audio_path) | |
| raise | |
| if self.groq_client: | |
| try: | |
| logger.info("[ASR] ✓ 使用 Groq Whisper V3 进行语音识别(带时间戳)") | |
| result = await self.groq_client.transcribe(audio_path) | |
| # 调试:打印 ASR 结果摘要 | |
| if result.get('segments'): | |
| first_seg = result['segments'][0] | |
| logger.info(f"[ASR] 结果: {len(result['segments'])} 片段, 首段时间戳: {first_seg.get('start', 'N/A')}-{first_seg.get('end', 'N/A')}") | |
| return result | |
| except Exception as e: | |
| logger.warning(f"[ASR] Groq ASR 失败: {e},尝试回退到 SiliconFlow") | |
| if self.siliconflow_client: | |
| return await self.siliconflow_client.transcribe(audio_path) | |
| raise | |
| if self.siliconflow_client: | |
| logger.warning("[ASR] ⚠ 使用 SiliconFlow SenseVoice(无时间戳,可能影响分组)") | |
| return await self.siliconflow_client.transcribe(audio_path) | |
| raise RuntimeError( | |
| "没有可用的 ASR 服务。请配置 GROQ_API_KEY 或 SILICONFLOW_API_KEY 环境变量。" | |
| ) | |
| def _group_segments_for_streaming( | |
| self, | |
| segments: List[Dict[str, Any]], | |
| total_duration: float | |
| ) -> List[List[Dict[str, Any]]]: | |
| """ | |
| 将 ASR 片段分组用于流式处理 | |
| 策略: | |
| 1. 首段尽量短(30秒内),快速开始播放 | |
| 2. 后续段按最大时长分组 | |
| 3. 尽量在句子边界分割 | |
| """ | |
| if not segments: | |
| return [] | |
| groups = [] | |
| current_group = [] | |
| current_duration = 0 | |
| is_first_group = True | |
| # 首段目标时长 | |
| target_duration = self.config.first_segment_target if is_first_group else self.config.max_segment_duration | |
| for seg in segments: | |
| seg_start = seg.get('start', 0) | |
| seg_end = seg.get('end', 0) | |
| seg_duration = seg_end - seg_start | |
| # 检查是否需要开始新组 | |
| if current_group and current_duration + seg_duration > target_duration: | |
| groups.append(current_group) | |
| current_group = [] | |
| current_duration = 0 | |
| is_first_group = False | |
| target_duration = self.config.max_segment_duration | |
| current_group.append(seg) | |
| current_duration += seg_duration | |
| # 添加最后一组 | |
| if current_group: | |
| groups.append(current_group) | |
| return groups | |
| async def _translate_segments( | |
| self, | |
| segments: List[Dict[str, Any]], | |
| source_language: str, | |
| client_config: Optional[Dict[str, Any]] = None | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| 翻译片段并识别角色 | |
| """ | |
| logger.info(f"🔥 _translate_segments 被调用: {len(segments)} 个片段, 源语言: {source_language}") | |
| # 调试:打印输入片段 | |
| logger.info(f"翻译输入: {len(segments)} 个片段") | |
| for i, seg in enumerate(segments[:3]): # 只打印前3个 | |
| logger.debug(f" 片段 {i}: {seg.get('text', '')[:50]}...") | |
| if not self.groq_client: | |
| # 如果没有 Groq 客户端,直接使用原文作为翻译结果 | |
| logger.warning("Groq 客户端未初始化,使用原文") | |
| return [ | |
| { | |
| **seg, | |
| 'cn': seg.get('text', ''), | |
| 'role': 'MALE' | |
| } | |
| for seg in segments | |
| ] | |
| # 合并文本进行翻译 | |
| text = ' '.join(seg.get('text', '') for seg in segments) | |
| if not text.strip(): | |
| logger.warning("翻译输入为空,使用原文") | |
| return [ | |
| { | |
| **seg, | |
| 'cn': seg.get('text', ''), | |
| 'role': 'MALE' | |
| } | |
| for seg in segments | |
| ] | |
| logger.info(f"调用 Groq 翻译: {len(text)} 字符") | |
| translation_result = await self.groq_client.translate( | |
| text, | |
| source_language, | |
| segments | |
| ) | |
| # 合并翻译结果到原始片段 | |
| translated_segments = translation_result.get('segments', []) | |
| logger.info(f"翻译结果: {len(translated_segments)} 个片段") | |
| # 调试:检查空翻译 | |
| empty_count = sum(1 for seg in translated_segments if not seg.get('cn', '').strip()) | |
| if empty_count > 0: | |
| logger.warning(f"翻译结果中有 {empty_count} 个片段的中文为空") | |
| for i, seg in enumerate(translated_segments[:5]): # 打印前5个 | |
| logger.debug(f" 片段 {i}: cn='{seg.get('cn', '')}', original='{seg.get('original', '')[:30]}'") | |
| result = [] | |
| for i, seg in enumerate(segments): | |
| translated = translated_segments[i] if i < len(translated_segments) else {} | |
| cn_text = translated.get('cn', seg.get('text', '')).strip() | |
| # 如果翻译为空,使用原文 | |
| if not cn_text: | |
| cn_text = seg.get('text', '').strip() | |
| if cn_text: | |
| logger.warning(f"片段 {i} 翻译为空,使用原文: {cn_text[:30]}") | |
| result.append({ | |
| **seg, | |
| 'cn': cn_text, | |
| 'role': translated.get('role', 'MALE') | |
| }) | |
| return result | |
| async def _generate_tts( | |
| self, | |
| segments: List[Dict[str, Any]], | |
| client_config: Optional[Dict[str, Any]] = None | |
| ) -> List[Optional[str]]: | |
| """ | |
| 生成 TTS 音频 | |
| 根据配置选择 Edge-TTS 或 SiliconFlow TTS | |
| """ | |
| # 获取 TTS 提供商配置 | |
| tts_provider = 'edge-tts' # 默认使用 Edge-TTS | |
| if client_config: | |
| tts_provider = client_config.get('ttsProvider', 'edge-tts') | |
| # 准备 TTS 输入 | |
| tts_segments = [ | |
| { | |
| 'cn': seg.get('cn', ''), | |
| 'role': seg.get('role', 'MALE') | |
| } | |
| for seg in segments | |
| ] | |
| if tts_provider == 'siliconflow' and self.siliconflow_client: | |
| logger.info("使用 SiliconFlow TTS 生成配音") | |
| # 使用 SiliconFlow TTS | |
| results = [] | |
| for seg in tts_segments: | |
| if seg['cn']: | |
| output_path = os.path.join( | |
| self.config.temp_dir, | |
| f"tts_{uuid.uuid4().hex[:8]}.mp3" | |
| ) | |
| await self.siliconflow_client.synthesize( | |
| seg['cn'], | |
| seg['role'], | |
| output_path | |
| ) | |
| results.append(output_path) | |
| self._temp_files.append(output_path) | |
| else: | |
| results.append(None) | |
| return results | |
| else: | |
| logger.info("使用 Edge-TTS 生成配音") | |
| # 使用 Edge-TTS | |
| return await self.tts_generator.generate(tts_segments, client_config) | |
| async def _sync_audio( | |
| self, | |
| tts_paths: List[Optional[str]], | |
| segments: List[Dict[str, Any]], | |
| target_duration: float, | |
| client_config: Optional[Dict[str, Any]] = None | |
| ) -> Optional[str]: | |
| """ | |
| 同步音频到时间戳 | |
| """ | |
| # 过滤有效的 TTS 路径 | |
| valid_paths = [] | |
| valid_segments = [] | |
| for i, (path, seg) in enumerate(zip(tts_paths, segments)): | |
| if path and os.path.exists(path): | |
| valid_paths.append(path) | |
| valid_segments.append({ | |
| 'start': seg.get('start', 0), | |
| 'end': seg.get('end', 0) | |
| }) | |
| if not valid_paths: | |
| logger.warning("没有有效的 TTS 音频") | |
| return None | |
| # 调用音频同步引擎 | |
| return await self.audio_sync.align( | |
| valid_paths, | |
| valid_segments, | |
| target_duration, | |
| client_config | |
| ) | |
| def cleanup(self) -> int: | |
| """清理临时文件""" | |
| cleaned = 0 | |
| for path in self._temp_files: | |
| try: | |
| if os.path.exists(path): | |
| os.remove(path) | |
| cleaned += 1 | |
| except Exception as e: | |
| logger.warning(f"清理临时文件失败 {path}: {e}") | |
| self._temp_files.clear() | |
| if self.tts_generator: | |
| cleaned += self.tts_generator.cleanup() | |
| if self.audio_sync: | |
| cleaned += self.audio_sync.cleanup() | |
| logger.info(f"流式处理器清理完成: {cleaned} 个文件") | |
| return cleaned | |
| def is_initialized(self) -> bool: | |
| """检查处理器是否已初始化""" | |
| return self._initialized | |