Prechádzať zdrojové kódy

修复不能接收 MQTT 离线消息的 bug

MQTT 订阅 topic 时应使用 MqttCallback
reghao 4 rokov pred
rodič
commit
ebaa784ac2

+ 3 - 3
common/src/main/java/cn/reghao/autodop/common/mqtt/DefaultMqttClient.java

@@ -80,10 +80,10 @@ public class DefaultMqttClient implements AutoCloseable {
      * @return
      * @date 2021-06-16 下午8:39
      */
-    public void sub(String topic, IMqttMessageListener messageListener) throws MqttException {
+    public void sub(String topic, MqttCallback callback) throws MqttException {
         connect();
-        client.subscribe(topic, 1, messageListener);
-        subMap.putIfAbsent(topic, messageListener);
+        client.setCallback(callback);
+        client.subscribe(topic, 1);
     }
 
     private void resub(String topic, IMqttMessageListener messageListener) throws MqttException {

+ 3 - 4
dagent/src/main/java/cn/reghao/autodop/dagent/machine/HeartbeatJob.java → dagent/src/main/java/cn/reghao/autodop/dagent/machine/DagentHeartbeatJob.java

@@ -12,16 +12,15 @@ import org.eclipse.paho.client.mqttv3.MqttException;
 import org.quartz.*;
 
 /**
- * 机器心跳任务
+ * dagent 心跳任务
  *
  * @author reghao
  * @date 2021-02-22 21:30:17
  */
 @Slf4j
-public class HeartbeatJob implements Job {
+public class DagentHeartbeatJob implements Job {
     @Override
     public void execute(JobExecutionContext context) {
-        JobKey jobKey = context.getJobDetail().getKey();
         JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
 
         DefaultMqttClient mqttClient = (DefaultMqttClient) jobDataMap.get("mqttClient");
@@ -33,7 +32,7 @@ public class HeartbeatJob implements Job {
         try {
             mqttClient.pub("dmaster", 0, JsonConverter.objectToJson(asyncMsg));
         } catch (MqttException e) {
-            e.printStackTrace();
+            log.error("{}", e.getMessage());
         }
     }
 }

+ 18 - 1
dagent/src/main/java/cn/reghao/autodop/dagent/mqttsub/DmasterMsgDispatcher.java

@@ -7,7 +7,9 @@ import cn.reghao.autodop.dagent.mqttsub.processor.app.AppOpsProcessor;
 import cn.reghao.autodop.dagent.mqttsub.processor.machine.MachineOpsProcessor;
 import cn.reghao.autodop.dagent.utils.DagentLifecycle;
 import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
 import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
 import org.eclipse.paho.client.mqttv3.MqttMessage;
 import org.springframework.stereotype.Component;
 
@@ -21,7 +23,7 @@ import java.lang.management.ManagementFactory;
  */
 @Slf4j
 @Component
-public class DmasterMsgDispatcher implements IMqttMessageListener {
+public class DmasterMsgDispatcher implements MqttCallback {
     private long startTime;
     private MachineOpsProcessor machineOpsProcessor;
     private AppOpsProcessor appOpsProcessor;
@@ -64,4 +66,19 @@ public class DmasterMsgDispatcher implements IMqttMessageListener {
             log.error("MQTT message exception -> {}", e.getMessage());
         }
     }
+
+    @Override
+    public void deliveryComplete(IMqttDeliveryToken token) {
+        if (!token.isComplete()) {
+            log.error("token -> {}", token);
+        } else {
+            // 由于使用 MQTT 记录日志,而每条消息发送成功后都会调用此方法,因此会形成循环引用,导致超高 CPU 使用率
+            // log.info("token -> {}", token);
+        }
+    }
+
+    @Override
+    public void connectionLost(Throwable cause) {
+        log.error("MQTT connection lost -> {}", cause.getMessage());
+    }
 }

+ 2 - 2
dagent/src/main/java/cn/reghao/autodop/dagent/utils/DagentLifecycle.java

@@ -11,7 +11,7 @@ import cn.reghao.autodop.common.mqtt.DefaultMqttClient;
 import cn.reghao.autodop.common.utils.MachineIdentity;
 import cn.reghao.autodop.common.utils.serializer.JsonConverter;
 import cn.reghao.autodop.dagent.mqttsub.DmasterMsgDispatcher;
-import cn.reghao.autodop.dagent.machine.HeartbeatJob;
+import cn.reghao.autodop.dagent.machine.DagentHeartbeatJob;
 import cn.reghao.autodop.dagent.machine.MachineScheduler;
 import cn.reghao.autodop.common.log.LoggerConfig;
 import lombok.extern.slf4j.Slf4j;
@@ -88,7 +88,7 @@ public class DagentLifecycle implements ApplicationRunner, DisposableBean {
     }
 
     private void pubDagentHeartbeat() throws SchedulerException {
-        machineScheduler.add(HeartbeatJob.class, "machine-heartbeat", "0/10 * * * * ?");
+        machineScheduler.add(DagentHeartbeatJob.class, "machine-heartbeat", "0/10 * * * * ?");
         machineScheduler.start();
     }
 

+ 13 - 1
dmaster/src/main/java/cn/reghao/autodop/dmaster/mqttsub/DagentMsgDispatcher.java

@@ -7,7 +7,9 @@ import cn.reghao.autodop.dmaster.mqttsub.processor.AppOpsProcessor;
 import cn.reghao.autodop.dmaster.mqttsub.processor.DagentOpsProcessor;
 import cn.reghao.autodop.dmaster.mqttsub.processor.MachineOpsProcessor;
 import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
 import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
 import org.eclipse.paho.client.mqttv3.MqttMessage;
 import org.springframework.stereotype.Component;
 
@@ -19,7 +21,7 @@ import org.springframework.stereotype.Component;
  */
 @Slf4j
 @Component
-public class DagentMsgDispatcher implements IMqttMessageListener {
+public class DagentMsgDispatcher implements MqttCallback {
     private DagentOpsProcessor dagentOpsProcessor;
     private MachineOpsProcessor machineOpsProcessor;
     private AppOpsProcessor appOpsProcessor;
@@ -62,4 +64,14 @@ public class DagentMsgDispatcher implements IMqttMessageListener {
             log.error("MQTT message exception -> {}", e.getMessage());
         }
     }
+
+    @Override
+    public void deliveryComplete(IMqttDeliveryToken token) {
+        log.info("token -> {}", token);
+    }
+
+    @Override
+    public void connectionLost(Throwable cause) {
+        log.error("MQTT connection lost -> {}", cause.getMessage());
+    }
 }