ai_task.py 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. import logging
  2. import time
  3. import re
  4. import asyncio
  5. from concurrent.futures import ThreadPoolExecutor
  6. import service.pyav as pyav
  7. import service.pygpu as pygpu
  8. import service.ai_asr as pyasr
  9. from setting import OUTPUT_DIR
  10. logger = logging.getLogger(__name__)
  11. executor = ThreadPoolExecutor(max_workers=1)
  12. task_queue = asyncio.Queue()
  13. async def start_worker():
  14. # 开启 funasr 初始化任务
  15. asyncio.create_task(pyasr.init_funasr())
  16. # 开启 Worker
  17. asyncio.create_task(gpu_worker())
  18. # --- 后台工作进程 (Consumer) ---
  19. async def gpu_worker():
  20. logger.info("🏃 Worker 开始监听任务队列")
  21. while True:
  22. if not pyasr.check_ready():
  23. logger.info("休眠 10s 等待模型加载完成...")
  24. await asyncio.sleep(10)
  25. continue
  26. # 获取任务
  27. task = await task_queue.get()
  28. task_id, audio_path, srt_path, video_path = task
  29. logger.info(f"⚡ 开始处理任务 [{task_id}]: {audio_path}")
  30. try:
  31. start_t = time.time()
  32. # 2. 推理 (在线程池中运行同步识别,防止阻塞事件循环)
  33. # 使用 run_in_executor 将同步函数包装成异步,不再阻塞 Event Loop
  34. # 这样在处理视频的同时,FastAPI 依然可以接收新请求并 put 到队列中
  35. loop = asyncio.get_event_loop()
  36. result = await loop.run_in_executor(
  37. executor,
  38. pyasr.get_text,
  39. audio_path
  40. )
  41. text = result['text']
  42. timestamp_list = result['timestamps']
  43. text_list = re.split(r"([。!?;,])", text)
  44. srt_list = pyav.get_precise_srt(text_list, timestamp_list)
  45. pyav.save_srt_file(srt_list, srt_path)
  46. with open(f"{OUTPUT_DIR}/{task_id}.txt", "w", encoding="utf-8") as f:
  47. f.write(text)
  48. await loop.run_in_executor(
  49. executor,
  50. pyav.generate_video,
  51. audio_path,
  52. srt_path,
  53. video_path
  54. )
  55. logger.info(f"🎉 任务 [{task_id}] 完成,耗时: {time.time() - start_t:.2f}s")
  56. except Exception as e:
  57. logger.error(f"❌ 任务 [{task_id}] 失败: {str(e)}")
  58. finally:
  59. # 4. 定时/按需清理 GPU 显存
  60. task_queue.task_done()
  61. pygpu.clear_gpu_memory()
  62. async def put_task(task_id, save_path, srt_path, video_path):
  63. await task_queue.put((task_id, save_path, srt_path, video_path))
  64. def get_tasks():
  65. return task_queue.qsize()