|
|
@@ -1,189 +0,0 @@
|
|
|
-package cn.reghao.devops.common.mqtt;
|
|
|
-
|
|
|
-import cn.reghao.devops.common.log.LogCache;
|
|
|
-import cn.reghao.devops.common.machine.Machine;
|
|
|
-import cn.reghao.devops.common.msg.Message;
|
|
|
-import cn.reghao.devops.common.msg.MsgQueue;
|
|
|
-import cn.reghao.devops.common.msg.pub.PubMsg;
|
|
|
-import cn.reghao.devops.common.msg.pub.constant.NodeEventPubClazz;
|
|
|
-import cn.reghao.devops.common.msg.rpc.RpcMsg;
|
|
|
-import cn.reghao.jutil.jdk.serializer.JsonConverter;
|
|
|
-import lombok.extern.slf4j.Slf4j;
|
|
|
-import org.eclipse.paho.client.mqttv3.*;
|
|
|
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
|
|
-
|
|
|
-import java.nio.charset.StandardCharsets;
|
|
|
-import java.util.HashMap;
|
|
|
-import java.util.Map;
|
|
|
-import java.util.concurrent.ConcurrentHashMap;
|
|
|
-
|
|
|
-/**
|
|
|
- * TODO 采用 Builder 模式初始化参数
|
|
|
- *
|
|
|
- * @author reghao
|
|
|
- * @date 2021-10-28 20:52:17
|
|
|
- */
|
|
|
-@Slf4j
|
|
|
-public class AsyncMqttClient {
|
|
|
- private final MqttProperties properties;
|
|
|
- private final MqttAsyncClient mqttClient;
|
|
|
- private final Map<String, IMqttMessageListener> msgListeners = new HashMap<>();
|
|
|
- private final Map<String, RpcMsg> rpcRecorder = new ConcurrentHashMap<>();
|
|
|
-
|
|
|
- public AsyncMqttClient(MqttProperties properties) throws MqttException {
|
|
|
- this.properties = properties;
|
|
|
- String clientId = properties.getClientId() + Machine.ID;
|
|
|
- mqttClient = new MqttAsyncClient(properties.getBroker(), clientId, new MemoryPersistence());
|
|
|
- }
|
|
|
-
|
|
|
- private MqttConnectOptions connectOptions() {
|
|
|
- MqttConnectOptions options = new MqttConnectOptions();
|
|
|
- options.setUserName(properties.getUsername());
|
|
|
- options.setPassword(properties.getPassword().toCharArray());
|
|
|
- // 在服务端保存状态
|
|
|
- options.setCleanSession(false);
|
|
|
- // 自动重连
|
|
|
- options.setAutomaticReconnect(true);
|
|
|
- // 避免 Timed out as no activity, keepAlive=60,000 lastOutboundActivity=1,635,408,462,211 错误
|
|
|
- options.setConnectionTimeout(0);
|
|
|
- // 设置 last will message
|
|
|
- options.setWill(MsgQueue.managerTopic(), lastWillMessage(), 1, false);
|
|
|
- return options;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * LWT 消息
|
|
|
- *
|
|
|
- * @param
|
|
|
- * @return
|
|
|
- * @date 2021-11-04 下午3:57
|
|
|
- */
|
|
|
- private byte[] lastWillMessage() {
|
|
|
- String jsonPayload = String.format("{\"machineId\":\"%s\"}", Machine.ID);
|
|
|
- PubMsg pubMsg = PubMsg.pubMsg(NodeEventPubClazz.class.getSimpleName(),
|
|
|
- NodeEventPubClazz.unavail.name(), jsonPayload);
|
|
|
- Message message = Message.pubMsg(pubMsg);
|
|
|
- return JsonConverter.objectToJson(message).getBytes(StandardCharsets.UTF_8);
|
|
|
- }
|
|
|
-
|
|
|
- public void add(String topic, IMqttMessageListener messageListener) {
|
|
|
- msgListeners.putIfAbsent(topic, messageListener);
|
|
|
- }
|
|
|
-
|
|
|
- public Map<String, IMqttMessageListener> getMsgListeners() {
|
|
|
- return msgListeners;
|
|
|
- }
|
|
|
-
|
|
|
- public RpcMsg getAndRemoveRecord(String msgId) {
|
|
|
- return rpcRecorder.remove(msgId);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 没有返回结果的 RPC 调用
|
|
|
- *
|
|
|
- * @param
|
|
|
- * @return
|
|
|
- * @date 2021-11-04 上午11:33
|
|
|
- */
|
|
|
- public Map<String, RpcMsg> unreturnedRpc() {
|
|
|
- return rpcRecorder;
|
|
|
- }
|
|
|
-
|
|
|
- public void connect(IMqttActionListener actionListener) throws MqttException {
|
|
|
- mqttClient.setCallback(new ConnectionCallback());
|
|
|
- mqttClient.connect(connectOptions(), null, actionListener);
|
|
|
- }
|
|
|
-
|
|
|
- public boolean isConnected() {
|
|
|
- return mqttClient.isConnected();
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * qos=0 的消息不能离线
|
|
|
- *
|
|
|
- * @param
|
|
|
- * @return
|
|
|
- * @date 2021-06-16 下午8:40
|
|
|
- */
|
|
|
- public void pub(String topic, int qos, String payload) throws MqttException {
|
|
|
- MqttMessage message = new MqttMessage();
|
|
|
- message.setQos(qos);
|
|
|
- //message.setRetained(true);
|
|
|
- message.setPayload(payload.getBytes());
|
|
|
- mqttClient.publish(topic, message);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * pub 消息后,等 sub 端处理后在 pub 结果
|
|
|
- *
|
|
|
- * @param
|
|
|
- * @return
|
|
|
- * @date 2021-06-30 上午11:22
|
|
|
- */
|
|
|
- public void pubWithResult(String topic, int qos, Message message) throws MqttException {
|
|
|
- MqttMessage mqttMessage = new MqttMessage();
|
|
|
- mqttMessage.setQos(qos);
|
|
|
- mqttMessage.setPayload(JsonConverter.objectToJson(message).getBytes());
|
|
|
-
|
|
|
- mqttClient.publish(topic, mqttMessage);
|
|
|
- rpcRecorder.put(message.getMsgId(), message.getRpcMsg());
|
|
|
- }
|
|
|
-
|
|
|
- public void sub(String topic, IMqttMessageListener messageListener) throws MqttException {
|
|
|
- mqttClient.subscribe(topic, 2, messageListener);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * MQTT 连接回调
|
|
|
- *
|
|
|
- * @param
|
|
|
- * @return
|
|
|
- * @date 2021-10-28 下午8:31
|
|
|
- */
|
|
|
- class ConnectionCallback implements MqttCallbackExtended {
|
|
|
- @Override
|
|
|
- public void connectComplete(boolean reconnect, String serverUri) {
|
|
|
- if (reconnect) {
|
|
|
- log.info("MQTT 重新连接成功");
|
|
|
- log.info("发布之前未发送成功的日志");
|
|
|
- LogCache.getAll().forEach((timestamp, message) -> {
|
|
|
- try {
|
|
|
- pub(MsgQueue.managerTopic(), 1, JsonConverter.objectToJson(message));
|
|
|
- } catch (MqttException e) {
|
|
|
- e.printStackTrace();
|
|
|
- LogCache.put(message);
|
|
|
- }
|
|
|
- });
|
|
|
-
|
|
|
- log.info("重新订阅 topic");
|
|
|
- // 重连完成后自动订阅 topic
|
|
|
- msgListeners.forEach((topic, listener) -> {
|
|
|
- try {
|
|
|
- sub(topic, listener);
|
|
|
- } catch (MqttException e) {
|
|
|
- log.error("重新订阅 {} 失败,原因: {}", topic, e.getMessage());
|
|
|
- }
|
|
|
- });
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void connectionLost(Throwable cause) {
|
|
|
- log.error("MQTT 连接断开, 准备重新连接 {}", cause.getMessage());
|
|
|
- try {
|
|
|
- mqttClient.reconnect();
|
|
|
- } catch (MqttException e) {
|
|
|
- e.printStackTrace();
|
|
|
- log.error("MQTT 重连失败 {}", cause.getMessage());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void messageArrived(String topic, MqttMessage message) {
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void deliveryComplete(IMqttDeliveryToken token) {
|
|
|
- }
|
|
|
- }
|
|
|
-}
|