| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229 |
- import os
- import re
- import json
- import subprocess
- import time
- import numpy as np
- import cv2
- import threading
- import requests
- from setting import WEBHOOK_URL
- from setting import RTMP_URL
- from setting import CAM_NAME
- # --- 配置参数 ---
- FONT_PATH = "/usr/share/fonts/truetype/wqy/wqy-microhei.ttc"
- cam_name = CAM_NAME
- push_url = RTMP_URL
- notify_interval = 30 # 告警间隔,设为60秒防止轰炸
- WIDTH, HEIGHT = 960, 540 # 沿用你跑通的分辨率
- FPS = 25
- y_size = WIDTH * HEIGHT
- frame_size = int(y_size * 1.5)
- notify = False
- def send_ding_msg(detect_time):
- if not notify:
- print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] notify 参数为 False, 不发送通知")
- return
- current_hour = time.localtime().tm_hour
- if current_hour >= 20 or current_hour < 8:
- print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 夜间免打扰模式, 不发送 webhook 通知")
- return
- """
- 异步发送钉钉通知
- """
- headers = {'Content-Type': 'application/json'}
-
- # 构造钉钉要求的 JSON 结构
- data = {
- "msgtype": "text",
- "text": {
- "content": (
- f"【cam-alert】\n"
- f"机位:{cam_name}\n"
- f"时间:{detect_time}\n"
- f"状态:检测到物体移动\n"
- f"请查看回放对齐画面 OSD 时间。"
- )
- },
- "at": {
- "isAtAll": False # 如果需要提醒所有人,设为 True
- }
- }
- try:
- response = requests.post(
- WEBHOOK_URL,
- data=json.dumps(data),
- headers=headers,
- timeout=5
- )
- res_json = response.json()
- if res_json.get("errcode") == 0:
- print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 钉钉通知发送成功")
- else:
- print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 钉钉发送失败: {res_json.get('errmsg')}")
- except Exception as e:
- print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 网络异常(钉钉): {e}")
- def monitor_ffmpeg_log(stderr_pipe):
- print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 日志监控线程已启动...")
- """
- 专门解析 FFmpeg stderr 日志的线程函数
- """
- # 正则表达式匹配:帧数、FPS、码率、速度
- # 示例: frame= 150 fps= 25 q=28.0 size= 1024kB time=00:00:06.00 bitrate=1365.3kbits/s speed=1.01x
- regex = re.compile(r"frame=\s*(\d+)\s+fps=\s*([\d.]+)\s+.*?bitrate=\s*([\d.]+kbits/s)\s+speed=\s*([\d.]+x)")
- while True:
- # FFmpeg 的进度信息是用 \r 更新的
- # 我们一次读一部分字节,并按 \r 或 \n 切分
- try:
- # 这里的 read(128) 能保证及时拿到数据
- chunk = stderr_pipe.read(128).decode('utf-8', errors='ignore')
- if not chunk:
- break
-
- # 只要包含 frame=,就说明是进度行
- if "frame=" in chunk:
- # 使用正则或简单的字符串查找提取 fps 和 speed
- # 简单打印出来看看是否有输出
- line_str = chunk.strip().replace('\r', '\n').split('\n')[-1]
- #print(f">>> FFmpeg 状态: {line_str}")
- # 查找统计信息行
- match = regex.search(line_str)
- if match:
- frame, fps, bitrate, speed = match.groups()
- # 这里你可以根据数值做逻辑处理
- # 例如:如果 speed < 0.9x,说明 CPU 跑不动了或者网络堵塞
- #print(f"--- 推流状态: FPS={fps}, 码率={bitrate}, 速度={speed} ---")
-
- # 如果速度太慢,可以触发一个标志位让主程序知道
- # if float(speed[:-1]) < 0.8: print("警告:推流速度过慢!")
- # 也可以捕捉特定的错误信息
- if "Broken pipe" in line_str or "Connection refused" in line_str:
- print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 🚨 关键错误:推流连接断开!")
- except Exception as e:
- print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 日志读取异常: {e}")
- break
- # --- FFmpeg 命令:整合了你的稳定参数 + 我们的管道输入 ---
- FFMPEG_CMD = [
- 'ffmpeg', '-y',
- '-hide_banner',
- '-f', 'rawvideo', '-pixel_format', 'yuv420p',
- '-video_size', f'{WIDTH}x{HEIGHT}',
- '-framerate', str(FPS),
- '-i', '-', # 从 Python 管道接收数据
- '-thread_queue_size', '8192',
- '-f', 'alsa', '-channels', '1', '-i', 'hw:1,0', # 加上音频防止断流
- '-vf', (
- # 将 cam_name 和 localtime 拼在一起,中间用 | 分隔
- f"drawtext=fontfile={FONT_PATH}:"
- f"text='{cam_name} | %{{localtime\\:%Y-%m-%d %H\\\\\\:%M\\\\\\:%S}}':"
- "x=20:y=20:fontsize=24:fontcolor=yellow:shadowcolor=black:shadowx=2:shadowy=2"
- ),
- '-vcodec', 'libx264', # 使用你验证成功的软编
- '-acodec', 'aac',
- '-b:v', '1M', '-b:a', '64K',
- '-preset:v', 'ultrafast',
- '-tune:v', 'zerolatency',
- '-g', '25', # 将 GOP 缩短到 25(即每秒一个关键帧),方便服务器切片
- '-keyint_min', '25',
- '-sc_threshold', '0', # 禁用场景切换检测,保证 GOP 长度固定
- '-f', 'flv',
- push_url
- ]
- def run_smart_cam():
- while True:
- # 清理残余进程,防止端口占用
- os.system("pkill -9 ffmpeg > /dev/null 2>&1")
- os.system("pkill -9 libcamera-vid > /dev/null 2>&1")
- print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 休眠 5s 等待残余进程被彻底清理...")
- time.sleep(5)
-
- # 1. 启动 libcamera 采集原始数据(为了 OpenCV 必须拿原始流)
- # 如果你一定要用 v4l2,可以改用 ffmpeg 读取再转给 python
- input_cmd = [
- 'rpicam-vid', '-t', '0', '--width', str(WIDTH), '--height', str(HEIGHT),
- '--framerate', str(FPS), '--codec', 'yuv420', '--nopreview', '--flush', '-o', '-'
- ]
-
- in_proc = subprocess.Popen(input_cmd, stdout=subprocess.PIPE, bufsize=frame_size*3)
- #out_proc = subprocess.Popen(FFMPEG_CMD, stdin=subprocess.PIPE)
- out_proc = subprocess.Popen(
- FFMPEG_CMD,
- stdin=subprocess.PIPE,
- stderr=None, # 必须开启 stderr 捕获日志
- bufsize=0 # 禁用缓冲,确保实时获取日志
- )
- # 启动 ffmpeg 日志监控线程
- log_thread = threading.Thread(target=monitor_ffmpeg_log, args=(out_proc.stderr,))
- log_thread.daemon = True
- log_thread.start()
-
- fgbg = cv2.createBackgroundSubtractorMOG2(history=100, varThreshold=50)
-
- print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 智能推流启动...")
- try:
- last_log_time = 0
- # --- 循环外定义变量 ---
- last_notify_time = 0
- while True:
- raw_data = in_proc.stdout.read(frame_size)
- if len(raw_data) < frame_size: break
-
- # --- OpenCV 移动检测 (抽样检测节省 CPU) ---
- # 直接在 Y 通道操作,不影响最终推流
- y_buf = np.frombuffer(raw_data, dtype=np.uint8, count=y_size).reshape((HEIGHT, WIDTH))
-
- # 每 5 帧做一次检测
- small_y = cv2.resize(y_buf, (212, 120))
- fgmask = fgbg.apply(small_y)
- is_moving = np.count_nonzero(fgmask > 200) > 50
- if is_moving:
- current_time_val = time.time()
- # 获取精确到秒的时间戳
- readable_time = time.strftime('%Y-%m-%d %Y-%m-%d %H:%M:%S')
- # 冷却时间 2 秒,避免频繁触发
- if current_time_val - last_log_time > 2:
- #print(f"视频回放定位点: 请寻找画面 OSD 显示为 {readable_time} 附近的片段")
- # 如果你有关联的抓拍,文件名也用这个时间
- # filename = f"{save_dir}/motion_{time.strftime('%H%M%S')}.jpg"
- # cv2.imwrite(filename, y_buf)
- last_log_time = current_time_val
- if current_time_val - last_notify_time > notify_interval:
- # 开启线程异步发送
- t = threading.Thread(target=send_ding_msg, args=(readable_time,))
- t.daemon = True
- t.start()
- last_notify_time = current_time_val
- # 将处理后的数据写入 FFmpeg
- out_proc.stdin.write(raw_data)
- if out_proc.poll() is not None: break
- except Exception as e:
- print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 运行异常: {e}")
- finally:
- in_proc.terminate()
- out_proc.terminate()
- print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] FFmpeg 进程已结束,5秒后重启...")
- time.sleep(5)
- if __name__ == "__main__":
- run_smart_cam()
|