|
|
@@ -0,0 +1,148 @@
|
|
|
+package cn.reghao.devops.manager.ws.handler;
|
|
|
+
|
|
|
+import cn.reghao.devops.manager.log.service.LogService;
|
|
|
+import cn.reghao.jutil.jdk.converter.DateTimeConverter;
|
|
|
+import cn.reghao.jutil.jdk.result.AppLog;
|
|
|
+import cn.reghao.jutil.jdk.serializer.JdkSerializer;
|
|
|
+import cn.reghao.jutil.jdk.serializer.JsonConverter;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+import org.springframework.web.socket.*;
|
|
|
+
|
|
|
+import java.io.IOException;
|
|
|
+import java.util.*;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
+
|
|
|
+/**
|
|
|
+ * @author reghao
|
|
|
+ * @date 2022-03-11 16:45:52
|
|
|
+ */
|
|
|
+@Slf4j
|
|
|
+@Component
|
|
|
+public class LogHandler implements WebSocketHandler {
|
|
|
+ private LogService logService;
|
|
|
+ private final Map<String, WebSocketSession> pullSessions = new ConcurrentHashMap<>();
|
|
|
+
|
|
|
+ public LogHandler(LogService logService) {
|
|
|
+ this.logService = logService;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void afterConnectionEstablished(WebSocketSession webSocketSession) throws IOException {
|
|
|
+ String path = webSocketSession.getUri().getPath();
|
|
|
+ if (path.equals("/ws/log/push")) {
|
|
|
+ String query = webSocketSession.getUri().getQuery();
|
|
|
+ Map<String, String> map = parseParams(query);
|
|
|
+ String app = map.get("app");
|
|
|
+ String host = map.get("host");
|
|
|
+ String key = String.format("%s-%s", app, host);
|
|
|
+ //pushSessions.put(key, webSocketSession);
|
|
|
+ } else {
|
|
|
+ String query = webSocketSession.getUri().getQuery();
|
|
|
+ Map<String, String> map = parseParams(query);
|
|
|
+ String app = map.get("app");
|
|
|
+ String host = map.get("host");
|
|
|
+ String key = String.format("%s-%s", app, host);
|
|
|
+ putPullSession(app, host, webSocketSession);
|
|
|
+ }
|
|
|
+
|
|
|
+ log.info("WebSocket 建立连接");
|
|
|
+ }
|
|
|
+
|
|
|
+ private Map<String, String> parseParams(String query) {
|
|
|
+ String[] params = query.split("&");
|
|
|
+ Map<String, String> map = new HashMap<>();
|
|
|
+ for (String param : params) {
|
|
|
+ String[] arr = param.split("=");
|
|
|
+ map.put(arr[0], arr[1]);
|
|
|
+ }
|
|
|
+
|
|
|
+ return map;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void handleMessage(WebSocketSession webSocketSession, WebSocketMessage<?> webSocketMessage)
|
|
|
+ throws IOException {
|
|
|
+ try {
|
|
|
+ if (webSocketMessage instanceof TextMessage) {
|
|
|
+ String payload = (String) webSocketMessage.getPayload();
|
|
|
+ } else if (webSocketMessage instanceof BinaryMessage) {
|
|
|
+ log.info("接收到 WebSocket 二进制消息");
|
|
|
+ BinaryMessage binaryMessage = (BinaryMessage) webSocketMessage;
|
|
|
+ Object object = JdkSerializer.deserialize(binaryMessage.getPayload().array());
|
|
|
+ if (object instanceof AppLog) {
|
|
|
+ AppLog appLog = (AppLog) object;
|
|
|
+ logService.saveAppLog(appLog);
|
|
|
+ String dateTimeStr = DateTimeConverter.format(appLog.getTimestamp());
|
|
|
+
|
|
|
+ String app = appLog.getApp();
|
|
|
+ String host = appLog.getHost();
|
|
|
+ WebSocketSession pullSession = getPullSession(app, host);
|
|
|
+ if (pullSession != null) {
|
|
|
+ String jsonData = JsonConverter.objectToJson(appLog);
|
|
|
+ WebSocketMessage<String> message1 = new TextMessage(jsonData);
|
|
|
+ try {
|
|
|
+ pullSession.sendMessage(message1);
|
|
|
+ } catch (IOException e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else if (webSocketMessage instanceof PingMessage) {
|
|
|
+ log.info("接收到 WebSocket PingMessage");
|
|
|
+ } else if (webSocketMessage instanceof PongMessage) {
|
|
|
+ log.info("接收到 WebSocket PongMessage");
|
|
|
+ } else {
|
|
|
+ log.error("接收到未知类型的 WebSocket 消息");
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void handleTransportError(WebSocketSession webSocketSession, Throwable throwable) {
|
|
|
+ log.error("WebSocket 数据传输错误");
|
|
|
+ String sessionId = webSocketSession.getId();
|
|
|
+ removePullSession(sessionId);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void afterConnectionClosed(WebSocketSession webSocketSession, CloseStatus closeStatus) {
|
|
|
+ log.info("WebSocket 断开连接");
|
|
|
+ String sessionId = webSocketSession.getId();
|
|
|
+ removePullSession(sessionId);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void putPullSession(String app, String host, WebSocketSession session) {
|
|
|
+ String sessionId = session.getId();
|
|
|
+ String key = String.format("%s-%s-%s", sessionId, app, host);
|
|
|
+ pullSessions.put(key, session);
|
|
|
+ }
|
|
|
+
|
|
|
+ private WebSocketSession getPullSession(String app, String host) {
|
|
|
+ String suffix = String.format("%s-%s", app, host);
|
|
|
+ Set<String> keys = pullSessions.keySet();
|
|
|
+ for (String key : keys) {
|
|
|
+ if (key.endsWith(suffix)) {
|
|
|
+ return pullSessions.get(key);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ private void removePullSession(String sessionId) {
|
|
|
+ Set<String> keys = pullSessions.keySet();
|
|
|
+ keys.forEach(key -> {
|
|
|
+ if (key.startsWith(sessionId)) {
|
|
|
+ pullSessions.remove(key);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public boolean supportsPartialMessages() {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+}
|