Bläddra i källkod

dagent 中使用 mqtt 异步客户端处理连接和断开后重连的流程
使用一个 LogCache 来保存 mqtt 连接断开后发布失败的日志, 连接重连后第一时间发布积压的日志

reghao 4 år sedan
förälder
incheckning
ee2c6c75f5
17 ändrade filer med 367 tillägg och 105 borttagningar
  1. 16 1
      common/src/main/java/cn/reghao/autodop/common/log/Appenders.java
  2. 38 0
      common/src/main/java/cn/reghao/autodop/common/log/LogCache.java
  3. 14 1
      common/src/main/java/cn/reghao/autodop/common/log/MqttAppender.java
  4. 149 0
      common/src/main/java/cn/reghao/autodop/common/mqtt/AsyncMqttClient.java
  5. 5 6
      common/src/main/java/cn/reghao/autodop/common/mqtt/DefaultMqttClient.java
  6. 6 2
      common/src/main/java/cn/reghao/autodop/common/msg/MsgQueue.java
  7. 1 1
      common/src/main/java/cn/reghao/autodop/common/msg/pub/constant/NodeEventPubClazz.java
  8. 3 3
      dagent/src/main/java/cn/reghao/autodop/dagent/machine/NodeEventClazzPubImpl.java
  9. 59 0
      dagent/src/main/java/cn/reghao/autodop/dagent/mqttsub/DagentConnActionListener.java
  10. 5 24
      dagent/src/main/java/cn/reghao/autodop/dagent/mqttsub/DagentTopicListener.java
  11. 17 40
      dagent/src/main/java/cn/reghao/autodop/dagent/spring/DagentLifecycle.java
  12. 1 1
      dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/AppDeployer.java
  13. 9 19
      dmaster/src/main/java/cn/reghao/autodop/dmaster/mqttsub/DmasterTopicListener.java
  14. 30 0
      dmaster/src/main/java/cn/reghao/autodop/dmaster/mqttsub/SysTopicListener.java
  15. 3 0
      dmaster/src/main/java/cn/reghao/autodop/dmaster/mqttsub/impl/pub/NodeEventPubClazzDispatcher.java
  16. 0 1
      dmaster/src/main/java/cn/reghao/autodop/dmaster/mqttsub/impl/rpcresult/AppRpcClazzResultDispatcher.java
  17. 11 6
      dmaster/src/main/java/cn/reghao/autodop/dmaster/spring/DmasterLifecycle.java

+ 16 - 1
common/src/main/java/cn/reghao/autodop/common/log/Appenders.java

@@ -6,6 +6,7 @@ import ch.qos.logback.classic.spi.ILoggingEvent;
 import ch.qos.logback.core.Appender;
 import ch.qos.logback.core.ConsoleAppender;
 import ch.qos.logback.core.FileAppender;
+import cn.reghao.autodop.common.mqtt.AsyncMqttClient;
 import cn.reghao.autodop.common.mqtt.DefaultMqttClient;
 import org.slf4j.LoggerFactory;
 
@@ -14,7 +15,21 @@ import org.slf4j.LoggerFactory;
  * @date 2021-06-11 13:31:20
  */
 public class Appenders {
-    private static LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
+    private static final LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
+
+    public static Appender<ILoggingEvent> mqttAppender1(String appId, AsyncMqttClient asyncMqttClient) {
+        PatternLayoutEncoder layoutEncoder = new PatternLayoutEncoder();
+        layoutEncoder.setPattern("%date %level [%thread] %logger{10} [%file:%line] %msg%n");
+        layoutEncoder.setContext(loggerContext);
+        layoutEncoder.start();
+
+        MqttAppender mqttAppender = new MqttAppender(appId, null);
+        mqttAppender.setAsyncMqttClient(asyncMqttClient);
+
+        mqttAppender.setContext(loggerContext);
+        mqttAppender.start();
+        return mqttAppender;
+    }
 
     public static Appender<ILoggingEvent> mqttAppender(String appId, DefaultMqttClient mqttClient) {
         PatternLayoutEncoder layoutEncoder = new PatternLayoutEncoder();

+ 38 - 0
common/src/main/java/cn/reghao/autodop/common/log/LogCache.java

@@ -0,0 +1,38 @@
+package cn.reghao.autodop.common.log;
+
+import cn.reghao.autodop.common.msg.Message;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * MQTT 连接断开时, MqttAppender 不可用, 此时将日志消息缓存起来, 等连接可用时全部发送出去
+ *
+ * logs 作为共享变量, 会出现多个线程同时访问, 应避免出现数据冲突
+ *
+ * @author reghao
+ * @date 2021-10-29 14:23:59
+ */
+public class LogCache {
+    private static final Object lock = new Object();
+    private static Map<Long, Message> logs = new HashMap<>();
+
+    public static void put(Message message) {
+        synchronized (lock) {
+            logs.put(System.currentTimeMillis(), message);
+        }
+    }
+
+    public static Map<Long, Message> getAll() {
+        synchronized (lock) {
+            // TODO 需要按时间对日志进行排序
+            return logs;
+        }
+    }
+
+    public static void clear() {
+        synchronized (lock) {
+            logs.clear();
+        }
+    }
+}

+ 14 - 1
common/src/main/java/cn/reghao/autodop/common/log/MqttAppender.java

@@ -3,6 +3,7 @@ package cn.reghao.autodop.common.log;
 import ch.qos.logback.classic.spi.ILoggingEvent;
 import ch.qos.logback.core.UnsynchronizedAppenderBase;
 import cn.reghao.autodop.common.machine.Machine;
+import cn.reghao.autodop.common.mqtt.AsyncMqttClient;
 import cn.reghao.autodop.common.mqtt.DefaultMqttClient;
 import cn.reghao.autodop.common.msg.Message;
 import cn.reghao.autodop.common.msg.MsgQueue;
@@ -13,6 +14,8 @@ import cn.reghao.jdkutil.serializer.JsonConverter;
 import org.eclipse.paho.client.mqttv3.MqttException;
 
 /**
+ * 日志写到 MQTT broker
+ *
  * @author reghao
  * @date 2021-06-08 19:37:21
  */
@@ -20,6 +23,7 @@ public class MqttAppender extends UnsynchronizedAppenderBase<ILoggingEvent> {
     private final String appId;
     private final String topic;
     private final DefaultMqttClient mqttClient;
+    private AsyncMqttClient asyncMqttClient;
 
     public MqttAppender(String appId, DefaultMqttClient mqttClient) {
         this.appId = appId;
@@ -27,6 +31,10 @@ public class MqttAppender extends UnsynchronizedAppenderBase<ILoggingEvent> {
         this.mqttClient = mqttClient;
     }
 
+    public void setAsyncMqttClient(AsyncMqttClient asyncMqttClient) {
+        this.asyncMqttClient = asyncMqttClient;
+    }
+
     @Override
     public void start() {
         super.start();
@@ -44,9 +52,14 @@ public class MqttAppender extends UnsynchronizedAppenderBase<ILoggingEvent> {
         PubMsg pubMsg = PubMsg.pubMsg(NodeEventPubClazz.class.getSimpleName(), NodeEventPubClazz.log.name(), jsonPayload);
         Message message = Message.pubMsg(pubMsg);
         try {
-            mqttClient.pub(topic, 1, JsonConverter.objectToJson(message));
+            if (asyncMqttClient != null) {
+                asyncMqttClient.pub(topic, 1, JsonConverter.objectToJson(message));
+            } else {
+                mqttClient.pub(topic, 1, JsonConverter.objectToJson(message));
+            }
         } catch (MqttException e) {
             e.printStackTrace();
+            LogCache.put(message);
         }
     }
 

+ 149 - 0
common/src/main/java/cn/reghao/autodop/common/mqtt/AsyncMqttClient.java

@@ -0,0 +1,149 @@
+package cn.reghao.autodop.common.mqtt;
+
+import cn.reghao.autodop.common.log.LogCache;
+import cn.reghao.autodop.common.machine.Machine;
+import cn.reghao.autodop.common.msg.Message;
+import cn.reghao.autodop.common.msg.MsgQueue;
+import cn.reghao.autodop.common.msg.pub.PubMsg;
+import cn.reghao.autodop.common.msg.pub.constant.NodeEventPubClazz;
+import cn.reghao.jdkutil.serializer.JsonConverter;
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.*;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.springframework.stereotype.Component;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author reghao
+ * @date 2021-10-28 20:52:17
+ */
+@Slf4j
+@Component
+public class AsyncMqttClient {
+    private final MosquittoProperties properties;
+    private final MqttAsyncClient mqttClient;
+    private final Map<String, IMqttMessageListener> msgListeners = new HashMap<>();
+
+    public AsyncMqttClient(MosquittoProperties properties) throws MqttException {
+        this.properties = properties;
+        String clientId = properties.getClientId() + Machine.ID;
+        mqttClient = new MqttAsyncClient(properties.getBroker(), clientId, new MemoryPersistence());
+    }
+
+    private MqttConnectOptions connectOptions() {
+        MqttConnectOptions options = new MqttConnectOptions();
+        options.setUserName(properties.getUsername());
+        options.setPassword(properties.getPassword().toCharArray());
+        // 在服务端保存状态
+        options.setCleanSession(false);
+        // 自动重连
+        options.setAutomaticReconnect(true);
+        // 避免 Timed out as no activity, keepAlive=60,000 lastOutboundActivity=1,635,408,462,211 错误
+        options.setConnectionTimeout(0);
+        // 设置 last will message
+        options.setWill(MsgQueue.dmasterTopic(), lastWillMessage(), 1, false);
+        return options;
+    }
+
+    private byte[] lastWillMessage() {
+        String jsonPayload = String.format("{\"machineId\":\"%s\"}", Machine.ID);
+        PubMsg pubMsg = PubMsg.pubMsg(NodeEventPubClazz.class.getSimpleName(),
+                NodeEventPubClazz.unavail.name(), jsonPayload);
+        Message message = Message.pubMsg(pubMsg);
+        return JsonConverter.objectToJson(message).getBytes(StandardCharsets.UTF_8);
+    }
+
+    public void add(String topic, IMqttMessageListener messageListener) {
+        msgListeners.putIfAbsent(topic, messageListener);
+    }
+
+    public Map<String, IMqttMessageListener> getMsgListeners() {
+        return msgListeners;
+    }
+
+    public void connect(IMqttActionListener mqttActionListener) throws MqttException {
+        mqttClient.setCallback(new ConnectionCallback());
+        mqttClient.connect(connectOptions(), null, mqttActionListener);
+    }
+
+    public boolean isConnected() {
+        return mqttClient.isConnected();
+    }
+
+    /**
+     * qos=0 的消息不能离线
+     *
+     * @param
+     * @return
+     * @date 2021-06-16 下午8:40
+     */
+    public void pub(String topic, int qos, String payload) throws MqttException {
+        MqttMessage message = new MqttMessage();
+        message.setQos(qos);
+        //message.setRetained(true);
+        message.setPayload(payload.getBytes());
+        mqttClient.publish(topic, message);
+    }
+
+    public void sub(String topic, IMqttMessageListener messageListener) throws MqttException {
+        mqttClient.subscribe(topic, 2, messageListener);
+    }
+
+    /**
+     * MQTT 连接回调
+     *
+     * @param
+     * @return
+     * @date 2021-10-28 下午8:31
+     */
+    class ConnectionCallback implements MqttCallbackExtended {
+        @Override
+        public void connectComplete(boolean reconnect, String serverUri) {
+            if (reconnect) {
+                log.info("MQTT 重新连接成功");
+                log.info("发布之前未发送成功的日志");
+                LogCache.getAll().forEach((timestamp, message) -> {
+                    try {
+                        pub(MsgQueue.dmasterTopic(), 1, JsonConverter.objectToJson(message));
+                    } catch (MqttException e) {
+                        e.printStackTrace();
+                        LogCache.put(message);
+                    }
+                });
+
+                log.info("重新订阅 topic");
+                // 重连完成后自动订阅 topic
+                msgListeners.forEach((topic, listener) -> {
+                    try {
+                        sub(topic, listener);
+                    } catch (MqttException e) {
+                        log.error("重新订阅 {} 失败,原因: {}", topic, e.getMessage());
+                    }
+                });
+            }
+        }
+
+        @Override
+        public void connectionLost(Throwable cause) {
+            log.error("MQTT 连接断开, 准备重新连接 {}", cause.getMessage());
+            try {
+                mqttClient.reconnect();
+            } catch (MqttException e) {
+                e.printStackTrace();
+                log.error("MQTT 重连失败 {}", cause.getMessage());
+            }
+        }
+
+        @Override
+        public void messageArrived(String topic, MqttMessage message) throws Exception {
+            System.out.println();
+        }
+
+        @Override
+        public void deliveryComplete(IMqttDeliveryToken token) {
+        }
+    }
+}

+ 5 - 6
common/src/main/java/cn/reghao/autodop/common/mqtt/DefaultMqttClient.java

@@ -25,7 +25,7 @@ public class DefaultMqttClient implements AutoCloseable {
     // 记录 RPC 调用, size 为 0 时表示所有 pub 消息的结果都已 sub
     private final Map<String, RpcMsg> rpcRecorder = new HashMap<>();
     private final Map<String, IMqttMessageListener> subMap = new HashMap<>();
-    private final MqttCallback callback = new MqttClientCallback();
+    private final MqttCallback connCallback = new ConnectionCallback();
 
     public DefaultMqttClient(MosquittoProperties properties) throws MqttException, IOException {
         this.properties = properties;
@@ -71,7 +71,7 @@ public class DefaultMqttClient implements AutoCloseable {
     // TODO 处理系统启动时没有成功连接的情况
     private void connect() throws MqttException {
         if (!client.isConnected()) {
-            client.setCallback(callback);
+            client.setCallback(connCallback);
             client.connect(connectOptions());
         }
     }
@@ -86,10 +86,9 @@ public class DefaultMqttClient implements AutoCloseable {
      * @return
      * @date 2021-06-16 下午8:39
      */
-    public void sub(String topic, MqttCallback callback) throws MqttException {
+    public void sub(String topic, IMqttMessageListener messageListener) throws MqttException {
         connect();
-        client.setCallback(callback);
-        client.subscribe(topic, 1);
+        client.subscribe(topic, 1, messageListener);
     }
 
     private void resub(String topic, IMqttMessageListener messageListener) throws MqttException {
@@ -130,7 +129,7 @@ public class DefaultMqttClient implements AutoCloseable {
         rpcRecorder.put(message.getMsgId(), message.getRpcMsg());
     }
 
-    class MqttClientCallback implements MqttCallbackExtended {
+    class ConnectionCallback implements MqttCallbackExtended {
         @Override
         public void connectComplete(boolean reconnect, String serverUri) {
             // 重连完成后自动订阅 topic

+ 6 - 2
common/src/main/java/cn/reghao/autodop/common/msg/MsgQueue.java

@@ -8,10 +8,14 @@ import cn.reghao.autodop.common.msg.pub.dto.node.constant.AppId;
  */
 public class MsgQueue {
     public static String dmasterTopic() {
-        return String.format("sub.%s", AppId.dmaster.name());
+        return AppId.dmaster.name();
     }
 
     public static String dagentTopic(String nodeId) {
-        return String.format("rpc.%s.%s", nodeId, AppId.dagent.name());
+        return String.format("%s/%s", AppId.dagent.name(), nodeId);
+    }
+
+    public static String sysTopic() {
+        return "SYS/broker/clients/#";
     }
 }

+ 1 - 1
common/src/main/java/cn/reghao/autodop/common/msg/pub/constant/NodeEventPubClazz.java

@@ -5,5 +5,5 @@ package cn.reghao.autodop.common.msg.pub.constant;
  * @date 2021-09-15 09:19:14
  */
 public enum NodeEventPubClazz {
-    start, heartbeat, shutdown, log
+    start, heartbeat, shutdown, unavail, log
 }

+ 3 - 3
dagent/src/main/java/cn/reghao/autodop/dagent/machine/NodeEventClazzPubImpl.java

@@ -1,7 +1,7 @@
 package cn.reghao.autodop.dagent.machine;
 
 import cn.reghao.autodop.common.machine.Machine;
-import cn.reghao.autodop.common.mqtt.DefaultMqttClient;
+import cn.reghao.autodop.common.mqtt.AsyncMqttClient;
 import cn.reghao.autodop.common.msg.Message;
 import cn.reghao.autodop.common.msg.MsgQueue;
 import cn.reghao.autodop.common.msg.pub.PubMsg;
@@ -23,11 +23,11 @@ import java.util.concurrent.TimeUnit;
 @Slf4j
 public class NodeEventClazzPubImpl {
     private final ScheduledExecutorService scheduler;
-    private final DefaultMqttClient mqttClient;
+    private final AsyncMqttClient mqttClient;
     private final Machine machine;
     private final String pubClazz;
     
-    public NodeEventClazzPubImpl(DefaultMqttClient mqttClient, Machine machine) {
+    public NodeEventClazzPubImpl(AsyncMqttClient mqttClient, Machine machine) {
         this.scheduler = ThreadPoolWrapper.scheduledThreadPool("heartbeat", 1);
         this.mqttClient = mqttClient;
         this.machine = machine;

+ 59 - 0
dagent/src/main/java/cn/reghao/autodop/dagent/mqttsub/DagentConnActionListener.java

@@ -0,0 +1,59 @@
+package cn.reghao.autodop.dagent.mqttsub;
+
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.core.Appender;
+import cn.reghao.autodop.common.log.Appenders;
+import cn.reghao.autodop.common.log.LoggerConfig;
+import cn.reghao.autodop.common.mqtt.AsyncMqttClient;
+import cn.reghao.autodop.common.msg.pub.dto.node.constant.AppId;
+import cn.reghao.autodop.common.util.ExceptionUtil;
+import cn.reghao.autodop.dagent.machine.NodeEventClazzPubImpl;
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.IMqttActionListener;
+import org.eclipse.paho.client.mqttv3.IMqttToken;
+import org.eclipse.paho.client.mqttv3.MqttException;
+
+import java.util.List;
+
+/**
+ * MQTT 建立连接时的状态监听器
+ *
+ * @author reghao
+ * @date 2021-10-29 16:36:49
+ */
+@Slf4j
+public class DagentConnActionListener implements IMqttActionListener {
+    private final AsyncMqttClient mqttClient;
+    private final NodeEventClazzPubImpl nodeEventClazzPub;
+
+    public DagentConnActionListener(AsyncMqttClient mqttClient, NodeEventClazzPubImpl nodeEventClazzPub) {
+        this.mqttClient = mqttClient;
+        this.nodeEventClazzPub = nodeEventClazzPub;
+    }
+
+    @Override
+    public void onSuccess(IMqttToken asyncActionToken) {
+        Appender<ILoggingEvent> appender = Appenders.mqttAppender1(AppId.dagent.name(), mqttClient);
+        LoggerConfig.initLogger(List.of(appender));
+        nodeEventClazzPub.nodeStart();
+        nodeEventClazzPub.nodeHeartbeat();
+
+        log.info("MQTT 连接建立成功,开始订阅 topic");
+        mqttClient.getMsgListeners().forEach((topic, msgListener) -> {
+            try {
+                // 确保能订阅到离线消息的设置:
+                // - qos=1
+                // - clientId 确定且唯一,不能使用随机的 UUID
+                // - CleanSession=false
+                mqttClient.sub(topic, msgListener);
+            } catch (MqttException e) {
+                log.error("topic {} 订阅失败, 原因 {}", topic, ExceptionUtil.errorMsg(e));
+            }
+        });
+    }
+
+    @Override
+    public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
+        log.error("MQTT 连接建立失败,原因 {}", exception.getMessage());
+    }
+}

+ 5 - 24
dagent/src/main/java/cn/reghao/autodop/dagent/mqttsub/MqttRpcListener.java → dagent/src/main/java/cn/reghao/autodop/dagent/mqttsub/DagentTopicListener.java

@@ -1,18 +1,14 @@
 package cn.reghao.autodop.dagent.mqttsub;
 
-import cn.reghao.autodop.common.mqtt.DefaultMqttClient;
+import cn.reghao.autodop.common.mqtt.AsyncMqttClient;
 import cn.reghao.autodop.common.msg.Message;
 import cn.reghao.autodop.common.msg.MsgQueue;
-import cn.reghao.autodop.common.msg.MsgType;
 import cn.reghao.autodop.common.msg.rpc.RpcMsg;
 import cn.reghao.autodop.common.msg.rpc.constant.RpcClazz;
 import cn.reghao.autodop.dagent.mqttsub.impl.AppRpcClazzDispatcher;
 import cn.reghao.jdkutil.serializer.JsonConverter;
 import lombok.extern.slf4j.Slf4j;
-import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
-import org.eclipse.paho.client.mqttv3.MqttCallback;
-import org.eclipse.paho.client.mqttv3.MqttException;
-import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.*;
 import org.springframework.stereotype.Component;
 
 import java.lang.management.ManagementFactory;
@@ -25,12 +21,12 @@ import java.lang.management.ManagementFactory;
  */
 @Slf4j
 @Component
-public class MqttRpcListener implements MqttCallback {
+public class DagentTopicListener implements IMqttMessageListener {
     private final long startTime;
-    private final DefaultMqttClient mqttClient;
+    private final AsyncMqttClient mqttClient;
     private final AppRpcClazzDispatcher appRpcClazzDispatcher;
 
-    public MqttRpcListener(DefaultMqttClient mqttClient, AppRpcClazzDispatcher appRpcClazzDispatcher) {
+    public DagentTopicListener(AsyncMqttClient mqttClient, AppRpcClazzDispatcher appRpcClazzDispatcher) {
         this.startTime = ManagementFactory.getRuntimeMXBean().getStartTime();
         this.mqttClient = mqttClient;
         this.appRpcClazzDispatcher = appRpcClazzDispatcher;
@@ -65,19 +61,4 @@ public class MqttRpcListener implements MqttCallback {
         String topic = MsgQueue.dmasterTopic();
         mqttClient.pub(topic, 1, JsonConverter.objectToJson(message));
     }
-
-    @Override
-    public void deliveryComplete(IMqttDeliveryToken token) {
-        if (!token.isComplete()) {
-            log.error("token -> {}", token);
-        } else {
-            // 由于使用 MQTT 记录日志,而每条消息发送成功后都会调用此方法,因此会形成循环引用,导致超高 CPU 使用率
-            // log.info("token -> {}", token);
-        }
-    }
-
-    @Override
-    public void connectionLost(Throwable cause) {
-        log.error("MQTT connection lost -> {}", cause.getMessage());
-    }
 }

+ 17 - 40
dagent/src/main/java/cn/reghao/autodop/dagent/spring/DagentLifecycle.java

@@ -1,15 +1,11 @@
 package cn.reghao.autodop.dagent.spring;
 
-import ch.qos.logback.classic.spi.ILoggingEvent;
-import ch.qos.logback.core.Appender;
-import cn.reghao.autodop.common.log.Appenders;
 import cn.reghao.autodop.common.machine.Machine;
-import cn.reghao.autodop.common.mqtt.DefaultMqttClient;
+import cn.reghao.autodop.common.mqtt.AsyncMqttClient;
 import cn.reghao.autodop.common.msg.MsgQueue;
-import cn.reghao.autodop.common.msg.pub.dto.node.constant.AppId;
 import cn.reghao.autodop.dagent.machine.NodeEventClazzPubImpl;
-import cn.reghao.autodop.dagent.mqttsub.MqttRpcListener;
-import cn.reghao.autodop.common.log.LoggerConfig;
+import cn.reghao.autodop.dagent.mqttsub.DagentConnActionListener;
+import cn.reghao.autodop.dagent.mqttsub.DagentTopicListener;
 import lombok.extern.slf4j.Slf4j;
 import org.eclipse.paho.client.mqttv3.MqttException;
 import org.springframework.beans.factory.DisposableBean;
@@ -17,9 +13,6 @@ import org.springframework.boot.ApplicationArguments;
 import org.springframework.boot.ApplicationRunner;
 import org.springframework.context.annotation.Configuration;
 
-import java.util.ArrayList;
-import java.util.List;
-
 /**
  * TODO 无法处理 kill -9 和机器断电的情况
  *
@@ -29,28 +22,25 @@ import java.util.List;
 @Slf4j
 @Configuration
 public class DagentLifecycle implements ApplicationRunner, DisposableBean {
-    private final DefaultMqttClient mqttClient;
-    private final MqttRpcListener rpcListener;
-    private final NodeEventClazzPubImpl nodeClazzPub;
+    private final AsyncMqttClient mqttClient;
+    private final DagentTopicListener dagentTopicListener;
+    private final NodeEventClazzPubImpl nodeEventClazzPub;
 
-    public DagentLifecycle(DefaultMqttClient mqttClient, MqttRpcListener rpcListener, Machine machine) {
+    public DagentLifecycle(AsyncMqttClient mqttClient, DagentTopicListener dagentTopicListener, Machine machine) {
         this.mqttClient = mqttClient;
-        this.rpcListener = rpcListener;
-        this.nodeClazzPub = new NodeEventClazzPubImpl(mqttClient, machine);
-        initLogger();
-    }
-
-    private void initLogger() {
-        List<Appender<ILoggingEvent>> appenders = new ArrayList<>();
-        appenders.add(Appenders.mqttAppender(AppId.dagent.name(), mqttClient));
-        LoggerConfig.initLogger(appenders);
+        this.dagentTopicListener = dagentTopicListener;
+        this.nodeEventClazzPub = new NodeEventClazzPubImpl(mqttClient, machine);
     }
 
     @Override
     public void run(ApplicationArguments args) throws Exception {
-        subTopic();
-        pubDagentStart();
-        pubDagentHeartbeat();
+        startMqtt();
+    }
+
+    private void startMqtt() throws MqttException {
+        mqttClient.add(MsgQueue.dagentTopic(Machine.ID), dagentTopicListener);
+        DagentConnActionListener connActionListener = new DagentConnActionListener(mqttClient, nodeEventClazzPub);
+        mqttClient.connect(connActionListener);
     }
 
     @Override
@@ -58,21 +48,8 @@ public class DagentLifecycle implements ApplicationRunner, DisposableBean {
         pubDagentShutdown();
     }
 
-    private void subTopic() throws MqttException {
-        String topic = MsgQueue.dagentTopic(Machine.ID);
-        mqttClient.sub(topic, rpcListener);
-    }
-
-    private void pubDagentStart() {
-        nodeClazzPub.nodeStart();
-    }
-
-    private void pubDagentHeartbeat() {
-        nodeClazzPub.nodeHeartbeat();
-    }
-
     private void pubDagentShutdown() {
-        nodeClazzPub.nodeShutdown();
+        nodeEventClazzPub.nodeShutdown();
         log.info("Dagent 停止");
     }
 }

+ 1 - 1
dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/AppDeployer.java

@@ -1,11 +1,11 @@
 package cn.reghao.autodop.dmaster.app.service;
 
+import cn.reghao.autodop.common.mqtt.DefaultMqttClient;
 import cn.reghao.autodop.common.msg.Message;
 import cn.reghao.autodop.common.msg.MsgQueue;
 import cn.reghao.autodop.common.msg.rpc.RpcMsg;
 import cn.reghao.autodop.common.msg.rpc.constant.AppRpcClazz;
 import cn.reghao.autodop.common.msg.rpc.dto.app.DeployParam;
-import cn.reghao.autodop.common.mqtt.DefaultMqttClient;
 import cn.reghao.autodop.dmaster.app.db.crud.AppDeployingCrud;
 import cn.reghao.autodop.dmaster.app.db.crud.log.DeployLogCrud;
 import cn.reghao.autodop.dmaster.app.db.query.AppDeployingQuery;

+ 9 - 19
dmaster/src/main/java/cn/reghao/autodop/dmaster/mqttsub/MqttSubListener.java → dmaster/src/main/java/cn/reghao/autodop/dmaster/mqttsub/DmasterTopicListener.java

@@ -11,8 +11,7 @@ import cn.reghao.autodop.dmaster.mqttsub.impl.rpcresult.AppRpcClazzResultDispatc
 import cn.reghao.autodop.dmaster.mqttsub.impl.pub.NodeEventPubClazzDispatcher;
 import cn.reghao.jdkutil.serializer.JsonConverter;
 import lombok.extern.slf4j.Slf4j;
-import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
-import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
 import org.eclipse.paho.client.mqttv3.MqttMessage;
 import org.springframework.stereotype.Component;
 
@@ -24,17 +23,17 @@ import org.springframework.stereotype.Component;
  */
 @Slf4j
 @Component
-public class MqttSubListener implements MqttCallback {
+public class DmasterTopicListener implements IMqttMessageListener {
     private final NodeEventPubClazzDispatcher nodeEventPubClazzDispatcher;
     private final AppRpcClazzResultDispatcher appRpcClazzResultDispatcher;
-    private final DefaultMqttClient defaultMqttClient;
+    private final DefaultMqttClient mqttClient;
 
-    public MqttSubListener(NodeEventPubClazzDispatcher nodeEventPubClazzDispatcher,
-                           AppRpcClazzResultDispatcher appRpcClazzResultDispatcher,
-                           DefaultMqttClient defaultMqttClient) {
+    public DmasterTopicListener(NodeEventPubClazzDispatcher nodeEventPubClazzDispatcher,
+                                AppRpcClazzResultDispatcher appRpcClazzResultDispatcher,
+                                DefaultMqttClient mqttClient) {
         this.nodeEventPubClazzDispatcher = nodeEventPubClazzDispatcher;
         this.appRpcClazzResultDispatcher = appRpcClazzResultDispatcher;
-        this.defaultMqttClient = defaultMqttClient;
+        this.mqttClient = mqttClient;
     }
 
     @Override
@@ -76,7 +75,7 @@ public class MqttSubListener implements MqttCallback {
     }
 
     private void dispatchRpcResult(String msgId, RpcMsg rpcMsg) {
-        RpcMsg rpcMsg1 = defaultMqttClient.getRecord(msgId);
+        RpcMsg rpcMsg1 = mqttClient.getRecord(msgId);
         if (rpcMsg1 == null) {
             log.error("RPC 调用结果 {} 找不到 RPC 调用参数", rpcMsg);
             return;
@@ -90,15 +89,6 @@ public class MqttSubListener implements MqttCallback {
             default:
                 ;
         }
-        defaultMqttClient.removeRecord(msgId);
-    }
-
-    @Override
-    public void deliveryComplete(IMqttDeliveryToken token) {
-    }
-
-    @Override
-    public void connectionLost(Throwable cause) {
-        log.error("MQTT connection lost -> {}", cause.getMessage());
+        mqttClient.removeRecord(msgId);
     }
 }

+ 30 - 0
dmaster/src/main/java/cn/reghao/autodop/dmaster/mqttsub/SysTopicListener.java

@@ -0,0 +1,30 @@
+package cn.reghao.autodop.dmaster.mqttsub;
+
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.springframework.stereotype.Component;
+
+/**
+ * 处理 sub.dmaster topic 中的消息
+ *
+ * @author reghao
+ * @date 2021-05-24 09:24:03
+ */
+@Slf4j
+@Component
+public class SysTopicListener implements IMqttMessageListener {
+    @Override
+    public void messageArrived(String topic, MqttMessage message) {
+        try {
+            if (message.isDuplicate()) {
+                log.info("重新发送的消息");
+            }
+
+            String msg = message.toString();
+            log.info("mosquitto 系统消息 {}", msg);
+        } catch (Exception e) {
+            log.error("MQTT message exception -> {}", e.getMessage());
+        }
+    }
+}

+ 3 - 0
dmaster/src/main/java/cn/reghao/autodop/dmaster/mqttsub/impl/pub/NodeEventPubClazzDispatcher.java

@@ -40,6 +40,9 @@ public class NodeEventPubClazzDispatcher implements ClazzDispatcher<PubMsg, Void
                 MachineStat machineStat1 = JsonConverter.jsonToObject(jsonPayload, MachineStat.class);
                 nodePubClazz.shutdown(machineStat1);
                 break;
+            case unavail:
+                log.error("节点无故断开 {}", jsonPayload);
+                break;
             case log:
                 NodeLogDTO nodeLogDTO = JsonConverter.jsonToObject(jsonPayload, NodeLogDTO.class);
                 nodePubClazz.log(nodeLogDTO);

+ 0 - 1
dmaster/src/main/java/cn/reghao/autodop/dmaster/mqttsub/impl/rpcresult/AppRpcClazzResultDispatcher.java

@@ -5,7 +5,6 @@ import cn.reghao.autodop.common.msg.rpc.RpcMsg;
 import cn.reghao.autodop.common.msg.rpc.constant.AppRpcClazz;
 import cn.reghao.autodop.common.msg.rpc.dto.app.StatusResult;
 import cn.reghao.autodop.common.msg.rpc.dto.app.DeployResult;
-import cn.reghao.autodop.common.mqtt.DefaultMqttClient;
 import cn.reghao.jdkutil.result.ResultStatus;
 import cn.reghao.jdkutil.serializer.JsonConverter;
 import lombok.extern.slf4j.Slf4j;

+ 11 - 6
dmaster/src/main/java/cn/reghao/autodop/dmaster/spring/DmasterLifecycle.java

@@ -5,12 +5,14 @@ import ch.qos.logback.core.Appender;
 import cn.reghao.autodop.common.log.LoggerConfig;
 import cn.reghao.autodop.common.machine.Machine;
 import cn.reghao.autodop.common.mqtt.DefaultMqttClient;
+import cn.reghao.autodop.common.mqtt.DefaultMqttClient;
 import cn.reghao.autodop.common.msg.MsgQueue;
 import cn.reghao.autodop.dmaster.app.model.constant.EnvList;
 import cn.reghao.autodop.dmaster.app.model.po.config.build.BuildDir;
 import cn.reghao.autodop.dmaster.app.service.config.BuildDirService;
-import cn.reghao.autodop.dmaster.mqttsub.MqttSubListener;
+import cn.reghao.autodop.dmaster.mqttsub.DmasterTopicListener;
 import cn.reghao.autodop.dmaster.machine.db.crud.NodeLogCrud;
+import cn.reghao.autodop.dmaster.mqttsub.SysTopicListener;
 import cn.reghao.autodop.dmaster.sys.db.crud.SysEnvCrud;
 import cn.reghao.autodop.dmaster.sys.db.query.SysEnvQuery;
 import cn.reghao.autodop.dmaster.sys.model.SysEnv;
@@ -32,19 +34,22 @@ import java.util.List;
 @Slf4j
 @Component
 public class DmasterLifecycle implements ApplicationRunner, DisposableBean {
-    private final MqttSubListener messageListener;
+    private final DmasterTopicListener messageListener;
     private final DefaultMqttClient mqttClient;
     private final BuildDirService buildDirService;
     private SysEnvQuery sysEnvQuery;
     private final SysEnvCrud sysEnvCrud;
+    private SysTopicListener sysTopicSubListener;
 
-    public DmasterLifecycle(NodeLogCrud nodeLogCrud, MqttSubListener messageListener, BuildDirService buildDirService,
-                            DefaultMqttClient mqttClient, SysEnvQuery sysEnvQuery, SysEnvCrud sysEnvCrud) {
+    public DmasterLifecycle(NodeLogCrud nodeLogCrud, DmasterTopicListener messageListener, BuildDirService buildDirService,
+                            DefaultMqttClient mqttClient, SysEnvQuery sysEnvQuery, SysEnvCrud sysEnvCrud,
+                            SysTopicListener sysTopicSubListener) {
         this.messageListener = messageListener;
         this.buildDirService = buildDirService;
         this.mqttClient = mqttClient;
         this.sysEnvQuery = sysEnvQuery;
         this.sysEnvCrud = sysEnvCrud;
+        this.sysTopicSubListener = sysTopicSubListener;
         initLogger(nodeLogCrud);
         initSysEnv();
     }
@@ -68,8 +73,8 @@ public class DmasterLifecycle implements ApplicationRunner, DisposableBean {
     }
 
     private void subTopic() throws MqttException {
-        String topic = MsgQueue.dmasterTopic();
-        mqttClient.sub(topic, messageListener);
+        mqttClient.sub(MsgQueue.dmasterTopic(), messageListener);
+        mqttClient.sub(MsgQueue.sysTopic(), sysTopicSubListener);
     }
 
     private void initBuildDir() {