Forráskód Böngészése

修改 cn.reghao.autodop.common.msg 包,将消息分为 PubMsg, RpcParamMsg 和 RpcResultMsg. 为 PubClazz 和 RpcClazz 中的值给出相应的接口, 订阅并处理相应接口的消息时再实现接口.

reghao 4 éve
szülő
commit
ac9156524a
45 módosított fájl, 586 hozzáadás és 470 törlés
  1. 2 2
      common/src/main/java/cn/reghao/autodop/common/log/MqttAppender.java
  2. 9 5
      common/src/main/java/cn/reghao/autodop/common/mqtt/DefaultMqttClient.java
  3. 9 0
      common/src/main/java/cn/reghao/autodop/common/msg/ClazzDispatcher.java
  4. 23 9
      common/src/main/java/cn/reghao/autodop/common/msg/Message.java
  5. 1 1
      common/src/main/java/cn/reghao/autodop/common/msg/MsgType.java
  6. 16 0
      common/src/main/java/cn/reghao/autodop/common/msg/pub/clazz/INodePubClazz.java
  7. 1 1
      common/src/main/java/cn/reghao/autodop/common/msg/pub/constant/NodePubClazz.java
  8. 1 1
      common/src/main/java/cn/reghao/autodop/common/msg/pub/constant/PubClazz.java
  9. 36 0
      common/src/main/java/cn/reghao/autodop/common/msg/rpc/RpcParamMsg.java
  10. 41 0
      common/src/main/java/cn/reghao/autodop/common/msg/rpc/RpcResultMsg.java
  11. 19 0
      common/src/main/java/cn/reghao/autodop/common/msg/rpc/clazz/IAppRpcClazz.java
  12. 2 2
      common/src/main/java/cn/reghao/autodop/common/msg/rpc/constant/AppRpcClazz.java
  13. 9 0
      common/src/main/java/cn/reghao/autodop/common/msg/rpc/constant/RpcClazz.java
  14. 1 1
      common/src/main/java/cn/reghao/autodop/common/msg/rpc/dto/app/DeployParam.java
  15. 2 2
      common/src/main/java/cn/reghao/autodop/common/msg/rpc/dto/app/DeployResult.java
  16. 1 1
      common/src/main/java/cn/reghao/autodop/common/msg/rpc/dto/app/PackType.java
  17. 2 2
      common/src/main/java/cn/reghao/autodop/common/msg/rpc/dto/app/StatusParam.java
  18. 2 2
      common/src/main/java/cn/reghao/autodop/common/msg/rpc/dto/app/StatusResult.java
  19. 0 33
      common/src/main/java/cn/reghao/autodop/common/msg/sub/RpcPayload.java
  20. 0 33
      common/src/main/java/cn/reghao/autodop/common/msg/sub/RpcResult.java
  21. 0 35
      common/src/main/java/cn/reghao/autodop/common/msg/sub/SubMsg.java
  22. 0 9
      common/src/main/java/cn/reghao/autodop/common/msg/sub/clazz/SubClazz.java
  23. 7 7
      dagent/src/main/java/cn/reghao/autodop/dagent/app/AppService.java
  24. 9 9
      dagent/src/main/java/cn/reghao/autodop/dagent/app/DockerAppServiceImpl.java
  25. 7 7
      dagent/src/main/java/cn/reghao/autodop/dagent/app/ZipAppServiceImpl.java
  26. 2 2
      dagent/src/main/java/cn/reghao/autodop/dagent/machine/NodeClazzPubImpl.java
  27. 19 20
      dagent/src/main/java/cn/reghao/autodop/dagent/mqttsub/RpcListener.java
  28. 0 219
      dagent/src/main/java/cn/reghao/autodop/dagent/mqttsub/impl/AppClazzRpcImpl.java
  29. 59 0
      dagent/src/main/java/cn/reghao/autodop/dagent/mqttsub/impl/AppRpcClazzDispatcher.java
  30. 193 0
      dagent/src/main/java/cn/reghao/autodop/dagent/mqttsub/impl/AppRpcClazzImpl.java
  31. 1 1
      dmaster/src/main/java/cn/reghao/autodop/dmaster/app/controller/page/AppConfigPageController.java
  32. 1 1
      dmaster/src/main/java/cn/reghao/autodop/dmaster/app/controller/page/PackerPageController.java
  33. 1 1
      dmaster/src/main/java/cn/reghao/autodop/dmaster/app/model/po/AppDeploying.java
  34. 3 3
      dmaster/src/main/java/cn/reghao/autodop/dmaster/app/model/po/AppRunning.java
  35. 1 1
      dmaster/src/main/java/cn/reghao/autodop/dmaster/app/model/po/config/build/PackerConfig.java
  36. 3 3
      dmaster/src/main/java/cn/reghao/autodop/dmaster/app/model/po/log/DeployLog.java
  37. 5 7
      dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/bd/AppDeployer.java
  38. 1 1
      dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/bd/AppIntegrate.java
  39. 3 3
      dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/bd/DeployNotifyMsg.java
  40. 1 1
      dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/bd/tools/packer/ZipPack.java
  41. 34 28
      dmaster/src/main/java/cn/reghao/autodop/dmaster/mqttsub/MessageListener.java
  42. 33 3
      dmaster/src/main/java/cn/reghao/autodop/dmaster/mqttsub/impl/pub/NodePubClazzDispatcher.java
  43. 6 1
      dmaster/src/main/java/cn/reghao/autodop/dmaster/mqttsub/impl/pub/NodePubClazzImpl.java
  44. 17 10
      dmaster/src/main/java/cn/reghao/autodop/dmaster/mqttsub/impl/rpcresult/AppRpcClazzResultDispatcher.java
  45. 3 3
      dmaster/src/main/java/cn/reghao/autodop/dmaster/mqttsub/impl/rpcresult/AppRpcClazzResultImpl.java

+ 2 - 2
common/src/main/java/cn/reghao/autodop/common/log/MqttAppender.java

@@ -7,7 +7,7 @@ import cn.reghao.autodop.common.mqtt.DefaultMqttClient;
 import cn.reghao.autodop.common.msg.Message;
 import cn.reghao.autodop.common.msg.MsgQueue;
 import cn.reghao.autodop.common.msg.pub.PubMsg;
-import cn.reghao.autodop.common.msg.pub.clazz.NodePubClazz;
+import cn.reghao.autodop.common.msg.pub.constant.NodePubClazz;
 import cn.reghao.autodop.common.msg.pub.dto.node.NodeLogDTO;
 import cn.reghao.jdkutil.serializer.JsonConverter;
 import org.eclipse.paho.client.mqttv3.MqttException;
@@ -42,7 +42,7 @@ public class MqttAppender extends UnsynchronizedAppenderBase<ILoggingEvent> {
         NodeLogDTO nodeLogDTO = nodeLogDTO(event);
         String jsonPayload = JsonConverter.objectToJson(nodeLogDTO);
         PubMsg pubMsg = PubMsg.pubMsg(NodePubClazz.class.getSimpleName(), NodePubClazz.log.name(), jsonPayload);
-        Message message = Message.pubMessage(pubMsg);
+        Message message = Message.pubMsg(pubMsg);
         try {
             mqttClient.pub(topic, 1, JsonConverter.objectToJson(message));
         } catch (MqttException e) {

+ 9 - 5
common/src/main/java/cn/reghao/autodop/common/mqtt/DefaultMqttClient.java

@@ -2,7 +2,7 @@ package cn.reghao.autodop.common.mqtt;
 
 import cn.reghao.autodop.common.machine.Machine;
 import cn.reghao.autodop.common.msg.Message;
-import cn.reghao.autodop.common.msg.sub.SubMsg;
+import cn.reghao.autodop.common.msg.rpc.RpcParamMsg;
 import cn.reghao.jdkutil.serializer.JsonConverter;
 import lombok.extern.slf4j.Slf4j;
 import org.eclipse.paho.client.mqttv3.*;
@@ -23,7 +23,7 @@ public class DefaultMqttClient implements AutoCloseable {
     private final MosquittoProperties properties;
     private final MqttClient client;
     // 记录 RPC 调用, size 为 0 时表示所有 pub 消息的结果都已 sub
-    private final Map<String, SubMsg> rpcRecorder = new HashMap<>();
+    private final Map<String, RpcParamMsg> rpcRecorder = new HashMap<>();
     private final Map<String, IMqttMessageListener> subMap = new HashMap<>();
     private final MqttCallback callback = new MqttClientCallback();
 
@@ -52,8 +52,12 @@ public class DefaultMqttClient implements AutoCloseable {
         client.close();
     }
 
-    public void putRecord(String msgId, SubMsg rpcMsg) {
-        rpcRecorder.put(msgId, rpcMsg);
+    public void putRecord(String msgId, RpcParamMsg paramMsg) {
+        rpcRecorder.put(msgId, paramMsg);
+    }
+
+    public RpcParamMsg getRecord(String msgId) {
+        return rpcRecorder.get(msgId);
     }
 
     public void removeRecord(String msgId) {
@@ -123,7 +127,7 @@ public class DefaultMqttClient implements AutoCloseable {
 
         connect();
         client.publish(topic, mqttMessage);
-        rpcRecorder.put(message.getRpcMsg().getMsgId(), message.getRpcMsg());
+        rpcRecorder.put(message.getMsgId(), message.getParamMsg());
     }
 
     class MqttClientCallback implements MqttCallbackExtended {

+ 9 - 0
common/src/main/java/cn/reghao/autodop/common/msg/ClazzDispatcher.java

@@ -0,0 +1,9 @@
+package cn.reghao.autodop.common.msg;
+
+/**
+ * @author reghao
+ * @date 2021-10-19 16:00:12
+ */
+public interface ClazzDispatcher<K, V> {
+    V dispatch(K k);
+}

+ 23 - 9
common/src/main/java/cn/reghao/autodop/common/msg/Message.java

@@ -1,10 +1,12 @@
 package cn.reghao.autodop.common.msg;
 
 import cn.reghao.autodop.common.msg.pub.PubMsg;
-import cn.reghao.autodop.common.msg.sub.SubMsg;
+import cn.reghao.autodop.common.msg.rpc.RpcParamMsg;
+import cn.reghao.autodop.common.msg.rpc.RpcResultMsg;
 import lombok.Data;
 
 import java.io.Serializable;
+import java.util.UUID;
 
 /**
  * @author reghao
@@ -14,24 +16,36 @@ import java.io.Serializable;
 public class Message implements Serializable {
     private static final long serialVersionUID = 1L;
 
+    private String msgId;
+    private long sendTime;
     private String msgType;
     private PubMsg pubMsg;
-    private SubMsg rpcMsg;
-    private long sendTime;
+    private RpcParamMsg paramMsg;
+    private RpcResultMsg resultMsg;
+
+    private Message() {
+        this.msgId = UUID.randomUUID().toString();
+        this.sendTime = System.currentTimeMillis();
+    }
 
-    public static Message pubMessage(PubMsg pubMsg) {
+    public static Message pubMsg(PubMsg pubMsg) {
         Message message = new Message();
         message.setMsgType(MsgType.pub.name());
         message.setPubMsg(pubMsg);
-        message.setSendTime(System.currentTimeMillis());
         return message;
     }
 
-    public static Message rpcMessage(SubMsg rpcMsg) {
+    public static Message paramMsg(RpcParamMsg paramMsg) {
+        Message message = new Message();
+        message.setMsgType(MsgType.rpcParam.name());
+        message.setParamMsg(paramMsg);
+        return message;
+    }
+
+    public static Message resultMsg(RpcResultMsg resultMsg) {
         Message message = new Message();
-        message.setMsgType(MsgType.sub.name());
-        message.setRpcMsg(rpcMsg);
-        message.setSendTime(System.currentTimeMillis());
+        message.setMsgType(MsgType.rpcResult.name());
+        message.setResultMsg(resultMsg);
         return message;
     }
 }

+ 1 - 1
common/src/main/java/cn/reghao/autodop/common/msg/MsgType.java

@@ -5,5 +5,5 @@ package cn.reghao.autodop.common.msg;
  * @date 2021-09-15 09:01:17
  */
 public enum MsgType {
-    pub, sub
+    pub, rpcParam, rpcResult
 }

+ 16 - 0
common/src/main/java/cn/reghao/autodop/common/msg/pub/clazz/INodePubClazz.java

@@ -0,0 +1,16 @@
+package cn.reghao.autodop.common.msg.pub.clazz;
+
+import cn.reghao.autodop.common.msg.pub.dto.node.NodeLogDTO;
+import cn.reghao.jdkutil.machine.data.detail.MachineDetail;
+import cn.reghao.jdkutil.machine.data.stat.MachineStat;
+
+/**
+ * @author reghao
+ * @date 2021-09-15 09:19:14
+ */
+public interface INodePubClazz {
+    void start(MachineDetail machineDetail);
+    void heartbeat(MachineStat machineStat);
+    void shutdown(MachineStat machineStat);
+    void log(NodeLogDTO nodeLogDTO);
+}

+ 1 - 1
common/src/main/java/cn/reghao/autodop/common/msg/pub/clazz/NodePubClazz.java → common/src/main/java/cn/reghao/autodop/common/msg/pub/constant/NodePubClazz.java

@@ -1,4 +1,4 @@
-package cn.reghao.autodop.common.msg.pub.clazz;
+package cn.reghao.autodop.common.msg.pub.constant;
 
 /**
  * @author reghao

+ 1 - 1
common/src/main/java/cn/reghao/autodop/common/msg/pub/clazz/PubClazz.java → common/src/main/java/cn/reghao/autodop/common/msg/pub/constant/PubClazz.java

@@ -1,4 +1,4 @@
-package cn.reghao.autodop.common.msg.pub.clazz;
+package cn.reghao.autodop.common.msg.pub.constant;
 
 /**
  * @author reghao

+ 36 - 0
common/src/main/java/cn/reghao/autodop/common/msg/rpc/RpcParamMsg.java

@@ -0,0 +1,36 @@
+package cn.reghao.autodop.common.msg.rpc;
+
+import lombok.Data;
+
+import java.io.Serializable;
+
+/**
+ * @author reghao
+ * @date 2021-08-25 17:19:41
+ */
+@Data
+public class RpcParamMsg implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private String clazz;
+    private String method;
+    private String jsonParam;
+
+    private RpcParamMsg() {
+    }
+
+    public static RpcParamMsg paramMsg(String clazz, String method) {
+        RpcParamMsg paramMsg = new RpcParamMsg();
+        paramMsg.setClazz(clazz);
+        paramMsg.setMethod(method);
+        return paramMsg;
+    }
+
+    public static RpcParamMsg paramMsg(String clazz, String method, String jsonParam) {
+        RpcParamMsg paramMsg = new RpcParamMsg();
+        paramMsg.setClazz(clazz);
+        paramMsg.setMethod(method);
+        paramMsg.setJsonParam(jsonParam);
+        return paramMsg;
+    }
+}

+ 41 - 0
common/src/main/java/cn/reghao/autodop/common/msg/rpc/RpcResultMsg.java

@@ -0,0 +1,41 @@
+package cn.reghao.autodop.common.msg.rpc;
+
+import lombok.Data;
+
+import java.io.Serializable;
+
+import static cn.reghao.jdkutil.result.ResultStatus.*;
+
+/**
+ * RPC 调用结果
+ *
+ * @author reghao
+ * @date 2020-09-07 23:04:16
+ */
+@Data
+public class RpcResultMsg implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    // 0 - 成功 1 - 失败
+    private int code;
+    private String msg;
+    private String jsonData;
+
+    private RpcResultMsg(int code, String msg, String jsonData) {
+        this.code = code;
+        this.msg = msg;
+        this.jsonData = jsonData;
+    }
+
+    public static RpcResultMsg success(String jsonData) {
+        return new RpcResultMsg(SUCCESS.getCode(), SUCCESS.getMsg(), jsonData);
+    }
+
+    public static RpcResultMsg fail(String msg) {
+        return new RpcResultMsg(FAIL.getCode(), msg, null);
+    }
+
+    public static RpcResultMsg error(String msg) {
+        return new RpcResultMsg(ERROR.getCode(), msg, null);
+    }
+}

+ 19 - 0
common/src/main/java/cn/reghao/autodop/common/msg/rpc/clazz/IAppRpcClazz.java

@@ -0,0 +1,19 @@
+package cn.reghao.autodop.common.msg.rpc.clazz;
+
+import cn.reghao.autodop.common.msg.rpc.dto.app.DeployParam;
+import cn.reghao.autodop.common.msg.rpc.dto.app.DeployResult;
+import cn.reghao.autodop.common.msg.rpc.dto.app.StatusParam;
+import cn.reghao.autodop.common.msg.rpc.dto.app.StatusResult;
+
+/**
+ * @author reghao
+ * @date 2020-12-25 19:15:00
+ */
+public interface IAppRpcClazz {
+    DeployResult deploy(DeployParam deployParam);
+    StatusResult status(StatusParam statusParam);
+    StatusResult restart(StatusParam statusParam);
+    StatusResult stop(StatusParam statusParam);
+    StatusResult start(StatusParam statusParam);
+    void log();
+}

+ 2 - 2
common/src/main/java/cn/reghao/autodop/common/msg/sub/clazz/AppSubClazz.java → common/src/main/java/cn/reghao/autodop/common/msg/rpc/constant/AppRpcClazz.java

@@ -1,9 +1,9 @@
-package cn.reghao.autodop.common.msg.sub.clazz;
+package cn.reghao.autodop.common.msg.rpc.constant;
 
 /**
  * @author reghao
  * @date 2020-12-25 19:15:00
  */
-public enum AppSubClazz {
+public enum AppRpcClazz {
     deploy, status, restart, stop, start, log,
 }

+ 9 - 0
common/src/main/java/cn/reghao/autodop/common/msg/rpc/constant/RpcClazz.java

@@ -0,0 +1,9 @@
+package cn.reghao.autodop.common.msg.rpc.constant;
+
+/**
+ * @author reghao
+ * @date 2021-09-14 18:15:05
+ */
+public enum RpcClazz {
+    AppRpcClazz,
+}

+ 1 - 1
common/src/main/java/cn/reghao/autodop/common/msg/sub/dto/app/DeployParam.java → common/src/main/java/cn/reghao/autodop/common/msg/rpc/dto/app/DeployParam.java

@@ -1,4 +1,4 @@
-package cn.reghao.autodop.common.msg.sub.dto.app;
+package cn.reghao.autodop.common.msg.rpc.dto.app;
 
 import lombok.Data;
 

+ 2 - 2
common/src/main/java/cn/reghao/autodop/common/msg/sub/dto/app/DeployResult.java → common/src/main/java/cn/reghao/autodop/common/msg/rpc/dto/app/DeployResult.java

@@ -1,4 +1,4 @@
-package cn.reghao.autodop.common.msg.sub.dto.app;
+package cn.reghao.autodop.common.msg.rpc.dto.app;
 
 import cn.reghao.jdkutil.result.Result;
 import lombok.Data;
@@ -16,7 +16,7 @@ public class DeployResult {
     private String machineIpv4;
     private LocalDateTime deployTime;
     private Result result;
-    private AppStatus appStatus;
+    private StatusResult appStatus;
 
     public DeployResult(String buildLogId, String machineId, String machineIpv4) {
         this.buildLogId = buildLogId;

+ 1 - 1
common/src/main/java/cn/reghao/autodop/common/msg/sub/dto/app/PackType.java → common/src/main/java/cn/reghao/autodop/common/msg/rpc/dto/app/PackType.java

@@ -1,4 +1,4 @@
-package cn.reghao.autodop.common.msg.sub.dto.app;
+package cn.reghao.autodop.common.msg.rpc.dto.app;
 
 /**
  * 打包方式类型

+ 2 - 2
common/src/main/java/cn/reghao/autodop/common/msg/sub/dto/app/AppIdParam.java → common/src/main/java/cn/reghao/autodop/common/msg/rpc/dto/app/StatusParam.java

@@ -1,4 +1,4 @@
-package cn.reghao.autodop.common.msg.sub.dto.app;
+package cn.reghao.autodop.common.msg.rpc.dto.app;
 
 import lombok.AllArgsConstructor;
 import lombok.Data;
@@ -13,7 +13,7 @@ import lombok.NoArgsConstructor;
 @NoArgsConstructor
 @AllArgsConstructor
 @Data
-public class AppIdParam {
+public class StatusParam {
     private String packerType;
     private String appId;
 }

+ 2 - 2
common/src/main/java/cn/reghao/autodop/common/msg/sub/dto/app/AppStatus.java → common/src/main/java/cn/reghao/autodop/common/msg/rpc/dto/app/StatusResult.java

@@ -1,4 +1,4 @@
-package cn.reghao.autodop.common.msg.sub.dto.app;
+package cn.reghao.autodop.common.msg.rpc.dto.app;
 
 import lombok.Data;
 
@@ -11,7 +11,7 @@ import java.time.LocalDateTime;
  * @date 2021-02-22 16:24:08
  */
 @Data
-public class AppStatus {
+public class StatusResult {
     private String machineId;
     private String machineIpv4;
     private String appId;

+ 0 - 33
common/src/main/java/cn/reghao/autodop/common/msg/sub/RpcPayload.java

@@ -1,33 +0,0 @@
-package cn.reghao.autodop.common.msg.sub;
-
-import lombok.Data;
-
-import java.io.Serializable;
-
-/**
- * @author reghao
- * @date 2021-08-25 17:19:41
- */
-@Data
-public class RpcPayload implements Serializable {
-    private static final long serialVersionUID = 1L;
-
-    private String clazz;
-    private String method;
-    private String jsonParam;
-
-    public static RpcPayload rpcPayload(String clazz, String method) {
-        RpcPayload nodeMsg = new RpcPayload();
-        nodeMsg.setClazz(clazz);
-        nodeMsg.setMethod(method);
-        return nodeMsg;
-    }
-
-    public static RpcPayload rpcPayload(String clazz, String method, String jsonParam) {
-        RpcPayload nodeMsg = new RpcPayload();
-        nodeMsg.setClazz(clazz);
-        nodeMsg.setMethod(method);
-        nodeMsg.setJsonParam(jsonParam);
-        return nodeMsg;
-    }
-}

+ 0 - 33
common/src/main/java/cn/reghao/autodop/common/msg/sub/RpcResult.java

@@ -1,33 +0,0 @@
-package cn.reghao.autodop.common.msg.sub;
-
-import lombok.AllArgsConstructor;
-import lombok.Data;
-
-import static cn.reghao.jdkutil.result.ResultStatus.*;
-
-/**
- * RPC 调用结果
- *
- * @author reghao
- * @date 2020-09-07 23:04:16
- */
-@Data
-@AllArgsConstructor
-public class RpcResult {
-    // 0 - 成功 1 - 失败
-    private int code;
-    private String msg;
-    private String jsonData;
-
-    public static  RpcResult success(String jsonData) {
-        return new RpcResult(SUCCESS.getCode(), SUCCESS.getMsg(), jsonData);
-    }
-
-    public static  RpcResult fail(String msg) {
-        return new RpcResult(FAIL.getCode(), msg, null);
-    }
-
-    public static  RpcResult error(String msg) {
-        return new RpcResult(ERROR.getCode(), msg, null);
-    }
-}

+ 0 - 35
common/src/main/java/cn/reghao/autodop/common/msg/sub/SubMsg.java

@@ -1,35 +0,0 @@
-package cn.reghao.autodop.common.msg.sub;
-
-import lombok.Data;
-
-import java.util.UUID;
-
-/**
- * @author reghao
- * @date 2021-09-14 17:42:43
- */
-@Data
-public class SubMsg {
-    private String msgId;
-    private RpcPayload rpcPayload;
-    private RpcResult rpcResult;
-
-    public SubMsg(RpcPayload rpcPayload) {
-        this.msgId = UUID.randomUUID().toString();
-        this.rpcPayload = rpcPayload;
-    }
-
-    public SubMsg(String msgId, RpcPayload rpcPayload, RpcResult rpcResult) {
-        this.msgId = msgId;
-        this.rpcPayload = rpcPayload;
-        this.rpcResult = rpcResult;
-    }
-
-    public static SubMsg callMsg(RpcPayload rpcPayload) {
-        return new SubMsg(rpcPayload);
-    }
-
-    public static SubMsg resultMsg(SubMsg callMsg, RpcResult rpcResult) {
-        return new SubMsg(callMsg.getMsgId(), callMsg.getRpcPayload(), rpcResult);
-    }
-}

+ 0 - 9
common/src/main/java/cn/reghao/autodop/common/msg/sub/clazz/SubClazz.java

@@ -1,9 +0,0 @@
-package cn.reghao.autodop.common.msg.sub.clazz;
-
-/**
- * @author reghao
- * @date 2021-09-14 18:15:05
- */
-public enum SubClazz {
-    AppSubClazz,
-}

+ 7 - 7
dagent/src/main/java/cn/reghao/autodop/dagent/app/AppService.java

@@ -1,7 +1,7 @@
 package cn.reghao.autodop.dagent.app;
 
-import cn.reghao.autodop.common.msg.sub.dto.app.AppStatus;
-import cn.reghao.autodop.common.msg.sub.dto.app.DeployParam;
+import cn.reghao.autodop.common.msg.rpc.dto.app.StatusResult;
+import cn.reghao.autodop.common.msg.rpc.dto.app.DeployParam;
 import cn.reghao.autodop.common.dagent.app.log.AppLogArgs;
 import cn.reghao.autodop.common.dagent.app.log.LogFile;
 import cn.reghao.autodop.common.docker.DockerException;
@@ -13,11 +13,11 @@ import java.util.List;
  * @date 2021-02-22 16:21:28
  */
 public interface AppService {
-    AppStatus deploy(DeployParam deployParam) throws DockerException;
-    AppStatus status(String appId) throws DockerException;
-    AppStatus restart(String appId) throws DockerException;
-    AppStatus stop(String appId) throws DockerException;
-    AppStatus start(String appId) throws DockerException;
+    StatusResult deploy(DeployParam deployParam) throws DockerException;
+    StatusResult status(String appId) throws DockerException;
+    StatusResult restart(String appId) throws DockerException;
+    StatusResult stop(String appId) throws DockerException;
+    StatusResult start(String appId) throws DockerException;
     /**
      * 日志文件列表
      *

+ 9 - 9
dagent/src/main/java/cn/reghao/autodop/dagent/app/DockerAppServiceImpl.java

@@ -1,8 +1,8 @@
 package cn.reghao.autodop.dagent.app;
 
 import cn.reghao.autodop.common.machine.Machine;
-import cn.reghao.autodop.common.msg.sub.dto.app.AppStatus;
-import cn.reghao.autodop.common.msg.sub.dto.app.DeployParam;
+import cn.reghao.autodop.common.msg.rpc.dto.app.StatusResult;
+import cn.reghao.autodop.common.msg.rpc.dto.app.DeployParam;
 import cn.reghao.autodop.common.dagent.app.log.AppLogArgs;
 import cn.reghao.autodop.common.dagent.app.log.LogConfig;
 import cn.reghao.autodop.common.dagent.app.log.LogFile;
@@ -30,7 +30,7 @@ public class DockerAppServiceImpl implements AppService {
     private final long sleep = 10_000;
 
     @Override
-    public AppStatus deploy(DeployParam deployParam) throws DockerException {
+    public StatusResult deploy(DeployParam deployParam) throws DockerException {
         String appId = deployParam.getAppId();
         String packagePath = deployParam.getPackagePath();
         Config dockerConfig = JsonConverter.jsonToObject(deployParam.getStartScript(), Config.class);
@@ -55,8 +55,8 @@ public class DockerAppServiceImpl implements AppService {
         }
     }
 
-    private AppStatus getAppStatus(String appId, ContainerInfo containerInfo) {
-        AppStatus appStatus = new AppStatus();
+    private StatusResult getAppStatus(String appId, ContainerInfo containerInfo) {
+        StatusResult appStatus = new StatusResult();
         appStatus.setMachineId(Machine.ID);
         appStatus.setMachineIpv4(Machine.IPV4);
         appStatus.setAppId(appId);
@@ -75,7 +75,7 @@ public class DockerAppServiceImpl implements AppService {
     }
 
     @Override
-    public AppStatus status(String appId) throws DockerException {
+    public StatusResult status(String appId) throws DockerException {
         try (Docker docker = new Docker()) {
             String containerId = docker.getIdByName(appId);
             if (containerId == null) {
@@ -87,7 +87,7 @@ public class DockerAppServiceImpl implements AppService {
     }
 
     @Override
-    public AppStatus restart(String appId) throws DockerException {
+    public StatusResult restart(String appId) throws DockerException {
         try (Docker docker = new Docker()) {
             String containerId = docker.getIdByName(appId);
             docker.restart(containerId);
@@ -101,7 +101,7 @@ public class DockerAppServiceImpl implements AppService {
     }
 
     @Override
-    public AppStatus stop(String appId) throws DockerException {
+    public StatusResult stop(String appId) throws DockerException {
         try (Docker docker = new Docker()) {
             String containerId = docker.getIdByName(appId);
             docker.stop(containerId);
@@ -112,7 +112,7 @@ public class DockerAppServiceImpl implements AppService {
     }
 
     @Override
-    public AppStatus start(String appId) throws DockerException {
+    public StatusResult start(String appId) throws DockerException {
         try (Docker docker = new Docker()) {
             String containerId = docker.getIdByName(appId);
             docker.start(containerId);

+ 7 - 7
dagent/src/main/java/cn/reghao/autodop/dagent/app/ZipAppServiceImpl.java

@@ -1,7 +1,7 @@
 package cn.reghao.autodop.dagent.app;
 
-import cn.reghao.autodop.common.msg.sub.dto.app.AppStatus;
-import cn.reghao.autodop.common.msg.sub.dto.app.DeployParam;
+import cn.reghao.autodop.common.msg.rpc.dto.app.StatusResult;
+import cn.reghao.autodop.common.msg.rpc.dto.app.DeployParam;
 import cn.reghao.autodop.common.dagent.app.log.AppLogArgs;
 import cn.reghao.autodop.common.dagent.app.log.LogFile;
 import lombok.extern.slf4j.Slf4j;
@@ -16,28 +16,28 @@ import java.util.List;
 @Slf4j
 public class ZipAppServiceImpl implements AppService {
     @Override
-    public AppStatus deploy(DeployParam deployParam) {
+    public StatusResult deploy(DeployParam deployParam) {
         log.info("zip 部署");
         return null;
     }
 
     @Override
-    public AppStatus status(String appId) {
+    public StatusResult status(String appId) {
         return null;
     }
 
     @Override
-    public AppStatus restart(String appId)  {
+    public StatusResult restart(String appId)  {
         return null;
     }
 
     @Override
-    public AppStatus stop(String appId)  {
+    public StatusResult stop(String appId)  {
         return null;
     }
 
     @Override
-    public AppStatus start(String appId) {
+    public StatusResult start(String appId) {
         return null;
     }
 

+ 2 - 2
dagent/src/main/java/cn/reghao/autodop/dagent/machine/NodeClazzPubImpl.java

@@ -5,7 +5,7 @@ import cn.reghao.autodop.common.mqtt.DefaultMqttClient;
 import cn.reghao.autodop.common.msg.Message;
 import cn.reghao.autodop.common.msg.MsgQueue;
 import cn.reghao.autodop.common.msg.pub.PubMsg;
-import cn.reghao.autodop.common.msg.pub.clazz.NodePubClazz;
+import cn.reghao.autodop.common.msg.pub.constant.NodePubClazz;
 import cn.reghao.autodop.common.util.thread.ThreadPoolWrapper;
 import cn.reghao.jdkutil.machine.data.detail.MachineDetail;
 import cn.reghao.jdkutil.machine.data.stat.MachineStat;
@@ -57,7 +57,7 @@ public class NodeClazzPubImpl {
     }
     
     private void pub(PubMsg pubMsg) {
-        Message message = Message.pubMessage(pubMsg);
+        Message message = Message.pubMsg(pubMsg);
         try {
             String topic = MsgQueue.dmasterTopic();
             mqttClient.pub(topic, 1, JsonConverter.objectToJson(message));

+ 19 - 20
dagent/src/main/java/cn/reghao/autodop/dagent/mqttsub/RpcListener.java

@@ -3,11 +3,11 @@ package cn.reghao.autodop.dagent.mqttsub;
 import cn.reghao.autodop.common.mqtt.DefaultMqttClient;
 import cn.reghao.autodop.common.msg.Message;
 import cn.reghao.autodop.common.msg.MsgQueue;
-import cn.reghao.autodop.common.msg.sub.RpcPayload;
-import cn.reghao.autodop.common.msg.sub.SubMsg;
-import cn.reghao.autodop.common.msg.sub.RpcResult;
-import cn.reghao.autodop.common.msg.sub.clazz.SubClazz;
-import cn.reghao.autodop.dagent.mqttsub.impl.AppClazzRpcImpl;
+import cn.reghao.autodop.common.msg.rpc.RpcParamMsg;
+import cn.reghao.autodop.common.msg.rpc.RpcResultMsg;
+import cn.reghao.autodop.common.msg.rpc.clazz.IAppRpcClazz;
+import cn.reghao.autodop.common.msg.rpc.constant.RpcClazz;
+import cn.reghao.autodop.dagent.mqttsub.impl.AppRpcClazzDispatcher;
 import cn.reghao.jdkutil.serializer.JsonConverter;
 import lombok.extern.slf4j.Slf4j;
 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
@@ -29,35 +29,34 @@ import java.lang.management.ManagementFactory;
 public class RpcListener implements MqttCallback {
     private final long startTime;
     private final DefaultMqttClient mqttClient;
-    private final AppClazzRpcImpl appClazz;
+    private final AppRpcClazzDispatcher appRpcClazzDispatcher;
 
-    public RpcListener(DefaultMqttClient mqttClient, AppClazzRpcImpl appClazz) {
+    public RpcListener(DefaultMqttClient mqttClient, AppRpcClazzDispatcher appRpcClazzDispatcher) {
         this.startTime = ManagementFactory.getRuntimeMXBean().getStartTime();
         this.mqttClient = mqttClient;
-        this.appClazz = appClazz;
+        this.appRpcClazzDispatcher = appRpcClazzDispatcher;
     }
 
     @Override
     public void messageArrived(String topic, MqttMessage message) {
         try {
-            String msg = message.toString();
-            Message callMessage = JsonConverter.jsonToObject(msg, Message.class);
-            SubMsg callMsg = callMessage.getRpcMsg();
+            Message msg = JsonConverter.jsonToObject(message.toString(), Message.class);
+            String msgId = msg.getMsgId();
+            long sendTime = msg.getSendTime();
 
-            RpcPayload rpcPayload = callMsg.getRpcPayload();
-            String clazz = rpcPayload.getClazz();
-            RpcResult rpcResult;
-            switch (SubClazz.valueOf(clazz)) {
-                case AppSubClazz:
-                    rpcResult = appClazz.process(rpcPayload);
+            RpcParamMsg paramMsg = msg.getParamMsg();
+            String clazz = paramMsg.getClazz();
+            RpcResultMsg resultMsg;
+            switch (RpcClazz.valueOf(clazz)) {
+                case AppRpcClazz:
+                    resultMsg = appRpcClazzDispatcher.dispatch(paramMsg);
                     break;
                 default:
                     String err = String.format("找不到 %s 类型", clazz);
-                    rpcResult = RpcResult.error(err);
+                    resultMsg = RpcResultMsg.error(err);
             }
 
-            SubMsg resultMsg = SubMsg.resultMsg(callMsg, rpcResult);
-            Message message1 = Message.rpcMessage(resultMsg);
+            Message message1 = Message.resultMsg(resultMsg);
             rpcReply(message1);
         } catch (Exception e) {
             log.error("MQTT message exception -> {}", e.getMessage());

+ 0 - 219
dagent/src/main/java/cn/reghao/autodop/dagent/mqttsub/impl/AppClazzRpcImpl.java

@@ -1,219 +0,0 @@
-package cn.reghao.autodop.dagent.mqttsub.impl;
-
-import cn.reghao.autodop.common.dagent.app.log.AppLogArgs;
-import cn.reghao.autodop.common.dagent.app.log.LogConfig;
-import cn.reghao.autodop.common.dagent.app.log.LogFile;
-import cn.reghao.autodop.common.machine.Machine;
-import cn.reghao.autodop.common.msg.sub.clazz.AppSubClazz;
-import cn.reghao.autodop.common.msg.sub.dto.app.*;
-import cn.reghao.autodop.common.msg.sub.RpcPayload;
-import cn.reghao.autodop.common.msg.sub.RpcResult;
-import cn.reghao.autodop.common.util.ExceptionUtil;
-import cn.reghao.autodop.dagent.app.AppService;
-import cn.reghao.autodop.dagent.app.DockerAppServiceImpl;
-import cn.reghao.autodop.dagent.app.ZipAppServiceImpl;
-import cn.reghao.jdkutil.result.Result;
-import cn.reghao.jdkutil.result.ResultStatus;
-import cn.reghao.jdkutil.serializer.JsonConverter;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.stereotype.Component;
-
-import java.util.List;
-
-/**
- * @author reghao
- * @date 2020-12-30 10:26:47
- */
-@Slf4j
-@Component
-public class AppClazzRpcImpl {
-    private final AppService dockerAppServiceImpl;
-    private final AppService zipAppServiceImpl;
-    
-    public AppClazzRpcImpl() {
-        this.dockerAppServiceImpl = new DockerAppServiceImpl();
-        this.zipAppServiceImpl = new ZipAppServiceImpl();
-    }
-    
-    public RpcResult process(RpcPayload rpcPayload) {
-        String method = rpcPayload.getMethod();
-        String jsonParam = rpcPayload.getJsonParam();
-        Object param;
-        switch (AppSubClazz.valueOf(method)) {
-            case deploy:
-                param = JsonConverter.jsonToObject(jsonParam, DeployParam.class);
-                return deploy(param);
-            case status:
-                param = JsonConverter.jsonToObject(jsonParam, AppIdParam.class);
-                return status(param);
-            case start:
-                param = JsonConverter.jsonToObject(jsonParam, AppIdParam.class);
-                return start(param);
-            case stop:
-                param = JsonConverter.jsonToObject(jsonParam, AppIdParam.class);
-                return stop(param);
-            case restart:
-                param = JsonConverter.jsonToObject(jsonParam, AppIdParam.class);
-                return restart(param);
-            default:
-                String err = String.format("找不到 %s 方法", method);
-                return RpcResult.error(err);
-        }
-    }
-    
-    private RpcResult deploy(Object param) {
-        DeployParam deployParam = (DeployParam) param;
-        String buildLogId = deployParam.getBuildLogId();
-        String packType = deployParam.getPackType();
-
-        AppStatus appStatus;
-        DeployResult deployResult = new DeployResult(buildLogId, Machine.ID, Machine.IPV4);
-        try {
-            switch (PackType.valueOf(packType)) {
-                case docker:
-                    appStatus = dockerAppServiceImpl.deploy(deployParam);
-                    deployResult.setResult(Result.result(ResultStatus.SUCCESS));
-                    deployResult.setAppStatus(appStatus);
-                    break;
-                case zip:
-                    appStatus = zipAppServiceImpl.deploy(deployParam);
-                    deployResult.setResult(Result.result(ResultStatus.SUCCESS));
-                    deployResult.setAppStatus(appStatus);
-                    break;
-                default:
-                    String msg = "打包类型 " + deployParam.getPackType() + " 不存在";
-                    deployResult.setResult(Result.result(ResultStatus.ERROR, msg));
-            }
-        } catch (Exception e) {
-            deployResult.setResult(Result.result(ResultStatus.FAIL, ExceptionUtil.errorMsg(e)));
-        }
-        
-        return RpcResult.success(JsonConverter.objectToJson(deployResult));
-    }
-
-    public RpcResult status(Object param) {
-        AppIdParam appIdParam = (AppIdParam) param;
-        String appId = appIdParam.getAppId();
-        String packerType = appIdParam.getPackerType();
-
-        AppStatus appStatus;
-        try {
-            switch (PackType.valueOf(packerType)) {
-                case docker:
-                    appStatus = dockerAppServiceImpl.status(appId);
-                    break;
-                case zip:
-                    appStatus = zipAppServiceImpl.status(appId);
-                    break;
-                default:
-                    return RpcResult.error("打包类型 " + appIdParam.getPackerType() + " 不存在");
-            }
-        } catch (Exception e) {
-            return RpcResult.fail(ExceptionUtil.errorMsg(e));
-        }
-        
-        return RpcResult.success(JsonConverter.objectToJson(appStatus));
-    }
-
-    public RpcResult restart(Object param) {
-        AppIdParam appIdParam = (AppIdParam) param;
-        String appId = appIdParam.getAppId();
-        String packerType = appIdParam.getPackerType();
-
-        AppStatus appStatus;
-        try {
-            switch (PackType.valueOf(packerType)) {
-                case docker:
-                    appStatus = dockerAppServiceImpl.restart(appId);
-                    break;
-                case zip:
-                    appStatus = zipAppServiceImpl.restart(appId);
-                    break;
-                default:
-                    return RpcResult.error("打包类型 " + appIdParam.getPackerType() + " 不存在");
-            }
-        } catch (Exception e) {
-            return RpcResult.fail(ExceptionUtil.errorMsg(e));
-        }
-        
-        return RpcResult.success(JsonConverter.objectToJson(appStatus));
-    }
-
-    public RpcResult stop(Object param) {
-        AppIdParam appIdParam = (AppIdParam) param;
-        String appId = appIdParam.getAppId();
-        String packerType = appIdParam.getPackerType();
-
-        AppStatus appStatus;
-        try {
-            switch (PackType.valueOf(packerType)) {
-                case docker:
-                    appStatus = dockerAppServiceImpl.stop(appId);
-                    break;
-                case zip:
-                    appStatus = zipAppServiceImpl.stop(appId);
-                    break;
-                default:
-                    return RpcResult.error("打包类型 " + appIdParam.getPackerType() + " 不存在");
-            }
-        } catch (Exception e) {
-            return RpcResult.fail(ExceptionUtil.errorMsg(e));
-        }
-
-        return RpcResult.success(JsonConverter.objectToJson(appStatus));
-    }
-
-    public RpcResult start(Object param) {
-        AppIdParam appIdParam = (AppIdParam) param;
-        String appId = appIdParam.getAppId();
-        String packerType = appIdParam.getPackerType();
-
-        AppStatus appStatus;
-        try {
-            switch (PackType.valueOf(packerType)) {
-                case docker:
-                    appStatus = dockerAppServiceImpl.start(appId);
-                    break;
-                case zip:
-                    appStatus = zipAppServiceImpl.start(appId);
-                    break;
-                default:
-                    return RpcResult.error("打包类型 " + appIdParam.getPackerType() + " 不存在");
-            }
-        } catch (Exception e) {
-            return RpcResult.fail(ExceptionUtil.errorMsg(e));
-        }
-
-        return RpcResult.success(JsonConverter.objectToJson(appStatus));
-    }
-
-    public RpcResult log(Object param) {
-        AppLogArgs appLogArgs = (AppLogArgs) param;
-        String packerType = appLogArgs.getPackerType();
-        long count = appLogArgs.getLogConfigs().stream().filter(LogConfig::getIsDir).count();
-        try {
-            switch (PackType.valueOf(packerType)) {
-                case docker:
-                    if (count > 0) {
-                        List<LogFile> logFiles = dockerAppServiceImpl.logFiles(appLogArgs);
-                        return RpcResult.success(JsonConverter.objectToJson(logFiles));
-                    } else {
-                        List<String> logContent = dockerAppServiceImpl.logContent(appLogArgs);
-                        return RpcResult.success(JsonConverter.objectToJson(logContent));
-                    }
-                case zip:
-                    if (count > 0) {
-                        List<LogFile> logFiles = zipAppServiceImpl.logFiles(appLogArgs);
-                        return RpcResult.success(JsonConverter.objectToJson(logFiles));
-                    } else {
-                        List<String> logContent = zipAppServiceImpl.logContent(appLogArgs);
-                        return RpcResult.success(JsonConverter.objectToJson(logContent));
-                    }
-                default:
-                    return RpcResult.error("打包类型 " + appLogArgs.getPackerType() + " 不存在");
-            }
-        } catch (Exception e) {
-            return RpcResult.fail(ExceptionUtil.errorMsg(e));
-        }
-    }
-}

+ 59 - 0
dagent/src/main/java/cn/reghao/autodop/dagent/mqttsub/impl/AppRpcClazzDispatcher.java

@@ -0,0 +1,59 @@
+package cn.reghao.autodop.dagent.mqttsub.impl;
+
+import cn.reghao.autodop.common.msg.ClazzDispatcher;
+import cn.reghao.autodop.common.msg.rpc.RpcResultMsg;
+import cn.reghao.autodop.common.msg.rpc.clazz.IAppRpcClazz;
+import cn.reghao.autodop.common.msg.rpc.constant.AppRpcClazz;
+import cn.reghao.autodop.common.msg.rpc.dto.app.*;
+import cn.reghao.autodop.common.msg.rpc.RpcParamMsg;
+import cn.reghao.jdkutil.serializer.JsonConverter;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author reghao
+ * @date 2020-12-30 10:26:47
+ */
+@Slf4j
+@Component
+public class AppRpcClazzDispatcher implements ClazzDispatcher<RpcParamMsg, RpcResultMsg> {
+    private final IAppRpcClazz appRpcClazz;
+
+    public AppRpcClazzDispatcher(IAppRpcClazz appRpcClazz) {
+        this.appRpcClazz = appRpcClazz;
+    }
+
+    @Override
+    public RpcResultMsg dispatch(RpcParamMsg paramMsg) {
+        String method = paramMsg.getMethod();
+        String jsonParam = paramMsg.getJsonParam();
+        StatusParam statusParam;
+        StatusResult statusResult;
+        switch (AppRpcClazz.valueOf(method)) {
+            case deploy:
+                DeployParam deployParam = JsonConverter.jsonToObject(jsonParam, DeployParam.class);
+                DeployResult deployResult = appRpcClazz.deploy(deployParam);
+                return RpcResultMsg.success(JsonConverter.objectToJson(deployResult));
+            case status:
+                statusParam = JsonConverter.jsonToObject(jsonParam, StatusParam.class);
+                statusResult = appRpcClazz.status(statusParam);
+                return RpcResultMsg.success(JsonConverter.objectToJson(statusResult));
+            case restart:
+                statusParam = JsonConverter.jsonToObject(jsonParam, StatusParam.class);
+                statusResult = appRpcClazz.restart(statusParam);
+                return RpcResultMsg.success(JsonConverter.objectToJson(statusResult));
+            case stop:
+                statusParam = JsonConverter.jsonToObject(jsonParam, StatusParam.class);
+                statusResult = appRpcClazz.stop(statusParam);
+                return RpcResultMsg.success(JsonConverter.objectToJson(statusResult));
+            case start:
+                statusParam = JsonConverter.jsonToObject(jsonParam, StatusParam.class);
+                statusResult = appRpcClazz.start(statusParam);
+                return RpcResultMsg.success(JsonConverter.objectToJson(statusResult));
+            case log:
+            default:
+                String err = String.format("找不到 %s 方法", method);
+                return RpcResultMsg.error(err);
+        }
+    }
+}

+ 193 - 0
dagent/src/main/java/cn/reghao/autodop/dagent/mqttsub/impl/AppRpcClazzImpl.java

@@ -0,0 +1,193 @@
+package cn.reghao.autodop.dagent.mqttsub.impl;
+
+import cn.reghao.autodop.common.machine.Machine;
+import cn.reghao.autodop.common.msg.rpc.clazz.IAppRpcClazz;
+import cn.reghao.autodop.common.msg.rpc.dto.app.*;
+import cn.reghao.autodop.common.util.ExceptionUtil;
+import cn.reghao.autodop.dagent.app.AppService;
+import cn.reghao.autodop.dagent.app.DockerAppServiceImpl;
+import cn.reghao.autodop.dagent.app.ZipAppServiceImpl;
+import cn.reghao.jdkutil.result.Result;
+import cn.reghao.jdkutil.result.ResultStatus;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author reghao
+ * @date 2020-12-30 10:26:47
+ */
+@Slf4j
+@Component
+public class AppRpcClazzImpl implements IAppRpcClazz {
+    private final AppService dockerAppServiceImpl;
+    private final AppService zipAppServiceImpl;
+
+    public AppRpcClazzImpl() {
+        this.dockerAppServiceImpl = new DockerAppServiceImpl();
+        this.zipAppServiceImpl = new ZipAppServiceImpl();
+    }
+
+    @Override
+    public DeployResult deploy(DeployParam deployParam) {
+        String buildLogId = deployParam.getBuildLogId();
+        String packType = deployParam.getPackType();
+
+        StatusResult appStatus;
+        DeployResult deployResult = new DeployResult(buildLogId, Machine.ID, Machine.IPV4);
+        try {
+            switch (PackType.valueOf(packType)) {
+                case docker:
+                    appStatus = dockerAppServiceImpl.deploy(deployParam);
+                    deployResult.setResult(Result.result(ResultStatus.SUCCESS));
+                    deployResult.setAppStatus(appStatus);
+                    break;
+                case zip:
+                    appStatus = zipAppServiceImpl.deploy(deployParam);
+                    deployResult.setResult(Result.result(ResultStatus.SUCCESS));
+                    deployResult.setAppStatus(appStatus);
+                    break;
+                default:
+                    String msg = "打包类型 " + deployParam.getPackType() + " 不存在";
+                    deployResult.setResult(Result.result(ResultStatus.ERROR, msg));
+            }
+        } catch (Exception e) {
+            deployResult.setResult(Result.result(ResultStatus.FAIL, ExceptionUtil.errorMsg(e)));
+        }
+        
+        //return RpcResultMsg.success(JsonConverter.objectToJson(deployResult));
+        return deployResult;
+    }
+
+    @Override
+    public StatusResult status(StatusParam statusParam) {
+        String appId = statusParam.getAppId();
+        String packerType = statusParam.getPackerType();
+
+        StatusResult statusResult;
+        try {
+            switch (PackType.valueOf(packerType)) {
+                case docker:
+                    statusResult = dockerAppServiceImpl.status(appId);
+                    break;
+                case zip:
+                    statusResult = zipAppServiceImpl.status(appId);
+                    break;
+                default:
+                    return null;
+                    //return RpcResultMsg.error("打包类型 " + statusParam.getPackerType() + " 不存在");
+            }
+        } catch (Exception e) {
+            return null;
+            //return RpcResultMsg.fail(ExceptionUtil.errorMsg(e));
+        }
+        
+        //return RpcResultMsg.success(JsonConverter.objectToJson(appStatus));
+        return statusResult;
+    }
+
+    @Override
+    public StatusResult restart(StatusParam statusParam) {
+        String appId = statusParam.getAppId();
+        String packerType = statusParam.getPackerType();
+
+        /*StatusResult appStatus;
+        try {
+            switch (PackType.valueOf(packerType)) {
+                case docker:
+                    appStatus = dockerAppServiceImpl.restart(appId);
+                    break;
+                case zip:
+                    appStatus = zipAppServiceImpl.restart(appId);
+                    break;
+                default:
+                    return RpcResultMsg.error("打包类型 " + statusParam.getPackerType() + " 不存在");
+            }
+        } catch (Exception e) {
+            return RpcResultMsg.fail(ExceptionUtil.errorMsg(e));
+        }
+        
+        return RpcResultMsg.success(JsonConverter.objectToJson(appStatus));*/
+        return null;
+    }
+
+    @Override
+    public StatusResult stop(StatusParam statusParam) {
+        String appId = statusParam.getAppId();
+        String packerType = statusParam.getPackerType();
+
+        /*StatusResult appStatus;
+        try {
+            switch (PackType.valueOf(packerType)) {
+                case docker:
+                    appStatus = dockerAppServiceImpl.stop(appId);
+                    break;
+                case zip:
+                    appStatus = zipAppServiceImpl.stop(appId);
+                    break;
+                default:
+                    return RpcResultMsg.error("打包类型 " + statusParam.getPackerType() + " 不存在");
+            }
+        } catch (Exception e) {
+            return RpcResultMsg.fail(ExceptionUtil.errorMsg(e));
+        }
+
+        return RpcResultMsg.success(JsonConverter.objectToJson(appStatus));*/
+        return null;
+    }
+
+    @Override
+    public StatusResult start(StatusParam statusParam) {
+        String appId = statusParam.getAppId();
+        String packerType = statusParam.getPackerType();
+
+        /*StatusResult appStatus;
+        try {
+            switch (PackType.valueOf(packerType)) {
+                case docker:
+                    appStatus = dockerAppServiceImpl.start(appId);
+                    break;
+                case zip:
+                    appStatus = zipAppServiceImpl.start(appId);
+                    break;
+                default:
+                    return RpcResultMsg.error("打包类型 " + statusParam.getPackerType() + " 不存在");
+            }
+        } catch (Exception e) {
+            return RpcResultMsg.fail(ExceptionUtil.errorMsg(e));
+        }
+
+        return RpcResultMsg.success(JsonConverter.objectToJson(appStatus));*/
+        return null;
+    }
+
+    @Override
+    public void log() {
+        /*AppLogArgs appLogArgs = (AppLogArgs) param;
+        String packerType = appLogArgs.getPackerType();
+        long count = appLogArgs.getLogConfigs().stream().filter(LogConfig::getIsDir).count();
+        try {
+            switch (PackType.valueOf(packerType)) {
+                case docker:
+                    if (count > 0) {
+                        List<LogFile> logFiles = dockerAppServiceImpl.logFiles(appLogArgs);
+                        return RpcResultMsg.success(JsonConverter.objectToJson(logFiles));
+                    } else {
+                        List<String> logContent = dockerAppServiceImpl.logContent(appLogArgs);
+                        return RpcResultMsg.success(JsonConverter.objectToJson(logContent));
+                    }
+                case zip:
+                    if (count > 0) {
+                        List<LogFile> logFiles = zipAppServiceImpl.logFiles(appLogArgs);
+                        return RpcResultMsg.success(JsonConverter.objectToJson(logFiles));
+                    } else {
+                        List<String> logContent = zipAppServiceImpl.logContent(appLogArgs);
+                        return RpcResultMsg.success(JsonConverter.objectToJson(logContent));
+                    }
+                default:
+                    return RpcResultMsg.error("打包类型 " + appLogArgs.getPackerType() + " 不存在");
+            }
+        } catch (Exception e) {
+            return RpcResultMsg.fail(ExceptionUtil.errorMsg(e));
+        }*/
+    }
+}

+ 1 - 1
dmaster/src/main/java/cn/reghao/autodop/dmaster/app/controller/page/AppConfigPageController.java

@@ -1,6 +1,6 @@
 package cn.reghao.autodop.dmaster.app.controller.page;
 
-import cn.reghao.autodop.common.msg.sub.dto.app.PackType;
+import cn.reghao.autodop.common.msg.rpc.dto.app.PackType;
 import cn.reghao.autodop.dmaster.app.model.constant.AppType;
 import cn.reghao.autodop.dmaster.app.model.constant.EnvList;
 import cn.reghao.autodop.dmaster.app.model.po.config.DeployConfig;

+ 1 - 1
dmaster/src/main/java/cn/reghao/autodop/dmaster/app/controller/page/PackerPageController.java

@@ -1,7 +1,7 @@
 package cn.reghao.autodop.dmaster.app.controller.page;
 
 import cn.reghao.autodop.common.machine.Machine;
-import cn.reghao.autodop.common.msg.sub.dto.app.PackType;
+import cn.reghao.autodop.common.msg.rpc.dto.app.PackType;
 import cn.reghao.autodop.dmaster.app.db.query.config.PackerConfigQuery;
 import cn.reghao.autodop.dmaster.app.model.po.config.build.PackerConfig;
 import cn.reghao.autodop.dmaster.app.model.vo.KeyValue;

+ 1 - 1
dmaster/src/main/java/cn/reghao/autodop/dmaster/app/model/po/AppDeploying.java

@@ -1,6 +1,6 @@
 package cn.reghao.autodop.dmaster.app.model.po;
 
-import cn.reghao.autodop.common.msg.sub.dto.app.DeployResult;
+import cn.reghao.autodop.common.msg.rpc.dto.app.DeployResult;
 import cn.reghao.autodop.dmaster.app.model.po.config.AppConfig;
 import cn.reghao.autodop.dmaster.app.model.po.config.DeployConfig;
 import cn.reghao.autodop.dmaster.app.model.po.log.BuildLog;

+ 3 - 3
dmaster/src/main/java/cn/reghao/autodop/dmaster/app/model/po/AppRunning.java

@@ -1,6 +1,6 @@
 package cn.reghao.autodop.dmaster.app.model.po;
 
-import cn.reghao.autodop.common.msg.sub.dto.app.AppStatus;
+import cn.reghao.autodop.common.msg.rpc.dto.app.StatusResult;
 import cn.reghao.autodop.dmaster.app.model.po.config.AppConfig;
 import cn.reghao.autodop.dmaster.app.model.po.config.DeployConfig;
 import cn.reghao.autodop.dmaster.app.model.po.log.BuildLog;
@@ -42,7 +42,7 @@ public class AppRunning extends AppProperty {
         return this;
     }
 
-    public AppRunning update(BuildLog buildLog, AppStatus appStatus) {
+    public AppRunning update(BuildLog buildLog, StatusResult appStatus) {
         this.setBuildLogId(buildLog.getId());
         this.setPackagePath(buildLog.getPackagePath());
         this.setIsRunning(appStatus.getIsRunning());
@@ -52,7 +52,7 @@ public class AppRunning extends AppProperty {
         return this;
     }
 
-    public AppRunning update(AppStatus appStatus) {
+    public AppRunning update(StatusResult appStatus) {
         this.setPackagePath(appStatus.getPackagePath());
         this.setIsRunning(appStatus.getIsRunning());
         this.setStartTime(appStatus.getStartTime());

+ 1 - 1
dmaster/src/main/java/cn/reghao/autodop/dmaster/app/model/po/config/build/PackerConfig.java

@@ -1,6 +1,6 @@
 package cn.reghao.autodop.dmaster.app.model.po.config.build;
 
-import cn.reghao.autodop.common.msg.sub.dto.app.PackType;
+import cn.reghao.autodop.common.msg.rpc.dto.app.PackType;
 import cn.reghao.autodop.dmaster.util.validator.ValidEnum;
 import cn.reghao.autodop.dmaster.util.db.BaseEntity;
 import lombok.*;

+ 3 - 3
dmaster/src/main/java/cn/reghao/autodop/dmaster/app/model/po/log/DeployLog.java

@@ -1,7 +1,7 @@
 package cn.reghao.autodop.dmaster.app.model.po.log;
 
-import cn.reghao.autodop.common.msg.sub.dto.app.AppStatus;
-import cn.reghao.autodop.common.msg.sub.dto.app.DeployResult;
+import cn.reghao.autodop.common.msg.rpc.dto.app.StatusResult;
+import cn.reghao.autodop.common.msg.rpc.dto.app.DeployResult;
 import cn.reghao.jdkutil.result.Result;
 import cn.reghao.autodop.dmaster.util.db.BaseDocument;
 import lombok.Data;
@@ -31,7 +31,7 @@ public class DeployLog extends BaseDocument {
         DeployLog deployLog = new DeployLog();
         deployLog.setBuildLogId(deployResult.getBuildLogId());
         deployLog.setResult(deployResult.getResult());
-        AppStatus appStatus = deployResult.getAppStatus();
+        StatusResult appStatus = deployResult.getAppStatus();
         if (appStatus != null) {
             deployLog.setMachineId(appStatus.getMachineId());
             deployLog.setMachineIpv4(appStatus.getMachineIpv4());

+ 5 - 7
dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/bd/AppDeployer.java

@@ -3,10 +3,9 @@ package cn.reghao.autodop.dmaster.app.service.bd;
 import cn.reghao.autodop.common.machine.Machine;
 import cn.reghao.autodop.common.msg.Message;
 import cn.reghao.autodop.common.msg.MsgQueue;
-import cn.reghao.autodop.common.msg.sub.SubMsg;
-import cn.reghao.autodop.common.msg.sub.RpcPayload;
-import cn.reghao.autodop.common.msg.sub.clazz.AppSubClazz;
-import cn.reghao.autodop.common.msg.sub.dto.app.DeployParam;
+import cn.reghao.autodop.common.msg.rpc.RpcParamMsg;
+import cn.reghao.autodop.common.msg.rpc.constant.AppRpcClazz;
+import cn.reghao.autodop.common.msg.rpc.dto.app.DeployParam;
 import cn.reghao.autodop.common.mqtt.DefaultMqttClient;
 import cn.reghao.jdkutil.serializer.JsonConverter;
 import cn.reghao.autodop.dmaster.app.db.query.config.AppConfigQuery;
@@ -48,10 +47,9 @@ public class AppDeployer {
             String startScript = deployConfig.getStartScript();
             deployParam.setStartScript(startScript);
 
-            RpcPayload rpcPayload = RpcPayload.rpcPayload(AppSubClazz.class.getSimpleName(), AppSubClazz.deploy.name(),
+            RpcParamMsg paramMsg = RpcParamMsg.paramMsg(AppRpcClazz.class.getSimpleName(), AppRpcClazz.deploy.name(),
                     JsonConverter.objectToJson(deployParam));
-            SubMsg rpcMsg = SubMsg.callMsg(rpcPayload);
-            Message message = Message.rpcMessage(rpcMsg);
+            Message message = Message.paramMsg(paramMsg);
 
             // TODO 对于需要返回值的 pub,需要做一个记录,pub 和 sub 一一对应
             String topic = MsgQueue.dagentTopic(Machine.ID);

+ 1 - 1
dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/bd/AppIntegrate.java

@@ -1,6 +1,6 @@
 package cn.reghao.autodop.dmaster.app.service.bd;
 
-import cn.reghao.autodop.common.msg.sub.dto.app.PackType;
+import cn.reghao.autodop.common.msg.rpc.dto.app.PackType;
 import cn.reghao.autodop.dmaster.app.model.constant.build.CompileType;
 import cn.reghao.autodop.dmaster.app.model.constant.build.RepoType;
 import cn.reghao.autodop.dmaster.app.model.po.config.build.CompilerConfig;

+ 3 - 3
dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/bd/DeployNotifyMsg.java

@@ -1,7 +1,7 @@
 package cn.reghao.autodop.dmaster.app.service.bd;
 
-import cn.reghao.autodop.common.msg.sub.dto.app.AppStatus;
-import cn.reghao.autodop.common.msg.sub.dto.app.DeployResult;
+import cn.reghao.autodop.common.msg.rpc.dto.app.StatusResult;
+import cn.reghao.autodop.common.msg.rpc.dto.app.DeployResult;
 import cn.reghao.autodop.dmaster.app.model.po.log.BuildLog;
 import cn.reghao.autodop.dmaster.notification.service.notifier.ding.DingMsg;
 import cn.reghao.jdkutil.converter.DateTimeConverter;
@@ -35,7 +35,7 @@ public class DeployNotifyMsg {
         this.commitTime = DateTimeConverter.format(buildLog.getCommitInfo().getMsCommitTime());
         this.machineIpv4 = deployResult.getMachineIpv4();
         this.deployResult = deployResult.getResult().getMsg();
-        AppStatus appStatus = deployResult.getAppStatus();
+        StatusResult appStatus = deployResult.getAppStatus();
         if (appStatus != null) {
             this.deployTime = DateTimeConverter.format(appStatus.getStartTime());
         } else {

+ 1 - 1
dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/bd/tools/packer/ZipPack.java

@@ -1,6 +1,6 @@
 package cn.reghao.autodop.dmaster.app.service.bd.tools.packer;
 
-import cn.reghao.autodop.common.msg.sub.dto.app.PackType;
+import cn.reghao.autodop.common.msg.rpc.dto.app.PackType;
 import cn.reghao.autodop.common.util.compression.ZipFiles;
 import cn.reghao.autodop.dmaster.app.model.po.config.build.LocalBuildDir;
 import cn.reghao.autodop.dmaster.app.model.po.config.build.PackerConfig;

+ 34 - 28
dmaster/src/main/java/cn/reghao/autodop/dmaster/mqttsub/MessageListener.java

@@ -1,15 +1,15 @@
 package cn.reghao.autodop.dmaster.mqttsub;
 
+import cn.reghao.autodop.common.mqtt.DefaultMqttClient;
 import cn.reghao.autodop.common.msg.Message;
 import cn.reghao.autodop.common.msg.MsgType;
 import cn.reghao.autodop.common.msg.pub.PubMsg;
-import cn.reghao.autodop.common.msg.pub.clazz.PubClazz;
-import cn.reghao.autodop.common.msg.sub.RpcPayload;
-import cn.reghao.autodop.common.msg.sub.SubMsg;
-import cn.reghao.autodop.common.msg.sub.RpcResult;
-import cn.reghao.autodop.common.msg.sub.clazz.SubClazz;
-import cn.reghao.autodop.dmaster.mqttsub.impl.rpcresult.AppRpcClazzResultHandler;
-import cn.reghao.autodop.dmaster.mqttsub.impl.pub.NodePubClazzHandler;
+import cn.reghao.autodop.common.msg.pub.constant.PubClazz;
+import cn.reghao.autodop.common.msg.rpc.RpcParamMsg;
+import cn.reghao.autodop.common.msg.rpc.RpcResultMsg;
+import cn.reghao.autodop.common.msg.rpc.constant.RpcClazz;
+import cn.reghao.autodop.dmaster.mqttsub.impl.rpcresult.AppRpcClazzResultDispatcher;
+import cn.reghao.autodop.dmaster.mqttsub.impl.pub.NodePubClazzDispatcher;
 import cn.reghao.jdkutil.serializer.JsonConverter;
 import lombok.extern.slf4j.Slf4j;
 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
@@ -26,12 +26,15 @@ import org.springframework.stereotype.Component;
 @Slf4j
 @Component
 public class MessageListener implements MqttCallback {
-    private final NodePubClazzHandler nodePubClazzHandler;
-    private final AppRpcClazzResultHandler appRpcClazzResult;
+    private final NodePubClazzDispatcher nodePubClazzDispatcher;
+    private final AppRpcClazzResultDispatcher appRpcClazzResult;
+    private final DefaultMqttClient defaultMqttClient;
 
-    public MessageListener(NodePubClazzHandler nodePubClazzHandler, AppRpcClazzResultHandler appRpcClazzResult) {
-        this.nodePubClazzHandler = nodePubClazzHandler;
+    public MessageListener(NodePubClazzDispatcher nodePubClazzDispatcher, AppRpcClazzResultDispatcher appRpcClazzResult,
+                           DefaultMqttClient defaultMqttClient) {
+        this.nodePubClazzDispatcher = nodePubClazzDispatcher;
         this.appRpcClazzResult = appRpcClazzResult;
+        this.defaultMqttClient = defaultMqttClient;
     }
 
     @Override
@@ -41,15 +44,16 @@ public class MessageListener implements MqttCallback {
                 log.info("重新发送的消息");
             }
 
-            String msg = message.toString();
-            Message message1 = JsonConverter.jsonToObject(msg, Message.class);
-            String msgType = message1.getMsgType();
+            Message msg = JsonConverter.jsonToObject(message.toString(), Message.class);
+            String msgId = msg.getMsgId();
+            long sendTime = msg.getSendTime();
+            String msgType = msg.getMsgType();
             switch (MsgType.valueOf(msgType)) {
                 case pub:
-                    processPubMsg(message1.getPubMsg());
+                    dispatchPubMsg(msg.getPubMsg());
                     break;
-                case sub:
-                    processRpcResult(message1.getRpcMsg());
+                case rpcResult:
+                    dispatchRpcResult(msgId, msg.getResultMsg());
                     break;
                 default:
             }
@@ -58,28 +62,30 @@ public class MessageListener implements MqttCallback {
         }
     }
 
-    private void processPubMsg(PubMsg pubMsg) {
+    private void dispatchPubMsg(PubMsg pubMsg) {
         String clazz = pubMsg.getClazz();
         String method = pubMsg.getMethod();
         String jsonPayload = pubMsg.getJsonPayload();
         switch (PubClazz.valueOf(clazz)) {
             case NodePubClazz:
-                nodePubClazzHandler.process(method, jsonPayload);
+                nodePubClazzDispatcher.dispatch(pubMsg);
                 break;
             default:
                 ;
         }
     }
 
-    private void processRpcResult(SubMsg rpcMsg) {
-        String msgId = rpcMsg.getMsgId();
-        RpcPayload rpcPayload = rpcMsg.getRpcPayload();
-        String clazz = rpcPayload.getClazz();
-        String method = rpcPayload.getMethod();
-        RpcResult rpcResult = rpcMsg.getRpcResult();
-        switch (SubClazz.valueOf(clazz)) {
-            case AppSubClazz:
-                appRpcClazzResult.process(msgId, method, rpcResult);
+    private void dispatchRpcResult(String msgId, RpcResultMsg resultMsg) {
+        RpcParamMsg paramMsg = defaultMqttClient.getRecord(msgId);
+        if (paramMsg == null) {
+            return;
+        }
+
+        String clazz = paramMsg.getClazz();
+        String method = paramMsg.getMethod();
+        switch (RpcClazz.valueOf(clazz)) {
+            case AppRpcClazz:
+                appRpcClazzResult.process(msgId, method, resultMsg);
                 break;
             default:
                 ;

+ 33 - 3
dmaster/src/main/java/cn/reghao/autodop/dmaster/mqttsub/impl/pub/NodePubClazzHandler.java → dmaster/src/main/java/cn/reghao/autodop/dmaster/mqttsub/impl/pub/NodePubClazzDispatcher.java

@@ -1,6 +1,8 @@
 package cn.reghao.autodop.dmaster.mqttsub.impl.pub;
 
-import cn.reghao.autodop.common.msg.pub.clazz.NodePubClazz;
+import cn.reghao.autodop.common.msg.ClazzDispatcher;
+import cn.reghao.autodop.common.msg.pub.PubMsg;
+import cn.reghao.autodop.common.msg.pub.constant.NodePubClazz;
 import cn.reghao.autodop.common.msg.pub.dto.node.NodeLogDTO;
 import cn.reghao.jdkutil.machine.data.detail.MachineDetail;
 import cn.reghao.jdkutil.machine.data.stat.MachineStat;
@@ -14,13 +16,41 @@ import org.springframework.stereotype.Component;
  */
 @Slf4j
 @Component
-public class NodePubClazzHandler {
+public class NodePubClazzDispatcher implements ClazzDispatcher<PubMsg, Void> {
     private final NodePubClazzImpl nodePubClazz;
 
-    public NodePubClazzHandler(NodePubClazzImpl nodePubClazz) {
+    public NodePubClazzDispatcher(NodePubClazzImpl nodePubClazz) {
         this.nodePubClazz = nodePubClazz;
     }
 
+    @Override
+    public Void dispatch(PubMsg pubMsg) {
+        String method = pubMsg.getMethod();
+        String jsonPayload = pubMsg.getJsonPayload();
+        switch (NodePubClazz.valueOf(method)) {
+            case start:
+                MachineDetail machineDetail = JsonConverter.jsonToObject(jsonPayload, MachineDetail.class);
+                nodePubClazz.start(machineDetail);
+                break;
+            case heartbeat:
+                MachineStat machineStat = JsonConverter.jsonToObject(jsonPayload, MachineStat.class);
+                nodePubClazz.heartbeat(machineStat);
+                break;
+            case shutdown:
+                MachineStat machineStat1 = JsonConverter.jsonToObject(jsonPayload, MachineStat.class);
+                nodePubClazz.shutdown(machineStat1);
+                break;
+            case log:
+                NodeLogDTO nodeLogDTO = JsonConverter.jsonToObject(jsonPayload, NodeLogDTO.class);
+                nodePubClazz.log(nodeLogDTO);
+                break;
+            default:
+                ;
+        }
+
+        return null;
+    }
+
     public void process(String method, String jsonPayload) {
         switch (NodePubClazz.valueOf(method)) {
             case start:

+ 6 - 1
dmaster/src/main/java/cn/reghao/autodop/dmaster/mqttsub/impl/pub/NodePubClazzImpl.java

@@ -1,5 +1,6 @@
 package cn.reghao.autodop.dmaster.mqttsub.impl.pub;
 
+import cn.reghao.autodop.common.msg.pub.clazz.INodePubClazz;
 import cn.reghao.autodop.common.msg.pub.dto.node.NodeLogDTO;
 import cn.reghao.autodop.common.msg.pub.dto.node.constant.NodeStatus;
 import cn.reghao.autodop.dmaster.machine.db.crud.MachineHostCrud;
@@ -19,7 +20,7 @@ import org.springframework.stereotype.Service;
  * @date 2021-10-15 16:39:48
  */
 @Service
-public class NodePubClazzImpl {
+public class NodePubClazzImpl implements INodePubClazz {
     private final MachineInfoQuery infoQuery;
     private final MachineInfoCrud infoCrud;
     private final MachineHostQuery hostQuery;
@@ -37,6 +38,7 @@ public class NodePubClazzImpl {
     }
 
     // TODO MachineInfo 和 MachineHost 应该在一个事务内完成
+    @Override
     public void start(MachineDetail machineDetail) {
         String machineId = machineDetail.getMachineId();
         MachineInfo machineInfo = infoQuery.findByMachineId(machineId);
@@ -58,6 +60,7 @@ public class NodePubClazzImpl {
     }
 
     // TODO 根据 MachineStat 中的信息检查系统负载,CPU,内存,磁盘等使用情况(暂未实现)
+    @Override
     public void heartbeat(MachineStat machineStat) {
         String machineId = machineStat.getMachineId();
         MachineHost machineHost = hostQuery.findByMachineId(machineId);
@@ -67,6 +70,7 @@ public class NodePubClazzImpl {
         }
     }
 
+    @Override
     public void shutdown(MachineStat machineStat) {
         String machineId = machineStat.getMachineId();
         MachineHost machineHost = hostQuery.findByMachineId(machineId);
@@ -77,6 +81,7 @@ public class NodePubClazzImpl {
         }
     }
 
+    @Override
     public void log(NodeLogDTO nodeLogDTO) {
         nodeLogCrud.save(new NodeLog(nodeLogDTO));
     }

+ 17 - 10
dmaster/src/main/java/cn/reghao/autodop/dmaster/mqttsub/impl/rpcresult/AppRpcClazzResultHandler.java → dmaster/src/main/java/cn/reghao/autodop/dmaster/mqttsub/impl/rpcresult/AppRpcClazzResultDispatcher.java

@@ -1,9 +1,11 @@
 package cn.reghao.autodop.dmaster.mqttsub.impl.rpcresult;
 
-import cn.reghao.autodop.common.msg.sub.RpcResult;
-import cn.reghao.autodop.common.msg.sub.clazz.AppSubClazz;
-import cn.reghao.autodop.common.msg.sub.dto.app.AppStatus;
-import cn.reghao.autodop.common.msg.sub.dto.app.DeployResult;
+import cn.reghao.autodop.common.msg.ClazzDispatcher;
+import cn.reghao.autodop.common.msg.Message;
+import cn.reghao.autodop.common.msg.rpc.RpcResultMsg;
+import cn.reghao.autodop.common.msg.rpc.constant.AppRpcClazz;
+import cn.reghao.autodop.common.msg.rpc.dto.app.StatusResult;
+import cn.reghao.autodop.common.msg.rpc.dto.app.DeployResult;
 import cn.reghao.autodop.common.mqtt.DefaultMqttClient;
 import cn.reghao.jdkutil.result.ResultStatus;
 import cn.reghao.jdkutil.serializer.JsonConverter;
@@ -11,23 +13,28 @@ import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Component;
 
 /**
- * 分发 AppSubClazz 相关的消息
+ * 分发 AppRpcClazz 相关的消息
  *
  * @author reghao
  * @date 2020-12-30 10:26:47
  */
 @Slf4j
 @Component
-public class AppRpcClazzResultHandler {
+public class AppRpcClazzResultDispatcher implements ClazzDispatcher<Message, Void> {
     private final AppRpcClazzResultImpl appRpcClazzResult;
     private final DefaultMqttClient mqttClient;
 
-    public AppRpcClazzResultHandler(AppRpcClazzResultImpl appRpcClazzResult, DefaultMqttClient mqttClient) {
+    public AppRpcClazzResultDispatcher(AppRpcClazzResultImpl appRpcClazzResult, DefaultMqttClient mqttClient) {
         this.appRpcClazzResult = appRpcClazzResult;
         this.mqttClient = mqttClient;
     }
 
-    public void process(String msgId, String method, RpcResult rpcResult) {
+    @Override
+    public Void dispatch(Message message) {
+        return null;
+    }
+
+    public void process(String msgId, String method, RpcResultMsg rpcResult) {
         mqttClient.removeRecord(msgId);
         int code = rpcResult.getCode();
         if (code != ResultStatus.SUCCESS.getCode()) {
@@ -36,7 +43,7 @@ public class AppRpcClazzResultHandler {
             return;
         }
 
-        switch (AppSubClazz.valueOf(method)) {
+        switch (AppRpcClazz.valueOf(method)) {
             case deploy:
                 processDeployResult(rpcResult.getJsonData());
                 break;
@@ -59,7 +66,7 @@ public class AppRpcClazzResultHandler {
     }
 
     private void processStatusResult(String payload) {
-        AppStatus appStatus = JsonConverter.jsonToObject(payload, AppStatus.class);
+        StatusResult appStatus = JsonConverter.jsonToObject(payload, StatusResult.class);
         if (appStatus != null) {
             appRpcClazzResult.statusResult(appStatus);
         }

+ 3 - 3
dmaster/src/main/java/cn/reghao/autodop/dmaster/mqttsub/impl/rpcresult/AppRpcClazzResultImpl.java

@@ -2,8 +2,8 @@ package cn.reghao.autodop.dmaster.mqttsub.impl.rpcresult;
 
 import cn.reghao.autodop.common.http.DefaultWebRequest;
 import cn.reghao.autodop.common.http.WebRequest;
-import cn.reghao.autodop.common.msg.sub.dto.app.AppStatus;
-import cn.reghao.autodop.common.msg.sub.dto.app.DeployResult;
+import cn.reghao.autodop.common.msg.rpc.dto.app.StatusResult;
+import cn.reghao.autodop.common.msg.rpc.dto.app.DeployResult;
 import cn.reghao.autodop.dmaster.app.db.query.config.AppConfigQuery;
 import cn.reghao.autodop.dmaster.app.db.repository.AppDeployingRepository;
 import cn.reghao.autodop.dmaster.app.db.repository.AppRunningRepository;
@@ -107,7 +107,7 @@ public class AppRpcClazzResultImpl {
         });
     }
 
-    public void statusResult(AppStatus appStatus) {
+    public void statusResult(StatusResult appStatus) {
         String appId = appStatus.getAppId();
         String machineId = appStatus.getMachineId();
         AppRunning appRunning = runningRepository.findByAppIdAndMachineId(appId, machineId);