Pārlūkot izejas kodu

search-service 添加 websocket, 将日志实时提供给监控端

reghao 7 mēneši atpakaļ
vecāks
revīzija
3ec955d8da

+ 4 - 0
search/search-service/pom.xml

@@ -44,6 +44,10 @@
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-web</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-websocket</artifactId>
+        </dependency>
 
         <dependency>
             <groupId>mysql</groupId>

+ 9 - 0
search/search-service/src/main/java/cn/reghao/tnb/search/app/log/consumer/RabbitListeners.java

@@ -3,6 +3,7 @@ package cn.reghao.tnb.search.app.log.consumer;
 import cn.reghao.bnt.log.AppLog;
 import cn.reghao.bnt.log.GatewayLog;
 import cn.reghao.jutil.jdk.serializer.JsonConverter;
+import cn.reghao.tnb.search.app.ws.handler.LogHandler;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.amqp.rabbit.annotation.Exchange;
 import org.springframework.amqp.rabbit.annotation.Queue;
@@ -18,6 +19,12 @@ import org.springframework.stereotype.Component;
 @Slf4j
 @Component
 public class RabbitListeners {
+    private final LogHandler logHandler;
+
+    public RabbitListeners(LogHandler logHandler) {
+        this.logHandler = logHandler;
+    }
+
     @RabbitListener(bindings =@QueueBinding(
             value = @Queue(value = "tnb.log.gateway", durable = "true"),
             key = "tnb.log.gateway",
@@ -25,6 +32,7 @@ public class RabbitListeners {
     )
     public void accessLogConsumer(@Payload String msg) {
         GatewayLog gatewayLog = JsonConverter.jsonToObject(msg, GatewayLog.class);
+        logHandler.pushGatewayLog(gatewayLog);
         log.info("{} -> {}", gatewayLog.getRequestId(), gatewayLog.getRequestUrl());
     }
 
@@ -35,6 +43,7 @@ public class RabbitListeners {
     )
     public void runtimeLogConsumer(@Payload String msg) {
         AppLog appLog = JsonConverter.jsonToObject(msg, AppLog.class);
+        logHandler.pushAppLog(appLog);
         log.info("{}:{} -> {}", appLog.getApp(), appLog.getHost(), appLog.getMessage());
     }
 }

+ 13 - 0
search/search-service/src/main/java/cn/reghao/tnb/search/app/ws/Keys.java

@@ -0,0 +1,13 @@
+package cn.reghao.tnb.search.app.ws;
+
+/**
+ * @author reghao
+ * @date 2021-07-07 13:45:22
+ */
+public class Keys {
+    public static final String USER_UUID = "user-uuid";
+    // SSH 连接操作
+    public static final String OPS_CONNECT = "connect";
+    // SSH 命令操作
+    public static final String OPS_COMMAND = "command";
+}

+ 35 - 0
search/search-service/src/main/java/cn/reghao/tnb/search/app/ws/WebSocketConfig.java

@@ -0,0 +1,35 @@
+package cn.reghao.tnb.search.app.ws;
+
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.web.socket.WebSocketHandler;
+import org.springframework.web.socket.config.annotation.EnableWebSocket;
+import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
+import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
+
+/**
+ * @author reghao
+ * @date 2022-03-11 16:43:25
+ */
+@EnableWebSocket
+@Configuration
+public class WebSocketConfig implements WebSocketConfigurer {
+    private final WebSocketInterceptor webSocketInterceptor;
+    private final WebSocketHandler logHandler;
+
+    public WebSocketConfig(WebSocketInterceptor webSocketInterceptor,
+                           @Qualifier("logHandler") WebSocketHandler logHandler) {
+        this.webSocketInterceptor = webSocketInterceptor;
+        this.logHandler = logHandler;
+    }
+
+    @Override
+    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
+        registry.addHandler(logHandler, "/logws/pull/access")
+                .addInterceptors(webSocketInterceptor)
+                .setAllowedOrigins("*");
+        registry.addHandler(logHandler, "/logws/pull/runtime")
+                .addInterceptors(webSocketInterceptor)
+                .setAllowedOrigins("*");
+    }
+}

+ 44 - 0
search/search-service/src/main/java/cn/reghao/tnb/search/app/ws/WebSocketInterceptor.java

@@ -0,0 +1,44 @@
+package cn.reghao.tnb.search.app.ws;
+
+import org.springframework.http.server.ServerHttpRequest;
+import org.springframework.http.server.ServerHttpResponse;
+import org.springframework.http.server.ServletServerHttpRequest;
+import org.springframework.lang.Nullable;
+import org.springframework.stereotype.Component;
+import org.springframework.web.socket.WebSocketHandler;
+import org.springframework.web.socket.server.HandshakeInterceptor;
+
+import java.util.Map;
+
+/**
+ * @author reghao
+ * @date 2022-03-11 16:42:44
+ */
+@Component
+public class WebSocketInterceptor implements HandshakeInterceptor {
+    @Override
+    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,
+                                   WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
+        if (request instanceof ServletServerHttpRequest) {
+            ServletServerHttpRequest httpRequest = (ServletServerHttpRequest) request;
+            String path = httpRequest.getURI().getPath();
+            if (path.startsWith("/ws")) {
+                String query = httpRequest.getURI().getQuery();
+                String token = query.replace("token=", "");
+                if (!token.isBlank()) {
+                    attributes.put(Keys.USER_UUID, token);
+                    return true;
+                }
+            } else {
+                return true;
+            }
+        }
+
+        return true;
+    }
+
+    @Override
+    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response,
+                               WebSocketHandler wsHandler, @Nullable Exception exception) {
+    }
+}

+ 47 - 0
search/search-service/src/main/java/cn/reghao/tnb/search/app/ws/WsSender.java

@@ -0,0 +1,47 @@
+package cn.reghao.tnb.search.app.ws;
+
+import cn.reghao.jutil.jdk.serializer.JdkSerializer;
+import cn.reghao.jutil.jdk.serializer.JsonConverter;
+import org.springframework.stereotype.Component;
+import org.springframework.web.socket.BinaryMessage;
+import org.springframework.web.socket.TextMessage;
+import org.springframework.web.socket.WebSocketSession;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author reghao
+ * @date 2023-03-01 11:03:31
+ */
+@Component
+public class WsSender {
+    private final static Map<String, WebSocketSession> sessionMap = new HashMap<>();
+
+    public void addSession(String machineId, WebSocketSession session) {
+        sessionMap.put(machineId, session);
+    }
+
+    public void removeSession(String machineId) {
+        sessionMap.remove(machineId);
+    }
+
+    public void send(String dest, Object message) throws IOException {
+        WebSocketSession session = sessionMap.get(dest);
+        if (session != null) {
+            byte[] bytes = JdkSerializer.serialize(message);
+            BinaryMessage binaryMessage = new BinaryMessage(bytes);
+            session.sendMessage(binaryMessage);
+        }
+    }
+
+    public void sendMessage(String dest, Object message) throws IOException {
+        WebSocketSession session = sessionMap.get(dest);
+        if (session != null) {
+            String json = JsonConverter.objectToJson(message);
+            TextMessage textMessage = new TextMessage(json);
+            session.sendMessage(textMessage);
+        }
+    }
+}

+ 204 - 0
search/search-service/src/main/java/cn/reghao/tnb/search/app/ws/handler/LogHandler.java

@@ -0,0 +1,204 @@
+package cn.reghao.tnb.search.app.ws.handler;
+
+import cn.reghao.bnt.log.AppLog;
+import cn.reghao.bnt.log.GatewayLog;
+import cn.reghao.jutil.jdk.converter.DateTimeConverter;
+import cn.reghao.jutil.jdk.serializer.JdkSerializer;
+import cn.reghao.jutil.jdk.serializer.JsonConverter;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+import org.springframework.web.socket.*;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+/**
+ * @author reghao
+ * @date 2022-03-11 16:45:52
+ */
+@Slf4j
+@Component
+public class LogHandler implements WebSocketHandler {
+    private final Map<String, String> appMap = new HashMap<>();
+    private final Map<String, WebSocketSession> pullRuntimeMap = new ConcurrentHashMap<>();
+    private final Map<String, WebSocketSession> pullAccessMap = new ConcurrentHashMap<>();
+
+    public List<String> getApps() {
+        return new ArrayList<>(appMap.values());
+    }
+
+    public List<String> getApp(String app) {
+        return appMap.values().stream()
+                .filter(appKey -> appKey.startsWith(app))
+                .collect(Collectors.toList());
+    }
+
+    @Override
+    public void afterConnectionEstablished(WebSocketSession webSocketSession) throws IOException {
+        String sessionId = webSocketSession.getId();
+        String query = webSocketSession.getUri().getQuery();
+        Map<String, String> map = parseParams(query);
+        String app = map.get("app");
+        String host = map.get("host");
+        if (app == null || host == null) {
+            if (webSocketSession.isOpen()) {
+                webSocketSession.close();
+            }
+        }
+
+        String appKey = String.format("%s@%s", app, host);
+        String path = webSocketSession.getUri().getPath();
+        if (path.equals("/logws/pull/runtime")) {
+            appMap.put(sessionId, appKey);
+            pullRuntimeMap.put(appKey, webSocketSession);
+        } else if (path.equals("/logws/pull/access")) {
+            pullAccessMap.put(appKey, webSocketSession);
+        }
+
+        log.info("WebSocket 建立连接");
+    }
+
+    private void removeSession(WebSocketSession webSocketSession) {
+        String sessionId = webSocketSession.getId();
+        String appKey = appMap.get(sessionId);
+        if (appKey == null) {
+            return;
+        }
+
+        String path = webSocketSession.getUri().getPath();
+        if (path.equals("/logws/pull/runtime")) {
+            pullRuntimeMap.remove(appKey);
+        } else if (path.equals("/logws/pull/access")) {
+            pullAccessMap.remove(appKey);
+        }
+    }
+
+    public void pushAppLog(AppLog appLog) {
+        String app = appLog.getApp();
+        String host = appLog.getHost();
+        WebSocketSession pullSession = getPullRuntimeSession(app, host);
+        if (pullSession != null) {
+            String jsonData = JsonConverter.objectToJson(appLog);
+            WebSocketMessage<String> message1 = new TextMessage(jsonData);
+            try {
+                pullSession.sendMessage(message1);
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    public void pushGatewayLog(GatewayLog gatewayLog) {
+        WebSocketSession pullSession = getPullAccessSession("tnb", "localhost");
+        if (pullSession != null) {
+            String jsonData = JsonConverter.objectToJson(gatewayLog);
+            WebSocketMessage<String> message1 = new TextMessage(jsonData);
+            try {
+                pullSession.sendMessage(message1);
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    private Map<String, String> parseParams(String query) {
+        String[] params = query.split("&");
+        Map<String, String> map = new HashMap<>();
+        for (String param : params) {
+            String[] arr = param.split("=");
+            map.put(arr[0], arr[1]);
+        }
+
+        return map;
+    }
+
+    Map<Long, Integer> ngxLogMap = new TreeMap<>();
+    @Override
+    public void handleMessage(WebSocketSession webSocketSession, WebSocketMessage<?> webSocketMessage)
+            throws IOException {
+        try {
+            String appKey = appMap.get(webSocketSession.getId());
+            if (webSocketMessage instanceof TextMessage) {
+                String payload = (String) webSocketMessage.getPayload();
+            } else if (webSocketMessage instanceof BinaryMessage) {
+                BinaryMessage binaryMessage = (BinaryMessage) webSocketMessage;
+                Object object = JdkSerializer.deserialize(binaryMessage.getPayload().array());
+                if (object instanceof String) {
+                    //log.info("{} -> {}", appKey, object);
+                } else if (object instanceof AppLog) {
+                    AppLog appLog = (AppLog) object;
+                    String dateTimeStr = DateTimeConverter.format(appLog.getTimestamp());
+                    //appLog.setDateTimeStr(dateTimeStr);
+                    //pushAppLog(appLog);
+                    //runtimeLogService.addAppLog(appLog);
+                } /*else if (object instanceof NginxLog) {
+                    NginxLog nginxLog = (NginxLog) object;
+                    String date = nginxLog.getTimeIso8601();
+                    DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss+08:00")
+                            .withZone(ZoneId.of("UTC"));
+                    LocalDateTime localDateTime = LocalDateTime.parse(date, formatter);
+                    Long timestamp = localDateTime.toEpochSecond(ZoneOffset.of("+8"));
+                    Long key = timestamp;
+                    Integer count = ngxLogMap.get(key);
+                    if (count == null) {
+                        ngxLogMap.put(key, 1);
+                    } else {
+                        int count1 = ngxLogMap.get(key) + 1;
+                        ngxLogMap.put(key, count1);
+                    }
+                }*/
+            } else if (webSocketMessage instanceof PingMessage) {
+                log.info("接收到 WebSocket PingMessage");
+            } else if (webSocketMessage instanceof PongMessage) {
+                log.info("接收到 WebSocket PongMessage");
+            } else {
+                log.error("接收到未知类型的 WebSocket 消息");
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    public void handleTransportError(WebSocketSession webSocketSession, Throwable throwable) {
+        log.error("WebSocket 数据传输错误");
+        removeSession(webSocketSession);
+    }
+
+    @Override
+    public void afterConnectionClosed(WebSocketSession webSocketSession, CloseStatus closeStatus) {
+        log.info("WebSocket 断开连接");
+        removeSession(webSocketSession);
+    }
+
+    private WebSocketSession getPullRuntimeSession(String app, String host) {
+        String suffix = String.format("%s@%s", app, host);
+        Set<String> keys = pullRuntimeMap.keySet();
+        for (String key : keys) {
+            if (key.endsWith(suffix)) {
+                return pullRuntimeMap.get(key);
+            }
+        }
+
+        return null;
+    }
+
+    private WebSocketSession getPullAccessSession(String app, String host) {
+        String suffix = String.format("%s@%s", app, host);
+        Set<String> keys = pullAccessMap.keySet();
+        for (String key : keys) {
+            if (key.endsWith(suffix)) {
+                return pullAccessMap.get(key);
+            }
+        }
+
+        return null;
+    }
+
+    @Override
+    public boolean supportsPartialMessages() {
+        return false;
+    }
+}