Просмотр исходного кода

修改 cn.reghao.autodop.common.msg 包,将消息分为 pub, rpcParam, rpcResult 三种类型和 PubMsg, RpcMsg 两个对象

reghao 4 лет назад
Родитель
Сommit
8ed5ccc04b

+ 6 - 6
common/src/main/java/cn/reghao/autodop/common/mqtt/DefaultMqttClient.java

@@ -2,7 +2,7 @@ package cn.reghao.autodop.common.mqtt;
 
 import cn.reghao.autodop.common.machine.Machine;
 import cn.reghao.autodop.common.msg.Message;
-import cn.reghao.autodop.common.msg.rpc.RpcParamMsg;
+import cn.reghao.autodop.common.msg.rpc.RpcMsg;
 import cn.reghao.jdkutil.serializer.JsonConverter;
 import lombok.extern.slf4j.Slf4j;
 import org.eclipse.paho.client.mqttv3.*;
@@ -23,7 +23,7 @@ public class DefaultMqttClient implements AutoCloseable {
     private final MosquittoProperties properties;
     private final MqttClient client;
     // 记录 RPC 调用, size 为 0 时表示所有 pub 消息的结果都已 sub
-    private final Map<String, RpcParamMsg> rpcRecorder = new HashMap<>();
+    private final Map<String, RpcMsg> rpcRecorder = new HashMap<>();
     private final Map<String, IMqttMessageListener> subMap = new HashMap<>();
     private final MqttCallback callback = new MqttClientCallback();
 
@@ -52,11 +52,11 @@ public class DefaultMqttClient implements AutoCloseable {
         client.close();
     }
 
-    public void putRecord(String msgId, RpcParamMsg paramMsg) {
-        rpcRecorder.put(msgId, paramMsg);
+    public void putRecord(String msgId, RpcMsg rpcMsg) {
+        rpcRecorder.put(msgId, rpcMsg);
     }
 
-    public RpcParamMsg getRecord(String msgId) {
+    public RpcMsg getRecord(String msgId) {
         return rpcRecorder.get(msgId);
     }
 
@@ -127,7 +127,7 @@ public class DefaultMqttClient implements AutoCloseable {
 
         connect();
         client.publish(topic, mqttMessage);
-        rpcRecorder.put(message.getMsgId(), message.getParamMsg());
+        rpcRecorder.put(message.getMsgId(), message.getRpcMsg());
     }
 
     class MqttClientCallback implements MqttCallbackExtended {

+ 5 - 14
common/src/main/java/cn/reghao/autodop/common/msg/Message.java

@@ -1,8 +1,7 @@
 package cn.reghao.autodop.common.msg;
 
 import cn.reghao.autodop.common.msg.pub.PubMsg;
-import cn.reghao.autodop.common.msg.rpc.RpcParamMsg;
-import cn.reghao.autodop.common.msg.rpc.RpcResultMsg;
+import cn.reghao.autodop.common.msg.rpc.RpcMsg;
 import lombok.Data;
 
 import java.io.Serializable;
@@ -20,8 +19,7 @@ public class Message implements Serializable {
     private long sendTime;
     private String msgType;
     private PubMsg pubMsg;
-    private RpcParamMsg paramMsg;
-    private RpcResultMsg resultMsg;
+    private RpcMsg rpcMsg;
 
     private Message() {
         this.msgId = UUID.randomUUID().toString();
@@ -35,17 +33,10 @@ public class Message implements Serializable {
         return message;
     }
 
-    public static Message paramMsg(RpcParamMsg paramMsg) {
+    public static Message rpcMsg(RpcMsg rpcMsg, MsgType msgType) {
         Message message = new Message();
-        message.setMsgType(MsgType.rpcParam.name());
-        message.setParamMsg(paramMsg);
-        return message;
-    }
-
-    public static Message resultMsg(RpcResultMsg resultMsg) {
-        Message message = new Message();
-        message.setMsgType(MsgType.rpcResult.name());
-        message.setResultMsg(resultMsg);
+        message.setMsgType(msgType.name());
+        message.setRpcMsg(rpcMsg);
         return message;
     }
 }

+ 62 - 0
common/src/main/java/cn/reghao/autodop/common/msg/rpc/RpcMsg.java

@@ -0,0 +1,62 @@
+package cn.reghao.autodop.common.msg.rpc;
+
+import lombok.Data;
+
+import java.io.Serializable;
+
+import static cn.reghao.jdkutil.result.ResultStatus.*;
+
+/**
+ * @author reghao
+ * @date 2021-08-25 17:19:41
+ */
+@Data
+public class RpcMsg implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private String clazz;
+    private String method;
+    private String jsonParam;
+
+    // 0 - 成功 1 - 失败
+    private int code;
+    private String msg;
+    private String jsonData;
+
+    private RpcMsg() {
+    }
+
+    public static RpcMsg paramMsg(String clazz, String method) {
+        RpcMsg paramMsg = new RpcMsg();
+        paramMsg.setClazz(clazz);
+        paramMsg.setMethod(method);
+        return paramMsg;
+    }
+
+    public static RpcMsg paramMsg(String clazz, String method, String jsonParam) {
+        RpcMsg paramMsg = new RpcMsg();
+        paramMsg.setClazz(clazz);
+        paramMsg.setMethod(method);
+        paramMsg.setJsonParam(jsonParam);
+        return paramMsg;
+    }
+
+    public RpcMsg success(String jsonData) {
+        this.setCode(SUCCESS.getCode());
+        this.setMsg(SUCCESS.getMsg());
+        this.setJsonData(jsonData);
+        return this;
+    }
+
+    public RpcMsg fail(String msg) {
+        this.setCode(FAIL.getCode());
+        this.setMsg(msg);
+        return this;
+    }
+
+    public RpcMsg error(String msg) {
+        this.setCode(ERROR.getCode());
+        this.setMsg(msg);
+        return this;
+    }
+}

+ 0 - 36
common/src/main/java/cn/reghao/autodop/common/msg/rpc/RpcParamMsg.java

@@ -1,36 +0,0 @@
-package cn.reghao.autodop.common.msg.rpc;
-
-import lombok.Data;
-
-import java.io.Serializable;
-
-/**
- * @author reghao
- * @date 2021-08-25 17:19:41
- */
-@Data
-public class RpcParamMsg implements Serializable {
-    private static final long serialVersionUID = 1L;
-
-    private String clazz;
-    private String method;
-    private String jsonParam;
-
-    private RpcParamMsg() {
-    }
-
-    public static RpcParamMsg paramMsg(String clazz, String method) {
-        RpcParamMsg paramMsg = new RpcParamMsg();
-        paramMsg.setClazz(clazz);
-        paramMsg.setMethod(method);
-        return paramMsg;
-    }
-
-    public static RpcParamMsg paramMsg(String clazz, String method, String jsonParam) {
-        RpcParamMsg paramMsg = new RpcParamMsg();
-        paramMsg.setClazz(clazz);
-        paramMsg.setMethod(method);
-        paramMsg.setJsonParam(jsonParam);
-        return paramMsg;
-    }
-}

+ 0 - 41
common/src/main/java/cn/reghao/autodop/common/msg/rpc/RpcResultMsg.java

@@ -1,41 +0,0 @@
-package cn.reghao.autodop.common.msg.rpc;
-
-import lombok.Data;
-
-import java.io.Serializable;
-
-import static cn.reghao.jdkutil.result.ResultStatus.*;
-
-/**
- * RPC 调用结果
- *
- * @author reghao
- * @date 2020-09-07 23:04:16
- */
-@Data
-public class RpcResultMsg implements Serializable {
-    private static final long serialVersionUID = 1L;
-
-    // 0 - 成功 1 - 失败
-    private int code;
-    private String msg;
-    private String jsonData;
-
-    private RpcResultMsg(int code, String msg, String jsonData) {
-        this.code = code;
-        this.msg = msg;
-        this.jsonData = jsonData;
-    }
-
-    public static RpcResultMsg success(String jsonData) {
-        return new RpcResultMsg(SUCCESS.getCode(), SUCCESS.getMsg(), jsonData);
-    }
-
-    public static RpcResultMsg fail(String msg) {
-        return new RpcResultMsg(FAIL.getCode(), msg, null);
-    }
-
-    public static RpcResultMsg error(String msg) {
-        return new RpcResultMsg(ERROR.getCode(), msg, null);
-    }
-}

+ 7 - 9
dagent/src/main/java/cn/reghao/autodop/dagent/mqttsub/RpcListener.java

@@ -3,9 +3,8 @@ package cn.reghao.autodop.dagent.mqttsub;
 import cn.reghao.autodop.common.mqtt.DefaultMqttClient;
 import cn.reghao.autodop.common.msg.Message;
 import cn.reghao.autodop.common.msg.MsgQueue;
-import cn.reghao.autodop.common.msg.rpc.RpcParamMsg;
-import cn.reghao.autodop.common.msg.rpc.RpcResultMsg;
-import cn.reghao.autodop.common.msg.rpc.clazz.IAppRpcClazz;
+import cn.reghao.autodop.common.msg.MsgType;
+import cn.reghao.autodop.common.msg.rpc.RpcMsg;
 import cn.reghao.autodop.common.msg.rpc.constant.RpcClazz;
 import cn.reghao.autodop.dagent.mqttsub.impl.AppRpcClazzDispatcher;
 import cn.reghao.jdkutil.serializer.JsonConverter;
@@ -44,19 +43,18 @@ public class RpcListener implements MqttCallback {
             String msgId = msg.getMsgId();
             long sendTime = msg.getSendTime();
 
-            RpcParamMsg paramMsg = msg.getParamMsg();
-            String clazz = paramMsg.getClazz();
-            RpcResultMsg resultMsg;
+            RpcMsg rpcMsg = msg.getRpcMsg();
+            String clazz = rpcMsg.getClazz();
             switch (RpcClazz.valueOf(clazz)) {
                 case AppRpcClazz:
-                    resultMsg = appRpcClazzDispatcher.dispatch(paramMsg);
+                    rpcMsg = appRpcClazzDispatcher.dispatch(rpcMsg);
                     break;
                 default:
                     String err = String.format("找不到 %s 类型", clazz);
-                    resultMsg = RpcResultMsg.error(err);
+                    rpcMsg.error(err);
             }
 
-            Message message1 = Message.resultMsg(resultMsg);
+            Message message1 = Message.rpcMsg(rpcMsg, MsgType.rpcResult);
             rpcReply(message1);
         } catch (Exception e) {
             log.error("MQTT message exception -> {}", e.getMessage());

+ 11 - 12
dagent/src/main/java/cn/reghao/autodop/dagent/mqttsub/impl/AppRpcClazzDispatcher.java

@@ -1,11 +1,10 @@
 package cn.reghao.autodop.dagent.mqttsub.impl;
 
 import cn.reghao.autodop.common.msg.ClazzDispatcher;
-import cn.reghao.autodop.common.msg.rpc.RpcResultMsg;
 import cn.reghao.autodop.common.msg.rpc.clazz.IAppRpcClazz;
 import cn.reghao.autodop.common.msg.rpc.constant.AppRpcClazz;
 import cn.reghao.autodop.common.msg.rpc.dto.app.*;
-import cn.reghao.autodop.common.msg.rpc.RpcParamMsg;
+import cn.reghao.autodop.common.msg.rpc.RpcMsg;
 import cn.reghao.jdkutil.serializer.JsonConverter;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Component;
@@ -16,7 +15,7 @@ import org.springframework.stereotype.Component;
  */
 @Slf4j
 @Component
-public class AppRpcClazzDispatcher implements ClazzDispatcher<RpcParamMsg, RpcResultMsg> {
+public class AppRpcClazzDispatcher implements ClazzDispatcher<RpcMsg, RpcMsg> {
     private final IAppRpcClazz appRpcClazz;
 
     public AppRpcClazzDispatcher(IAppRpcClazz appRpcClazz) {
@@ -24,36 +23,36 @@ public class AppRpcClazzDispatcher implements ClazzDispatcher<RpcParamMsg, RpcRe
     }
 
     @Override
-    public RpcResultMsg dispatch(RpcParamMsg paramMsg) {
-        String method = paramMsg.getMethod();
-        String jsonParam = paramMsg.getJsonParam();
+    public RpcMsg dispatch(RpcMsg rpcMsg) {
+        String method = rpcMsg.getMethod();
+        String jsonParam = rpcMsg.getJsonParam();
         StatusParam statusParam;
         StatusResult statusResult;
         switch (AppRpcClazz.valueOf(method)) {
             case deploy:
                 DeployParam deployParam = JsonConverter.jsonToObject(jsonParam, DeployParam.class);
                 DeployResult deployResult = appRpcClazz.deploy(deployParam);
-                return RpcResultMsg.success(JsonConverter.objectToJson(deployResult));
+                return rpcMsg.success(JsonConverter.objectToJson(deployResult));
             case status:
                 statusParam = JsonConverter.jsonToObject(jsonParam, StatusParam.class);
                 statusResult = appRpcClazz.status(statusParam);
-                return RpcResultMsg.success(JsonConverter.objectToJson(statusResult));
+                return rpcMsg.success(JsonConverter.objectToJson(statusResult));
             case restart:
                 statusParam = JsonConverter.jsonToObject(jsonParam, StatusParam.class);
                 statusResult = appRpcClazz.restart(statusParam);
-                return RpcResultMsg.success(JsonConverter.objectToJson(statusResult));
+                return rpcMsg.success(JsonConverter.objectToJson(statusResult));
             case stop:
                 statusParam = JsonConverter.jsonToObject(jsonParam, StatusParam.class);
                 statusResult = appRpcClazz.stop(statusParam);
-                return RpcResultMsg.success(JsonConverter.objectToJson(statusResult));
+                return rpcMsg.success(JsonConverter.objectToJson(statusResult));
             case start:
                 statusParam = JsonConverter.jsonToObject(jsonParam, StatusParam.class);
                 statusResult = appRpcClazz.start(statusParam);
-                return RpcResultMsg.success(JsonConverter.objectToJson(statusResult));
+                return rpcMsg.success(JsonConverter.objectToJson(statusResult));
             case log:
             default:
                 String err = String.format("找不到 %s 方法", method);
-                return RpcResultMsg.error(err);
+                return rpcMsg.error(err);
         }
     }
 }

+ 4 - 3
dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/bd/AppDeployer.java

@@ -3,7 +3,8 @@ package cn.reghao.autodop.dmaster.app.service.bd;
 import cn.reghao.autodop.common.machine.Machine;
 import cn.reghao.autodop.common.msg.Message;
 import cn.reghao.autodop.common.msg.MsgQueue;
-import cn.reghao.autodop.common.msg.rpc.RpcParamMsg;
+import cn.reghao.autodop.common.msg.MsgType;
+import cn.reghao.autodop.common.msg.rpc.RpcMsg;
 import cn.reghao.autodop.common.msg.rpc.constant.AppRpcClazz;
 import cn.reghao.autodop.common.msg.rpc.dto.app.DeployParam;
 import cn.reghao.autodop.common.mqtt.DefaultMqttClient;
@@ -47,9 +48,9 @@ public class AppDeployer {
             String startScript = deployConfig.getStartScript();
             deployParam.setStartScript(startScript);
 
-            RpcParamMsg paramMsg = RpcParamMsg.paramMsg(AppRpcClazz.class.getSimpleName(), AppRpcClazz.deploy.name(),
+            RpcMsg rpcMsg = RpcMsg.paramMsg(AppRpcClazz.class.getSimpleName(), AppRpcClazz.deploy.name(),
                     JsonConverter.objectToJson(deployParam));
-            Message message = Message.paramMsg(paramMsg);
+            Message message = Message.rpcMsg(rpcMsg, MsgType.rpcParam);
 
             // TODO 对于需要返回值的 pub,需要做一个记录,pub 和 sub 一一对应
             String topic = MsgQueue.dagentTopic(Machine.ID);

+ 11 - 12
dmaster/src/main/java/cn/reghao/autodop/dmaster/mqttsub/MessageListener.java

@@ -5,8 +5,7 @@ import cn.reghao.autodop.common.msg.Message;
 import cn.reghao.autodop.common.msg.MsgType;
 import cn.reghao.autodop.common.msg.pub.PubMsg;
 import cn.reghao.autodop.common.msg.pub.constant.PubClazz;
-import cn.reghao.autodop.common.msg.rpc.RpcParamMsg;
-import cn.reghao.autodop.common.msg.rpc.RpcResultMsg;
+import cn.reghao.autodop.common.msg.rpc.RpcMsg;
 import cn.reghao.autodop.common.msg.rpc.constant.RpcClazz;
 import cn.reghao.autodop.dmaster.mqttsub.impl.rpcresult.AppRpcClazzResultDispatcher;
 import cn.reghao.autodop.dmaster.mqttsub.impl.pub.NodePubClazzDispatcher;
@@ -27,13 +26,14 @@ import org.springframework.stereotype.Component;
 @Component
 public class MessageListener implements MqttCallback {
     private final NodePubClazzDispatcher nodePubClazzDispatcher;
-    private final AppRpcClazzResultDispatcher appRpcClazzResult;
+    private final AppRpcClazzResultDispatcher appRpcClazzResultDispatcher;
     private final DefaultMqttClient defaultMqttClient;
 
-    public MessageListener(NodePubClazzDispatcher nodePubClazzDispatcher, AppRpcClazzResultDispatcher appRpcClazzResult,
+    public MessageListener(NodePubClazzDispatcher nodePubClazzDispatcher,
+                           AppRpcClazzResultDispatcher appRpcClazzResultDispatcher,
                            DefaultMqttClient defaultMqttClient) {
         this.nodePubClazzDispatcher = nodePubClazzDispatcher;
-        this.appRpcClazzResult = appRpcClazzResult;
+        this.appRpcClazzResultDispatcher = appRpcClazzResultDispatcher;
         this.defaultMqttClient = defaultMqttClient;
     }
 
@@ -53,7 +53,7 @@ public class MessageListener implements MqttCallback {
                     dispatchPubMsg(msg.getPubMsg());
                     break;
                 case rpcResult:
-                    dispatchRpcResult(msgId, msg.getResultMsg());
+                    dispatchRpcResult(msgId, msg.getRpcMsg());
                     break;
                 default:
             }
@@ -75,17 +75,16 @@ public class MessageListener implements MqttCallback {
         }
     }
 
-    private void dispatchRpcResult(String msgId, RpcResultMsg resultMsg) {
-        RpcParamMsg paramMsg = defaultMqttClient.getRecord(msgId);
-        if (paramMsg == null) {
+    private void dispatchRpcResult(String msgId, RpcMsg rpcMsg) {
+        RpcMsg rpcMsg1 = defaultMqttClient.getRecord(msgId);
+        if (rpcMsg1 == null) {
             return;
         }
 
-        String clazz = paramMsg.getClazz();
-        String method = paramMsg.getMethod();
+        String clazz = rpcMsg.getClazz();
         switch (RpcClazz.valueOf(clazz)) {
             case AppRpcClazz:
-                appRpcClazzResult.process(msgId, method, resultMsg);
+                appRpcClazzResultDispatcher.dispatch(rpcMsg);
                 break;
             default:
                 ;

+ 14 - 18
dmaster/src/main/java/cn/reghao/autodop/dmaster/mqttsub/impl/rpcresult/AppRpcClazzResultDispatcher.java

@@ -1,8 +1,7 @@
 package cn.reghao.autodop.dmaster.mqttsub.impl.rpcresult;
 
 import cn.reghao.autodop.common.msg.ClazzDispatcher;
-import cn.reghao.autodop.common.msg.Message;
-import cn.reghao.autodop.common.msg.rpc.RpcResultMsg;
+import cn.reghao.autodop.common.msg.rpc.RpcMsg;
 import cn.reghao.autodop.common.msg.rpc.constant.AppRpcClazz;
 import cn.reghao.autodop.common.msg.rpc.dto.app.StatusResult;
 import cn.reghao.autodop.common.msg.rpc.dto.app.DeployResult;
@@ -20,44 +19,41 @@ import org.springframework.stereotype.Component;
  */
 @Slf4j
 @Component
-public class AppRpcClazzResultDispatcher implements ClazzDispatcher<Message, Void> {
+public class AppRpcClazzResultDispatcher implements ClazzDispatcher<RpcMsg, Void> {
     private final AppRpcClazzResultImpl appRpcClazzResult;
-    private final DefaultMqttClient mqttClient;
 
-    public AppRpcClazzResultDispatcher(AppRpcClazzResultImpl appRpcClazzResult, DefaultMqttClient mqttClient) {
+    public AppRpcClazzResultDispatcher(AppRpcClazzResultImpl appRpcClazzResult) {
         this.appRpcClazzResult = appRpcClazzResult;
-        this.mqttClient = mqttClient;
     }
 
     @Override
-    public Void dispatch(Message message) {
-        return null;
-    }
-
-    public void process(String msgId, String method, RpcResultMsg rpcResult) {
-        mqttClient.removeRecord(msgId);
-        int code = rpcResult.getCode();
+    public Void dispatch(RpcMsg rpcMsg) {
+        String method = rpcMsg.getMethod();
+        int code = rpcMsg.getCode();
         if (code != ResultStatus.SUCCESS.getCode()) {
-            String msg = rpcResult.getMsg();
-            log.error("error msg -> {}", msg);
-            return;
+            String msg1 = rpcMsg.getMsg();
+            log.error("error msg -> {}", msg1);
+            return null;
         }
 
+        String jsonData = rpcMsg.getJsonData();
         switch (AppRpcClazz.valueOf(method)) {
             case deploy:
-                processDeployResult(rpcResult.getJsonData());
+                processDeployResult(jsonData);
                 break;
             case start:
             case stop:
             case restart:
             case status:
-                processStatusResult(rpcResult.getJsonData());
+                processStatusResult(jsonData);
                 break;
             case log:
                 break;
             default:
                 ;
         }
+
+        return null;
     }
 
     private void processDeployResult(String payload) {