import logging import time import re import asyncio from concurrent.futures import ThreadPoolExecutor import service.pyav as pyav import service.pygpu as pygpu import service.ai_text_ollama as ai_text import service.ai_asr as pyasr from setting import OUTPUT_DIR logger = logging.getLogger(__name__) executor = ThreadPoolExecutor(max_workers=1) task_queue = asyncio.Queue() async def start_worker(): asyncio.create_task(gpu_worker()) # 开启 Worker # --- 后台工作进程 (Consumer) --- async def gpu_worker(): logger.info("🏃 Worker 开始监听任务队列") while True: if not pyasr.check_ready(): logger.info("休眠 10s 等待模型加载完成...") await asyncio.sleep(10) continue # 获取任务 task = await task_queue.get() task_id, audio_path, srt_path, video_path = task logger.info(f"⚡ 开始处理任务 [{task_id}]: {audio_path}") try: start_t = time.time() # 2. 推理 (在线程池中运行同步识别,防止阻塞事件循环) # 使用 run_in_executor 将同步函数包装成异步,不再阻塞 Event Loop # 这样在处理视频的同时,FastAPI 依然可以接收新请求并 put 到队列中 loop = asyncio.get_event_loop() result = await loop.run_in_executor( executor, pyasr.get_text, audio_path ) text = result['text'] timestamp_list = result['timestamps'] text_list = re.split(r"([。!?;,])", text) srt_list = pyav.get_precise_srt(text_list, timestamp_list) pyav.save_srt_file(srt_list, srt_path) with open(f"{OUTPUT_DIR}/{task_id}.txt", "w", encoding="utf-8") as f: f.write(text) await loop.run_in_executor( executor, pyav.generate_video, audio_path, srt_path, video_path ) logger.info(f"🎉 任务 [{task_id}] 完成,耗时: {time.time() - start_t:.2f}s") except Exception as e: logger.error(f"❌ 任务 [{task_id}] 失败: {str(e)}") finally: # 4. 定时/按需清理 GPU 显存 task_queue.task_done() pygpu.clear_gpu_memory() async def put_task(task_id, save_path, srt_path, video_path): await task_queue.put((task_id, save_path, srt_path, video_path)) def get_tasks(): return task_queue.qsize() def translate_to_zh(text): result = ai_text.translate2zh(text) return result