Просмотр исходного кода

完成 MQTT 掉线重连,重新订阅 topic 功能

reghao 4 лет назад
Родитель
Сommit
e0625b4e17
27 измененных файлов с 201 добавлено и 381 удалено
  1. 4 8
      common/src/main/java/cn/reghao/autodop/common/log/Appenders.java
  2. 0 41
      common/src/main/java/cn/reghao/autodop/common/log/InternalMqttClient.java
  3. 8 31
      common/src/main/java/cn/reghao/autodop/common/log/MqttAppender.java
  4. 123 0
      common/src/main/java/cn/reghao/autodop/common/mqtt/DefaultMqttClient.java
  5. 2 2
      common/src/main/java/cn/reghao/autodop/common/mqtt/MosquittoProperties.java
  6. 0 71
      common/src/main/java/cn/reghao/autodop/common/mqtt/MqttPub.java
  7. 0 49
      common/src/main/java/cn/reghao/autodop/common/mqtt/MqttSub.java
  8. 0 24
      common/src/main/java/cn/reghao/autodop/common/mqtt/PubActionListener.java
  9. 0 28
      common/src/main/java/cn/reghao/autodop/common/mqtt/SubMsgCallback.java
  10. 3 3
      dagent/src/main/java/cn/reghao/autodop/dagent/machine/HeartbeatJob.java
  11. 9 7
      dagent/src/main/java/cn/reghao/autodop/dagent/machine/MachineScheduler.java
  12. 2 13
      dagent/src/main/java/cn/reghao/autodop/dagent/mqttsub/DmasterMsgDispatcher.java
  13. 5 7
      dagent/src/main/java/cn/reghao/autodop/dagent/mqttsub/processor/app/AppOpsProcessor.java
  14. 1 29
      dagent/src/main/java/cn/reghao/autodop/dagent/mqttsub/processor/docker/DockerOpsProcessor.java
  15. 5 7
      dagent/src/main/java/cn/reghao/autodop/dagent/mqttsub/processor/machine/MachineOpsProcessor.java
  16. 15 22
      dagent/src/main/java/cn/reghao/autodop/dagent/utils/DagentLifecycle.java
  17. 1 1
      dagent/src/main/resources/application-dev.yml
  18. 1 1
      dagent/src/main/resources/application-prod.yml
  19. 1 1
      dagent/src/main/resources/application-test.yml
  20. 6 6
      dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/AppStatusService.java
  21. 5 5
      dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/bd/AppDeployer.java
  22. 2 13
      dmaster/src/main/java/cn/reghao/autodop/dmaster/mqttsub/DagentMsgDispatcher.java
  23. 5 6
      dmaster/src/main/java/cn/reghao/autodop/dmaster/utils/lifecycle/DmasterLifecycle.java
  24. 1 1
      dmaster/src/main/resources/application-dev.yml
  25. 1 1
      dmaster/src/main/resources/application-prod.yml
  26. 1 1
      dmaster/src/main/resources/application-test.yml
  27. 0 3
      dmaster/src/test/java/cn/reghao/autodop/common/mqtt/MqttTest.java

+ 4 - 8
common/src/main/java/cn/reghao/autodop/common/log/Appenders.java

@@ -6,12 +6,9 @@ import ch.qos.logback.classic.spi.ILoggingEvent;
 import ch.qos.logback.core.Appender;
 import ch.qos.logback.core.ConsoleAppender;
 import ch.qos.logback.core.FileAppender;
-import cn.reghao.autodop.common.mqtt.MosquittoProperties;
-import cn.reghao.autodop.common.utils.MachineIdentity;
+import cn.reghao.autodop.common.mqtt.DefaultMqttClient;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-
 /**
  * @author reghao
  * @date 2021-06-11 13:31:20
@@ -19,15 +16,14 @@ import java.io.IOException;
 public class Appenders {
     private LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
 
-    public Appender<ILoggingEvent> mqttAppender(MachineIdentity machineIdentity,
-                                                MosquittoProperties properties,
-                                                String appId) throws IOException {
+    public Appender<ILoggingEvent> mqttAppender(String machineId, String machineIpv4, String appId,
+                                                DefaultMqttClient mqttClient) {
         PatternLayoutEncoder layoutEncoder = new PatternLayoutEncoder();
         layoutEncoder.setPattern("%date %level [%thread] %logger{10} [%file:%line] %msg%n");
         layoutEncoder.setContext(loggerContext);
         layoutEncoder.start();
 
-        MqttAppender mqttAppender = new MqttAppender(machineIdentity, properties, appId);
+        MqttAppender mqttAppender = new MqttAppender(machineId, machineIpv4, appId, mqttClient);
         mqttAppender.setContext(loggerContext);
         mqttAppender.start();
         return mqttAppender;

+ 0 - 41
common/src/main/java/cn/reghao/autodop/common/log/InternalMqttClient.java

@@ -1,41 +0,0 @@
-package cn.reghao.autodop.common.log;
-
-import org.eclipse.paho.client.mqttv3.*;
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
-import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
-
-/**
- * @author reghao
- * @date 2021-06-08 19:38:34
- */
-public class InternalMqttClient extends MqttClient implements MqttCallback {
-    private MqttClientPersistence clientPersistence;
-
-    public InternalMqttClient(String serverURI, String clientId) throws MqttException {
-        //this(serverURI, clientId, new MqttDefaultFilePersistence());
-        this(serverURI, clientId, new MemoryPersistence());
-    }
-
-    public InternalMqttClient(String serverURI, String clientId, MqttClientPersistence persistence) throws MqttException {
-        super(serverURI, clientId, persistence);
-        clientPersistence = persistence;
-    }
-
-    @Override
-    public void disconnect() throws MqttPersistenceException {
-        clientPersistence.close();
-    }
-
-    @Override
-    public void connectionLost(Throwable cause) {
-        // TODO reconnect
-    }
-
-    @Override
-    public void messageArrived(String topic, MqttMessage message) throws Exception {
-    }
-
-    @Override
-    public void deliveryComplete(IMqttDeliveryToken token) {
-    }
-}

+ 8 - 31
common/src/main/java/cn/reghao/autodop/common/log/MqttAppender.java

@@ -5,59 +5,36 @@ import ch.qos.logback.core.UnsynchronizedAppenderBase;
 import cn.reghao.autodop.common.message.AsyncMsg;
 import cn.reghao.autodop.common.message.MessageType;
 import cn.reghao.autodop.common.message.ops.DagentOps;
-import cn.reghao.autodop.common.mqtt.MosquittoProperties;
+import cn.reghao.autodop.common.mqtt.DefaultMqttClient;
 import cn.reghao.autodop.common.utils.DateTimeConverter;
-import cn.reghao.autodop.common.utils.MachineIdentity;
 import cn.reghao.autodop.common.utils.serializer.JsonConverter;
-import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
 import org.eclipse.paho.client.mqttv3.MqttException;
 import org.eclipse.paho.client.mqttv3.MqttMessage;
 
-import java.io.IOException;
-import java.util.UUID;
-
 /**
  * @author reghao
  * @date 2021-06-08 19:37:21
  */
 public class MqttAppender extends UnsynchronizedAppenderBase<ILoggingEvent> {
-    private MosquittoProperties properties;
-    private String clientId = UUID.randomUUID().toString();
-    private InternalMqttClient client;
-    private String appId;
     private String machineId;
     private String machineIpv4;
+    private String appId;
+    private DefaultMqttClient mqttClient;
 
-    public MqttAppender(MachineIdentity machineIdentity, MosquittoProperties properties, String appId)
-            throws IOException {
-        this.properties = properties;
+    public MqttAppender(String machineId, String machineIpv4, String appId, DefaultMqttClient mqttClient) {
+        this.machineId = machineId;
+        this.machineIpv4 = machineIpv4;
         this.appId = appId;
-        this.machineId = machineIdentity.id();
-        this.machineIpv4 = machineIdentity.ipv4();
+        this.mqttClient = mqttClient;
     }
 
     @Override
     public void start() {
         super.start();
-        try {
-            client = new InternalMqttClient(properties.getBroker(), clientId);
-            MqttConnectOptions options = new MqttConnectOptions();
-            options.setUserName(properties.getUsername());
-            options.setPassword(properties.getPassword().toCharArray());
-            client.setCallback(client);
-            client.connect(options);
-        } catch (MqttException e) {
-            e.printStackTrace();
-        }
     }
 
     @Override
     public void stop() {
-        try {
-            client.disconnect();
-        } catch (MqttException e) {
-            e.printStackTrace();
-        }
         super.stop();
     }
 
@@ -71,7 +48,7 @@ public class MqttAppender extends UnsynchronizedAppenderBase<ILoggingEvent> {
         message.setQos(0);
         message.setPayload(payload.getBytes());
         try {
-            client.publish(properties.getTopic(), message);
+            mqttClient.pub("dmaster", 1, payload);
         } catch (MqttException e) {
             e.printStackTrace();
         }

+ 123 - 0
common/src/main/java/cn/reghao/autodop/common/mqtt/DefaultMqttClient.java

@@ -0,0 +1,123 @@
+package cn.reghao.autodop.common.mqtt;
+
+import cn.reghao.autodop.common.utils.MachineIdentity;
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.*;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author reghao
+ * @date 2021-05-21 08:27:41
+ */
+@Slf4j
+@Component
+public class DefaultMqttClient implements AutoCloseable {
+    private MosquittoProperties properties;
+    private MqttClient client;
+    private Map<String, IMqttMessageListener> subMap = new HashMap<>();
+    private MqttCallback callback = new MqttClientCallback();
+
+    public DefaultMqttClient(MachineIdentity id, MosquittoProperties properties) throws MqttException, IOException {
+        this.properties = properties;
+        String clientId = properties.getClientId() + id.id();
+        this.client = new MqttClient(properties.getBroker(), clientId, new MemoryPersistence());
+    }
+
+    private MqttConnectOptions connectOptions() {
+        MqttConnectOptions options = new MqttConnectOptions();
+        options.setUserName(properties.getUsername());
+        options.setPassword(properties.getPassword().toCharArray());
+        options.setCleanSession(false);
+        // 自动重连
+        options.setAutomaticReconnect(true);
+        return options;
+    }
+
+    @Override
+    public void close() throws MqttException {
+        if (client.isConnected()) {
+            client.disconnect();
+        }
+        client.close();
+    }
+
+    public boolean isConnected() {
+        return client.isConnected();
+    }
+
+    private void connect() throws MqttException {
+        if (!client.isConnected()) {
+            client.setCallback(callback);
+            client.connect(connectOptions());
+        }
+    }
+
+    /**
+     * 确保能订阅到离线消息的设置:
+     * - qos=1
+     * - clientId 确定,不能使用随机的 UUID
+     * - CleanSession=false
+     *
+     * @param
+     * @return
+     * @date 2021-06-16 下午8:39
+     */
+    public void sub(String topic, IMqttMessageListener messageListener) throws MqttException {
+        connect();
+        client.subscribe(topic, 1, messageListener);
+        subMap.putIfAbsent(topic, messageListener);
+    }
+
+    private void resub(String topic, IMqttMessageListener messageListener) throws MqttException {
+        client.subscribe(topic, 1, messageListener);
+        log.info("重新订阅 {} 成功", topic);
+    }
+
+    /**
+     * qos=0 的消息不能离线
+     *
+     * @param
+     * @return
+     * @date 2021-06-16 下午8:40
+     */
+    public void pub(String topic, int qos, String payload) throws MqttException {
+        MqttMessage message = new MqttMessage();
+        message.setQos(qos);
+        message.setPayload(payload.getBytes());
+
+        connect();
+        client.publish(topic, message);
+    }
+
+    class MqttClientCallback implements MqttCallbackExtended {
+        @Override
+        public void connectComplete(boolean reconnect, String serverUri) {
+            // 重连完成后自动订阅 topic
+            subMap.forEach((topic, listener) -> {
+                try {
+                    resub(topic, listener);
+                } catch (MqttException e) {
+                    log.error("重新订阅 {} 失败,原因: {}", topic, e.getMessage());
+                }
+            });
+        }
+
+        @Override
+        public void connectionLost(Throwable cause) {
+            log.error("MQTT connection lost {}", cause.getMessage());
+        }
+
+        @Override
+        public void messageArrived(String topic, MqttMessage message) throws Exception {
+        }
+
+        @Override
+        public void deliveryComplete(IMqttDeliveryToken token) {
+        }
+    }
+}

+ 2 - 2
common/src/main/java/cn/reghao/autodop/common/mqtt/MosquittoProperties.java

@@ -19,6 +19,6 @@ public class MosquittoProperties {
     private String username;
     @Value("${mosquitto.password}")
     private String password;
-    @Value("${mosquitto.topic}")
-    private String topic;
+    @Value("${mosquitto.clientId}")
+    private String clientId;
 }

+ 0 - 71
common/src/main/java/cn/reghao/autodop/common/mqtt/MqttPub.java

@@ -1,71 +0,0 @@
-package cn.reghao.autodop.common.mqtt;
-
-import lombok.extern.slf4j.Slf4j;
-import org.eclipse.paho.client.mqttv3.*;
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
-import org.springframework.stereotype.Component;
-
-import java.util.UUID;
-
-/**
- * @author reghao
- * @date 2021-05-21 08:27:41
- */
-@Slf4j
-@Component
-public class MqttPub implements AutoCloseable {
-    private MosquittoProperties properties;
-    private MqttClient client;
-
-    public MqttPub(MosquittoProperties properties) throws MqttException {
-        this.properties = properties;
-        this.client = new MqttClient(properties.getBroker(), UUID.randomUUID().toString(), new MemoryPersistence());
-    }
-
-    private MqttConnectOptions connectOptions() {
-        MqttConnectOptions options = new MqttConnectOptions();
-        options.setConnectionTimeout(10);
-        options.setKeepAliveInterval(20);
-        options.setUserName(properties.getUsername());
-        options.setPassword(properties.getPassword().toCharArray());
-        return options;
-    }
-
-    @Override
-    public void close() throws MqttException {
-        client.close();
-    }
-
-    /**
-     * qos=0 的消息不能离线
-     *
-     * @param
-     * @return
-     * @date 2021-06-16 下午8:40
-     */
-    public void pub(String topic, int qos, String payload) throws MqttException {
-        client.connect(connectOptions());
-        MqttTopic mqttTopic = client.getTopic(topic);
-
-        MqttMessage message = new MqttMessage();
-        message.setQos(qos);
-        message.setPayload(payload.getBytes());
-        MqttDeliveryToken deliveryToken = mqttTopic.publish(message);
-        if (client.isConnected()) {
-            client.disconnect();
-        }
-    }
-
-    public void pub(String topic, String payload) throws MqttException {
-        client.connect(connectOptions());
-        MqttTopic mqttTopic = client.getTopic(topic);
-
-        MqttMessage message = new MqttMessage();
-        message.setQos(1);
-        message.setPayload(payload.getBytes());
-        MqttDeliveryToken deliveryToken = mqttTopic.publish(message);
-        if (client.isConnected()) {
-            client.disconnect();
-        }
-    }
-}

+ 0 - 49
common/src/main/java/cn/reghao/autodop/common/mqtt/MqttSub.java

@@ -1,49 +0,0 @@
-package cn.reghao.autodop.common.mqtt;
-
-import lombok.extern.slf4j.Slf4j;
-import org.eclipse.paho.client.mqttv3.*;
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
-
-/**
- * @author reghao
- * @date 2021-05-21 08:27:41
- */
-@Slf4j
-public class MqttSub implements AutoCloseable {
-    private MosquittoProperties properties;
-    private MqttClient client;
-
-    public MqttSub(String clientId, MosquittoProperties properties) throws MqttException {
-        this.properties = properties;
-        this.client = new MqttClient(properties.getBroker(), clientId, new MemoryPersistence());
-    }
-
-    private MqttConnectOptions connectOptions() {
-        MqttConnectOptions options = new MqttConnectOptions();
-        options.setCleanSession(false);
-        options.setUserName(properties.getUsername());
-        options.setPassword(properties.getPassword().toCharArray());
-        return options;
-    }
-
-    @Override
-    public void close() throws MqttException {
-        client.close();
-    }
-
-    /**
-     * 确保能订阅到离线消息的设置:
-     * - qos=1
-     * - clientId 确定,不能使用随机的 UUID
-     * - CleanSession=false
-     *
-     * @param
-     * @return
-     * @date 2021-06-16 下午8:39
-     */
-    public void sub(String topic, MqttCallback callback) throws MqttException {
-        client.connect(connectOptions());
-        client.setCallback(callback);
-        client.subscribe(topic, 1);
-    }
-}

+ 0 - 24
common/src/main/java/cn/reghao/autodop/common/mqtt/PubActionListener.java

@@ -1,24 +0,0 @@
-package cn.reghao.autodop.common.mqtt;
-
-import lombok.extern.slf4j.Slf4j;
-import org.eclipse.paho.client.mqttv3.IMqttActionListener;
-import org.eclipse.paho.client.mqttv3.IMqttToken;
-
-/**
- * 发布消息时的回调
- *
- * @author reghao
- * @date 2021-06-08 19:16:46
- */
-@Slf4j
-public class PubActionListener implements IMqttActionListener {
-    @Override
-    public void onSuccess(IMqttToken pubToken) {
-        int msgId = pubToken.getMessageId();
-    }
-
-    @Override
-    public void onFailure(IMqttToken pubToken, Throwable exception) {
-        log.info("MQTT 消息发布失败...");
-    }
-}

+ 0 - 28
common/src/main/java/cn/reghao/autodop/common/mqtt/SubMsgCallback.java

@@ -1,28 +0,0 @@
-package cn.reghao.autodop.common.mqtt;
-
-import lombok.extern.slf4j.Slf4j;
-import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
-import org.eclipse.paho.client.mqttv3.MqttCallback;
-import org.eclipse.paho.client.mqttv3.MqttMessage;
-
-/**
- * @author reghao
- * @date 2021-05-24 09:24:03
- */
-@Slf4j
-public class SubMsgCallback implements MqttCallback {
-    @Override
-    public void connectionLost(Throwable cause) {
-        log.error("conn lost {}", cause.getMessage());
-    }
-
-    @Override
-    public void messageArrived(String topic, MqttMessage message) {
-        log.info("{}: {} -> {}", topic, message.getId(), message.toString());
-    }
-
-    @Override
-    public void deliveryComplete(IMqttDeliveryToken token) {
-        log.info("token -> {}", token);
-    }
-}

+ 3 - 3
dagent/src/main/java/cn/reghao/autodop/dagent/machine/HeartbeatJob.java

@@ -4,7 +4,7 @@ import cn.reghao.autodop.common.message.AsyncMsg;
 import cn.reghao.autodop.common.message.MessageType;
 import cn.reghao.autodop.common.dagent.machine.Machine;
 import cn.reghao.autodop.common.message.ops.DagentOps;
-import cn.reghao.autodop.common.mqtt.MqttPub;
+import cn.reghao.autodop.common.mqtt.DefaultMqttClient;
 import cn.reghao.autodop.common.utils.serializer.JsonConverter;
 import cn.reghao.autodop.dagent.utils.DagentLifecycle;
 import lombok.extern.slf4j.Slf4j;
@@ -24,14 +24,14 @@ public class HeartbeatJob implements Job {
         JobKey jobKey = context.getJobDetail().getKey();
         JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
 
-        MqttPub mqttPub = (MqttPub) jobDataMap.get("mqttPub");
+        DefaultMqttClient mqttClient = (DefaultMqttClient) jobDataMap.get("mqttClient");
         Machine machine = (Machine) jobDataMap.get("machine");
 
         String payload = JsonConverter.objectToJson(machine.heartbeat());
         AsyncMsg asyncMsg = AsyncMsg.asyncMsg(DagentLifecycle.MACHINE_ID, MessageType.dagentType.name(),
                 DagentOps.dagentHeartbeat.name(), payload);
         try {
-            mqttPub.pub("dmaster", 0, JsonConverter.objectToJson(asyncMsg));
+            mqttClient.pub("dmaster", 0, JsonConverter.objectToJson(asyncMsg));
         } catch (MqttException e) {
             e.printStackTrace();
         }

+ 9 - 7
dagent/src/main/java/cn/reghao/autodop/dagent/machine/MachineScheduler.java

@@ -1,7 +1,7 @@
 package cn.reghao.autodop.dagent.machine;
 
 import cn.reghao.autodop.common.dagent.machine.Machine;
-import cn.reghao.autodop.common.mqtt.MqttPub;
+import cn.reghao.autodop.common.mqtt.DefaultMqttClient;
 import lombok.extern.slf4j.Slf4j;
 import org.quartz.*;
 import org.quartz.impl.StdSchedulerFactory;
@@ -15,18 +15,18 @@ import org.springframework.stereotype.Component;
 @Component
 public class MachineScheduler {
     private Scheduler scheduler;
-    private MqttPub mqttPub;
+    private DefaultMqttClient mqttClient;
     private Machine machine;
 
-    public MachineScheduler(MqttPub mqttPub, Machine machine) throws SchedulerException {
+    public MachineScheduler(DefaultMqttClient mqttClient, Machine machine) throws SchedulerException {
         this.scheduler = StdSchedulerFactory.getDefaultScheduler();
-        this.mqttPub = mqttPub;
+        this.mqttClient = mqttClient;
         this.machine = machine;
     }
 
     public void add(Class<? extends Job> clazz, String jobId, String cronExp) throws SchedulerException {
         JobDataMap jobDataMap = new JobDataMap();
-        jobDataMap.put("mqttPub", mqttPub);
+        jobDataMap.put("mqttClient", mqttClient);
         jobDataMap.put("machine", machine);
 
         JobDetail jobDetail = JobBuilder.newJob(clazz)
@@ -41,8 +41,10 @@ public class MachineScheduler {
                 .withSchedule(CronScheduleBuilder.cronSchedule(cronExp));
         CronTrigger cronTrigger = triggerBuilder.build();
 
-        scheduler.scheduleJob(jobDetail, cronTrigger);
-        log.info("添加新定时任务 {}...", jobId);
+        if (scheduler.getJobDetail(jobDetail.getKey()) == null) {
+            scheduler.scheduleJob(jobDetail, cronTrigger);
+            log.info("添加新定时任务 {}...", jobId);
+        }
     }
 
     public void remove() {

+ 2 - 13
dagent/src/main/java/cn/reghao/autodop/dagent/mqttsub/DmasterMsgDispatcher.java

@@ -7,8 +7,7 @@ import cn.reghao.autodop.dagent.mqttsub.processor.app.AppOpsProcessor;
 import cn.reghao.autodop.dagent.mqttsub.processor.machine.MachineOpsProcessor;
 import cn.reghao.autodop.dagent.utils.DagentLifecycle;
 import lombok.extern.slf4j.Slf4j;
-import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
-import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
 import org.eclipse.paho.client.mqttv3.MqttMessage;
 import org.springframework.stereotype.Component;
 
@@ -22,7 +21,7 @@ import java.lang.management.ManagementFactory;
  */
 @Slf4j
 @Component
-public class DmasterMsgDispatcher implements MqttCallback {
+public class DmasterMsgDispatcher implements IMqttMessageListener {
     private long startTime;
     private MachineOpsProcessor machineOpsProcessor;
     private AppOpsProcessor appOpsProcessor;
@@ -64,14 +63,4 @@ public class DmasterMsgDispatcher implements MqttCallback {
             log.error("MQTT message exception -> {}", e.getMessage());
         }
     }
-
-    @Override
-    public void deliveryComplete(IMqttDeliveryToken token) {
-        log.info("token -> {}", token);
-    }
-
-    @Override
-    public void connectionLost(Throwable cause) {
-        log.error("conn lost {}", cause.getMessage());
-    }
 }

+ 5 - 7
dagent/src/main/java/cn/reghao/autodop/dagent/mqttsub/processor/app/AppOpsProcessor.java

@@ -5,7 +5,7 @@ import cn.reghao.autodop.common.message.CallResult;
 import cn.reghao.autodop.common.message.MessageType;
 import cn.reghao.autodop.common.message.ops.AppOps;
 import cn.reghao.autodop.common.message.ops.OpsProcessor;
-import cn.reghao.autodop.common.mqtt.MqttPub;
+import cn.reghao.autodop.common.mqtt.DefaultMqttClient;
 import cn.reghao.autodop.common.utils.serializer.JsonConverter;
 import cn.reghao.autodop.dagent.app.App;
 import cn.reghao.autodop.dagent.utils.DagentLifecycle;
@@ -20,13 +20,11 @@ import org.springframework.stereotype.Component;
 @Slf4j
 @Component
 public class AppOpsProcessor implements OpsProcessor {
-    private MqttPub mqttPub;
-    private String topic;
+    private DefaultMqttClient mqttClient;
     private App app;
     
-    public AppOpsProcessor(MqttPub mqttPub, App app) {
-        this.mqttPub = mqttPub;
-        this.topic = "dmaster";
+    public AppOpsProcessor(DefaultMqttClient mqttClient, App app) {
+        this.mqttClient = mqttClient;
         this.app = app;
     }
 
@@ -67,6 +65,6 @@ public class AppOpsProcessor implements OpsProcessor {
                 break;
         }
 
-        mqttPub.pub(topic, JsonConverter.objectToJson(asyncMsg));
+        mqttClient.pub("dmaster", 1, JsonConverter.objectToJson(asyncMsg));
     }
 }

+ 1 - 29
dagent/src/main/java/cn/reghao/autodop/dagent/mqttsub/processor/docker/DockerOpsProcessor.java

@@ -1,16 +1,7 @@
 package cn.reghao.autodop.dagent.mqttsub.processor.docker;
 
-import cn.reghao.autodop.common.message.AsyncMsg;
-import cn.reghao.autodop.common.message.CallResult;
-import cn.reghao.autodop.common.message.MessageType;
-import cn.reghao.autodop.common.message.ops.DockerOps;
 import cn.reghao.autodop.common.message.ops.OpsProcessor;
-import cn.reghao.autodop.common.mqtt.MqttPub;
-import cn.reghao.autodop.common.utils.serializer.JsonConverter;
-import cn.reghao.autodop.dagent.app.App;
-import cn.reghao.autodop.dagent.utils.DagentLifecycle;
 import lombok.extern.slf4j.Slf4j;
-import org.eclipse.paho.client.mqttv3.MqttException;
 import org.springframework.stereotype.Component;
 
 /**
@@ -20,26 +11,7 @@ import org.springframework.stereotype.Component;
 @Slf4j
 @Component
 public class DockerOpsProcessor implements OpsProcessor {
-    private MqttPub mqttPub;
-    private String topic;
-    private App app;
-
-    public DockerOpsProcessor(MqttPub mqttPub, App app) {
-        this.mqttPub = mqttPub;
-        this.topic = "dmaster";
-        this.app = app;
-    }
-
     @Override
-    public void process(String ops, String payload) throws MqttException {
-        AsyncMsg asyncMsg;
-        switch (DockerOps.valueOf(ops)) {
-            case dockerImageResult:
-                break;
-            default:
-                break;
-        }
-
-        mqttPub.pub(topic, JsonConverter.objectToJson("asyncMsg"));
+    public void process(String ops, String payload) {
     }
 }

+ 5 - 7
dagent/src/main/java/cn/reghao/autodop/dagent/mqttsub/processor/machine/MachineOpsProcessor.java

@@ -4,7 +4,7 @@ import cn.reghao.autodop.common.dagent.machine.Machine;
 import cn.reghao.autodop.common.message.*;
 import cn.reghao.autodop.common.message.ops.MachineOps;
 import cn.reghao.autodop.common.message.ops.OpsProcessor;
-import cn.reghao.autodop.common.mqtt.MqttPub;
+import cn.reghao.autodop.common.mqtt.DefaultMqttClient;
 import cn.reghao.autodop.common.utils.serializer.JsonConverter;
 import cn.reghao.autodop.dagent.utils.DagentLifecycle;
 import lombok.extern.slf4j.Slf4j;
@@ -18,13 +18,11 @@ import org.springframework.stereotype.Component;
 @Slf4j
 @Component
 public class MachineOpsProcessor implements OpsProcessor {
-    private MqttPub mqttPub;
-    private String topic;
+    private DefaultMqttClient mqttClient;
     private Machine machine;
 
-    public MachineOpsProcessor(MqttPub mqttPub, Machine machine) {
-        this.mqttPub = mqttPub;
-        this.topic = "dmaster";
+    public MachineOpsProcessor(DefaultMqttClient mqttClient, Machine machine) {
+        this.mqttClient = mqttClient;
         this.machine = machine;
     }
 
@@ -50,6 +48,6 @@ public class MachineOpsProcessor implements OpsProcessor {
                 break;
         }
 
-        mqttPub.pub(topic, JsonConverter.objectToJson(asyncMsg));
+        mqttClient.pub("dmaster", 1, JsonConverter.objectToJson(asyncMsg));
     }
 }

+ 15 - 22
dagent/src/main/java/cn/reghao/autodop/dagent/utils/DagentLifecycle.java

@@ -5,9 +5,7 @@ import cn.reghao.autodop.common.message.AsyncMsg;
 import cn.reghao.autodop.common.message.MessageType;
 import cn.reghao.autodop.common.dagent.machine.Machine;
 import cn.reghao.autodop.common.message.ops.DagentOps;
-import cn.reghao.autodop.common.mqtt.MosquittoProperties;
-import cn.reghao.autodop.common.mqtt.MqttPub;
-import cn.reghao.autodop.common.mqtt.MqttSub;
+import cn.reghao.autodop.common.mqtt.DefaultMqttClient;
 import cn.reghao.autodop.common.utils.MachineIdentity;
 import cn.reghao.autodop.common.utils.serializer.JsonConverter;
 import cn.reghao.autodop.dagent.mqttsub.DmasterMsgDispatcher;
@@ -20,7 +18,7 @@ import org.quartz.SchedulerException;
 import org.springframework.beans.factory.DisposableBean;
 import org.springframework.boot.ApplicationArguments;
 import org.springframework.boot.ApplicationRunner;
-import org.springframework.stereotype.Component;
+import org.springframework.context.annotation.Configuration;
 
 import java.io.IOException;
 
@@ -31,35 +29,30 @@ import java.io.IOException;
  * @date 2021-04-01 11:36:51
  */
 @Slf4j
-@Component
+@Configuration
 public class DagentLifecycle implements ApplicationRunner, DisposableBean {
     public static String MACHINE_ID;
     public static String MACHINE_IPV4;
 
-    private MosquittoProperties properties;
     private DmasterMsgDispatcher dmasterMsgDispatcher;
-    private MqttPub mqttPub;
-    private MqttSub mqttSub;
+    private DefaultMqttClient mqttClient;
     private MachineScheduler machineScheduler;
     private Machine machine;
-    private String topic = "dagent/" + DagentLifecycle.MACHINE_ID;
+    private String topic;
 
     public DagentLifecycle(MachineIdentity machineIdentity,
-                           MosquittoProperties properties,
                            DmasterMsgDispatcher dmasterMsgDispatcher,
-                           MqttPub mqttPub,
+                           DefaultMqttClient mqttClient,
                            MachineScheduler machineScheduler,
-                           Machine machine,
-                           MosquittoProperties mosquittoProperties) throws IOException, MqttException {
+                           Machine machine) throws IOException {
         MACHINE_ID = machineIdentity.id();
         MACHINE_IPV4 = machineIdentity.ipv4();
-        this.properties = properties;
         this.dmasterMsgDispatcher = dmasterMsgDispatcher;
-        this.mqttPub = mqttPub;
-        this.mqttSub = new MqttSub("dagent" + MACHINE_ID, mosquittoProperties);
+        this.mqttClient = mqttClient;
         this.machineScheduler = machineScheduler;
         this.machine = machine;
-        initLogger(machineIdentity);
+        this.topic = "dagent/" + DagentLifecycle.MACHINE_ID;
+        initLogger();
     }
 
     @Override
@@ -72,14 +65,14 @@ public class DagentLifecycle implements ApplicationRunner, DisposableBean {
         String payload = MACHINE_ID;
         AsyncMsg asyncMsg = AsyncMsg.asyncMsg(MACHINE_ID, MessageType.dagentType.name(),
                 DagentOps.dagnetShutdown.name(), payload);
-        mqttPub.pub("dmaster", JsonConverter.objectToJson(asyncMsg));
+        mqttClient.pub("dmaster", 1, JsonConverter.objectToJson(asyncMsg));
         log.info("Dagent 停止...");
     }
 
-    private void initLogger(MachineIdentity machineIdentity) throws IOException {
+    private void initLogger() {
         Appenders appenders = new Appenders();
         LoggerConfig loggerConfig = new LoggerConfig();
-        loggerConfig.addAppender(appenders.mqttAppender(machineIdentity, properties, "dagent"));
+        loggerConfig.addAppender(appenders.mqttAppender(MACHINE_ID, MACHINE_IPV4, "dagent", mqttClient));
     }
 
     /**
@@ -92,12 +85,12 @@ public class DagentLifecycle implements ApplicationRunner, DisposableBean {
     }
 
     private void subAndPub() throws MqttException {
-        mqttSub.sub(topic, dmasterMsgDispatcher);
+        mqttClient.sub(topic, dmasterMsgDispatcher);
 
         String payload = JsonConverter.objectToJson(machine.registry());
         AsyncMsg asyncMsg = AsyncMsg.asyncMsg(MACHINE_ID, MessageType.dagentType.name(),
                 DagentOps.dagentStart.name(), payload);
-        mqttPub.pub("dmaster", JsonConverter.objectToJson(asyncMsg));
+        mqttClient.pub("dmaster", 1, JsonConverter.objectToJson(asyncMsg));
     }
 
     private void scheduledJobs() throws SchedulerException {

+ 1 - 1
dagent/src/main/resources/application-dev.yml

@@ -2,4 +2,4 @@ mosquitto:
   broker: tcp://localhost:1883
   username: dev
   password: Dev@123456
-  topic: dmaster
+  clientId: dagent

+ 1 - 1
dagent/src/main/resources/application-prod.yml

@@ -2,4 +2,4 @@ mosquitto:
   broker: tcp://localhost:1883
   username: dev
   password: Dev@123456
-  topic: dmaster
+  clientId: dagent

+ 1 - 1
dagent/src/main/resources/application-test.yml

@@ -2,4 +2,4 @@ mosquitto:
   broker: tcp://192.168.0.220:1883
   username: test
   password: Test@123456
-  topic: dmaster
+  clientId: dagent

+ 6 - 6
dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/AppStatusService.java

@@ -4,7 +4,7 @@ import cn.reghao.autodop.common.message.AsyncMsg;
 import cn.reghao.autodop.common.message.MessageType;
 import cn.reghao.autodop.common.message.ops.AppOps;
 import cn.reghao.autodop.common.dagent.app.api.data.AppIdArgs;
-import cn.reghao.autodop.common.mqtt.MqttPub;
+import cn.reghao.autodop.common.mqtt.DefaultMqttClient;
 import cn.reghao.autodop.common.utils.serializer.JsonConverter;
 import cn.reghao.autodop.dmaster.app.constant.StatusOps;
 import cn.reghao.autodop.dmaster.app.db.query.AppQuery;
@@ -26,12 +26,12 @@ import java.util.List;
  */
 @Service
 public class AppStatusService {
-    private MqttPub mqttPub;
+    private DefaultMqttClient mqttClient;
     private AppRunningRepository runningRepository;
     private AppQuery appQuery;
 
-    public AppStatusService(MqttPub mqttPub, AppRunningRepository runningRepository, AppQuery appQuery) {
-        this.mqttPub = mqttPub;
+    public AppStatusService(DefaultMqttClient mqttClient, AppRunningRepository runningRepository, AppQuery appQuery) {
+        this.mqttClient = mqttClient;
         this.runningRepository = runningRepository;
         this.appQuery = appQuery;
     }
@@ -64,7 +64,7 @@ public class AppStatusService {
 
         String topic = "dagent/" + machineId;
         try {
-            mqttPub.pub(topic, JsonConverter.objectToJson(asyncMsg));
+            mqttClient.pub(topic, 1, JsonConverter.objectToJson(asyncMsg));
         } catch (MqttException e) {
             e.printStackTrace();
         }
@@ -91,7 +91,7 @@ public class AppStatusService {
 
         String topic = "dagent/" + machineId;
         try {
-            mqttPub.pub(topic, JsonConverter.objectToJson(asyncMsg));
+            mqttClient.pub(topic, 1, JsonConverter.objectToJson(asyncMsg));
         } catch (MqttException e) {
             e.printStackTrace();
         }

+ 5 - 5
dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/bd/AppDeployer.java

@@ -4,7 +4,7 @@ import cn.reghao.autodop.common.message.AsyncMsg;
 import cn.reghao.autodop.common.message.MessageType;
 import cn.reghao.autodop.common.message.ops.AppOps;
 import cn.reghao.autodop.common.dagent.app.api.data.deploy.AppDeployArgs;
-import cn.reghao.autodop.common.mqtt.MqttPub;
+import cn.reghao.autodop.common.mqtt.DefaultMqttClient;
 import cn.reghao.autodop.common.utils.serializer.JsonConverter;
 import cn.reghao.autodop.dmaster.app.db.query.AppQuery;
 import cn.reghao.autodop.dmaster.app.entity.config.deploy.DeployConfig;
@@ -24,11 +24,11 @@ import java.util.*;
 @Slf4j
 @Service
 public class AppDeployer {
-    private MqttPub mqttPub;
+    private DefaultMqttClient mqttClient;
     private AppQuery appQuery;
 
-    public AppDeployer(MqttPub mqttPub, AppQuery appQuery) {
-        this.mqttPub = mqttPub;
+    public AppDeployer(DefaultMqttClient mqttClient, AppQuery appQuery) {
+        this.mqttClient = mqttClient;
         this.appQuery = appQuery;
     }
 
@@ -54,7 +54,7 @@ public class AppDeployer {
             asyncMsg.setPayload(JsonConverter.objectToJson(appDeployArgs));
 
             String topic = "dagent/" + machineId;
-            mqttPub.pub(topic, JsonConverter.objectToJson(asyncMsg));
+            mqttClient.pub(topic, 1, JsonConverter.objectToJson(asyncMsg));
         }
     }
 

+ 2 - 13
dmaster/src/main/java/cn/reghao/autodop/dmaster/mqttsub/DagentMsgDispatcher.java

@@ -7,8 +7,7 @@ import cn.reghao.autodop.dmaster.mqttsub.processor.AppOpsProcessor;
 import cn.reghao.autodop.dmaster.mqttsub.processor.DagentOpsProcessor;
 import cn.reghao.autodop.dmaster.mqttsub.processor.MachineOpsProcessor;
 import lombok.extern.slf4j.Slf4j;
-import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
-import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
 import org.eclipse.paho.client.mqttv3.MqttMessage;
 import org.springframework.stereotype.Component;
 
@@ -20,7 +19,7 @@ import org.springframework.stereotype.Component;
  */
 @Slf4j
 @Component
-public class DagentMsgDispatcher implements MqttCallback {
+public class DagentMsgDispatcher implements IMqttMessageListener {
     private DagentOpsProcessor dagentOpsProcessor;
     private MachineOpsProcessor machineOpsProcessor;
     private AppOpsProcessor appOpsProcessor;
@@ -62,14 +61,4 @@ public class DagentMsgDispatcher implements MqttCallback {
             log.error("MQTT message exception -> {}", e.getMessage());
         }
     }
-
-    @Override
-    public void deliveryComplete(IMqttDeliveryToken token) {
-        log.info("token -> {}", token);
-    }
-
-    @Override
-    public void connectionLost(Throwable cause) {
-        log.error("MQTT connection lost -> {}", cause.getMessage());
-    }
 }

+ 5 - 6
dmaster/src/main/java/cn/reghao/autodop/dmaster/utils/lifecycle/DmasterLifecycle.java

@@ -3,10 +3,9 @@ package cn.reghao.autodop.dmaster.utils.lifecycle;
 import cn.reghao.autodop.common.dagent.machine.disk.Disk;
 import cn.reghao.autodop.common.dagent.machine.disk.DiskInfo;
 import cn.reghao.autodop.common.log.LoggerConfig;
-import cn.reghao.autodop.common.mqtt.MosquittoProperties;
+import cn.reghao.autodop.common.mqtt.DefaultMqttClient;
 import cn.reghao.autodop.common.utils.ByteConverter;
 import cn.reghao.autodop.common.utils.ByteType;
-import cn.reghao.autodop.common.mqtt.MqttSub;
 import cn.reghao.autodop.common.utils.FileOps;
 import cn.reghao.autodop.common.utils.MachineIdentity;
 import cn.reghao.autodop.dmaster.app.db.config.build.BuildDirCrudService;
@@ -42,7 +41,7 @@ public class DmasterLifecycle implements ApplicationRunner, DisposableBean {
 
     private AppRuntimeLogRepository logRepository;
     private DagentMsgDispatcher dagentMsgDispatcher;
-    private MqttSub mqttSub;
+    private DefaultMqttClient mqttClient;
     private BuildDirCrudService buildDirCrudService;
     private ByteConverter convert = new ByteConverter();
 
@@ -50,12 +49,12 @@ public class DmasterLifecycle implements ApplicationRunner, DisposableBean {
                             AppRuntimeLogRepository logRepository,
                             DagentMsgDispatcher dagentMsgDispatcher,
                             BuildDirCrudService buildDirCrudService,
-                            MosquittoProperties mosquittoProperties) throws IOException, MqttException {
+                            DefaultMqttClient mqttClient) throws IOException {
         MACHINE_ID = machineIdentity.id();
         MACHINE_IPV4 = machineIdentity.ipv4();
         this.logRepository = logRepository;
         this.dagentMsgDispatcher = dagentMsgDispatcher;
-        this.mqttSub = new MqttSub("dmaster" + MACHINE_ID, mosquittoProperties);
+        this.mqttClient = mqttClient;
         this.buildDirCrudService = buildDirCrudService;
     }
 
@@ -161,6 +160,6 @@ public class DmasterLifecycle implements ApplicationRunner, DisposableBean {
 
     private void sub() throws MqttException {
         String topic = "dmaster";
-        mqttSub.sub(topic, dagentMsgDispatcher);
+        mqttClient.sub(topic, dagentMsgDispatcher);
     }
 }

+ 1 - 1
dmaster/src/main/resources/application-dev.yml

@@ -10,7 +10,7 @@ mosquitto:
   broker: tcp://localhost:1883
   username: dev
   password: Dev@123456
-  topic: dagent@
+  clientId: dmaster
 #mosquitto:
 #  broker: tcp://192.168.0.220:1883
 #  username: test

+ 1 - 1
dmaster/src/main/resources/application-prod.yml

@@ -10,7 +10,7 @@ mosquitto:
   broker: tcp://192.168.0.211:1883
   username: test
   password: Test@123456
-  topic: dagent@
+  clientId: dmaster
 oss:
   host: http://static.reghao.icu
   key: minioadmin

+ 1 - 1
dmaster/src/main/resources/application-test.yml

@@ -10,7 +10,7 @@ mosquitto:
   broker: tcp://192.168.0.220:1883
   username: test
   password: Test@123456
-  topic: dagent@
+  clientId: dmaster
 oss:
   host: http://static.reghao.icu
   key: minioadmin

+ 0 - 3
dmaster/src/test/java/cn/reghao/autodop/common/mqtt/MqttTest.java

@@ -10,8 +10,6 @@ import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.test.context.ActiveProfiles;
 import org.springframework.test.context.junit4.SpringRunner;
 
-import static cn.reghao.autodop.dmaster.utils.lifecycle.DmasterLifecycle.MACHINE_ID;
-
 @Slf4j
 @ActiveProfiles("dev")
 @SpringBootTest(classes = DmasterApplication.class)
@@ -22,7 +20,6 @@ class MqttTest {
 
     @Test
     public void sub() throws MqttException {
-        MqttSub mqttSub = new MqttSub("dmaster" + MACHINE_ID, mosquittoProperties);
     }
 
     @Test