Kaynağa Gözat

WebSocketClient 中的 onText 方法中 使用 try-catch 处理解析消息时可能产生的异常

reghao 2 gün önce
ebeveyn
işleme
a8d53e0d60

+ 48 - 44
agent/src/main/java/cn/reghao/devops/agent/ws/WebSocketClient.java

@@ -118,53 +118,57 @@ public class WebSocketClient {
 
         @Override
         public CompletionStage<?> onText(WebSocket webSocket, CharSequence data, boolean last) {
-            String message = data.toString();
-            JsonNode root = JsonUtils.readTree(message);
-            // 1. 直接解析为枚举对象
-            AgentCommand cmd = AgentCommand.fromString(root.path("type").asText());
-            String taskId = root.path("taskId").asText();
-            if (cmd == AgentCommand.REGISTER_ACK) {
-                log.info("注册确认收到,开启定时心跳任务...");
-                // 只有收到 ACK,才启动心跳定时器
-                startHeartbeatScheduler();
-            } else if (cmd == AgentCommand.HEARTBEAT_ACK) {
-                // 更新最后成功交互时间
-                lastAckTime.set(System.currentTimeMillis());
-            } else {
-                // 使用虚拟线程或线程池处理任务,避免阻塞 WebSocket 读取线程
-                taskExecutor.execute(() -> {
-                    try {
-                        switch (cmd) {
-                            case DEPLOY_TASK -> {
-                                EvtAppDeploy evtAppDeploy = JsonUtils.convert(root.get("data"), EvtAppDeploy.class);
-                                deployJob.handleDeploy(webSocket, evtAppDeploy);
-                            }
-                            case APP_TASK -> {
-                                EvtAppStat evtAppStat = JsonUtils.convert(root.get("data"), EvtAppStat.class);
-                                appJob.handleApp(webSocket, evtAppStat);
+            try {
+                String message = data.toString();
+                JsonNode root = JsonUtils.readTree(message);
+                // 1. 直接解析为枚举对象
+                AgentCommand cmd = AgentCommand.fromString(root.path("type").asText());
+                String taskId = root.path("taskId").asText();
+                if (cmd == AgentCommand.REGISTER_ACK) {
+                    log.info("注册确认收到,开启定时心跳任务...");
+                    // 只有收到 ACK,才启动心跳定时器
+                    startHeartbeatScheduler();
+                } else if (cmd == AgentCommand.HEARTBEAT_ACK) {
+                    // 更新最后成功交互时间
+                    lastAckTime.set(System.currentTimeMillis());
+                } else {
+                    // 使用虚拟线程或线程池处理任务,避免阻塞 WebSocket 读取线程
+                    taskExecutor.execute(() -> {
+                        try {
+                            switch (cmd) {
+                                case DEPLOY_TASK -> {
+                                    EvtAppDeploy evtAppDeploy = JsonUtils.convert(root.get("data"), EvtAppDeploy.class);
+                                    deployJob.handleDeploy(webSocket, evtAppDeploy);
+                                }
+                                case APP_TASK -> {
+                                    EvtAppStat evtAppStat = JsonUtils.convert(root.get("data"), EvtAppStat.class);
+                                    appJob.handleApp(webSocket, evtAppStat);
+                                }
+                                case UNKNOWN -> {
+                                    log.error("收到未知指令类型: {}", message);
+                                }
                             }
-                            case UNKNOWN -> {
-                                log.error("收到未知指令类型: {}", message);
+                        } catch (Exception e) {
+                            String errorMsg = e.getMessage();
+                            AgentResponse<Object> resp = new AgentResponse<>();
+                            resp.setAgentId(Machine.ID);
+                            resp.setTaskId(taskId);
+                            resp.setType(AgentResponseType.TASK_ERROR);
+                            resp.setData(errorMsg);
+                            String json = JsonUtils.toJson(resp);
+                            if (webSocket != null && !webSocket.isOutputClosed()) {
+                                webSocket.sendText(json, true).whenComplete((ws, ex) -> {
+                                    if (ex != null) {
+                                        log.error("发送失败,这可能导致输出流被标记为关闭!", ex);
+                                        // 这里可以触发重连
+                                    }
+                                });
                             }
                         }
-                    } catch (Exception e) {
-                        String errorMsg = e.getMessage();
-                        AgentResponse<Object> resp = new AgentResponse<>();
-                        resp.setAgentId(Machine.ID);
-                        resp.setTaskId(taskId);
-                        resp.setType(AgentResponseType.TASK_ERROR);
-                        resp.setData(errorMsg);
-                        String json = JsonUtils.toJson(resp);
-                        if (webSocket != null && !webSocket.isOutputClosed()) {
-                            webSocket.sendText(json, true).whenComplete((ws, ex) -> {
-                                if (ex != null) {
-                                    log.error("发送失败,这可能导致输出流被标记为关闭!", ex);
-                                    // 这里可以触发重连
-                                }
-                            });
-                        }
-                    }
-                });
+                    });
+                }
+            } catch (Exception e) {
+                log.error("处理消息时发生异常: ", e);
             }
 
             // 立即返回,继续监听下一条消息