Jelajahi Sumber

添加 graylog 的 ding webhook 通知接口

reghao 1 tahun lalu
induk
melakukan
c3415d4588

+ 33 - 0
mgr/src/main/java/cn/reghao/devops/mgr/mgr/thirdparty/graylog/DingNotifyWrapper.java

@@ -0,0 +1,33 @@
+package cn.reghao.devops.mgr.mgr.thirdparty.graylog;
+
+import cn.reghao.devops.mgr.admin.sys.db.repository.NotifyReceiverRepository;
+import cn.reghao.devops.mgr.admin.sys.model.po.NotifyReceiver;
+import cn.reghao.devops.mgr.admin.sys.service.notifier.ding.DingMsg;
+import cn.reghao.devops.mgr.admin.sys.service.notifier.ding.DingNotify;
+import cn.reghao.jutil.tool.http.DefaultWebRequest;
+import org.springframework.stereotype.Service;
+
+/**
+ * @author reghao
+ * @date 2025-01-20 10:36:12
+ */
+@Service
+public class DingNotifyWrapper {
+    private final DingNotify dingNotify;
+    private final String webhook;
+
+    public DingNotifyWrapper(NotifyReceiverRepository notifyReceiverRepository) throws Exception {
+        this.dingNotify = new DingNotify(new DefaultWebRequest());
+        NotifyReceiver notifyReceiver = notifyReceiverRepository.findByName("ding-graylog");
+        String sign = notifyReceiver.getSign();
+        if (sign != null) {
+            this.webhook = dingNotify.getReceiver(notifyReceiver.getUrl(), notifyReceiver.getSign());
+        } else {
+            this.webhook = notifyReceiver.getUrl();
+        }
+    }
+
+    public void sendNotify(DingMsg dingMsg) throws Exception {
+        dingNotify.send(webhook, dingMsg);
+    }
+}

+ 21 - 0
mgr/src/main/java/cn/reghao/devops/mgr/mgr/thirdparty/graylog/ExceptionMessage.java

@@ -0,0 +1,21 @@
+package cn.reghao.devops.mgr.mgr.thirdparty.graylog;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+import java.time.LocalDateTime;
+
+/**
+ * @author reghao
+ * @date 2025-01-16 16:42:43
+ */
+@AllArgsConstructor
+@Getter
+public class ExceptionMessage {
+    private LocalDateTime date;
+    private String appName;
+    private String localIp;
+    private String url;
+    private String exceptionType;
+    private String exceptionMessage;
+}

+ 101 - 0
mgr/src/main/java/cn/reghao/devops/mgr/mgr/thirdparty/graylog/GraylogService.java

@@ -0,0 +1,101 @@
+package cn.reghao.devops.mgr.mgr.thirdparty.graylog;
+
+import cn.reghao.devops.mgr.admin.sys.service.notifier.ding.DingMsg;
+import cn.reghao.devops.mgr.mgr.thirdparty.graylog.pc.MessageProducer;
+import cn.reghao.jutil.jdk.converter.DateTimeConverter;
+import cn.reghao.jutil.jdk.serializer.JsonConverter;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import org.springframework.stereotype.Service;
+
+import java.text.ParseException;
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author reghao
+ * @date 2025-01-20 11:46:16
+ */
+@Service
+public class GraylogService {
+    private Map<String, ExceptionMessage> map = new HashMap<>();
+    private final MessageProducer messageProducer;
+
+    public GraylogService(MessageProducer messageProducer) {
+        this.messageProducer = messageProducer;
+    }
+
+    public void processLog(String json) {
+        JsonObject jsonObject = JsonConverter.jsonToJsonElement(json).getAsJsonObject();
+        JsonArray jsonArray = jsonObject.get("backlog").getAsJsonArray();
+        for (JsonElement jsonElement : jsonArray) {
+            JsonObject jsonObject1 = jsonElement.getAsJsonObject();
+            JsonObject fieldObject = jsonObject1.get("fields").getAsJsonObject();
+
+            try {
+                String title = "运行告警通知";
+                DingMsg dingMsg = processDingMsg(title, fieldObject);
+                if (dingMsg != null) {
+                    messageProducer.put(dingMsg);
+                }
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    private DingMsg processDingMsg(String title, JsonObject jsonObject) {
+        try {
+            String date = jsonObject.get("Date").getAsString();
+            LocalDateTime localDateTime = DateTimeConverter.localDateTime2(date);
+
+            String appName = jsonObject.get("ApplicationName").getAsString();
+            String localIp = jsonObject.get("LocalIp").getAsString();
+
+            String url = "";
+            JsonElement jsonElement = jsonObject.get("Url");
+            if (jsonElement != null) {
+                url = jsonElement.getAsString();
+            }
+
+            String exceptionType = jsonObject.get("ExceptionType").getAsString();
+            String exceptionMessage = jsonObject.get("ExceptionMessage").getAsString();
+            ExceptionMessage currentMessage = new ExceptionMessage(localDateTime, appName, localIp, url, exceptionType, exceptionMessage);
+
+            String mapKey = String.format("%s-%s-%s", appName, localIp, url);
+            ExceptionMessage lastMessage = map.get(mapKey);
+            if (lastMessage != null) {
+                LocalDateTime localDateTime1 = lastMessage.getDate();
+                long millis = Duration.between(localDateTime, localDateTime1).toMillis();
+                if (millis > 600_000) {
+                    map.put(mapKey, currentMessage);
+                    return getDingMsg(title, currentMessage);
+                }
+            } else {
+                map.put(mapKey, currentMessage);
+                return getDingMsg(title, currentMessage);
+            }
+        } catch (ParseException e) {
+            e.printStackTrace();
+        }
+
+        return null;
+    }
+
+    private DingMsg getDingMsg(String title, ExceptionMessage message) {
+        String appName = message.getAppName();
+        String localIp = message.getLocalIp();
+        String head = String.format("# %s(%s) 运行时异常", appName, localIp);
+
+        StringBuilder sb = new StringBuilder();
+        sb.append(head).append(System.lineSeparator());
+        sb.append("### 时间: ").append(DateTimeConverter.format(message.getDate())).append(System.lineSeparator());
+        sb.append("### 请求 URL: ").append(message.getUrl()).append(System.lineSeparator());
+        sb.append("### 异常类型: ").append(message.getExceptionType()).append(System.lineSeparator());
+        sb.append("### 异常消息: ").append(message.getExceptionMessage()).append(System.lineSeparator());
+        return new DingMsg(title, sb.toString());
+    }
+}

+ 32 - 0
mgr/src/main/java/cn/reghao/devops/mgr/mgr/thirdparty/graylog/ThirdpartyController.java

@@ -0,0 +1,32 @@
+package cn.reghao.devops.mgr.mgr.thirdparty.graylog;
+
+import cn.reghao.jutil.jdk.result.WebResult;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import org.springframework.http.MediaType;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+/**
+ * @author reghao
+ * @date 2025-01-16 17:53:56
+ */
+@Api(tags = "第三方服务接口")
+@RestController
+@RequestMapping("/api/thirdparty")
+public class ThirdpartyController {
+    private final GraylogService graylogService;
+
+    public ThirdpartyController(GraylogService graylogService) {
+        this.graylogService = graylogService;
+    }
+
+    @ApiOperation(value = "graylog webhook", notes = "N")
+    @PostMapping(value = "/graylog/notify", produces = MediaType.APPLICATION_JSON_VALUE)
+    public String dingNotify(@RequestBody String json) {
+        graylogService.processLog(json);
+        return WebResult.success();
+    }
+}

+ 55 - 0
mgr/src/main/java/cn/reghao/devops/mgr/mgr/thirdparty/graylog/pc/MessageConsumer.java

@@ -0,0 +1,55 @@
+package cn.reghao.devops.mgr.mgr.thirdparty.graylog.pc;
+
+import cn.reghao.devops.mgr.admin.sys.service.notifier.ding.DingMsg;
+import cn.reghao.devops.mgr.mgr.thirdparty.graylog.DingNotifyWrapper;
+import cn.reghao.jutil.jdk.thread.ThreadPoolWrapper;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import java.util.concurrent.*;
+
+/**
+ * @author reghao
+ * @date 2024-12-22 21:19:57
+ */
+@Slf4j
+@Component
+public class MessageConsumer {
+    private final ScheduledExecutorService scheduler = ThreadPoolWrapper.scheduledThreadPool("graylog-notify", 3);
+    private final Object monitor;
+    private final MessageProducer messageProducer;
+    private final DingNotifyWrapper dingNotifyWrapper;
+
+    public MessageConsumer(MessageProducer messageProducer, DingNotifyWrapper dingNotifyWrapper) {
+        this.monitor = messageProducer.getMonitor();
+        this.messageProducer = messageProducer;
+        this.dingNotifyWrapper = dingNotifyWrapper;
+    }
+
+    @PostConstruct
+    public void run() {
+        // 每 3s 执行一次
+        scheduler.scheduleAtFixedRate(new ConsumeThread(), 0, 3, TimeUnit.SECONDS);
+        log.info("graylog-notify 模型启动...");
+    }
+
+    class ConsumeThread implements Runnable {
+        @Override
+        public void run() {
+            try {
+                Object object = messageProducer.get();
+                if (object != null) {
+                    if (object instanceof DingMsg) {
+                        DingMsg dingMsg = (DingMsg) object;
+                        dingNotifyWrapper.sendNotify(dingMsg);
+                    } else {
+                        log.error("Object 类型未知");
+                    }
+                }
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+}

+ 40 - 0
mgr/src/main/java/cn/reghao/devops/mgr/mgr/thirdparty/graylog/pc/MessageProducer.java

@@ -0,0 +1,40 @@
+package cn.reghao.devops.mgr.mgr.thirdparty.graylog.pc;
+
+import org.springframework.stereotype.Component;
+
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * @author reghao
+ * @date 2024-12-22 21:19:48
+ */
+@Component
+public class MessageProducer {
+    private final Object monitor;
+    private final LinkedBlockingQueue<Object> dataQueue;
+
+    public MessageProducer() {
+        this.monitor = new Object();
+        this.dataQueue = new LinkedBlockingQueue<>(10_000);
+    }
+
+    public Object getMonitor() {
+        return monitor;
+    }
+
+    public void put(Object object) throws InterruptedException {
+        dataQueue.put(object);
+        synchronized (monitor) {
+            // 通知 consumer 线程有数据可用
+            monitor.notify();
+        }
+    }
+
+    public Object get() throws InterruptedException {
+        return dataQueue.poll();
+    }
+
+    public int size() {
+        return dataQueue.size();
+    }
+}