cytopa99's picture
Upload 44 files
444c616 verified
"""
流式异步处理模块
实现完整的流式配音处理流程:
扩展上传音频 → 语音识别 (Whisper V3 带时间戳) → 带时间戳分段异步处理
→ 翻译 + 角色识别 (Llama 3) → 语音合成 (Edge-TTS / SiliconFlow)
→ 音频同步对齐 (时间戳匹配) → 配音音频分段输出
支持:
- 流式 SSE 输出
- 分段并行处理
- 第一段完成即可开始播放
"""
import os
import asyncio
import logging
import time
import uuid
import base64
from typing import Dict, Any, Optional, List, AsyncGenerator
from dataclasses import dataclass, field
from enum import Enum
from .groq_client import GroqClient, GroqConfig, GroqError
from .siliconflow_client import SiliconFlowClient, SiliconFlowConfig
from .tts_generator import TTSGenerator, TTSConfig
from .audio_sync import AudioSyncEngine, SyncConfig
from .logging_config import get_component_logger, Component
# 使用统一的日志配置
logger = get_component_logger(Component.STREAM)
class ProcessingStage(Enum):
"""处理阶段枚举"""
INIT = "init"
ASR = "asr"
TRANSLATE = "translate"
TTS = "tts"
SYNC = "sync"
COMPLETE = "complete"
ERROR = "error"
@dataclass
class StreamConfig:
"""
流式处理配置
属性:
temp_dir: 临时文件目录
max_segment_duration: 每段最大时长(秒)
first_segment_target: 首段目标时长(秒),用于快速开始播放
parallel_tts: 是否并行生成TTS
max_tts_workers: TTS最大并发数
parallel_groups: 是否并行处理多个分组
max_group_workers: 分组最大并发数
audio_chunk_duration: 音频切片时长(秒),用于 ASR 分段处理
enable_audio_chunking: 是否启用音频切片(大文件自动启用)
audio_chunk_threshold_mb: 启用切片的文件大小阈值(MB)
"""
temp_dir: str = "temp/stream"
max_segment_duration: float = 60.0 # 缩短到1分钟,更快输出
first_segment_target: float = 10.0 # 首段10秒,极速开始
parallel_tts: bool = True
max_tts_workers: int = 8 # Edge-TTS 无限制,提高并发
parallel_groups: bool = True # 并行处理分组
max_group_workers: int = 1 # 降到1,避免 Groq LLM 限流(6000 TPM)
audio_chunk_duration: float = 120.0 # 每个切片 2 分钟
enable_audio_chunking: bool = True # 启用音频切片
audio_chunk_threshold_mb: float = 5.0 # 超过 5MB 自动切片
segment_processing_delay: float = 10.0 # 分段处理间隔(秒),避免 API 限流
first_segment_priority: bool = True # 首段优先处理,无延迟
@dataclass
class SegmentData:
"""
分段数据
属性:
index: 分段索引
start_time: 开始时间(秒)
end_time: 结束时间(秒)
original_text: 原文
translated_text: 译文
role: 角色标签
audio_path: 生成的音频路径
audio_data: 音频二进制数据
"""
index: int
start_time: float
end_time: float
original_text: str = ""
translated_text: str = ""
role: str = "MALE"
audio_path: Optional[str] = None
audio_data: Optional[bytes] = None
class StreamProcessor:
"""
流式异步处理器
实现完整的配音处理流程,支持流式输出。
使用示例:
processor = StreamProcessor()
await processor.initialize()
async for update in processor.process_stream(audio_path, config):
if update['type'] == 'segment_ready':
# 播放分段音频
play_audio(update['audio_data'])
"""
def __init__(
self,
config: Optional[StreamConfig] = None,
groq_config: Optional[GroqConfig] = None,
siliconflow_config: Optional[SiliconFlowConfig] = None
):
"""
初始化流式处理器
参数:
config: 流式处理配置
groq_config: Groq 客户端配置
siliconflow_config: SiliconFlow 客户端配置
"""
self.config = config or StreamConfig()
self._groq_config = groq_config
self._siliconflow_config = siliconflow_config
# 子模块
self.groq_client: Optional[GroqClient] = None
self.siliconflow_client: Optional[SiliconFlowClient] = None
self.tts_generator: Optional[TTSGenerator] = None
self.audio_sync: Optional[AudioSyncEngine] = None
self._initialized = False
self._temp_files: List[str] = []
# 确保临时目录存在
os.makedirs(self.config.temp_dir, exist_ok=True)
logger.info("流式处理器配置完成")
async def initialize(self) -> None:
"""初始化所有子模块"""
if self._initialized:
return
logger.info("初始化流式处理器...")
# 初始化 Groq 客户端(ASR + LLM)
if self._groq_config:
self.groq_client = GroqClient(self._groq_config)
await self.groq_client.initialize()
# 初始化 SiliconFlow 客户端(可选的 ASR + TTS)
if self._siliconflow_config:
self.siliconflow_client = SiliconFlowClient(self._siliconflow_config)
await self.siliconflow_client.initialize()
# 初始化 TTS 生成器
tts_config = TTSConfig(temp_dir=os.path.join(self.config.temp_dir, "tts"))
self.tts_generator = TTSGenerator(tts_config)
# 初始化音频同步引擎
sync_config = SyncConfig(temp_dir=os.path.join(self.config.temp_dir, "sync"))
self.audio_sync = AudioSyncEngine(sync_config)
self._initialized = True
logger.info("流式处理器初始化完成")
def _ensure_initialized(self) -> None:
"""确保处理器已初始化"""
if not self._initialized:
raise RuntimeError("流式处理器未初始化,请先调用 initialize()")
async def _split_audio_by_time(
self,
audio_path: str,
chunk_duration: float = 120.0
) -> List[Dict[str, Any]]:
"""
按时间切分音频文件
使用 ffmpeg 将大音频文件切分为多个小片段,
每个片段独立进行 ASR 处理,然后合并结果。
参数:
audio_path: 原始音频文件路径
chunk_duration: 每个切片的时长(秒),默认 120 秒
返回:
切片信息列表,每个元素包含:
- path: 切片文件路径
- start_time: 切片在原音频中的开始时间
- duration: 切片时长
"""
import subprocess
# 获取音频总时长
try:
result = subprocess.run(
[
'ffprobe', '-v', 'error',
'-show_entries', 'format=duration',
'-of', 'default=noprint_wrappers=1:nokey=1',
audio_path
],
capture_output=True,
text=True,
timeout=30
)
total_duration = float(result.stdout.strip())
logger.info(f"音频总时长: {total_duration:.1f}秒")
except Exception as e:
logger.warning(f"无法获取音频时长,使用默认处理: {e}")
# 如果无法获取时长,返回原文件
return [{'path': audio_path, 'start_time': 0, 'duration': 0, 'is_original': True}]
# 如果音频时长小于切片时长,不需要切分
if total_duration <= chunk_duration:
logger.info(f"音频时长 {total_duration:.1f}s <= 切片时长 {chunk_duration}s,无需切分")
return [{'path': audio_path, 'start_time': 0, 'duration': total_duration, 'is_original': True}]
# 计算切片数量
num_chunks = int(total_duration / chunk_duration) + 1
logger.info(f"将音频切分为 {num_chunks} 个片段,每段 {chunk_duration}秒")
chunks = []
chunk_dir = os.path.join(self.config.temp_dir, "chunks")
os.makedirs(chunk_dir, exist_ok=True)
for i in range(num_chunks):
start_time = i * chunk_duration
# 最后一个切片可能更短
actual_duration = min(chunk_duration, total_duration - start_time)
if actual_duration <= 0:
break
chunk_path = os.path.join(chunk_dir, f"chunk_{uuid.uuid4().hex[:8]}_{i}.m4a")
try:
# 使用 ffmpeg 切分音频
cmd = [
'ffmpeg', '-y',
'-i', audio_path,
'-ss', str(start_time),
'-t', str(actual_duration),
'-c', 'copy', # 直接复制,不重新编码,速度快
'-loglevel', 'error',
chunk_path
]
subprocess.run(cmd, check=True, timeout=60)
chunks.append({
'path': chunk_path,
'start_time': start_time,
'duration': actual_duration,
'index': i,
'is_original': False
})
self._temp_files.append(chunk_path)
logger.info(f"切片 {i+1}/{num_chunks}: {start_time:.1f}s - {start_time + actual_duration:.1f}s")
except subprocess.TimeoutExpired:
logger.error(f"切片 {i} 超时")
continue
except subprocess.CalledProcessError as e:
logger.error(f"切片 {i} 失败: {e}")
continue
if not chunks:
logger.warning("音频切分失败,使用原文件")
return [{'path': audio_path, 'start_time': 0, 'duration': total_duration, 'is_original': True}]
logger.info(f"音频切分完成: {len(chunks)} 个切片")
return chunks
async def _do_asr_with_chunking(
self,
audio_path: str,
client_config: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
带切片的 ASR 处理
对大文件进行切片,并行处理每个切片,然后合并结果。
时间戳会根据切片的起始时间进行调整。
参数:
audio_path: 音频文件路径
client_config: 客户端配置
返回:
合并后的 ASR 结果
"""
file_size_mb = os.path.getsize(audio_path) / (1024 * 1024)
# 判断是否需要切片
need_chunking = (
self.config.enable_audio_chunking and
file_size_mb > self.config.audio_chunk_threshold_mb
)
if not need_chunking:
logger.info(f"文件大小 {file_size_mb:.2f}MB <= {self.config.audio_chunk_threshold_mb}MB,直接处理")
return await self._do_asr(audio_path, client_config)
logger.info(f"文件大小 {file_size_mb:.2f}MB > {self.config.audio_chunk_threshold_mb}MB,启用切片处理")
# 切分音频
chunks = await self._split_audio_by_time(
audio_path,
self.config.audio_chunk_duration
)
if len(chunks) == 1 and chunks[0].get('is_original'):
# 不需要切分或切分失败,直接处理原文件
return await self._do_asr(audio_path, client_config)
# 并行处理所有切片
logger.info(f"开始并行 ASR 处理 {len(chunks)} 个切片...")
async def process_chunk(chunk: Dict) -> Dict[str, Any]:
"""处理单个切片"""
try:
result = await self._do_asr(chunk['path'], client_config)
# 调整时间戳
for seg in result.get('segments', []):
seg['start'] = seg.get('start', 0) + chunk['start_time']
seg['end'] = seg.get('end', 0) + chunk['start_time']
return {
'success': True,
'index': chunk['index'],
'result': result
}
except Exception as e:
logger.error(f"切片 {chunk['index']} ASR 失败: {e}")
return {
'success': False,
'index': chunk['index'],
'error': str(e)
}
# 并行执行(限制并发数为 2,避免 Groq 限流)
semaphore = asyncio.Semaphore(2)
async def limited_process(chunk: Dict) -> Dict[str, Any]:
async with semaphore:
return await process_chunk(chunk)
tasks = [limited_process(chunk) for chunk in chunks]
results = await asyncio.gather(*tasks)
# 合并结果
all_segments = []
detected_language = 'unknown'
total_duration = 0
# 按索引排序结果
sorted_results = sorted(
[r for r in results if r['success']],
key=lambda x: x['index']
)
for r in sorted_results:
result = r['result']
all_segments.extend(result.get('segments', []))
if detected_language == 'unknown':
detected_language = result.get('language', 'unknown')
total_duration = max(total_duration, result.get('duration', 0))
# 重新计算总时长
if all_segments:
total_duration = all_segments[-1].get('end', 0)
logger.info(f"ASR 切片处理完成: {len(all_segments)} 个片段, 总时长 {total_duration:.1f}s")
return {
'text': ' '.join(seg.get('text', '') for seg in all_segments),
'language': detected_language,
'segments': all_segments,
'duration': total_duration
}
async def process_stream(
self,
audio_path: str,
client_config: Optional[Dict[str, Any]] = None
) -> AsyncGenerator[Dict[str, Any], None]:
"""
流式处理音频文件(激进并行优化版)
优化策略:
1. ASR 完成后立即开始处理首段
2. 多个分组并行处理(翻译+TTS+同步)
3. 按完成顺序流式输出,不等待顺序
4. 首段优先,15秒内开始播放
参数:
audio_path: 音频文件路径
client_config: 客户端配置
生成:
处理更新字典
"""
# 🚨 强制日志 - 如果看不到这个,说明代码没有更新
print("🚨🚨🚨 PROCESS_STREAM 函数被调用 🚨🚨🚨")
logger.error("🚨🚨🚨 PROCESS_STREAM 函数被调用 🚨🚨🚨")
self._ensure_initialized()
logger.error("🚨 _ensure_initialized 通过")
session_id = str(uuid.uuid4())[:8]
start_time = time.time()
logger.error(f"🚨 session_id: {session_id}")
logger.info(f"[{session_id}] 开始流式处理(并行优化): {audio_path}")
logger.info(f"[{session_id}] 客户端配置: {client_config}")
try:
logger.error(f"🚨 [{session_id}] 进入 try 块")
# 检查音频文件
logger.info(f"[{session_id}] 检查音频文件: {audio_path}")
if not os.path.exists(audio_path):
logger.error(f"[{session_id}] 音频文件不存在: {audio_path}")
yield {'type': 'error', 'message': f'音频文件不存在: {audio_path}'}
return
file_size = os.path.getsize(audio_path) / (1024 * 1024)
logger.info(f"[{session_id}] 音频文件检查通过: {file_size:.2f}MB")
# 1. ASR 语音识别
logger.info(f"[{session_id}] 开始 ASR 步骤")
yield {'type': 'progress', 'progress': 5, 'message': '语音识别中...'}
try:
# 使用带切片的 ASR 处理(大文件自动切片)
asr_result = await self._do_asr_with_chunking(audio_path, client_config)
except Exception as e:
logger.error(f"[{session_id}] ASR 失败: {e}")
yield {'type': 'error', 'message': f'语音识别失败: {str(e)}'}
return
if not asr_result.get('segments'):
yield {'type': 'error', 'message': '语音识别结果为空'}
return
source_language = asr_result.get('language', 'unknown')
total_duration = asr_result.get('duration', 0)
if total_duration == 0 and asr_result['segments']:
total_duration = asr_result['segments'][-1].get('end', 0)
logger.info(f"[{session_id}] ASR完成: {len(asr_result['segments'])} 片段, {total_duration:.1f}s")
# 调试:打印 ASR 结果样本
if asr_result['segments']:
sample = asr_result['segments'][0]
logger.info(f"[{session_id}] ASR 样本: {sample}")
yield {
'type': 'progress',
'progress': 20,
'message': f'识别完成,开始并行处理...'
}
# 2. 智能分组
segment_groups = self._group_segments_for_streaming(
asr_result['segments'],
total_duration
)
if not segment_groups:
yield {'type': 'error', 'message': '音频分段失败'}
return
total_groups = len(segment_groups)
logger.info(f"[{session_id}] 分组完成: {total_groups} 组,开始智能流式处理")
# 3. 智能流式处理:首段优先,后续段间隔处理
# 使用 asyncio.Queue 收集结果,按完成顺序输出
result_queue = asyncio.Queue()
completed_count = [0] # 使用列表以便在闭包中修改
async def process_group_with_delay(group_index: int, group: List[Dict]) -> None:
"""处理单个分组(带智能延迟)"""
group_start = group[0].get('start', 0) if group else 0
group_end = group[-1].get('end', 0) if group else 0
# 智能延迟策略:首段立即处理,后续段间隔处理
if group_index == 0:
logger.info(f"[{session_id}] 🚀 首段优先处理,无延迟")
else:
delay = group_index * self.config.segment_processing_delay
logger.info(f"[{session_id}] 分组 {group_index} 延迟 {delay}s 后开始处理")
await asyncio.sleep(delay)
try:
logger.info(f"[{session_id}] 开始处理分组 {group_index} ({group_start:.1f}s-{group_end:.1f}s)")
# 翻译
translated = await self._translate_segments(group, source_language, client_config)
# TTS(并行生成所有片段)
tts_results = await self._generate_tts_parallel(translated, client_config)
# 同步
synced_audio = await self._sync_audio(
tts_results, translated,
group_end - group_start, client_config
)
# 读取音频
audio_data = None
if synced_audio and os.path.exists(synced_audio):
with open(synced_audio, 'rb') as f:
audio_data = f.read()
await result_queue.put({
'success': True,
'index': group_index,
'start_time': group_start,
'end_time': group_end,
'audio_data': audio_data,
'segments': translated
})
logger.info(f"[{session_id}] ✅ 分组 {group_index} 处理完成")
except Exception as e:
logger.error(f"[{session_id}] 分组 {group_index} 失败: {e}")
await result_queue.put({
'success': False,
'index': group_index,
'error': str(e)
})
completed_count[0] += 1
# 启动所有分组的智能流式处理(不使用 semaphore 限制,让延迟自然控制)
tasks = []
for i, group in enumerate(segment_groups):
task = asyncio.create_task(process_group_with_delay(i, group))
tasks.append(task)
# 4. 流式输出结果(按完成顺序)
processed_groups = 0
while processed_groups < total_groups:
try:
# 等待任意一个分组完成,超时1秒检查进度
result = await asyncio.wait_for(result_queue.get(), timeout=1.0)
if result['success'] and result.get('audio_data'):
processed_groups += 1
progress = 20 + (processed_groups / total_groups) * 75
yield {
'type': 'segment_ready',
'index': result['index'],
'start_time': result['start_time'],
'end_time': result['end_time'],
'duration': result['end_time'] - result['start_time'],
'audio_data': base64.b64encode(result['audio_data']).decode('utf-8'),
'segments': [
{
'original': seg.get('text', ''),
'translated': seg.get('cn', ''),
'role': seg.get('role', 'MALE'),
'start': seg.get('start', 0),
'end': seg.get('end', 0)
}
for seg in result['segments']
]
}
logger.info(f"[{session_id}] 🎵 分组 {result['index']} 音频已输出 ({processed_groups}/{total_groups})")
else:
processed_groups += 1
logger.warning(f"[{session_id}] 分组 {result['index']} 无输出")
except asyncio.TimeoutError:
# 发送进度更新
progress = 20 + (completed_count[0] / total_groups) * 75
yield {
'type': 'progress',
'progress': progress,
'message': f'智能流式处理中 {completed_count[0]}/{total_groups}...'
}
# 等待所有任务完成
await asyncio.gather(*tasks, return_exceptions=True)
# 5. 完成
processing_time = time.time() - start_time
yield {
'type': 'complete',
'total_segments': processed_groups,
'total_duration': total_duration,
'processing_time': processing_time,
'source_language': source_language
}
logger.info(f"[{session_id}] 处理完成: {processed_groups} 段, 耗时 {processing_time:.1f}s")
except Exception as e:
logger.error(f"[{session_id}] 处理失败: {e}", exc_info=True)
yield {'type': 'error', 'message': str(e)}
async def _generate_tts_parallel(
self,
segments: List[Dict[str, Any]],
client_config: Optional[Dict[str, Any]] = None
) -> List[Optional[str]]:
"""
并行生成 TTS 音频(激进优化)
同时处理所有片段,不等待顺序
"""
tts_provider = 'edge-tts'
if client_config:
tts_provider = client_config.get('ttsProvider', 'edge-tts')
async def generate_single(seg: Dict, idx: int) -> tuple:
"""生成单个片段的TTS"""
text = seg.get('cn', '')
if not text:
return (idx, None)
role = seg.get('role', 'MALE')
output_path = os.path.join(
self.config.temp_dir,
f"tts_{uuid.uuid4().hex[:8]}.mp3"
)
try:
if tts_provider == 'siliconflow' and self.siliconflow_client:
await self.siliconflow_client.synthesize(text, role, output_path)
else:
# Edge-TTS
await self.tts_generator.generate_single(text, role, output_path, client_config)
self._temp_files.append(output_path)
return (idx, output_path)
except Exception as e:
logger.warning(f"TTS 生成失败 [{idx}]: {e}")
return (idx, None)
# 并行生成所有片段
semaphore = asyncio.Semaphore(self.config.max_tts_workers)
async def limited_generate(seg: Dict, idx: int) -> tuple:
async with semaphore:
return await generate_single(seg, idx)
tasks = [limited_generate(seg, i) for i, seg in enumerate(segments)]
results = await asyncio.gather(*tasks)
# 按索引排序结果
sorted_results = sorted(results, key=lambda x: x[0])
return [r[1] for r in sorted_results]
async def _do_asr(
self,
audio_path: str,
client_config: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
执行语音识别
根据配置选择 Groq Whisper 或 SiliconFlow SenseVoice
如果两者都不可用,返回错误
"""
# 获取 ASR 提供商配置
asr_provider = 'groq' # 默认使用 Groq
if client_config:
asr_provider = client_config.get('asrProvider', 'groq')
# 调试:打印客户端状态
logger.info(f"[ASR] 配置: provider={asr_provider}, groq_client={self.groq_client is not None}, siliconflow_client={self.siliconflow_client is not None}")
# 尝试使用配置的提供商
if asr_provider == 'siliconflow' and self.siliconflow_client:
try:
logger.info("[ASR] 使用 SiliconFlow SenseVoice 进行语音识别")
return await self.siliconflow_client.transcribe(audio_path)
except Exception as e:
logger.warning(f"[ASR] SiliconFlow ASR 失败: {e},尝试回退到 Groq")
if self.groq_client:
return await self.groq_client.transcribe(audio_path)
raise
if self.groq_client:
try:
logger.info("[ASR] ✓ 使用 Groq Whisper V3 进行语音识别(带时间戳)")
result = await self.groq_client.transcribe(audio_path)
# 调试:打印 ASR 结果摘要
if result.get('segments'):
first_seg = result['segments'][0]
logger.info(f"[ASR] 结果: {len(result['segments'])} 片段, 首段时间戳: {first_seg.get('start', 'N/A')}-{first_seg.get('end', 'N/A')}")
return result
except Exception as e:
logger.warning(f"[ASR] Groq ASR 失败: {e},尝试回退到 SiliconFlow")
if self.siliconflow_client:
return await self.siliconflow_client.transcribe(audio_path)
raise
if self.siliconflow_client:
logger.warning("[ASR] ⚠ 使用 SiliconFlow SenseVoice(无时间戳,可能影响分组)")
return await self.siliconflow_client.transcribe(audio_path)
raise RuntimeError(
"没有可用的 ASR 服务。请配置 GROQ_API_KEY 或 SILICONFLOW_API_KEY 环境变量。"
)
def _group_segments_for_streaming(
self,
segments: List[Dict[str, Any]],
total_duration: float
) -> List[List[Dict[str, Any]]]:
"""
将 ASR 片段分组用于流式处理
策略:
1. 首段尽量短(30秒内),快速开始播放
2. 后续段按最大时长分组
3. 尽量在句子边界分割
"""
if not segments:
return []
groups = []
current_group = []
current_duration = 0
is_first_group = True
# 首段目标时长
target_duration = self.config.first_segment_target if is_first_group else self.config.max_segment_duration
for seg in segments:
seg_start = seg.get('start', 0)
seg_end = seg.get('end', 0)
seg_duration = seg_end - seg_start
# 检查是否需要开始新组
if current_group and current_duration + seg_duration > target_duration:
groups.append(current_group)
current_group = []
current_duration = 0
is_first_group = False
target_duration = self.config.max_segment_duration
current_group.append(seg)
current_duration += seg_duration
# 添加最后一组
if current_group:
groups.append(current_group)
return groups
async def _translate_segments(
self,
segments: List[Dict[str, Any]],
source_language: str,
client_config: Optional[Dict[str, Any]] = None
) -> List[Dict[str, Any]]:
"""
翻译片段并识别角色
"""
logger.info(f"🔥 _translate_segments 被调用: {len(segments)} 个片段, 源语言: {source_language}")
# 调试:打印输入片段
logger.info(f"翻译输入: {len(segments)} 个片段")
for i, seg in enumerate(segments[:3]): # 只打印前3个
logger.debug(f" 片段 {i}: {seg.get('text', '')[:50]}...")
if not self.groq_client:
# 如果没有 Groq 客户端,直接使用原文作为翻译结果
logger.warning("Groq 客户端未初始化,使用原文")
return [
{
**seg,
'cn': seg.get('text', ''),
'role': 'MALE'
}
for seg in segments
]
# 合并文本进行翻译
text = ' '.join(seg.get('text', '') for seg in segments)
if not text.strip():
logger.warning("翻译输入为空,使用原文")
return [
{
**seg,
'cn': seg.get('text', ''),
'role': 'MALE'
}
for seg in segments
]
logger.info(f"调用 Groq 翻译: {len(text)} 字符")
translation_result = await self.groq_client.translate(
text,
source_language,
segments
)
# 合并翻译结果到原始片段
translated_segments = translation_result.get('segments', [])
logger.info(f"翻译结果: {len(translated_segments)} 个片段")
# 调试:检查空翻译
empty_count = sum(1 for seg in translated_segments if not seg.get('cn', '').strip())
if empty_count > 0:
logger.warning(f"翻译结果中有 {empty_count} 个片段的中文为空")
for i, seg in enumerate(translated_segments[:5]): # 打印前5个
logger.debug(f" 片段 {i}: cn='{seg.get('cn', '')}', original='{seg.get('original', '')[:30]}'")
result = []
for i, seg in enumerate(segments):
translated = translated_segments[i] if i < len(translated_segments) else {}
cn_text = translated.get('cn', seg.get('text', '')).strip()
# 如果翻译为空,使用原文
if not cn_text:
cn_text = seg.get('text', '').strip()
if cn_text:
logger.warning(f"片段 {i} 翻译为空,使用原文: {cn_text[:30]}")
result.append({
**seg,
'cn': cn_text,
'role': translated.get('role', 'MALE')
})
return result
async def _generate_tts(
self,
segments: List[Dict[str, Any]],
client_config: Optional[Dict[str, Any]] = None
) -> List[Optional[str]]:
"""
生成 TTS 音频
根据配置选择 Edge-TTS 或 SiliconFlow TTS
"""
# 获取 TTS 提供商配置
tts_provider = 'edge-tts' # 默认使用 Edge-TTS
if client_config:
tts_provider = client_config.get('ttsProvider', 'edge-tts')
# 准备 TTS 输入
tts_segments = [
{
'cn': seg.get('cn', ''),
'role': seg.get('role', 'MALE')
}
for seg in segments
]
if tts_provider == 'siliconflow' and self.siliconflow_client:
logger.info("使用 SiliconFlow TTS 生成配音")
# 使用 SiliconFlow TTS
results = []
for seg in tts_segments:
if seg['cn']:
output_path = os.path.join(
self.config.temp_dir,
f"tts_{uuid.uuid4().hex[:8]}.mp3"
)
await self.siliconflow_client.synthesize(
seg['cn'],
seg['role'],
output_path
)
results.append(output_path)
self._temp_files.append(output_path)
else:
results.append(None)
return results
else:
logger.info("使用 Edge-TTS 生成配音")
# 使用 Edge-TTS
return await self.tts_generator.generate(tts_segments, client_config)
async def _sync_audio(
self,
tts_paths: List[Optional[str]],
segments: List[Dict[str, Any]],
target_duration: float,
client_config: Optional[Dict[str, Any]] = None
) -> Optional[str]:
"""
同步音频到时间戳
"""
# 过滤有效的 TTS 路径
valid_paths = []
valid_segments = []
for i, (path, seg) in enumerate(zip(tts_paths, segments)):
if path and os.path.exists(path):
valid_paths.append(path)
valid_segments.append({
'start': seg.get('start', 0),
'end': seg.get('end', 0)
})
if not valid_paths:
logger.warning("没有有效的 TTS 音频")
return None
# 调用音频同步引擎
return await self.audio_sync.align(
valid_paths,
valid_segments,
target_duration,
client_config
)
def cleanup(self) -> int:
"""清理临时文件"""
cleaned = 0
for path in self._temp_files:
try:
if os.path.exists(path):
os.remove(path)
cleaned += 1
except Exception as e:
logger.warning(f"清理临时文件失败 {path}: {e}")
self._temp_files.clear()
if self.tts_generator:
cleaned += self.tts_generator.cleanup()
if self.audio_sync:
cleaned += self.audio_sync.cleanup()
logger.info(f"流式处理器清理完成: {cleaned} 个文件")
return cleaned
@property
def is_initialized(self) -> bool:
"""检查处理器是否已初始化"""
return self._initialized