ai_task.py 2.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. import logging
  2. import time
  3. import re
  4. import asyncio
  5. from concurrent.futures import ThreadPoolExecutor
  6. import service.pyav as pyav
  7. import service.pygpu as pygpu
  8. import service.ai_text_ollama as ai_text
  9. import service.ai_asr as pyasr
  10. from setting import OUTPUT_DIR
  11. logger = logging.getLogger(__name__)
  12. executor = ThreadPoolExecutor(max_workers=1)
  13. task_queue = asyncio.Queue()
  14. async def start_worker():
  15. asyncio.create_task(gpu_worker()) # 开启 Worker
  16. # --- 后台工作进程 (Consumer) ---
  17. async def gpu_worker():
  18. logger.info("🏃 Worker 开始监听任务队列")
  19. while True:
  20. if not pyasr.check_ready():
  21. logger.info("休眠 10s 等待模型加载完成...")
  22. await asyncio.sleep(10)
  23. continue
  24. # 获取任务
  25. task = await task_queue.get()
  26. task_id, audio_path, srt_path, video_path = task
  27. logger.info(f"⚡ 开始处理任务 [{task_id}]: {audio_path}")
  28. try:
  29. start_t = time.time()
  30. # 2. 推理 (在线程池中运行同步识别,防止阻塞事件循环)
  31. # 使用 run_in_executor 将同步函数包装成异步,不再阻塞 Event Loop
  32. # 这样在处理视频的同时,FastAPI 依然可以接收新请求并 put 到队列中
  33. loop = asyncio.get_event_loop()
  34. result = await loop.run_in_executor(
  35. executor,
  36. pyasr.get_text,
  37. audio_path
  38. )
  39. text = result['text']
  40. timestamp_list = result['timestamps']
  41. text_list = re.split(r"([。!?;,])", text)
  42. srt_list = pyav.get_precise_srt(text_list, timestamp_list)
  43. pyav.save_srt_file(srt_list, srt_path)
  44. with open(f"{OUTPUT_DIR}/{task_id}.txt", "w", encoding="utf-8") as f:
  45. f.write(text)
  46. await loop.run_in_executor(
  47. executor,
  48. pyav.generate_video,
  49. audio_path,
  50. srt_path,
  51. video_path
  52. )
  53. logger.info(f"🎉 任务 [{task_id}] 完成,耗时: {time.time() - start_t:.2f}s")
  54. except Exception as e:
  55. logger.error(f"❌ 任务 [{task_id}] 失败: {str(e)}")
  56. finally:
  57. # 4. 定时/按需清理 GPU 显存
  58. task_queue.task_done()
  59. pygpu.clear_gpu_memory()
  60. async def put_task(task_id, save_path, srt_path, video_path):
  61. await task_queue.put((task_id, save_path, srt_path, video_path))
  62. def get_tasks():
  63. return task_queue.qsize()
  64. def translate_to_zh(text):
  65. result = ai_text.translate2zh(text)
  66. return result