picam_cv.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. import os
  2. import re
  3. import json
  4. import subprocess
  5. import time
  6. import numpy as np
  7. import cv2
  8. import threading
  9. import requests
  10. from setting import WEBHOOK_URL
  11. from setting import RTMP_URL
  12. from setting import CAM_NAME
  13. # --- 配置参数 ---
  14. FONT_PATH = "/usr/share/fonts/truetype/wqy/wqy-microhei.ttc"
  15. cam_name = CAM_NAME
  16. push_url = RTMP_URL
  17. notify_interval = 30 # 告警间隔,设为60秒防止轰炸
  18. WIDTH, HEIGHT = 960, 540 # 沿用你跑通的分辨率
  19. FPS = 25
  20. y_size = WIDTH * HEIGHT
  21. frame_size = int(y_size * 1.5)
  22. notify = False
  23. def send_ding_msg(detect_time):
  24. if not notify:
  25. print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] notify 参数为 False, 不发送通知")
  26. return
  27. current_hour = time.localtime().tm_hour
  28. if current_hour >= 20 or current_hour < 8:
  29. print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 夜间免打扰模式, 不发送 webhook 通知")
  30. return
  31. """
  32. 异步发送钉钉通知
  33. """
  34. headers = {'Content-Type': 'application/json'}
  35. # 构造钉钉要求的 JSON 结构
  36. data = {
  37. "msgtype": "text",
  38. "text": {
  39. "content": (
  40. f"【cam-alert】\n"
  41. f"机位:{cam_name}\n"
  42. f"时间:{detect_time}\n"
  43. f"状态:检测到物体移动\n"
  44. f"请查看回放对齐画面 OSD 时间。"
  45. )
  46. },
  47. "at": {
  48. "isAtAll": False # 如果需要提醒所有人,设为 True
  49. }
  50. }
  51. try:
  52. response = requests.post(
  53. WEBHOOK_URL,
  54. data=json.dumps(data),
  55. headers=headers,
  56. timeout=5
  57. )
  58. res_json = response.json()
  59. if res_json.get("errcode") == 0:
  60. print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 钉钉通知发送成功")
  61. else:
  62. print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 钉钉发送失败: {res_json.get('errmsg')}")
  63. except Exception as e:
  64. print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 网络异常(钉钉): {e}")
  65. def monitor_ffmpeg_log(stderr_pipe):
  66. print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 日志监控线程已启动...")
  67. """
  68. 专门解析 FFmpeg stderr 日志的线程函数
  69. """
  70. # 正则表达式匹配:帧数、FPS、码率、速度
  71. # 示例: frame= 150 fps= 25 q=28.0 size= 1024kB time=00:00:06.00 bitrate=1365.3kbits/s speed=1.01x
  72. regex = re.compile(r"frame=\s*(\d+)\s+fps=\s*([\d.]+)\s+.*?bitrate=\s*([\d.]+kbits/s)\s+speed=\s*([\d.]+x)")
  73. while True:
  74. # FFmpeg 的进度信息是用 \r 更新的
  75. # 我们一次读一部分字节,并按 \r 或 \n 切分
  76. try:
  77. # 这里的 read(128) 能保证及时拿到数据
  78. chunk = stderr_pipe.read(128).decode('utf-8', errors='ignore')
  79. if not chunk:
  80. break
  81. # 只要包含 frame=,就说明是进度行
  82. if "frame=" in chunk:
  83. # 使用正则或简单的字符串查找提取 fps 和 speed
  84. # 简单打印出来看看是否有输出
  85. line_str = chunk.strip().replace('\r', '\n').split('\n')[-1]
  86. #print(f">>> FFmpeg 状态: {line_str}")
  87. # 查找统计信息行
  88. match = regex.search(line_str)
  89. if match:
  90. frame, fps, bitrate, speed = match.groups()
  91. # 这里你可以根据数值做逻辑处理
  92. # 例如:如果 speed < 0.9x,说明 CPU 跑不动了或者网络堵塞
  93. #print(f"--- 推流状态: FPS={fps}, 码率={bitrate}, 速度={speed} ---")
  94. # 如果速度太慢,可以触发一个标志位让主程序知道
  95. # if float(speed[:-1]) < 0.8: print("警告:推流速度过慢!")
  96. # 也可以捕捉特定的错误信息
  97. if "Broken pipe" in line_str or "Connection refused" in line_str:
  98. print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 🚨 关键错误:推流连接断开!")
  99. except Exception as e:
  100. print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 日志读取异常: {e}")
  101. break
  102. # --- FFmpeg 命令:整合了你的稳定参数 + 我们的管道输入 ---
  103. FFMPEG_CMD = [
  104. 'ffmpeg', '-y',
  105. '-hide_banner',
  106. '-f', 'rawvideo', '-pixel_format', 'yuv420p',
  107. '-video_size', f'{WIDTH}x{HEIGHT}',
  108. '-framerate', str(FPS),
  109. '-i', '-', # 从 Python 管道接收数据
  110. '-thread_queue_size', '8192',
  111. '-f', 'alsa', '-channels', '1', '-i', 'hw:1,0', # 加上音频防止断流
  112. '-vf', (
  113. # 将 cam_name 和 localtime 拼在一起,中间用 | 分隔
  114. f"drawtext=fontfile={FONT_PATH}:"
  115. f"text='{cam_name} | %{{localtime\\:%Y-%m-%d %H\\\\\\:%M\\\\\\:%S}}':"
  116. "x=20:y=20:fontsize=24:fontcolor=yellow:shadowcolor=black:shadowx=2:shadowy=2"
  117. ),
  118. '-vcodec', 'libx264', # 使用你验证成功的软编
  119. '-acodec', 'aac',
  120. '-b:v', '1M', '-b:a', '64K',
  121. '-preset:v', 'ultrafast',
  122. '-tune:v', 'zerolatency',
  123. '-g', '25', # 将 GOP 缩短到 25(即每秒一个关键帧),方便服务器切片
  124. '-keyint_min', '25',
  125. '-sc_threshold', '0', # 禁用场景切换检测,保证 GOP 长度固定
  126. '-f', 'flv',
  127. push_url
  128. ]
  129. def run_smart_cam():
  130. while True:
  131. # 清理残余进程,防止端口占用
  132. os.system("pkill -9 ffmpeg > /dev/null 2>&1")
  133. os.system("pkill -9 libcamera-vid > /dev/null 2>&1")
  134. print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 休眠 5s 等待残余进程被彻底清理...")
  135. time.sleep(5)
  136. # 1. 启动 libcamera 采集原始数据(为了 OpenCV 必须拿原始流)
  137. # 如果你一定要用 v4l2,可以改用 ffmpeg 读取再转给 python
  138. input_cmd = [
  139. 'rpicam-vid', '-t', '0', '--width', str(WIDTH), '--height', str(HEIGHT),
  140. '--framerate', str(FPS), '--codec', 'yuv420', '--nopreview', '--flush', '-o', '-'
  141. ]
  142. in_proc = subprocess.Popen(input_cmd, stdout=subprocess.PIPE, bufsize=frame_size*3)
  143. #out_proc = subprocess.Popen(FFMPEG_CMD, stdin=subprocess.PIPE)
  144. out_proc = subprocess.Popen(
  145. FFMPEG_CMD,
  146. stdin=subprocess.PIPE,
  147. stderr=None, # 必须开启 stderr 捕获日志
  148. bufsize=0 # 禁用缓冲,确保实时获取日志
  149. )
  150. # 启动 ffmpeg 日志监控线程
  151. log_thread = threading.Thread(target=monitor_ffmpeg_log, args=(out_proc.stderr,))
  152. log_thread.daemon = True
  153. log_thread.start()
  154. fgbg = cv2.createBackgroundSubtractorMOG2(history=100, varThreshold=50)
  155. print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 智能推流启动...")
  156. try:
  157. last_log_time = 0
  158. # --- 循环外定义变量 ---
  159. last_notify_time = 0
  160. while True:
  161. raw_data = in_proc.stdout.read(frame_size)
  162. if len(raw_data) < frame_size: break
  163. # --- OpenCV 移动检测 (抽样检测节省 CPU) ---
  164. # 直接在 Y 通道操作,不影响最终推流
  165. y_buf = np.frombuffer(raw_data, dtype=np.uint8, count=y_size).reshape((HEIGHT, WIDTH))
  166. # 每 5 帧做一次检测
  167. small_y = cv2.resize(y_buf, (212, 120))
  168. fgmask = fgbg.apply(small_y)
  169. is_moving = np.count_nonzero(fgmask > 200) > 50
  170. if is_moving:
  171. current_time_val = time.time()
  172. # 获取精确到秒的时间戳
  173. readable_time = time.strftime('%Y-%m-%d %Y-%m-%d %H:%M:%S')
  174. # 冷却时间 2 秒,避免频繁触发
  175. if current_time_val - last_log_time > 2:
  176. #print(f"视频回放定位点: 请寻找画面 OSD 显示为 {readable_time} 附近的片段")
  177. # 如果你有关联的抓拍,文件名也用这个时间
  178. # filename = f"{save_dir}/motion_{time.strftime('%H%M%S')}.jpg"
  179. # cv2.imwrite(filename, y_buf)
  180. last_log_time = current_time_val
  181. if current_time_val - last_notify_time > notify_interval:
  182. # 开启线程异步发送
  183. t = threading.Thread(target=send_ding_msg, args=(readable_time,))
  184. t.daemon = True
  185. t.start()
  186. last_notify_time = current_time_val
  187. # 将处理后的数据写入 FFmpeg
  188. out_proc.stdin.write(raw_data)
  189. if out_proc.poll() is not None: break
  190. except Exception as e:
  191. print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 运行异常: {e}")
  192. finally:
  193. in_proc.terminate()
  194. out_proc.terminate()
  195. print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] FFmpeg 进程已结束,5秒后重启...")
  196. time.sleep(5)
  197. if __name__ == "__main__":
  198. run_smart_cam()