|
|
@@ -0,0 +1,205 @@
|
|
|
+package cn.reghao.devops.agent.ws;
|
|
|
+
|
|
|
+import cn.reghao.devops.agent.config.AgentConfig;
|
|
|
+import cn.reghao.devops.agent.service.AppJob;
|
|
|
+import cn.reghao.devops.agent.service.DeployJob;
|
|
|
+import cn.reghao.devops.common.msg.AgentCommand;
|
|
|
+import cn.reghao.devops.common.msg.AgentResponse;
|
|
|
+import cn.reghao.devops.common.msg.AgentResponseType;
|
|
|
+import cn.reghao.devops.common.msg.constant.WsClientType;
|
|
|
+import cn.reghao.devops.common.msg.event.EvtAppStat;
|
|
|
+import cn.reghao.devops.common.util.Machine;
|
|
|
+import cn.reghao.devops.common.msg.event.EvtAgentStart;
|
|
|
+import cn.reghao.devops.common.msg.event.EvtAppDeploy;
|
|
|
+import cn.reghao.devops.common.util.JsonUtils;
|
|
|
+import com.fasterxml.jackson.databind.JsonNode;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+
|
|
|
+import java.net.URI;
|
|
|
+import java.net.http.HttpClient;
|
|
|
+import java.net.http.WebSocket;
|
|
|
+import java.util.concurrent.*;
|
|
|
+import java.util.concurrent.atomic.AtomicLong;
|
|
|
+
|
|
|
+/**
|
|
|
+ * @author reghao
|
|
|
+ * @date 2026-03-10 09:11:05
|
|
|
+ */
|
|
|
+@Slf4j
|
|
|
+public class WebSocketClient {
|
|
|
+ private final String serverUri;
|
|
|
+ private WebSocket webSocket;
|
|
|
+ private final HttpClient httpClient = HttpClient.newHttpClient();
|
|
|
+ private int reconnectAttempts = 0;
|
|
|
+ // 调度器:负责发心跳 + 负责检查超时
|
|
|
+ private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
|
|
|
+ // 处理接收到的任务
|
|
|
+ private final ExecutorService taskExecutor = Executors.newFixedThreadPool(
|
|
|
+ Runtime.getRuntime().availableProcessors() * 2
|
|
|
+ );
|
|
|
+
|
|
|
+ private final DeployJob deployJob = new DeployJob();
|
|
|
+ private final AppJob appJob = new AppJob();
|
|
|
+
|
|
|
+ public WebSocketClient(AgentConfig agentConfig) {
|
|
|
+ this.serverUri = String.format("%s/bgws/agent?type=%s&machineId=%s", agentConfig.getServerUrl(), WsClientType.AGENT.name(), Machine.ID);
|
|
|
+ }
|
|
|
+
|
|
|
+ public void start() {
|
|
|
+ connect();
|
|
|
+ }
|
|
|
+
|
|
|
+ public void connect() {
|
|
|
+ httpClient.newWebSocketBuilder()
|
|
|
+ .buildAsync(URI.create(serverUri), new AgentWebSocketListener())
|
|
|
+ .whenComplete((ws, ex) -> {
|
|
|
+ if (ex != null) {
|
|
|
+ reconnectAttempts++;
|
|
|
+ long delay = calculateDelay(reconnectAttempts);
|
|
|
+ log.error("连接失败 (第 {} 次), {} 秒后重连...", reconnectAttempts, delay);
|
|
|
+
|
|
|
+ // 指数退避重连
|
|
|
+ scheduler.schedule(this::connect, delay, TimeUnit.SECONDS);
|
|
|
+ } else {
|
|
|
+ // 连接成功,重置计数器
|
|
|
+ this.webSocket = ws;
|
|
|
+ this.reconnectAttempts = 0;
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ private long calculateDelay(int attempts) {
|
|
|
+ // 最大等待 60 秒
|
|
|
+ return Math.min((long) Math.pow(2, attempts), 60);
|
|
|
+ }
|
|
|
+
|
|
|
+ public void stop() {
|
|
|
+ if (webSocket != null) {
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private class AgentWebSocketListener implements WebSocket.Listener {
|
|
|
+ // 核心:记录最后一次收到 HEARTBEAT_ACK 的时间
|
|
|
+ private final AtomicLong lastAckTime = new AtomicLong(System.currentTimeMillis());
|
|
|
+ private final long HEARTBEAT_INTERVAL_MS = 10_000; // 10秒发一次心跳
|
|
|
+ private final long ACK_TIMEOUT_MS = 30_000; // 3 个周期没收到就判定为断连
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void onOpen(WebSocket webSocket) {
|
|
|
+ // 关键操作:请求下一条消息,否则 Listener 不会收到后续数据
|
|
|
+ webSocket.request(1);
|
|
|
+ // 1. 初始化时间戳(防止刚连上就触发超时)
|
|
|
+ lastAckTime.set(System.currentTimeMillis());
|
|
|
+
|
|
|
+ // 异步发送注册包,不阻塞连接线程
|
|
|
+ taskExecutor.execute(() -> {
|
|
|
+ EvtAgentStart evtAgentStart = Machine.getEvtAgentStart();
|
|
|
+ AgentResponse<EvtAgentStart> reg = new AgentResponse<>();
|
|
|
+ reg.setType(AgentResponseType.REGISTER);
|
|
|
+ reg.setAgentId(Machine.ID);
|
|
|
+ reg.setData(evtAgentStart);
|
|
|
+
|
|
|
+ webSocket.sendText(JsonUtils.toJson(reg), true);
|
|
|
+ log.info("Agent 已注册至 Manager");
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public CompletionStage<?> onClose(WebSocket webSocket, int statusCode, String reason) {
|
|
|
+ log.info("连接已关闭,准备重连...");
|
|
|
+ scheduler.schedule(WebSocketClient.this::connect, 5, TimeUnit.SECONDS);
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void onError(WebSocket webSocket, Throwable error) {
|
|
|
+ log.error("WebSocket 异常: {}", error.getMessage());
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public CompletionStage<?> onText(WebSocket webSocket, CharSequence data, boolean last) {
|
|
|
+ String message = data.toString();
|
|
|
+ JsonNode root = JsonUtils.readTree(message);
|
|
|
+ // 1. 直接解析为枚举对象
|
|
|
+ AgentCommand cmd = AgentCommand.fromString(root.path("type").asText());
|
|
|
+ String taskId = root.path("taskId").asText();
|
|
|
+ if (cmd == AgentCommand.REGISTER_ACK) {
|
|
|
+ log.info("注册确认收到,开启定时心跳任务...");
|
|
|
+ // 只有收到 ACK,才启动心跳定时器
|
|
|
+ startHeartbeatScheduler();
|
|
|
+ } else if (cmd == AgentCommand.HEARTBEAT_ACK) {
|
|
|
+ // 更新最后成功交互时间
|
|
|
+ lastAckTime.set(System.currentTimeMillis());
|
|
|
+ } else {
|
|
|
+ // 使用虚拟线程或线程池处理任务,避免阻塞 WebSocket 读取线程
|
|
|
+ taskExecutor.execute(() -> {
|
|
|
+ try {
|
|
|
+ switch (cmd) {
|
|
|
+ case DEPLOY_TASK -> {
|
|
|
+ EvtAppDeploy evtAppDeploy = JsonUtils.convert(root.get("data"), EvtAppDeploy.class);
|
|
|
+ deployJob.handleDeploy(webSocket, evtAppDeploy);
|
|
|
+ }
|
|
|
+ case APP_TASK -> {
|
|
|
+ EvtAppStat evtAppStat = JsonUtils.convert(root.get("data"), EvtAppStat.class);
|
|
|
+ appJob.handleApp(webSocket, evtAppStat);
|
|
|
+ }
|
|
|
+ case UNKNOWN -> {
|
|
|
+ log.error("收到未知指令类型: {}", message);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ String errorMsg = e.getMessage();
|
|
|
+ AgentResponse<Object> resp = new AgentResponse<>();
|
|
|
+ resp.setAgentId(Machine.ID);
|
|
|
+ resp.setTaskId(taskId);
|
|
|
+ resp.setType(AgentResponseType.TASK_ERROR);
|
|
|
+ resp.setData(errorMsg);
|
|
|
+ String json = JsonUtils.toJson(resp);
|
|
|
+ if (webSocket != null && !webSocket.isOutputClosed()) {
|
|
|
+ webSocket.sendText(json, true).whenComplete((ws, ex) -> {
|
|
|
+ if (ex != null) {
|
|
|
+ log.error("发送失败,这可能导致输出流被标记为关闭!", ex);
|
|
|
+ // 这里可以触发重连
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ // 立即返回,继续监听下一条消息
|
|
|
+ return WebSocket.Listener.super.onText(webSocket, data, last);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void startHeartbeatScheduler() {
|
|
|
+ // 启动定时心跳, 每 60 秒发送一次心跳
|
|
|
+ scheduler.scheduleAtFixedRate(this::sendHeartbeat, 10, 60, TimeUnit.SECONDS);
|
|
|
+ // 启动“看门狗”检测线程:每10秒检查一次是否超时
|
|
|
+ //scheduler.scheduleAtFixedRate(this::checkAckTimeout, 10, 10, TimeUnit.SECONDS);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 看门狗检测逻辑
|
|
|
+ */
|
|
|
+ private void checkAckTimeout() {
|
|
|
+ long now = System.currentTimeMillis();
|
|
|
+ if (now - lastAckTime.get() > ACK_TIMEOUT_MS) {
|
|
|
+ log.error("🚨 检测到心跳 ACK 超时!Manager 可能已假死或链路中断。");
|
|
|
+
|
|
|
+ // 主动关闭当前连接,触发 Listener 的 onClose 逻辑
|
|
|
+ if (webSocket != null) {
|
|
|
+ // 发送关闭帧并断开,触发重连流程
|
|
|
+ webSocket.sendClose(WebSocket.NORMAL_CLOSURE, "Heartbeat ACK timeout");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void sendHeartbeat() {
|
|
|
+ if (webSocket != null && !webSocket.isOutputClosed()) {
|
|
|
+ AgentResponse<String> agentResponse = AgentResponse.ok(AgentResponseType.HEARTBEAT, "", "PING");
|
|
|
+ String msg = JsonUtils.toJson(agentResponse);
|
|
|
+ webSocket.sendText(msg, true);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|