import os import re import json import subprocess import time import numpy as np import cv2 import threading import requests from setting import WEBHOOK_URL from setting import RTMP_URL from setting import CAM_NAME # --- 配置参数 --- FONT_PATH = "/usr/share/fonts/truetype/wqy/wqy-microhei.ttc" cam_name = CAM_NAME push_url = RTMP_URL notify_interval = 30 # 告警间隔,设为60秒防止轰炸 WIDTH, HEIGHT = 960, 540 # 沿用你跑通的分辨率 FPS = 25 y_size = WIDTH * HEIGHT frame_size = int(y_size * 1.5) notify = False def send_ding_msg(detect_time): if not notify: print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] notify 参数为 False, 不发送通知") return current_hour = time.localtime().tm_hour if current_hour >= 20 or current_hour < 8: print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 夜间免打扰模式, 不发送 webhook 通知") return """ 异步发送钉钉通知 """ headers = {'Content-Type': 'application/json'} # 构造钉钉要求的 JSON 结构 data = { "msgtype": "text", "text": { "content": ( f"【cam-alert】\n" f"机位:{cam_name}\n" f"时间:{detect_time}\n" f"状态:检测到物体移动\n" f"请查看回放对齐画面 OSD 时间。" ) }, "at": { "isAtAll": False # 如果需要提醒所有人,设为 True } } try: response = requests.post( WEBHOOK_URL, data=json.dumps(data), headers=headers, timeout=5 ) res_json = response.json() if res_json.get("errcode") == 0: print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 钉钉通知发送成功") else: print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 钉钉发送失败: {res_json.get('errmsg')}") except Exception as e: print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 网络异常(钉钉): {e}") def monitor_ffmpeg_log(stderr_pipe): print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 日志监控线程已启动...") """ 专门解析 FFmpeg stderr 日志的线程函数 """ # 正则表达式匹配:帧数、FPS、码率、速度 # 示例: frame= 150 fps= 25 q=28.0 size= 1024kB time=00:00:06.00 bitrate=1365.3kbits/s speed=1.01x regex = re.compile(r"frame=\s*(\d+)\s+fps=\s*([\d.]+)\s+.*?bitrate=\s*([\d.]+kbits/s)\s+speed=\s*([\d.]+x)") while True: # FFmpeg 的进度信息是用 \r 更新的 # 我们一次读一部分字节,并按 \r 或 \n 切分 try: # 这里的 read(128) 能保证及时拿到数据 chunk = stderr_pipe.read(128).decode('utf-8', errors='ignore') if not chunk: break # 只要包含 frame=,就说明是进度行 if "frame=" in chunk: # 使用正则或简单的字符串查找提取 fps 和 speed # 简单打印出来看看是否有输出 line_str = chunk.strip().replace('\r', '\n').split('\n')[-1] #print(f">>> FFmpeg 状态: {line_str}") # 查找统计信息行 match = regex.search(line_str) if match: frame, fps, bitrate, speed = match.groups() # 这里你可以根据数值做逻辑处理 # 例如:如果 speed < 0.9x,说明 CPU 跑不动了或者网络堵塞 #print(f"--- 推流状态: FPS={fps}, 码率={bitrate}, 速度={speed} ---") # 如果速度太慢,可以触发一个标志位让主程序知道 # if float(speed[:-1]) < 0.8: print("警告:推流速度过慢!") # 也可以捕捉特定的错误信息 if "Broken pipe" in line_str or "Connection refused" in line_str: print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 🚨 关键错误:推流连接断开!") except Exception as e: print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 日志读取异常: {e}") break # --- FFmpeg 命令:整合了你的稳定参数 + 我们的管道输入 --- FFMPEG_CMD = [ 'ffmpeg', '-y', '-hide_banner', '-f', 'rawvideo', '-pixel_format', 'yuv420p', '-video_size', f'{WIDTH}x{HEIGHT}', '-framerate', str(FPS), '-i', '-', # 从 Python 管道接收数据 '-thread_queue_size', '8192', '-f', 'alsa', '-channels', '1', '-i', 'hw:1,0', # 加上音频防止断流 '-vf', ( # 将 cam_name 和 localtime 拼在一起,中间用 | 分隔 f"drawtext=fontfile={FONT_PATH}:" f"text='{cam_name} | %{{localtime\\:%Y-%m-%d %H\\\\\\:%M\\\\\\:%S}}':" "x=20:y=20:fontsize=24:fontcolor=yellow:shadowcolor=black:shadowx=2:shadowy=2" ), '-vcodec', 'libx264', # 使用你验证成功的软编 '-acodec', 'aac', '-b:v', '1M', '-b:a', '64K', '-preset:v', 'ultrafast', '-tune:v', 'zerolatency', '-g', '25', # 将 GOP 缩短到 25(即每秒一个关键帧),方便服务器切片 '-keyint_min', '25', '-sc_threshold', '0', # 禁用场景切换检测,保证 GOP 长度固定 '-f', 'flv', push_url ] def run_smart_cam(): while True: # 清理残余进程,防止端口占用 os.system("pkill -9 ffmpeg > /dev/null 2>&1") os.system("pkill -9 libcamera-vid > /dev/null 2>&1") print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 休眠 5s 等待残余进程被彻底清理...") time.sleep(5) # 1. 启动 libcamera 采集原始数据(为了 OpenCV 必须拿原始流) # 如果你一定要用 v4l2,可以改用 ffmpeg 读取再转给 python input_cmd = [ 'rpicam-vid', '-t', '0', '--width', str(WIDTH), '--height', str(HEIGHT), '--framerate', str(FPS), '--codec', 'yuv420', '--nopreview', '--flush', '-o', '-' ] in_proc = subprocess.Popen(input_cmd, stdout=subprocess.PIPE, bufsize=frame_size*3) #out_proc = subprocess.Popen(FFMPEG_CMD, stdin=subprocess.PIPE) out_proc = subprocess.Popen( FFMPEG_CMD, stdin=subprocess.PIPE, stderr=None, # 必须开启 stderr 捕获日志 bufsize=0 # 禁用缓冲,确保实时获取日志 ) # 启动 ffmpeg 日志监控线程 log_thread = threading.Thread(target=monitor_ffmpeg_log, args=(out_proc.stderr,)) log_thread.daemon = True log_thread.start() fgbg = cv2.createBackgroundSubtractorMOG2(history=100, varThreshold=50) print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 智能推流启动...") try: last_log_time = 0 # --- 循环外定义变量 --- last_notify_time = 0 while True: raw_data = in_proc.stdout.read(frame_size) if len(raw_data) < frame_size: break # --- OpenCV 移动检测 (抽样检测节省 CPU) --- # 直接在 Y 通道操作,不影响最终推流 y_buf = np.frombuffer(raw_data, dtype=np.uint8, count=y_size).reshape((HEIGHT, WIDTH)) # 每 5 帧做一次检测 small_y = cv2.resize(y_buf, (212, 120)) fgmask = fgbg.apply(small_y) is_moving = np.count_nonzero(fgmask > 200) > 50 if is_moving: current_time_val = time.time() # 获取精确到秒的时间戳 readable_time = time.strftime('%Y-%m-%d %Y-%m-%d %H:%M:%S') # 冷却时间 2 秒,避免频繁触发 if current_time_val - last_log_time > 2: #print(f"视频回放定位点: 请寻找画面 OSD 显示为 {readable_time} 附近的片段") # 如果你有关联的抓拍,文件名也用这个时间 # filename = f"{save_dir}/motion_{time.strftime('%H%M%S')}.jpg" # cv2.imwrite(filename, y_buf) last_log_time = current_time_val if current_time_val - last_notify_time > notify_interval: # 开启线程异步发送 t = threading.Thread(target=send_ding_msg, args=(readable_time,)) t.daemon = True t.start() last_notify_time = current_time_val # 将处理后的数据写入 FFmpeg out_proc.stdin.write(raw_data) if out_proc.poll() is not None: break except Exception as e: print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 运行异常: {e}") finally: in_proc.terminate() out_proc.terminate() print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] FFmpeg 进程已结束,5秒后重启...") time.sleep(5) if __name__ == "__main__": run_smart_cam()