Quellcode durchsuchen

添加 FrontendHandler, 通过 websocket 来管理 agent
节点上的 docker

reghao vor 3 Monaten
Ursprung
Commit
6db68ea243

+ 13 - 2
web/src/main/java/cn/reghao/bnt/web/devops/machine/controller/MachineController.java

@@ -9,6 +9,7 @@ import cn.reghao.bnt.web.devops.machine.model.vo.MachineDetail;
 import cn.reghao.bnt.web.devops.machine.service.MachineQuery;
 import cn.reghao.bnt.web.devops.machine.service.MachineService;
 import cn.reghao.bnt.web.util.SelectOption;
+import cn.reghao.bnt.web.ws.WsSender;
 import cn.reghao.jutil.jdk.web.db.PageList;
 import cn.reghao.jutil.jdk.web.result.Result;
 import cn.reghao.jutil.web.WebResult;
@@ -31,15 +32,18 @@ import java.util.List;
 @RestController
 @RequestMapping("/api/devops/machine/host")
 public class MachineController {
+    private int pageSize = 10;
     private final MachineQuery machineQuery;
     private final MachineService machineService;
     private final MachineSys machineSys;
-    private int pageSize = 10;
+    private final WsSender wsSender;
 
-    public MachineController(MachineQuery machineQuery, MachineService machineService, MachineSys machineSys) {
+    public MachineController(MachineQuery machineQuery, MachineService machineService, MachineSys machineSys,
+                             WsSender wsSender) {
         this.machineQuery = machineQuery;
         this.machineService = machineService;
         this.machineSys = machineSys;
+        this.wsSender = wsSender;
     }
 
     @Operation(summary = "机器节点页面", description = "N")
@@ -103,4 +107,11 @@ public class MachineController {
         List<NetworkCard> networkCardList = machineSys.getNetworkCardList();
         return WebResult.success();
     }
+
+    @Operation(summary = "获取在线的 agent websocket 会话", description = "N")
+    @GetMapping(value = "/ws", produces = MediaType.APPLICATION_JSON_VALUE)
+    public String getWebSocketSessions(@RequestParam("env") String env) {
+        List<String> machineList = wsSender.getOnlineMachines(env);
+        return WebResult.success(machineList);
+    }
 }

+ 25 - 0
web/src/main/java/cn/reghao/bnt/web/ws/WsSender.java

@@ -1,5 +1,7 @@
 package cn.reghao.bnt.web.ws;
 
+import cn.reghao.bnt.web.devops.machine.model.po.MachineHost;
+import cn.reghao.bnt.web.devops.machine.service.MachineQuery;
 import cn.reghao.jutil.jdk.serializer.JdkSerializer;
 import org.springframework.stereotype.Component;
 import org.springframework.web.socket.BinaryMessage;
@@ -7,7 +9,10 @@ import org.springframework.web.socket.WebSocketSession;
 
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
 
 /**
  * @author reghao
@@ -16,6 +21,11 @@ import java.util.Map;
 @Component
 public class WsSender {
     private final static Map<String, WebSocketSession> sessionMap = new HashMap<>();
+    private final MachineQuery machineQuery;
+
+    public WsSender(MachineQuery machineQuery) {
+        this.machineQuery = machineQuery;
+    }
 
     public void addSession(String machineId, WebSocketSession session) {
         sessionMap.put(machineId, session);
@@ -31,4 +41,19 @@ public class WsSender {
         BinaryMessage binaryMessage = new BinaryMessage(bytes);
         session.sendMessage(binaryMessage);
     }
+
+    public List<String> getOnlineMachines(String env) {
+        return sessionMap.keySet().stream()
+                .map(machineId -> {
+                    MachineHost machineHost = machineQuery.getMachineHost(machineId);
+                    if (machineHost == null) {
+                        return null;
+                    }
+
+                    String env1 = machineHost.getEnv();
+                    return env1.equals(env) ? machineHost.getMachineIpv4() : null;
+                })
+                .filter(Objects::nonNull)
+                .collect(Collectors.toList());
+    }
 }

+ 9 - 1
web/src/main/java/cn/reghao/bnt/web/ws/config/WebSocketConfig.java

@@ -1,5 +1,6 @@
 package cn.reghao.bnt.web.ws.config;
 
+import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.web.socket.WebSocketHandler;
 import org.springframework.web.socket.config.annotation.EnableWebSocket;
@@ -17,13 +18,16 @@ public class WebSocketConfig implements WebSocketConfigurer {
     private final WebSocketHandler sshWebSocketHandler;
     private final WebSocketHandler agentWebSocketHandler;
     private final WebSocketHandler logHandler;
+    private final WebSocketHandler frontendHander;
 
     public WebSocketConfig(WebSocketInterceptor webSocketInterceptor, WebSocketHandler sshWebSocketHandler, 
-                           WebSocketHandler agentWebSocketHandler, WebSocketHandler logHandler) {
+                           WebSocketHandler agentWebSocketHandler, WebSocketHandler logHandler,
+                           @Qualifier("frontendHander") WebSocketHandler frontendHander) {
         this.webSocketInterceptor = webSocketInterceptor;
         this.sshWebSocketHandler = sshWebSocketHandler;
         this.agentWebSocketHandler = agentWebSocketHandler;
         this.logHandler = logHandler;
+        this.frontendHander = frontendHander;
     }
 
     @Override
@@ -43,5 +47,9 @@ public class WebSocketConfig implements WebSocketConfigurer {
         registry.addHandler(logHandler, WebSocketPath.wsPushLog)
                 .addInterceptors(webSocketInterceptor)
                 .setAllowedOrigins("*");
+
+        registry.addHandler(frontendHander, WebSocketPath.wsFrontend)
+                .addInterceptors(webSocketInterceptor)
+                .setAllowedOrigins("*");
     }
 }

+ 1 - 0
web/src/main/java/cn/reghao/bnt/web/ws/config/WebSocketPath.java

@@ -9,4 +9,5 @@ public class WebSocketPath {
     public static String wsSsh = "/bgws/ssh";
     public static String wsPullLog = "/bgws/log/pull";
     public static String wsPushLog = "/bgws/log/push";
+    public static String wsFrontend = "/bgws/frontend";
 }

+ 86 - 0
web/src/main/java/cn/reghao/bnt/web/ws/handler/FrontendHandler.java

@@ -0,0 +1,86 @@
+package cn.reghao.bnt.web.ws.handler;
+
+import cn.reghao.bnt.web.ws.WsSender;
+import cn.reghao.jutil.jdk.serializer.JdkSerializer;
+import cn.reghao.jutil.jdk.serializer.JsonConverter;
+import com.google.gson.JsonObject;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+import org.springframework.web.socket.*;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * @author reghao
+ * @date 2025-12-16 17:08:19
+ */
+@Slf4j
+@Component("frontendHander")
+public class FrontendHandler implements WebSocketHandler {
+    private WsSender wsSender;
+
+    public FrontendHandler(WsSender wsSender) {
+        this.wsSender = wsSender;
+    }
+
+    @Override
+    public void afterConnectionEstablished(WebSocketSession webSocketSession) throws IOException {
+        String sessionId = webSocketSession.getId();
+        String query = webSocketSession.getUri().getQuery();
+        log.info("WebSocket 建立连接");
+    }
+
+    private void removeSession(WebSocketSession webSocketSession) {
+    }
+
+    @Override
+    public void handleMessage(WebSocketSession webSocketSession, WebSocketMessage<?> webSocketMessage)
+            throws IOException {
+        try {
+            if (webSocketMessage instanceof TextMessage) {
+                log.info("接收到 WebSocket 文本消息");
+                String jsonPayload = (String) webSocketMessage.getPayload();
+                JsonObject jsonObject = JsonConverter.jsonToJsonElement(jsonPayload).getAsJsonObject();
+                String machineId = jsonObject.get("machineId").getAsString();
+
+                wsSender.send(machineId, new Object());
+                List<String> list = new ArrayList<>();
+                list.add("aaa");
+                list.add("bbb");
+                Map<String, Object> map = new HashMap<>();
+                map.put("type", "1");
+                map.put("result", list);
+                TextMessage textMessage = new TextMessage(JsonConverter.objectToJson(map));
+                webSocketSession.sendMessage(textMessage);
+            } else if (webSocketMessage instanceof BinaryMessage) {
+                log.info("接收到 WebSocket 二进制消息");
+            } else if (webSocketMessage instanceof PingMessage) {
+                log.info("接收到 WebSocket PingMessage");
+            } else if (webSocketMessage instanceof PongMessage) {
+                log.info("接收到 WebSocket PongMessage");
+            } else {
+                log.error("接收到未知类型的 WebSocket 消息");
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    public void handleTransportError(WebSocketSession webSocketSession, Throwable throwable) {
+        log.error("WebSocket 数据传输错误");
+        removeSession(webSocketSession);
+    }
+
+    @Override
+    public void afterConnectionClosed(WebSocketSession webSocketSession, CloseStatus closeStatus) {
+        log.info("WebSocket 断开连接");
+        removeSession(webSocketSession);
+    }
+
+    @Override
+    public boolean supportsPartialMessages() {
+        return false;
+    }
+}