소스 검색

update LogHandler

reghao 2 달 전
부모
커밋
63fb646124
1개의 변경된 파일128개의 추가작업 그리고 96개의 파일을 삭제
  1. 128 96
      web/src/main/java/cn/reghao/bnt/web/ws/handler/LogHandler.java

+ 128 - 96
web/src/main/java/cn/reghao/bnt/web/ws/handler/LogHandler.java

@@ -7,13 +7,14 @@ import cn.reghao.jutil.jdk.web.log.AppLog;
 import cn.reghao.jutil.jdk.serializer.JdkSerializer;
 import cn.reghao.jutil.jdk.serializer.JsonConverter;
 import cn.reghao.jutil.jdk.thread.ThreadPoolWrapper;
+import cn.reghao.jutil.jdk.web.log.NginxLog;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Component;
 import org.springframework.web.socket.*;
 
 import java.io.IOException;
-import java.time.LocalDateTime;
-import java.time.ZoneOffset;
+import java.time.*;
+import java.time.format.DateTimeFormatter;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
@@ -30,10 +31,9 @@ public class LogHandler implements WebSocketHandler {
     private final Map<String, String> appMap = new HashMap<>();
     private final Map<String, WebSocketSession> pushSessionMap = new ConcurrentHashMap<>();
     private final Map<String, WebSocketSession> pullSessionMap = new ConcurrentHashMap<>();
-    private final List<AppLog> appLogs = new ArrayList<>();
 
     public LogHandler() {
-        //threadPool.submit(new PushTask());
+        threadPool.submit(new PushTask());
     }
 
     public List<String> getApps() {
@@ -46,31 +46,6 @@ public class LogHandler implements WebSocketHandler {
                 .collect(Collectors.toList());
     }
 
-    @Override
-    public void afterConnectionEstablished(WebSocketSession webSocketSession) throws IOException {
-        String sessionId = webSocketSession.getId();
-        String query = webSocketSession.getUri().getQuery();
-        Map<String, String> map = parseParams(query);
-        String app = map.get("app");
-        String host = map.get("host");
-        if (app == null || host == null) {
-            if (webSocketSession.isOpen()) {
-                webSocketSession.close();
-            }
-        }
-
-        String appKey = String.format("%s@%s", app, host);
-        String path = webSocketSession.getUri().getPath();
-        if (path.equals(WebSocketPath.wsPushLog)) {
-            appMap.put(sessionId, appKey);
-            pushSessionMap.put(appKey, webSocketSession);
-        } else if (path.equals(WebSocketPath.wsPullLog)) {
-            pullSessionMap.put(appKey, webSocketSession);
-        }
-
-        log.info("WebSocket 建立连接");
-    }
-
     private void removeSession(WebSocketSession webSocketSession) {
         String sessionId = webSocketSession.getId();
         String appKey = appMap.get(sessionId);
@@ -86,20 +61,16 @@ public class LogHandler implements WebSocketHandler {
         }
     }
 
-    public void sendAccessLog(AccessLogInfo accessLogInfo) {
-        String app = "tnb";
-        String host = "localhost";
-        WebSocketSession pullSession = getPullSession(app, host);
-        if (pullSession != null) {
-            String jsonData = JsonConverter.objectToJson(accessLogInfo);
-            WebSocketMessage<String> message1 = new TextMessage(jsonData);
-            try {
-                pullSession.sendMessage(message1);
-            } catch (Exception e) {
-                log.error("websocket error {}", e.getMessage());
-                removeSession(pullSession);
+    private WebSocketSession getPullSession(String app, String host) {
+        String suffix = String.format("%s@%s", app, host);
+        Set<String> keys = pullSessionMap.keySet();
+        for (String key : keys) {
+            if (key.endsWith(suffix)) {
+                return pullSessionMap.get(key);
             }
         }
+
+        return null;
     }
 
     private Map<String, String> parseParams(String query) {
@@ -109,11 +80,34 @@ public class LogHandler implements WebSocketHandler {
             String[] arr = param.split("=");
             map.put(arr[0], arr[1]);
         }
-
         return map;
     }
 
-    Map<Long, Integer> ngxLogMap = new TreeMap<>();
+    @Override
+    public void afterConnectionEstablished(WebSocketSession webSocketSession) throws IOException {
+        String sessionId = webSocketSession.getId();
+        String query = webSocketSession.getUri().getQuery();
+        Map<String, String> map = parseParams(query);
+        String app = map.get("app");
+        String host = map.get("host");
+        if (app == null || host == null) {
+            if (webSocketSession.isOpen()) {
+                webSocketSession.close();
+            }
+        }
+
+        String appKey = String.format("%s@%s", app, host);
+        String path = webSocketSession.getUri().getPath();
+        if (path.equals(WebSocketPath.wsPushLog)) {
+            appMap.put(sessionId, appKey);
+            pushSessionMap.put(appKey, webSocketSession);
+        } else if (path.equals(WebSocketPath.wsPullLog)) {
+            pullSessionMap.put(appKey, webSocketSession);
+        }
+
+        log.info("WebSocket 建立连接");
+    }
+
     @Override
     public void handleMessage(WebSocketSession webSocketSession, WebSocketMessage<?> webSocketMessage)
             throws IOException {
@@ -122,7 +116,7 @@ public class LogHandler implements WebSocketHandler {
             if (webSocketMessage instanceof TextMessage) {
                 String payload = (String) webSocketMessage.getPayload();
             } else if (webSocketMessage instanceof BinaryMessage) {
-                log.info("接收到 WebSocket 二进制消息");
+                //log.info("接收到 WebSocket 二进制消息");
                 BinaryMessage binaryMessage = (BinaryMessage) webSocketMessage;
                 Object object = JdkSerializer.deserialize(binaryMessage.getPayload().array());
                 if (object instanceof String) {
@@ -130,23 +124,10 @@ public class LogHandler implements WebSocketHandler {
                 } else if (object instanceof AppLog) {
                     AppLog appLog = (AppLog) object;
                     String dateTimeStr = DateTimeConverter.format(appLog.getTimestamp());
-                    appLogs.add(appLog);
-                } /*else if (object instanceof NginxLog) {
+                } else if (object instanceof NginxLog) {
                     NginxLog nginxLog = (NginxLog) object;
-                    String date = nginxLog.getTimeIso8601();
-                    DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss+08:00")
-                            .withZone(ZoneId.of("UTC"));
-                    LocalDateTime localDateTime = LocalDateTime.parse(date, formatter);
-                    Long timestamp = localDateTime.toEpochSecond(ZoneOffset.of("+8"));
-                    Long key = timestamp;
-                    Integer count = ngxLogMap.get(key);
-                    if (count == null) {
-                        ngxLogMap.put(key, 1);
-                    } else {
-                        int count1 = ngxLogMap.get(key) + 1;
-                        ngxLogMap.put(key, count1);
-                    }
-                }*/
+                    processNginxLog(nginxLog);
+                }
             } else if (webSocketMessage instanceof PingMessage) {
                 log.info("接收到 WebSocket PingMessage");
             } else if (webSocketMessage instanceof PongMessage) {
@@ -161,7 +142,7 @@ public class LogHandler implements WebSocketHandler {
 
     @Override
     public void handleTransportError(WebSocketSession webSocketSession, Throwable throwable) {
-        log.error("WebSocket 数据传输错误");
+        log.error("WebSocket 数据传输错误: {}", throwable.getMessage());
         removeSession(webSocketSession);
     }
 
@@ -171,23 +152,40 @@ public class LogHandler implements WebSocketHandler {
         removeSession(webSocketSession);
     }
 
-    private WebSocketSession getPullSession(String app, String host) {
-        String suffix = String.format("%s@%s", app, host);
-        Set<String> keys = pullSessionMap.keySet();
-        for (String key : keys) {
-            if (key.endsWith(suffix)) {
-                return pullSessionMap.get(key);
-            }
-        }
-
-        return null;
-    }
-
     @Override
     public boolean supportsPartialMessages() {
         return false;
     }
 
+    String matchedMethod = "GET";
+    String matchedUrl = "/api";
+    Map<String, Map<Long, Integer>> map = new HashMap<>();
+    Map<Long, Integer> ngxLogMap = new TreeMap<>();
+    DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss+08:00").withZone(ZoneId.of("UTC"));
+    DateTimeFormatter formatter1 = DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneId.of("UTC"));
+    private void processNginxLog(NginxLog nginxLog) {
+        String todayStr = LocalDateTime.now().toLocalDate().format(formatter1);
+        String method = nginxLog.getRequestMethod();
+        String url = nginxLog.getUrl();
+        if (method.equals(matchedMethod) && url.startsWith(matchedUrl)) {
+            String date = nginxLog.getTimeIso8601();
+            LocalDateTime localDateTime = LocalDateTime.parse(date, formatter);
+            LocalDate localDate = localDateTime.toLocalDate();
+            String localDateStr = localDate.format(formatter1);
+            Map<Long, Integer> ngxLogMap = map.computeIfAbsent(localDateStr, k -> new TreeMap<>());
+
+            LocalTime localTime = localDateTime.toLocalTime();
+            Long timestampSecond = LocalDateTime.parse(date, formatter).toEpochSecond(ZoneOffset.of("+8"));
+            Integer count = ngxLogMap.get(timestampSecond);
+            if (count == null) {
+                ngxLogMap.put(timestampSecond, 1);
+            } else {
+                int count1 = ngxLogMap.get(timestampSecond) + 1;
+                ngxLogMap.put(timestampSecond, count1);
+            }
+        }
+    }
+
     /**
      * NginxLog 在前端 echarts 中的可视化
      *
@@ -199,40 +197,74 @@ public class LogHandler implements WebSocketHandler {
         @Override
         public void run() {
             while (!Thread.interrupted()) {
+                Set<String> dateSet = map.keySet();
                 try {
                     if (ngxLogMap.size() < 3) {
                         Thread.sleep(10_000);
                         continue;
                     }
 
-                    LocalDateTime localDateTime = LocalDateTime.now();
-                    Long baseKey = localDateTime.toEpochSecond(ZoneOffset.of("+8"));
-
-                    List<String> xList = new ArrayList<>();
-                    List<Integer> yList = new ArrayList<>();
-                    Set<Long> keys = new HashSet<>();
-                    for (Long key : ngxLogMap.keySet()) {
-                        if (key < baseKey) {
-                            xList.add(DateTimeConverter.format(key*1000).split(" ")[1]);
-                            yList.add(ngxLogMap.get(key));
-                            keys.add(key);
+                    List<Object> chartData = getChartData();
+                    TextMessage textMessage = new TextMessage(JsonConverter.objectToJson(chartData));
+                    pullSessionMap.values().forEach(webSocketSession -> {
+                        try {
+                            webSocketSession.sendMessage(textMessage);
+                        } catch (IOException e) {
+                            e.printStackTrace();
                         }
-                    }
-
-                    keys.forEach(ngxLogMap::remove);
-                    keys.clear();
-                    List results = new ArrayList();
-                    results.add(xList.toArray());
-                    results.add(yList.toArray());
-
-                    WebSocketSession webSocketSession = getPullSession("admin-service", "172.16.90.200");
-                    TextMessage textMessage = new TextMessage(JsonConverter.objectToJson(results));
-                    webSocketSession.sendMessage(textMessage);
-                    Thread.sleep(10_000);
+                    });
+                    Thread.sleep(3_000);
                 } catch (Exception e) {
                     e.printStackTrace();
                 }
             }
         }
+
+        /**
+         * x 轴显示时分秒
+         * y 轴显示访问次数
+         *
+         * @param
+         * @return
+         * @date 2025-12-24 17:48:10
+         */
+        private List<Object> getChartData() {
+            Long timestampSecondCurrent = LocalDateTime.now().toEpochSecond(ZoneOffset.of("+8"));
+            List<String> xList = new ArrayList<>();
+            List<Integer> yList = new ArrayList<>();
+            Set<Long> keys = new HashSet<>();
+            for (Long timestampSecond : ngxLogMap.keySet()) {
+                if (timestampSecond < timestampSecondCurrent) {
+                    // x 轴显示时分秒
+                    xList.add(DateTimeConverter.format(timestampSecond*1000).split(" ")[1]);
+                    // y 轴显示访问次数
+                    yList.add(ngxLogMap.get(timestampSecond));
+                    keys.add(timestampSecond);
+                }
+            }
+
+            keys.forEach(ngxLogMap::remove);
+            keys.clear();
+            List<Object> results = new ArrayList<>();
+            results.add(xList.toArray());
+            results.add(yList.toArray());
+            return results;
+        }
+    }
+
+    public void sendAccessLog(AccessLogInfo accessLogInfo) {
+        String app = "tnb";
+        String host = "localhost";
+        WebSocketSession pullSession = getPullSession(app, host);
+        if (pullSession != null) {
+            String jsonData = JsonConverter.objectToJson(accessLogInfo);
+            WebSocketMessage<String> message1 = new TextMessage(jsonData);
+            try {
+                pullSession.sendMessage(message1);
+            } catch (Exception e) {
+                log.error("websocket error {}", e.getMessage());
+                removeSession(pullSession);
+            }
+        }
     }
 }