| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778 |
- import logging
- import time
- import re
- import asyncio
- from concurrent.futures import ThreadPoolExecutor
- import service.pyav as pyav
- import service.pygpu as pygpu
- import service.ai_asr as pyasr
- from setting import OUTPUT_DIR
- logger = logging.getLogger(__name__)
- executor = ThreadPoolExecutor(max_workers=1)
- task_queue = asyncio.Queue()
- async def start_worker():
- # 开启 funasr 初始化任务
- asyncio.create_task(pyasr.init_funasr())
- # 开启 Worker
- asyncio.create_task(gpu_worker())
- # --- 后台工作进程 (Consumer) ---
- async def gpu_worker():
- logger.info("🏃 Worker 开始监听任务队列")
- while True:
- if not pyasr.check_ready():
- logger.info("休眠 10s 等待模型加载完成...")
- await asyncio.sleep(10)
- continue
- # 获取任务
- task = await task_queue.get()
- task_id, audio_path, srt_path, video_path = task
- logger.info(f"⚡ 开始处理任务 [{task_id}]: {audio_path}")
- try:
- start_t = time.time()
- # 2. 推理 (在线程池中运行同步识别,防止阻塞事件循环)
- # 使用 run_in_executor 将同步函数包装成异步,不再阻塞 Event Loop
- # 这样在处理视频的同时,FastAPI 依然可以接收新请求并 put 到队列中
- loop = asyncio.get_event_loop()
- result = await loop.run_in_executor(
- executor,
- pyasr.get_text,
- audio_path
- )
- text = result['text']
- timestamp_list = result['timestamps']
- text_list = re.split(r"([。!?;,])", text)
- srt_list = pyav.get_precise_srt(text_list, timestamp_list)
- pyav.save_srt_file(srt_list, srt_path)
- with open(f"{OUTPUT_DIR}/{task_id}.txt", "w", encoding="utf-8") as f:
- f.write(text)
- await loop.run_in_executor(
- executor,
- pyav.generate_video,
- audio_path,
- srt_path,
- video_path
- )
- logger.info(f"🎉 任务 [{task_id}] 完成,耗时: {time.time() - start_t:.2f}s")
- except Exception as e:
- logger.error(f"❌ 任务 [{task_id}] 失败: {str(e)}")
- finally:
- # 4. 定时/按需清理 GPU 显存
- task_queue.task_done()
- pygpu.clear_gpu_memory()
- async def put_task(task_id, save_path, srt_path, video_path):
- await task_queue.put((task_id, save_path, srt_path, video_path))
- def get_tasks():
- return task_queue.qsize()
|