Browse Source

添加 Webhook 通知, 并利用 webhook 发送 rtmp 事件通知

reghao 5 months ago
parent
commit
74e2c34366

+ 1 - 1
message/message-api/src/main/java/cn/reghao/tnb/message/api/constant/NotifyType.java

@@ -7,7 +7,7 @@ package cn.reghao.tnb.message.api.constant;
  * @date 2020-03-01 17:18:53
  */
 public enum NotifyType {
-    email(1), mobile(2);
+    email(1), mobile(2), webhook(3);
 
     private final int value;
     NotifyType(int value) {

+ 1 - 1
message/message-api/src/main/java/cn/reghao/tnb/message/api/dto/msg/BaseMessage.java

@@ -16,7 +16,7 @@ public class BaseMessage implements Serializable {
     private static final long serialVersionUID = 1L;
 
     private int notifyType;
-    @NotBlank(message = "邮箱或手机号不能为空白字符串")
+    //@NotBlank(message = "邮箱或手机号不能为空白字符串")
     private String receiver;
     private long timestamp;
 

+ 24 - 0
message/message-api/src/main/java/cn/reghao/tnb/message/api/dto/msg/RtmpMessage.java

@@ -0,0 +1,24 @@
+package cn.reghao.tnb.message.api.dto.msg;
+
+import cn.reghao.tnb.message.api.constant.NotifyType;
+import lombok.Getter;
+
+/**
+ * @author reghao
+ * @date 2025-10-16 23:03:01
+ */
+@Getter
+public class RtmpMessage extends BaseMessage {
+    private String callType;
+    private String appName;
+    private String clientId;
+    private String clientAddress;
+
+    public RtmpMessage(String callType, String appName, String clientId, String clientAddress) {
+        super(NotifyType.webhook.getValue(), "");
+        this.callType = callType;
+        this.appName = appName;
+        this.clientId = clientId;
+        this.clientAddress = clientAddress;
+    }
+}

+ 25 - 0
message/message-service/src/main/java/cn/reghao/tnb/message/app/config/BeanConfig.java

@@ -0,0 +1,25 @@
+package cn.reghao.tnb.message.app.config;
+
+import cn.reghao.jutil.jdk.http.WebClient;
+import cn.reghao.jutil.jdk.http.WebRequest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.web.client.RestTemplate;
+
+/**
+ * @author reghao
+ * @date 2023-08-02 09:14:17
+ */
+@Configuration
+public class BeanConfig {
+    @Bean
+    public RestTemplate restTemplate(){
+        return new RestTemplate();
+    }
+
+    @Bean
+    public WebRequest webRequest() {
+        //return new DefaultWebRequest();
+        return new WebClient();
+    }
+}

+ 14 - 0
message/message-service/src/main/java/cn/reghao/tnb/message/app/db/mapper/WebhookMapper.java

@@ -0,0 +1,14 @@
+package cn.reghao.tnb.message.app.db.mapper;
+
+import cn.reghao.jutil.jdk.db.BaseMapper;
+import cn.reghao.tnb.message.app.model.po.Webhook;
+import org.apache.ibatis.annotations.Mapper;
+
+/**
+ * @author reghao
+ * @date 2025-10-16 23:14:17
+ */
+@Mapper
+public interface WebhookMapper extends BaseMapper<Webhook> {
+    Webhook findByName(String name);
+}

+ 33 - 0
message/message-service/src/main/java/cn/reghao/tnb/message/app/model/msg/DingRtmpMsg.java

@@ -0,0 +1,33 @@
+package cn.reghao.tnb.message.app.model.msg;
+
+import cn.reghao.jutil.jdk.converter.DateTimeConverter;
+import cn.reghao.tnb.message.api.dto.msg.RtmpMessage;
+import cn.reghao.tnb.message.app.service.notifier.ding.DingMsg;
+import lombok.Getter;
+
+/**
+ * @author reghao
+ * @date 2025-10-13 15:31:48
+ */
+@Getter
+public class DingRtmpMsg {
+    private final RtmpMessage rtmpMessage;
+
+    public DingRtmpMsg(RtmpMessage rtmpMessage) {
+        this.rtmpMessage = rtmpMessage;
+    }
+
+    public DingMsg dingMsg() {
+        String title = "RTMP 通知";
+
+        StringBuilder sb = new StringBuilder();
+        sb.append("# RTMP 通知").append(System.lineSeparator());
+        sb.append("### RTMP 事件: ").append(rtmpMessage.getCallType()).append(System.lineSeparator());
+        sb.append("### 摄像头: ").append(rtmpMessage.getAppName()).append(System.lineSeparator());
+        sb.append("### 客户 ID: ").append(rtmpMessage.getClientId()).append(System.lineSeparator());
+        sb.append("### 客户 IP: ").append(rtmpMessage.getClientAddress()).append(System.lineSeparator());
+        sb.append("### 时间: ").append(DateTimeConverter.format(rtmpMessage.getTimestamp()));
+        String text = sb.toString();
+        return new DingMsg(title, text);
+    }
+}

+ 20 - 0
message/message-service/src/main/java/cn/reghao/tnb/message/app/model/po/Webhook.java

@@ -0,0 +1,20 @@
+package cn.reghao.tnb.message.app.model.po;
+
+import cn.reghao.jutil.jdk.db.BaseObject;
+import lombok.Getter;
+import lombok.Setter;
+
+import javax.validation.constraints.NotBlank;
+
+/**
+ * @author reghao
+ * @date 2021-03-18 21:01:46
+ */
+@Setter
+@Getter
+public class Webhook extends BaseObject<Integer> {
+    private String name;
+    @NotBlank
+    private String url;
+    private String sign;
+}

+ 2 - 3
message/message-service/src/main/java/cn/reghao/tnb/message/app/rabbit/RabbitListeners.java

@@ -1,7 +1,6 @@
 package cn.reghao.tnb.message.app.rabbit;
 
 import cn.reghao.jutil.jdk.serializer.JsonConverter;
-import cn.reghao.tnb.message.api.dto.msg.BaseMessage;
 import cn.reghao.tnb.message.app.config.SpringLifecycle;
 import cn.reghao.tnb.message.app.service.MessageConsumer;
 import cn.reghao.tnb.message.app.ws.media.MediaService;
@@ -76,8 +75,8 @@ public class RabbitListeners {
             exchange = @Exchange(value = "amq.direct")),
             ackMode = "MANUAL"
     )
-    public void consumeNotifyMessage(@Payload BaseMessage baseMessage, Channel channel, Message message) {
-        messageConsumer.sendMessage(baseMessage);
+    public void consumeNotifyMessage(@Payload String jsonPayload, Channel channel, Message message) {
+        messageConsumer.sendMessage(jsonPayload);
         long deliveryTag = message.getMessageProperties().getDeliveryTag();
         try {
             log.info("手动确认消息 {}", deliveryTag);

+ 23 - 0
message/message-service/src/main/java/cn/reghao/tnb/message/app/service/MessageConsumer.java

@@ -1,5 +1,9 @@
 package cn.reghao.tnb.message.app.service;
 
+import cn.reghao.jutil.jdk.serializer.JsonConverter;
+import cn.reghao.tnb.message.api.dto.msg.RtmpMessage;
+import cn.reghao.tnb.message.app.model.msg.DingRtmpMsg;
+import cn.reghao.tnb.message.app.service.notifier.ding.DingMsg;
 import cn.reghao.tnb.user.api.dto.AdminMessage;
 import cn.reghao.tnb.message.api.constant.NotifyType;
 import cn.reghao.tnb.message.api.dto.msg.BaseMessage;
@@ -7,6 +11,7 @@ import cn.reghao.tnb.message.api.dto.msg.LoginMessage;
 import cn.reghao.tnb.message.api.dto.msg.VerifyMessage;
 import cn.reghao.tnb.message.app.service.notifier.email.EmailMsg;
 import cn.reghao.tnb.user.api.iface.UserService;
+import com.google.gson.JsonObject;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.dubbo.config.annotation.DubboReference;
 import org.springframework.stereotype.Service;
@@ -26,6 +31,17 @@ public class MessageConsumer {
         this.notifyService = notifyService;
     }
 
+    public void sendMessage(String jsonPayload) {
+        JsonObject jsonObject = JsonConverter.jsonToJsonElement(jsonPayload).getAsJsonObject();
+        int notifyType = jsonObject.get("notifyType").getAsInt();
+        if (notifyType == NotifyType.webhook.getValue()) {
+            RtmpMessage rtmpMessage = JsonConverter.jsonToObject(jsonPayload, RtmpMessage.class);
+            DingMsg dingMsg = new DingRtmpMsg(rtmpMessage).dingMsg();
+            String webhookName = "ding_bdbot";
+            notifyService.notify(webhookName, dingMsg);
+        }
+    }
+
     public void sendMessage(BaseMessage baseMessage) {
         if (baseMessage instanceof LoginMessage) {
             LoginMessage loginMessage = (LoginMessage) baseMessage;
@@ -53,6 +69,13 @@ public class MessageConsumer {
                 AdminMessage adminMessage = new AdminMessage(2, errMsg);
                 userService.sendAdminMessage(adminMessage);
             }
+        } else if (baseMessage instanceof RtmpMessage) {
+            RtmpMessage rtmpMessage = (RtmpMessage) baseMessage;
+            if (rtmpMessage.getNotifyType() == NotifyType.webhook.getValue()) {
+                DingMsg dingMsg = new DingRtmpMsg(rtmpMessage).dingMsg();
+                String webhookName = "ding_bdbot";
+                notifyService.notify(webhookName, dingMsg);
+            }
         }
     }
 }

+ 2 - 0
message/message-service/src/main/java/cn/reghao/tnb/message/app/service/NotifyService.java

@@ -1,6 +1,7 @@
 package cn.reghao.tnb.message.app.service;
 
 import cn.reghao.jutil.jdk.thread.ThreadPoolWrapper;
+import cn.reghao.tnb.message.app.model.po.Webhook;
 import cn.reghao.tnb.message.app.service.notifier.Notify;
 import cn.reghao.tnb.message.app.service.notifier.ding.DingMsg;
 import cn.reghao.tnb.message.app.service.notifier.ding.DingNotify;
@@ -44,6 +45,7 @@ public class NotifyService {
             threadPool.execute(new NotifyTask<>(emailNotify, receiver, emailMsg, userService));
         }
     }
+
     /**
      * @param
      * @return

+ 21 - 5
message/message-service/src/main/java/cn/reghao/tnb/message/app/service/notifier/ding/DingNotify.java

@@ -3,9 +3,11 @@ package cn.reghao.tnb.message.app.service.notifier.ding;
 import cn.reghao.jutil.jdk.http.WebRequest;
 import cn.reghao.jutil.jdk.http.WebResponse;
 import cn.reghao.jutil.jdk.serializer.JsonConverter;
-import cn.reghao.jutil.tool.http.DefaultWebRequest;
+import cn.reghao.tnb.message.app.db.mapper.WebhookMapper;
+import cn.reghao.tnb.message.app.model.po.Webhook;
 import cn.reghao.tnb.message.app.service.notifier.Notify;
 import com.google.gson.JsonObject;
+import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Component;
 
 import javax.crypto.Mac;
@@ -22,17 +24,32 @@ import java.util.Base64;
  * @author reghao
  * @date 2020-05-07 01:31:03
  */
+@Slf4j
 @Component
 public class DingNotify implements Notify<DingMsg> {
     private final WebRequest webRequest;
+    private final WebhookMapper webhookMapper;
 
-    public DingNotify() {
-        this.webRequest = new DefaultWebRequest();
+    public DingNotify(WebRequest webRequest, WebhookMapper webhookMapper) {
+        this.webRequest = webRequest;
+        this.webhookMapper = webhookMapper;
     }
 
     @Override
     public void send(String receiver, DingMsg msg) throws Exception {
-        WebResponse webResponse = webRequest.postJson(receiver, JsonConverter.objectToJson(msg));
+        Webhook webhook = webhookMapper.findByName(receiver);
+        if (webhook == null) {
+            log.info("没有可用于发送通知的 webhook 配置");
+            return;
+        }
+
+        String url = webhook.getUrl();
+        String sign = webhook.getSign();
+        if (sign != null) {
+            url = getReceiver(url, sign);
+        }
+
+        WebResponse webResponse = webRequest.postJson(url, JsonConverter.objectToJson(msg));
         if (webResponse.getStatusCode() != 200) {
             throw new Exception(webResponse.getBody());
         }
@@ -47,7 +64,6 @@ public class DingNotify implements Notify<DingMsg> {
 
     private String getReceiver(String url, String secret) throws InvalidKeyException, NoSuchAlgorithmException {
         long timestamp = System.currentTimeMillis();
-        //String secret = "SEC4d7e0c126147b3679c4d15e47dc26311ca053bd47b21f4025832a6e246a93ec4";
         String sign = calcSign(timestamp, secret);
         return url + String.format("&timestamp=%s&sign=%s", timestamp, sign);
     }

+ 17 - 0
message/message-service/src/main/resources/mapper/WebhookMapper.xml

@@ -0,0 +1,17 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+
+<mapper namespace="cn.reghao.tnb.message.app.db.mapper.WebhookMapper">
+    <insert id="save">
+        insert into msg_webhook
+        (`name`,`url`,`sign`)
+        values
+        (#{name},#{url},#{sign})
+    </insert>
+
+    <select id="findByName" resultType="cn.reghao.tnb.message.app.model.po.Webhook">
+        select *
+        from msg_webhook
+        where name=#{name}
+    </select>
+</mapper>