Bläddra i källkod

实现 MQTT 消息的离线接收

reghao 4 år sedan
förälder
incheckning
32730892c9

+ 24 - 4
common/src/main/java/cn/reghao/autodop/common/mqtt/MqttPub.java

@@ -24,7 +24,6 @@ public class MqttPub implements AutoCloseable {
 
     private MqttConnectOptions connectOptions() {
         MqttConnectOptions options = new MqttConnectOptions();
-        options.setCleanSession(true);
         options.setConnectionTimeout(10);
         options.setKeepAliveInterval(20);
         options.setUserName(properties.getUsername());
@@ -37,15 +36,36 @@ public class MqttPub implements AutoCloseable {
         client.close();
     }
 
+    /**
+     * qos=0 的消息不能离线
+     *
+     * @param
+     * @return
+     * @date 2021-06-16 下午8:40
+     */
+    public void pub(String topic, int qos, String payload) throws MqttException {
+        client.connect(connectOptions());
+        MqttTopic mqttTopic = client.getTopic(topic);
+
+        MqttMessage message = new MqttMessage();
+        message.setQos(qos);
+        message.setPayload(payload.getBytes());
+        MqttDeliveryToken deliveryToken = mqttTopic.publish(message);
+        if (client.isConnected()) {
+            client.disconnect();
+        }
+    }
+
     public void pub(String topic, String payload) throws MqttException {
         client.connect(connectOptions());
         MqttTopic mqttTopic = client.getTopic(topic);
+
         MqttMessage message = new MqttMessage();
         message.setQos(1);
-        message.setRetained(false);
         message.setPayload(payload.getBytes());
         MqttDeliveryToken deliveryToken = mqttTopic.publish(message);
-        deliveryToken.setActionCallback(new PubActionListener());
-        client.disconnect();
+        if (client.isConnected()) {
+            client.disconnect();
+        }
     }
 }

+ 14 - 11
common/src/main/java/cn/reghao/autodop/common/mqtt/MqttSub.java

@@ -3,31 +3,24 @@ package cn.reghao.autodop.common.mqtt;
 import lombok.extern.slf4j.Slf4j;
 import org.eclipse.paho.client.mqttv3.*;
 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
-import org.springframework.stereotype.Component;
-
-import java.util.UUID;
 
 /**
  * @author reghao
  * @date 2021-05-21 08:27:41
  */
 @Slf4j
-@Component
 public class MqttSub implements AutoCloseable {
     private MosquittoProperties properties;
     private MqttClient client;
 
-    public MqttSub(MosquittoProperties properties) throws MqttException {
+    public MqttSub(String clientId, MosquittoProperties properties) throws MqttException {
         this.properties = properties;
-        this.client = new MqttClient(properties.getBroker(), UUID.randomUUID().toString(), new MemoryPersistence());
+        this.client = new MqttClient(properties.getBroker(), clientId, new MemoryPersistence());
     }
 
     private MqttConnectOptions connectOptions() {
         MqttConnectOptions options = new MqttConnectOptions();
-        options.setCleanSession(true);
-        options.setConnectionTimeout(10);
-        options.setKeepAliveInterval(30);
-        options.setAutomaticReconnect(true);
+        options.setCleanSession(false);
         options.setUserName(properties.getUsername());
         options.setPassword(properties.getPassword().toCharArray());
         return options;
@@ -38,9 +31,19 @@ public class MqttSub implements AutoCloseable {
         client.close();
     }
 
+    /**
+     * 确保能订阅到离线消息的设置:
+     * - qos=1
+     * - clientId 确定,不能使用随机的 UUID
+     * - CleanSession=false
+     *
+     * @param
+     * @return
+     * @date 2021-06-16 下午8:39
+     */
     public void sub(String topic, MqttCallback callback) throws MqttException {
         client.connect(connectOptions());
-        client.subscribe(topic);
         client.setCallback(callback);
+        client.subscribe(topic, 1);
     }
 }

+ 1 - 1
dagent/src/main/java/cn/reghao/autodop/dagent/machine/HeartbeatJob.java

@@ -31,7 +31,7 @@ public class HeartbeatJob implements Job {
         AsyncMsg asyncMsg = AsyncMsg.asyncMsg(DagentLifecycle.MACHINE_ID, MessageType.dagentType.name(),
                 DagentOps.dagentHeartbeat.name(), payload);
         try {
-            mqttPub.pub("dmaster", JsonConverter.objectToJson(asyncMsg));
+            mqttPub.pub("dmaster", 0, JsonConverter.objectToJson(asyncMsg));
         } catch (MqttException e) {
             e.printStackTrace();
         }

+ 7 - 7
dagent/src/main/java/cn/reghao/autodop/dagent/mqttsub/DmasterMsgDispatcher.java

@@ -33,15 +33,10 @@ public class DmasterMsgDispatcher implements MqttCallback {
         this.appOpsProcessor = appOpsProcessor;
     }
 
-    @Override
-    public void connectionLost(Throwable cause) {
-        log.error("conn lost {}", cause.getMessage());
-    }
-
     @Override
     public void messageArrived(String topic, MqttMessage message) {
-        String msg = message.toString();
         try {
+            String msg = message.toString();
             AsyncMsg mqAsyncMsg = JsonConverter.jsonToObject(msg, AsyncMsg.class);
             String type = mqAsyncMsg.getType();
             String ops = mqAsyncMsg.getOps();
@@ -66,7 +61,7 @@ public class DmasterMsgDispatcher implements MqttCallback {
                 default:
             }
         } catch (Exception e) {
-            log.error("MQTT 消息异常: {}", e.getMessage());
+            log.error("MQTT message exception -> {}", e.getMessage());
         }
     }
 
@@ -74,4 +69,9 @@ public class DmasterMsgDispatcher implements MqttCallback {
     public void deliveryComplete(IMqttDeliveryToken token) {
         log.info("token -> {}", token);
     }
+
+    @Override
+    public void connectionLost(Throwable cause) {
+        log.error("conn lost {}", cause.getMessage());
+    }
 }

+ 3 - 3
dagent/src/main/java/cn/reghao/autodop/dagent/utils/DagentLifecycle.java

@@ -48,15 +48,15 @@ public class DagentLifecycle implements ApplicationRunner, DisposableBean {
                            MosquittoProperties properties,
                            DmasterMsgDispatcher dmasterMsgDispatcher,
                            MqttPub mqttPub,
-                           MqttSub mqttSub,
                            MachineScheduler machineScheduler,
-                           Machine machine) throws IOException {
+                           Machine machine,
+                           MosquittoProperties mosquittoProperties) throws IOException, MqttException {
         MACHINE_ID = machineIdentity.id();
         MACHINE_IPV4 = machineIdentity.ipv4();
         this.properties = properties;
         this.dmasterMsgDispatcher = dmasterMsgDispatcher;
         this.mqttPub = mqttPub;
-        this.mqttSub = mqttSub;
+        this.mqttSub = new MqttSub("dagent" + MACHINE_ID, mosquittoProperties);
         this.machineScheduler = machineScheduler;
         this.machine = machine;
         initLogger(machineIdentity);

+ 11 - 8
dmaster/src/main/java/cn/reghao/autodop/dmaster/mqttsub/DagentMsgDispatcher.java

@@ -33,16 +33,14 @@ public class DagentMsgDispatcher implements MqttCallback {
         this.appOpsProcessor = appOpsProcessor;
     }
 
-    @Override
-    public void connectionLost(Throwable cause) {
-        log.error("mqtt conn lost");
-        cause.printStackTrace();
-    }
-
     @Override
     public void messageArrived(String topic, MqttMessage message) {
-        String msg = message.toString();
         try {
+            if (message.isDuplicate()) {
+                log.info("重新发送的消息...");
+            }
+
+            String msg = message.toString();
             AsyncMsg mqAsyncMsg = JsonConverter.jsonToObject(msg, AsyncMsg.class);
             String type = mqAsyncMsg.getType();
             String ops = mqAsyncMsg.getOps();
@@ -61,7 +59,7 @@ public class DagentMsgDispatcher implements MqttCallback {
                 default:
             }
         } catch (Exception e) {
-            e.printStackTrace();
+            log.error("MQTT message exception -> {}", e.getMessage());
         }
     }
 
@@ -69,4 +67,9 @@ public class DagentMsgDispatcher implements MqttCallback {
     public void deliveryComplete(IMqttDeliveryToken token) {
         log.info("token -> {}", token);
     }
+
+    @Override
+    public void connectionLost(Throwable cause) {
+        log.error("MQTT connection lost -> {}", cause.getMessage());
+    }
 }

+ 4 - 3
dmaster/src/main/java/cn/reghao/autodop/dmaster/utils/lifecycle/DmasterLifecycle.java

@@ -3,6 +3,7 @@ package cn.reghao.autodop.dmaster.utils.lifecycle;
 import cn.reghao.autodop.common.dagent.machine.disk.Disk;
 import cn.reghao.autodop.common.dagent.machine.disk.DiskInfo;
 import cn.reghao.autodop.common.log.LoggerConfig;
+import cn.reghao.autodop.common.mqtt.MosquittoProperties;
 import cn.reghao.autodop.common.utils.ByteConverter;
 import cn.reghao.autodop.common.utils.ByteType;
 import cn.reghao.autodop.common.mqtt.MqttSub;
@@ -48,13 +49,13 @@ public class DmasterLifecycle implements ApplicationRunner, DisposableBean {
     public DmasterLifecycle(MachineIdentity machineIdentity,
                             AppRuntimeLogRepository logRepository,
                             DagentMsgDispatcher dagentMsgDispatcher,
-                            MqttSub mqttSub,
-                            BuildDirCrudService buildDirCrudService) throws IOException {
+                            BuildDirCrudService buildDirCrudService,
+                            MosquittoProperties mosquittoProperties) throws IOException, MqttException {
         MACHINE_ID = machineIdentity.id();
         MACHINE_IPV4 = machineIdentity.ipv4();
         this.logRepository = logRepository;
         this.dagentMsgDispatcher = dagentMsgDispatcher;
-        this.mqttSub = mqttSub;
+        this.mqttSub = new MqttSub("dmaster" + MACHINE_ID, mosquittoProperties);
         this.buildDirCrudService = buildDirCrudService;
     }
 

+ 31 - 0
dmaster/src/test/java/cn/reghao/autodop/common/mqtt/MqttTest.java

@@ -0,0 +1,31 @@
+package cn.reghao.autodop.common.mqtt;
+
+import cn.reghao.autodop.dmaster.DmasterApplication;
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.junit.jupiter.api.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.ActiveProfiles;
+import org.springframework.test.context.junit4.SpringRunner;
+
+import static cn.reghao.autodop.dmaster.utils.lifecycle.DmasterLifecycle.MACHINE_ID;
+
+@Slf4j
+@ActiveProfiles("dev")
+@SpringBootTest(classes = DmasterApplication.class)
+@RunWith(SpringRunner.class)
+class MqttTest {
+    @Autowired
+    private MosquittoProperties mosquittoProperties;
+
+    @Test
+    public void sub() throws MqttException {
+        MqttSub mqttSub = new MqttSub("dmaster" + MACHINE_ID, mosquittoProperties);
+    }
+
+    @Test
+    public void pub() {
+    }
+}