reghao 2 дней назад
Родитель
Сommit
a9b2e5a91e
5 измененных файлов с 331 добавлено и 0 удалено
  1. 2 0
      README.md
  2. 95 0
      picam.py
  3. 229 0
      picam_cv.py
  4. 2 0
      requirements.txt
  5. 3 0
      setting.py

+ 2 - 0
README.md

@@ -0,0 +1,2 @@
+# 树莓派摄像头
+作为监控摄像头, 可推流到 [ngxflv](https://git.reghao.cn/reghao/ngxflv) 项目

+ 95 - 0
picam.py

@@ -0,0 +1,95 @@
+import os
+import subprocess
+import time
+import re
+from setting import RTMP_URL
+from setting import CAM_NAME
+
+FONT_PATH = "/usr/share/fonts/truetype/wqy/wqy-microhei.ttc"
+cam_name=CAM_NAME
+push_url=RTMP_URL
+
+# --- 配置参数 ---
+# 替换为你的 ffmpeg 命令(建议先在终端测试成功)
+FFMPEG_CMD = [
+        'ffmpeg',
+        '-hide_banner',
+        '-thread_queue_size', '2048',
+        '-f', 'video4linux2',
+        '-video_size', '1024x768',
+        '-framerate', '25',
+        '-i', '/dev/video0',
+        '-thread_queue_size', '8192',
+        '-f', 'alsa',
+        '-ac', '1',
+        '-i', 'hw:1,0',
+        '-vcodec', 'libx264',
+        '-acodec', 'aac',
+        '-b:v', '4M',
+        '-b:a', '128K',
+        '-max_delay', '1000000',
+        '-g', '50',
+        '-preset:v', 'ultrafast',
+        '-tune:v', 'zerolatency',
+        '-vf', (
+            f"drawtext=fontfile={FONT_PATH}:"
+            f"text='{cam_name} | %{{localtime\\:%Y-%m-%d %H\\\\\\:%M\\\\\\:%S}}':"
+            "x=20:y=20:fontsize=28:fontcolor=yellow:shadowcolor=black:shadowx=2:shadowy=2"
+        ),
+        '-f', 'flv',
+        push_url
+    ]
+
+RESTART_DELAY = 5  # 重启间隔(秒)
+
+
+def monitor_stream():
+    while True:
+        # 杀死所有属于当前用户的 ffmpeg 进程,防止冲突
+        os.system("pkill -9 ffmpeg > /dev/null 2>&1")
+        print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 正在启动 FFmpeg 推流...")
+
+        # 启动进程,将 stderr 重定向以便监控状态
+        process = subprocess.Popen(
+            FFMPEG_CMD,
+            stdout=subprocess.PIPE,
+            stderr=subprocess.STDOUT,
+            universal_newlines=True,
+            bufsize=1
+        )
+
+        try:
+            # 实时读取 FFmpeg 输出的状态行
+            while True:
+                line = process.stdout.readline()
+                if not line:
+                    break
+                # 监控推流状态(例如抓取 fps 和 bitrate)
+                if "frame=" in line:
+                    # 使用正则或字符串切片提取关键信息
+                    stats = re.findall(r'(frame=.*?bitrate=.*?speed=.*?x)', line)
+                    if stats:
+                        print(f"\r推流中: {stats[0].strip()}", end="")
+                # 检查进程是否意外退出
+                if process.poll() is not None:
+                    break
+        except Exception as e:
+            print(f"\n监控异常: {e}")
+        finally:
+            # 确保彻底杀死进程
+            if process.poll() is None:
+                process.terminate()
+                try:
+                    process.wait(timeout=5)
+                except subprocess.TimeoutExpired:
+                    process.kill()
+
+            print(f"[{time.strftime('%H:%M:%S')}] FFmpeg 进程已结束,{RESTART_DELAY}秒后重启...")
+            time.sleep(RESTART_DELAY)
+
+
+if __name__ == "__main__":
+    try:
+        monitor_stream()
+    except KeyboardInterrupt:
+        print("用户手动停止,程序退出。")

+ 229 - 0
picam_cv.py

@@ -0,0 +1,229 @@
+import os
+import re
+import json
+import subprocess
+import time
+import numpy as np
+import cv2
+import threading
+import requests
+from setting import WEBHOOK_URL
+from setting import RTMP_URL
+from setting import CAM_NAME
+
+# --- 配置参数 ---
+FONT_PATH = "/usr/share/fonts/truetype/wqy/wqy-microhei.ttc"
+cam_name = CAM_NAME
+push_url = RTMP_URL
+notify_interval = 30  # 告警间隔,设为60秒防止轰炸
+WIDTH, HEIGHT = 960, 540 # 沿用你跑通的分辨率
+FPS = 25
+y_size = WIDTH * HEIGHT
+frame_size = int(y_size * 1.5)
+notify = False
+
+
+def send_ding_msg(detect_time):
+    if not notify:
+        print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] notify 参数为 False, 不发送通知")
+        return
+
+    current_hour = time.localtime().tm_hour
+    if current_hour >= 20 or current_hour < 8:
+        print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 夜间免打扰模式, 不发送 webhook 通知")
+        return
+
+    """
+    异步发送钉钉通知
+    """
+    headers = {'Content-Type': 'application/json'}
+    
+    # 构造钉钉要求的 JSON 结构
+    data = {
+        "msgtype": "text",
+        "text": {
+            "content": (
+                f"【cam-alert】\n"
+                f"机位:{cam_name}\n"
+                f"时间:{detect_time}\n"
+                f"状态:检测到物体移动\n"
+                f"请查看回放对齐画面 OSD 时间。"
+            )
+        },
+        "at": {
+            "isAtAll": False  # 如果需要提醒所有人,设为 True
+        }
+    }
+
+    try:
+        response = requests.post(
+            WEBHOOK_URL, 
+            data=json.dumps(data), 
+            headers=headers, 
+            timeout=5
+        )
+        res_json = response.json()
+        if res_json.get("errcode") == 0:
+            print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 钉钉通知发送成功")
+        else:
+            print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 钉钉发送失败: {res_json.get('errmsg')}")
+    except Exception as e:
+        print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 网络异常(钉钉): {e}")
+
+
+def monitor_ffmpeg_log(stderr_pipe):
+    print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 日志监控线程已启动...")
+    """
+    专门解析 FFmpeg stderr 日志的线程函数
+    """
+    # 正则表达式匹配:帧数、FPS、码率、速度
+    # 示例: frame=  150 fps= 25 q=28.0 size=    1024kB time=00:00:06.00 bitrate=1365.3kbits/s speed=1.01x
+    regex = re.compile(r"frame=\s*(\d+)\s+fps=\s*([\d.]+)\s+.*?bitrate=\s*([\d.]+kbits/s)\s+speed=\s*([\d.]+x)")
+
+    while True:
+        # FFmpeg 的进度信息是用 \r 更新的
+        # 我们一次读一部分字节,并按 \r 或 \n 切分
+        try:
+            # 这里的 read(128) 能保证及时拿到数据
+            chunk = stderr_pipe.read(128).decode('utf-8', errors='ignore')
+            if not chunk:
+                break
+            
+            # 只要包含 frame=,就说明是进度行
+            if "frame=" in chunk:
+                # 使用正则或简单的字符串查找提取 fps 和 speed
+                # 简单打印出来看看是否有输出
+                line_str = chunk.strip().replace('\r', '\n').split('\n')[-1]
+                #print(f">>> FFmpeg 状态: {line_str}")
+                # 查找统计信息行
+                match = regex.search(line_str)
+                if match:
+                    frame, fps, bitrate, speed = match.groups()
+                    # 这里你可以根据数值做逻辑处理
+                    # 例如:如果 speed < 0.9x,说明 CPU 跑不动了或者网络堵塞
+                    #print(f"--- 推流状态: FPS={fps}, 码率={bitrate}, 速度={speed} ---")
+            
+                    # 如果速度太慢,可以触发一个标志位让主程序知道
+                    # if float(speed[:-1]) < 0.8: print("警告:推流速度过慢!")
+
+                # 也可以捕捉特定的错误信息
+                if "Broken pipe" in line_str or "Connection refused" in line_str:
+                    print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 🚨 关键错误:推流连接断开!")
+        except Exception as e:
+            print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 日志读取异常: {e}")
+            break
+
+
+# --- FFmpeg 命令:整合了你的稳定参数 + 我们的管道输入 ---
+FFMPEG_CMD = [
+    'ffmpeg', '-y',
+    '-hide_banner',
+    '-f', 'rawvideo', '-pixel_format', 'yuv420p',
+    '-video_size', f'{WIDTH}x{HEIGHT}',
+    '-framerate', str(FPS),
+    '-i', '-', # 从 Python 管道接收数据
+    '-thread_queue_size', '8192',
+    '-f', 'alsa', '-channels', '1', '-i', 'hw:1,0', # 加上音频防止断流
+    '-vf', (
+        # 将 cam_name 和 localtime 拼在一起,中间用 | 分隔
+        f"drawtext=fontfile={FONT_PATH}:"
+        f"text='{cam_name} | %{{localtime\\:%Y-%m-%d %H\\\\\\:%M\\\\\\:%S}}':"
+        "x=20:y=20:fontsize=24:fontcolor=yellow:shadowcolor=black:shadowx=2:shadowy=2"
+    ),
+    '-vcodec', 'libx264', # 使用你验证成功的软编
+    '-acodec', 'aac',
+    '-b:v', '1M', '-b:a', '64K',
+    '-preset:v', 'ultrafast',
+    '-tune:v', 'zerolatency',
+    '-g', '25',                # 将 GOP 缩短到 25(即每秒一个关键帧),方便服务器切片
+    '-keyint_min', '25',
+    '-sc_threshold', '0',      # 禁用场景切换检测,保证 GOP 长度固定
+    '-f', 'flv',
+    push_url
+]
+
+
+def run_smart_cam():
+    while True:
+        # 清理残余进程,防止端口占用
+        os.system("pkill -9 ffmpeg > /dev/null 2>&1")
+        os.system("pkill -9 libcamera-vid > /dev/null 2>&1")
+        print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 休眠 5s 等待残余进程被彻底清理...")
+        time.sleep(5)
+        
+        # 1. 启动 libcamera 采集原始数据(为了 OpenCV 必须拿原始流)
+        # 如果你一定要用 v4l2,可以改用 ffmpeg 读取再转给 python
+        input_cmd = [
+            'rpicam-vid', '-t', '0', '--width', str(WIDTH), '--height', str(HEIGHT),
+            '--framerate', str(FPS), '--codec', 'yuv420', '--nopreview', '--flush', '-o', '-'
+        ]
+        
+        in_proc = subprocess.Popen(input_cmd, stdout=subprocess.PIPE, bufsize=frame_size*3)
+        #out_proc = subprocess.Popen(FFMPEG_CMD, stdin=subprocess.PIPE)
+        out_proc = subprocess.Popen(
+                FFMPEG_CMD,
+                stdin=subprocess.PIPE,
+                stderr=None,  # 必须开启 stderr 捕获日志
+                bufsize=0                # 禁用缓冲,确保实时获取日志
+        )
+
+        # 启动 ffmpeg 日志监控线程
+        log_thread = threading.Thread(target=monitor_ffmpeg_log, args=(out_proc.stderr,))
+        log_thread.daemon = True
+        log_thread.start()
+        
+        fgbg = cv2.createBackgroundSubtractorMOG2(history=100, varThreshold=50)
+        
+        print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 智能推流启动...")
+        try:
+            last_log_time = 0
+            # --- 循环外定义变量 ---
+            last_notify_time = 0
+            while True:
+                raw_data = in_proc.stdout.read(frame_size)
+                if len(raw_data) < frame_size: break
+                
+                # --- OpenCV 移动检测 (抽样检测节省 CPU) ---
+                # 直接在 Y 通道操作,不影响最终推流
+                y_buf = np.frombuffer(raw_data, dtype=np.uint8, count=y_size).reshape((HEIGHT, WIDTH))
+                
+                # 每 5 帧做一次检测
+                small_y = cv2.resize(y_buf, (212, 120)) 
+                fgmask = fgbg.apply(small_y)
+                is_moving = np.count_nonzero(fgmask > 200) > 50
+                if is_moving:
+                    current_time_val = time.time()
+                    # 获取精确到秒的时间戳
+                    readable_time = time.strftime('%Y-%m-%d %Y-%m-%d %H:%M:%S')
+                    # 冷却时间 2 秒,避免频繁触发
+                    if current_time_val - last_log_time > 2:
+                        #print(f"视频回放定位点: 请寻找画面 OSD 显示为 {readable_time} 附近的片段")
+                        # 如果你有关联的抓拍,文件名也用这个时间
+                        # filename = f"{save_dir}/motion_{time.strftime('%H%M%S')}.jpg"
+                        # cv2.imwrite(filename, y_buf)
+                        last_log_time = current_time_val
+
+                    if current_time_val - last_notify_time > notify_interval:
+                        # 开启线程异步发送
+                        t = threading.Thread(target=send_ding_msg, args=(readable_time,))
+
+                        t.daemon = True
+                        t.start()
+                        last_notify_time = current_time_val
+
+                # 将处理后的数据写入 FFmpeg
+                out_proc.stdin.write(raw_data)
+
+                if out_proc.poll() is not None: break
+
+        except Exception as e:
+            print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 运行异常: {e}")
+        finally:
+            in_proc.terminate()
+            out_proc.terminate()
+            print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] FFmpeg 进程已结束,5秒后重启...")
+            time.sleep(5)
+
+
+if __name__ == "__main__":
+    run_smart_cam()

+ 2 - 0
requirements.txt

@@ -0,0 +1,2 @@
+opencv-python==4.13.0.92
+requests==2.33.1

+ 3 - 0
setting.py

@@ -0,0 +1,3 @@
+CAM_NAME = "picam"
+RTMP_URL = "rtmp://abc.rtmp.com/abc/picam"
+WEBHOOK_URL = "https://oapi.dingtalk.com/robot/send?access_token=1234567890"