Browse Source

RPC over MQTT 开发中

reghao 4 years ago
parent
commit
6ff918eb42
76 changed files with 1003 additions and 782 deletions
  1. 0 6
      common/pom.xml
  2. 9 20
      common/src/main/java/cn/reghao/autodop/common/dagent/machine/Machine.java
  3. 2 3
      common/src/main/java/cn/reghao/autodop/common/log/Appenders.java
  4. 23 22
      common/src/main/java/cn/reghao/autodop/common/log/MqttAppender.java
  5. 43 0
      common/src/main/java/cn/reghao/autodop/common/msg/Message.java
  6. 17 0
      common/src/main/java/cn/reghao/autodop/common/msg/MsgQueue.java
  7. 9 0
      common/src/main/java/cn/reghao/autodop/common/msg/MsgType.java
  8. 26 0
      common/src/main/java/cn/reghao/autodop/common/msg/pub/PubMsg.java
  9. 9 0
      common/src/main/java/cn/reghao/autodop/common/msg/pub/clazz/NodeClazz.java
  10. 9 0
      common/src/main/java/cn/reghao/autodop/common/msg/pub/clazz/PubClazz.java
  11. 5 3
      common/src/main/java/cn/reghao/autodop/common/msg/pub/dto/node/NodeAppDTO.java
  12. 1 1
      common/src/main/java/cn/reghao/autodop/common/msg/pub/dto/node/NodeAppLogDTO.java
  13. 1 1
      common/src/main/java/cn/reghao/autodop/common/msg/pub/dto/node/constant/AppId.java
  14. 1 1
      common/src/main/java/cn/reghao/autodop/common/msg/pub/dto/node/constant/AppStatus.java
  15. 35 0
      common/src/main/java/cn/reghao/autodop/common/msg/rpc/RpcMsg.java
  16. 6 6
      common/src/main/java/cn/reghao/autodop/common/msg/rpc/RpcPayload.java
  17. 1 1
      common/src/main/java/cn/reghao/autodop/common/msg/rpc/RpcResult.java
  18. 1 1
      common/src/main/java/cn/reghao/autodop/common/msg/rpc/clazz/AppClazz.java
  19. 1 2
      common/src/main/java/cn/reghao/autodop/common/msg/rpc/clazz/DockerClazz.java
  20. 9 0
      common/src/main/java/cn/reghao/autodop/common/msg/rpc/clazz/MachineClazz.java
  21. 2 2
      common/src/main/java/cn/reghao/autodop/common/msg/rpc/clazz/RpcClazz.java
  22. 1 1
      common/src/main/java/cn/reghao/autodop/common/msg/rpc/dto/app/AppIdParam.java
  23. 1 1
      common/src/main/java/cn/reghao/autodop/common/msg/rpc/dto/app/AppStatus.java
  24. 1 1
      common/src/main/java/cn/reghao/autodop/common/msg/rpc/dto/app/DeployParam.java
  25. 1 1
      common/src/main/java/cn/reghao/autodop/common/msg/rpc/dto/app/DeployResult.java
  26. 1 1
      common/src/main/java/cn/reghao/autodop/common/msg/rpc/dto/app/PackType.java
  27. 1 1
      common/src/main/java/cn/reghao/autodop/common/msg/rpc/dto/machine/cpu/CpuInfo.java
  28. 1 1
      common/src/main/java/cn/reghao/autodop/common/msg/rpc/dto/machine/cpu/CpuStat.java
  29. 1 1
      common/src/main/java/cn/reghao/autodop/common/msg/rpc/dto/machine/cpu/CpuStats.java
  30. 1 1
      common/src/main/java/cn/reghao/autodop/common/msg/rpc/dto/machine/disk/DiskInfo.java
  31. 1 1
      common/src/main/java/cn/reghao/autodop/common/msg/rpc/dto/machine/disk/DiskStat.java
  32. 1 1
      common/src/main/java/cn/reghao/autodop/common/msg/rpc/dto/machine/mem/MemoryInfo.java
  33. 0 26
      common/src/main/java/cn/reghao/autodop/common/node/msg/NodeMsg.java
  34. 0 11
      common/src/main/java/cn/reghao/autodop/common/node/msg/constant/NodeOps.java
  35. 0 42
      common/src/main/java/cn/reghao/autodop/common/node/rpc/RpcMsg.java
  36. 0 15
      common/src/main/java/cn/reghao/autodop/common/node/rpc/RpcQueue.java
  37. 0 10
      common/src/main/java/cn/reghao/autodop/common/node/rpc/clazz/MachineClazz.java
  38. 140 0
      common/src/main/java/cn/reghao/autodop/common/util/thread/ThreadFactoryBuilder.java
  39. 48 0
      common/src/main/java/cn/reghao/autodop/common/util/thread/ThreadPoolWrapper.java
  40. 5 5
      dagent/src/main/java/cn/reghao/autodop/dagent/app/App.java
  41. 2 2
      dagent/src/main/java/cn/reghao/autodop/dagent/app/AppService.java
  42. 2 2
      dagent/src/main/java/cn/reghao/autodop/dagent/app/DockerAppServiceImpl.java
  43. 2 2
      dagent/src/main/java/cn/reghao/autodop/dagent/app/ZipAppServiceImpl.java
  44. 0 38
      dagent/src/main/java/cn/reghao/autodop/dagent/machine/DagentHeartbeatJob.java
  45. 0 64
      dagent/src/main/java/cn/reghao/autodop/dagent/machine/MachineScheduler.java
  46. 112 0
      dagent/src/main/java/cn/reghao/autodop/dagent/machine/NodeOpsImpl.java
  47. 0 83
      dagent/src/main/java/cn/reghao/autodop/dagent/mqttsub/DmasterMsgDispatcher.java
  48. 0 71
      dagent/src/main/java/cn/reghao/autodop/dagent/mqttsub/processor/app/AppOpsProcessor.java
  49. 0 17
      dagent/src/main/java/cn/reghao/autodop/dagent/mqttsub/processor/docker/DockerOpsProcessor.java
  50. 0 54
      dagent/src/main/java/cn/reghao/autodop/dagent/mqttsub/processor/machine/MachineOpsProcessor.java
  51. 33 17
      dagent/src/main/java/cn/reghao/autodop/dagent/rpc/RpcListener.java
  52. 179 42
      dagent/src/main/java/cn/reghao/autodop/dagent/rpc/impl/AppClazzImpl.java
  53. 33 0
      dagent/src/main/java/cn/reghao/autodop/dagent/rpc/impl/DockerClazzImpl.java
  54. 21 11
      dagent/src/main/java/cn/reghao/autodop/dagent/rpc/impl/MachineClazzImpl.java
  55. 20 43
      dagent/src/main/java/cn/reghao/autodop/dagent/spring/DagentLifecycle.java
  56. 6 0
      dmaster/pom.xml
  57. 1 1
      dmaster/src/main/java/cn/reghao/autodop/dmaster/app/controller/page/AppConfigPageController.java
  58. 1 1
      dmaster/src/main/java/cn/reghao/autodop/dmaster/app/controller/page/BuildConfigPageController.java
  59. 1 1
      dmaster/src/main/java/cn/reghao/autodop/dmaster/app/model/po/AppDeploying.java
  60. 1 1
      dmaster/src/main/java/cn/reghao/autodop/dmaster/app/model/po/AppRunning.java
  61. 1 1
      dmaster/src/main/java/cn/reghao/autodop/dmaster/app/model/po/config/build/PackerConfig.java
  62. 2 2
      dmaster/src/main/java/cn/reghao/autodop/dmaster/app/model/po/log/DeployLog.java
  63. 2 2
      dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/AppOpsResultService.java
  64. 1 1
      dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/AppStatusService.java
  65. 1 1
      dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/BuildDeployDispatcher.java
  66. 1 1
      dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/bd/AppDeployer.java
  67. 1 1
      dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/bd/AppIntegrate.java
  68. 2 2
      dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/bd/DeployNotifyMsg.java
  69. 1 1
      dmaster/src/main/java/cn/reghao/autodop/dmaster/machine/service/ssh/WebSshImpl.java
  70. 43 17
      dmaster/src/main/java/cn/reghao/autodop/dmaster/mqttsub/DagentMsgDispatcher.java
  71. 100 0
      dmaster/src/main/java/cn/reghao/autodop/dmaster/mqttsub/MessageListener.java
  72. 0 68
      dmaster/src/main/java/cn/reghao/autodop/dmaster/mqttsub/RpcListener.java
  73. 2 2
      dmaster/src/main/java/cn/reghao/autodop/dmaster/mqttsub/processor/AppOpsProcessor.java
  74. 1 1
      dmaster/src/main/java/cn/reghao/autodop/dmaster/notification/service/NotifyService.java
  75. 17 13
      dmaster/src/main/java/cn/reghao/autodop/dmaster/spring/DmasterLifecycle.java
  76. 0 30
      dmaster/src/main/java/cn/reghao/autodop/dmaster/spring/thread/ThreadPoolWrapper.java

+ 0 - 6
common/pom.xml

@@ -53,11 +53,5 @@
             <artifactId>httpmime</artifactId>
             <version>4.5.8</version>
         </dependency>
-
-        <dependency>
-            <groupId>org.quartz-scheduler</groupId>
-            <artifactId>quartz</artifactId>
-            <version>2.3.2</version>
-        </dependency>
     </dependencies>
 </project>

+ 9 - 20
common/src/main/java/cn/reghao/autodop/common/dagent/machine/Machine.java

@@ -1,12 +1,10 @@
 package cn.reghao.autodop.common.dagent.machine;
 
-import cn.reghao.autodop.common.dagent.machine.shell.MachineShell;
 import cn.reghao.autodop.common.dagent.machine.cpu.Cpu;
 import cn.reghao.autodop.common.dagent.machine.disk.Disk;
 import cn.reghao.autodop.common.dagent.machine.memory.Memory;
 import cn.reghao.autodop.common.dagent.machine.network.Network;
 import cn.reghao.autodop.common.dagent.machine.os.Os;
-import cn.reghao.autodop.common.message.CallResult;
 import cn.reghao.autodop.common.util.MachineId;
 import org.springframework.stereotype.Component;
 
@@ -19,21 +17,20 @@ import java.time.LocalDateTime;
  */
 @Component
 public class Machine {
-    private Cpu cpu;
-    private Memory memory;
-    private Disk disk;
-    private Network network;
-    private Os os;
-    private String machineId;
-
-    public Machine(MachineId machineId, Cpu cpu, Memory memory, Disk disk, Network network, Os os)
-            throws IOException {
+    private final Cpu cpu;
+    private final Memory memory;
+    private final Disk disk;
+    private final Network network;
+    private final Os os;
+    private final String machineId;
+
+    public Machine(Cpu cpu, Memory memory, Disk disk, Network network, Os os) throws IOException {
         this.cpu = cpu;
         this.memory = memory;
         this.disk = disk;
         this.network = network;
         this.os = os;
-        this.machineId = machineId.id();
+        this.machineId = MachineId.id();
     }
 
     public MachineInfo registry() {
@@ -56,12 +53,4 @@ public class Machine {
         machineStat.setDiskInfos(disk.info());
         return machineStat;
     }
-
-    public CallResult<MachineStat> stat() {
-        return CallResult.success(heartbeat());
-    }
-
-    public CallResult<MachineShell> shell(String script) {
-        return CallResult.success(null);
-    }
 }

+ 2 - 3
common/src/main/java/cn/reghao/autodop/common/log/Appenders.java

@@ -16,14 +16,13 @@ import org.slf4j.LoggerFactory;
 public class Appenders {
     private static LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
 
-    public static Appender<ILoggingEvent> mqttAppender(String machineId, String machineIpv4, String appId,
-                                                DefaultMqttClient mqttClient) {
+    public static Appender<ILoggingEvent> mqttAppender(String machineId, String appId, DefaultMqttClient mqttClient) {
         PatternLayoutEncoder layoutEncoder = new PatternLayoutEncoder();
         layoutEncoder.setPattern("%date %level [%thread] %logger{10} [%file:%line] %msg%n");
         layoutEncoder.setContext(loggerContext);
         layoutEncoder.start();
 
-        MqttAppender mqttAppender = new MqttAppender(machineId, machineIpv4, appId, mqttClient);
+        MqttAppender mqttAppender = new MqttAppender(machineId, appId, mqttClient);
         mqttAppender.setContext(loggerContext);
         mqttAppender.start();
         return mqttAppender;

+ 23 - 22
common/src/main/java/cn/reghao/autodop/common/log/MqttAppender.java

@@ -6,6 +6,11 @@ import cn.reghao.autodop.common.message.AsyncMsg;
 import cn.reghao.autodop.common.message.MessageType;
 import cn.reghao.autodop.common.message.ops.DagentOps;
 import cn.reghao.autodop.common.mqtt.DefaultMqttClient;
+import cn.reghao.autodop.common.msg.Message;
+import cn.reghao.autodop.common.msg.MsgQueue;
+import cn.reghao.autodop.common.msg.pub.PubMsg;
+import cn.reghao.autodop.common.msg.pub.clazz.NodeClazz;
+import cn.reghao.autodop.common.msg.pub.dto.node.NodeAppLogDTO;
 import cn.reghao.jdkutil.converter.DateTimeConverter;
 import cn.reghao.jdkutil.serializer.JsonConverter;
 import org.eclipse.paho.client.mqttv3.MqttException;
@@ -17,14 +22,14 @@ import org.eclipse.paho.client.mqttv3.MqttMessage;
  */
 public class MqttAppender extends UnsynchronizedAppenderBase<ILoggingEvent> {
     private final String machineId;
-    private final String machineIpv4;
     private final String appId;
+    private final String topic;
     private final DefaultMqttClient mqttClient;
 
-    public MqttAppender(String machineId, String machineIpv4, String appId, DefaultMqttClient mqttClient) {
+    public MqttAppender(String machineId, String appId, DefaultMqttClient mqttClient) {
         this.machineId = machineId;
-        this.machineIpv4 = machineIpv4;
         this.appId = appId;
+        this.topic = MsgQueue.dmasterTopic();
         this.mqttClient = mqttClient;
     }
 
@@ -40,30 +45,26 @@ public class MqttAppender extends UnsynchronizedAppenderBase<ILoggingEvent> {
 
     @Override
     protected void append(ILoggingEvent event) {
-        String json = JsonConverter.objectToJson(runtimeLog(event));
-        AsyncMsg asyncMsg = AsyncMsg.asyncMsg(machineId, MessageType.dagentType.name(), DagentOps.dagentLog.name(), json);
-        String payload = JsonConverter.objectToJson(asyncMsg);
-
-        MqttMessage message = new MqttMessage();
-        message.setQos(0);
-        message.setPayload(payload.getBytes());
+        NodeAppLogDTO nodeAppLogDTO = nodeAppLog(event);
+        String jsonPayload = JsonConverter.objectToJson(nodeAppLogDTO);
+        PubMsg pubMsg = PubMsg.pubMsg(NodeClazz.class.getSimpleName(), NodeClazz.log.name(), jsonPayload);
+        Message message = Message.pubMessage("", "", pubMsg);
         try {
-            mqttClient.pub("dmaster", 1, payload);
+            mqttClient.pub(topic, 1, JsonConverter.objectToJson(message));
         } catch (MqttException e) {
             e.printStackTrace();
         }
     }
 
-    private RuntimeLog runtimeLog(ILoggingEvent event) {
-        RuntimeLog runtimeLog = new RuntimeLog();
-        runtimeLog.setMachineId(machineId);
-        runtimeLog.setMachineIpv4(machineIpv4);
-        runtimeLog.setAppId(appId);
-        runtimeLog.setLogTime(DateTimeConverter.localDateTime(event.getTimeStamp()));
-        runtimeLog.setLevel(event.getLevel().toString());
-        runtimeLog.setThreadName(event.getThreadName());
-        runtimeLog.setLoggerName(event.getLoggerName());
-        runtimeLog.setMessage(event.getFormattedMessage());
-        return runtimeLog;
+    private NodeAppLogDTO nodeAppLog(ILoggingEvent event) {
+        NodeAppLogDTO nodeAppLogDTO = new NodeAppLogDTO();
+        nodeAppLogDTO.setNodeId(machineId);
+        nodeAppLogDTO.setAppId(appId);
+        nodeAppLogDTO.setTimestamp(event.getTimeStamp());
+        nodeAppLogDTO.setLevel(event.getLevel().toString());
+        nodeAppLogDTO.setThreadName(event.getThreadName());
+        nodeAppLogDTO.setLoggerName(event.getLoggerName());
+        nodeAppLogDTO.setMessage(event.getFormattedMessage());
+        return nodeAppLogDTO;
     }
 }

+ 43 - 0
common/src/main/java/cn/reghao/autodop/common/msg/Message.java

@@ -0,0 +1,43 @@
+package cn.reghao.autodop.common.msg;
+
+import cn.reghao.autodop.common.msg.pub.PubMsg;
+import cn.reghao.autodop.common.msg.rpc.RpcMsg;
+import lombok.Data;
+
+import java.io.Serializable;
+
+/**
+ * @author reghao
+ * @date 2021-08-25 17:19:41
+ */
+@Data
+public class Message implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private String msgType;
+    private String destination;
+    private String originator;
+    private PubMsg pubMsg;
+    private RpcMsg rpcMsg;
+    private long sendTime;
+
+    public static Message pubMessage(String destination, String originator, PubMsg pubMsg) {
+        Message message = new Message();
+        message.setMsgType(MsgType.pub.name());
+        message.setDestination(destination);
+        message.setOriginator(originator);
+        message.setPubMsg(pubMsg);
+        message.setSendTime(System.currentTimeMillis());
+        return message;
+    }
+
+    public static Message rpcMessage(String destination, String originator, RpcMsg rpcMsg) {
+        Message message = new Message();
+        message.setMsgType(MsgType.rpc.name());
+        message.setDestination(destination);
+        message.setOriginator(originator);
+        message.setRpcMsg(rpcMsg);
+        message.setSendTime(System.currentTimeMillis());
+        return message;
+    }
+}

+ 17 - 0
common/src/main/java/cn/reghao/autodop/common/msg/MsgQueue.java

@@ -0,0 +1,17 @@
+package cn.reghao.autodop.common.msg;
+
+import cn.reghao.autodop.common.msg.pub.dto.node.constant.AppId;
+
+/**
+ * @author reghao
+ * @date 2021-09-14 17:10:31
+ */
+public class MsgQueue {
+    public static String dmasterTopic() {
+        return String.format("sub.%s", AppId.dmaster.name());
+    }
+
+    public static String dagentTopic(String nodeId) {
+        return String.format("rpc.%s.%s", nodeId, AppId.dagent.name());
+    }
+}

+ 9 - 0
common/src/main/java/cn/reghao/autodop/common/msg/MsgType.java

@@ -0,0 +1,9 @@
+package cn.reghao.autodop.common.msg;
+
+/**
+ * @author reghao
+ * @date 2021-09-15 09:01:17
+ */
+public enum MsgType {
+    pub, rpc
+}

+ 26 - 0
common/src/main/java/cn/reghao/autodop/common/msg/pub/PubMsg.java

@@ -0,0 +1,26 @@
+package cn.reghao.autodop.common.msg.pub;
+
+import lombok.Data;
+
+import java.io.Serializable;
+
+/**
+ * @author reghao
+ * @date 2021-09-15 08:57:55
+ */
+@Data
+public class PubMsg implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private String clazz;
+    private String method;
+    private String jsonPayload;
+
+    public static PubMsg pubMsg(String clazz, String method, String jsonPayload) {
+        PubMsg pubMsg = new PubMsg();
+        pubMsg.setClazz(clazz);
+        pubMsg.setMethod(method);
+        pubMsg.setJsonPayload(jsonPayload);
+        return pubMsg;
+    }
+}

+ 9 - 0
common/src/main/java/cn/reghao/autodop/common/msg/pub/clazz/NodeClazz.java

@@ -0,0 +1,9 @@
+package cn.reghao.autodop.common.msg.pub.clazz;
+
+/**
+ * @author reghao
+ * @date 2021-09-15 09:19:14
+ */
+public enum NodeClazz {
+    start, shutdown, heartbeat, log,
+}

+ 9 - 0
common/src/main/java/cn/reghao/autodop/common/msg/pub/clazz/PubClazz.java

@@ -0,0 +1,9 @@
+package cn.reghao.autodop.common.msg.pub.clazz;
+
+/**
+ * @author reghao
+ * @date 2021-09-15 11:51:17
+ */
+public enum PubClazz {
+    NodeClazz,
+}

+ 5 - 3
common/src/main/java/cn/reghao/autodop/common/node/msg/dto/NodeAppDTO.java → common/src/main/java/cn/reghao/autodop/common/msg/pub/dto/node/NodeAppDTO.java

@@ -1,5 +1,7 @@
-package cn.reghao.autodop.common.node.msg.dto;
+package cn.reghao.autodop.common.msg.pub.dto.node;
 
+import cn.reghao.autodop.common.msg.pub.dto.node.constant.AppId;
+import cn.reghao.autodop.common.msg.pub.dto.node.constant.AppStatus;
 import lombok.Data;
 
 import java.io.Serializable;
@@ -19,12 +21,12 @@ public class NodeAppDTO implements Serializable {
     private String appStatus;
     private long heartbeatTime;
 
-    /*public NodeAppDTO(String nodeId, AppId appId, String publicIp, String privateIp, AppStatus appStatus) {
+    public NodeAppDTO(String nodeId, AppId appId, String publicIp, String privateIp, AppStatus appStatus) {
         this.nodeId = nodeId;
         this.appId = appId.name();
         this.publicIp = publicIp;
         this.privateIp = privateIp;
         this.appStatus = appStatus.name();
         this.heartbeatTime = System.currentTimeMillis();
-    }*/
+    }
 }

+ 1 - 1
common/src/main/java/cn/reghao/autodop/common/node/msg/dto/NodeAppLogDTO.java → common/src/main/java/cn/reghao/autodop/common/msg/pub/dto/node/NodeAppLogDTO.java

@@ -1,4 +1,4 @@
-package cn.reghao.autodop.common.node.msg.dto;
+package cn.reghao.autodop.common.msg.pub.dto.node;
 
 import lombok.Data;
 

+ 1 - 1
common/src/main/java/cn/reghao/autodop/common/node/msg/constant/AppId.java → common/src/main/java/cn/reghao/autodop/common/msg/pub/dto/node/constant/AppId.java

@@ -1,4 +1,4 @@
-package cn.reghao.autodop.common.node.msg.constant;
+package cn.reghao.autodop.common.msg.pub.dto.node.constant;
 
 /**
  * @author reghao

+ 1 - 1
common/src/main/java/cn/reghao/autodop/common/node/msg/constant/AppStatus.java → common/src/main/java/cn/reghao/autodop/common/msg/pub/dto/node/constant/AppStatus.java

@@ -1,4 +1,4 @@
-package cn.reghao.autodop.common.node.msg.constant;
+package cn.reghao.autodop.common.msg.pub.dto.node.constant;
 
 /**
  * @author reghao

+ 35 - 0
common/src/main/java/cn/reghao/autodop/common/msg/rpc/RpcMsg.java

@@ -0,0 +1,35 @@
+package cn.reghao.autodop.common.msg.rpc;
+
+import lombok.Data;
+
+import java.util.UUID;
+
+/**
+ * @author reghao
+ * @date 2021-09-14 17:42:43
+ */
+@Data
+public class RpcMsg {
+    private String msgId;
+    private RpcPayload rpcPayload;
+    private RpcResult<?> rpcResult;
+
+    public RpcMsg(RpcPayload rpcPayload) {
+        this.msgId = UUID.randomUUID().toString();
+        this.rpcPayload = rpcPayload;
+    }
+
+    public RpcMsg(String msgId, RpcPayload rpcPayload, RpcResult<?> rpcResult) {
+        this.msgId = msgId;
+        this.rpcPayload = rpcPayload;
+        this.rpcResult = rpcResult;
+    }
+
+    public static RpcMsg callMsg(RpcPayload rpcPayload) {
+        return new RpcMsg(rpcPayload);
+    }
+
+    public static RpcMsg resultMsg(RpcMsg callMsg, RpcResult<?> rpcResult) {
+        return new RpcMsg(callMsg.getMsgId(), callMsg.getRpcPayload(), rpcResult);
+    }
+}

+ 6 - 6
common/src/main/java/cn/reghao/autodop/common/node/rpc/JsonRpc.java → common/src/main/java/cn/reghao/autodop/common/msg/rpc/RpcPayload.java

@@ -1,4 +1,4 @@
-package cn.reghao.autodop.common.node.rpc;
+package cn.reghao.autodop.common.msg.rpc;
 
 import lombok.Data;
 
@@ -9,22 +9,22 @@ import java.io.Serializable;
  * @date 2021-08-25 17:19:41
  */
 @Data
-public class JsonRpc implements Serializable {
+public class RpcPayload implements Serializable {
     private static final long serialVersionUID = 1L;
 
     private String clazz;
     private String method;
     private Object param;
 
-    public static  JsonRpc jsonRpc(String clazz, String method) {
-        JsonRpc nodeMsg = new JsonRpc();
+    public static RpcPayload rpcPayload(String clazz, String method) {
+        RpcPayload nodeMsg = new RpcPayload();
         nodeMsg.setClazz(clazz);
         nodeMsg.setMethod(method);
         return nodeMsg;
     }
 
-    public static  JsonRpc jsonRpc(String clazz, String method, Object param) {
-        JsonRpc nodeMsg = new JsonRpc();
+    public static RpcPayload rpcPayload(String clazz, String method, Object param) {
+        RpcPayload nodeMsg = new RpcPayload();
         nodeMsg.setClazz(clazz);
         nodeMsg.setMethod(method);
         nodeMsg.setParam(param);

+ 1 - 1
common/src/main/java/cn/reghao/autodop/common/node/rpc/RpcResult.java → common/src/main/java/cn/reghao/autodop/common/msg/rpc/RpcResult.java

@@ -1,4 +1,4 @@
-package cn.reghao.autodop.common.node.rpc;
+package cn.reghao.autodop.common.msg.rpc;
 
 import lombok.AllArgsConstructor;
 import lombok.Data;

+ 1 - 1
common/src/main/java/cn/reghao/autodop/common/node/rpc/clazz/AppClazz.java → common/src/main/java/cn/reghao/autodop/common/msg/rpc/clazz/AppClazz.java

@@ -1,4 +1,4 @@
-package cn.reghao.autodop.common.node.rpc.clazz;
+package cn.reghao.autodop.common.msg.rpc.clazz;
 
 /**
  * @author reghao

+ 1 - 2
common/src/main/java/cn/reghao/autodop/common/node/rpc/clazz/DockerClazz.java → common/src/main/java/cn/reghao/autodop/common/msg/rpc/clazz/DockerClazz.java

@@ -1,4 +1,4 @@
-package cn.reghao.autodop.common.node.rpc.clazz;
+package cn.reghao.autodop.common.msg.rpc.clazz;
 
 /**
  * @author reghao
@@ -6,5 +6,4 @@ package cn.reghao.autodop.common.node.rpc.clazz;
  */
 public enum DockerClazz {
     dockerImage,
-    dockerImageResult,
 }

+ 9 - 0
common/src/main/java/cn/reghao/autodop/common/msg/rpc/clazz/MachineClazz.java

@@ -0,0 +1,9 @@
+package cn.reghao.autodop.common.msg.rpc.clazz;
+
+/**
+ * @author reghao
+ * @date 2020-12-25 19:15:00
+ */
+public enum MachineClazz {
+    shell, stat,
+}

+ 2 - 2
common/src/main/java/cn/reghao/autodop/common/node/rpc/clazz/Clazz.java → common/src/main/java/cn/reghao/autodop/common/msg/rpc/clazz/RpcClazz.java

@@ -1,9 +1,9 @@
-package cn.reghao.autodop.common.node.rpc.clazz;
+package cn.reghao.autodop.common.msg.rpc.clazz;
 
 /**
  * @author reghao
  * @date 2021-09-14 18:15:05
  */
-public enum Clazz {
+public enum RpcClazz {
     AppClazz, MachineClazz, DockerClazz,
 }

+ 1 - 1
common/src/main/java/cn/reghao/autodop/common/dagent/app/AppIdParam.java → common/src/main/java/cn/reghao/autodop/common/msg/rpc/dto/app/AppIdParam.java

@@ -1,4 +1,4 @@
-package cn.reghao.autodop.common.dagent.app;
+package cn.reghao.autodop.common.msg.rpc.dto.app;
 
 import lombok.AllArgsConstructor;
 import lombok.Data;

+ 1 - 1
common/src/main/java/cn/reghao/autodop/common/dagent/app/AppStatus.java → common/src/main/java/cn/reghao/autodop/common/msg/rpc/dto/app/AppStatus.java

@@ -1,4 +1,4 @@
-package cn.reghao.autodop.common.dagent.app;
+package cn.reghao.autodop.common.msg.rpc.dto.app;
 
 import lombok.Data;
 

+ 1 - 1
common/src/main/java/cn/reghao/autodop/common/dagent/app/DeployParam.java → common/src/main/java/cn/reghao/autodop/common/msg/rpc/dto/app/DeployParam.java

@@ -1,4 +1,4 @@
-package cn.reghao.autodop.common.dagent.app;
+package cn.reghao.autodop.common.msg.rpc.dto.app;
 
 import lombok.Data;
 

+ 1 - 1
common/src/main/java/cn/reghao/autodop/common/dagent/app/DeployResult.java → common/src/main/java/cn/reghao/autodop/common/msg/rpc/dto/app/DeployResult.java

@@ -1,4 +1,4 @@
-package cn.reghao.autodop.common.dagent.app;
+package cn.reghao.autodop.common.msg.rpc.dto.app;
 
 import cn.reghao.jdkutil.result.Result;
 import lombok.Data;

+ 1 - 1
common/src/main/java/cn/reghao/autodop/common/dagent/app/PackType.java → common/src/main/java/cn/reghao/autodop/common/msg/rpc/dto/app/PackType.java

@@ -1,4 +1,4 @@
-package cn.reghao.autodop.common.dagent.app;
+package cn.reghao.autodop.common.msg.rpc.dto.app;
 
 /**
  * 打包方式类型

+ 1 - 1
common/src/main/java/cn/reghao/autodop/common/node/rpc/dto/machine/cpu/CpuInfo.java → common/src/main/java/cn/reghao/autodop/common/msg/rpc/dto/machine/cpu/CpuInfo.java

@@ -1,4 +1,4 @@
-package cn.reghao.autodop.common.node.rpc.dto.machine.cpu;
+package cn.reghao.autodop.common.msg.rpc.dto.machine.cpu;
 
 import lombok.Data;
 

+ 1 - 1
common/src/main/java/cn/reghao/autodop/common/node/rpc/dto/machine/cpu/CpuStat.java → common/src/main/java/cn/reghao/autodop/common/msg/rpc/dto/machine/cpu/CpuStat.java

@@ -1,4 +1,4 @@
-package cn.reghao.autodop.common.node.rpc.dto.machine.cpu;
+package cn.reghao.autodop.common.msg.rpc.dto.machine.cpu;
 
 import lombok.Data;
 

+ 1 - 1
common/src/main/java/cn/reghao/autodop/common/node/rpc/dto/machine/cpu/CpuStats.java → common/src/main/java/cn/reghao/autodop/common/msg/rpc/dto/machine/cpu/CpuStats.java

@@ -1,4 +1,4 @@
-package cn.reghao.autodop.common.node.rpc.dto.machine.cpu;
+package cn.reghao.autodop.common.msg.rpc.dto.machine.cpu;
 
 import lombok.Data;
 

+ 1 - 1
common/src/main/java/cn/reghao/autodop/common/node/rpc/dto/machine/disk/DiskInfo.java → common/src/main/java/cn/reghao/autodop/common/msg/rpc/dto/machine/disk/DiskInfo.java

@@ -1,4 +1,4 @@
-package cn.reghao.autodop.common.node.rpc.dto.machine.disk;
+package cn.reghao.autodop.common.msg.rpc.dto.machine.disk;
 
 import lombok.Data;
 

+ 1 - 1
common/src/main/java/cn/reghao/autodop/common/node/rpc/dto/machine/disk/DiskStat.java → common/src/main/java/cn/reghao/autodop/common/msg/rpc/dto/machine/disk/DiskStat.java

@@ -1,4 +1,4 @@
-package cn.reghao.autodop.common.node.rpc.dto.machine.disk;
+package cn.reghao.autodop.common.msg.rpc.dto.machine.disk;
 
 import lombok.Data;
 

+ 1 - 1
common/src/main/java/cn/reghao/autodop/common/node/rpc/dto/machine/mem/MemoryInfo.java → common/src/main/java/cn/reghao/autodop/common/msg/rpc/dto/machine/mem/MemoryInfo.java

@@ -1,4 +1,4 @@
-package cn.reghao.autodop.common.node.rpc.dto.machine.mem;
+package cn.reghao.autodop.common.msg.rpc.dto.machine.mem;
 
 import lombok.Data;
 

+ 0 - 26
common/src/main/java/cn/reghao/autodop/common/node/msg/NodeMsg.java

@@ -1,26 +0,0 @@
-package cn.reghao.autodop.common.node.msg;
-
-import lombok.Data;
-
-import java.io.Serializable;
-
-/**
- * @author reghao
- * @date 2021-08-25 17:19:41
- */
-@Data
-public class NodeMsg implements Serializable {
-    private static final long serialVersionUID = 1L;
-
-    private String nodeOps;
-    private String jsonPayload;
-    private long sendTime;
-
-    public static NodeMsg nodeMsg(String nodeOps, String jsonPayload) {
-        NodeMsg nodeMsg = new NodeMsg();
-        nodeMsg.setNodeOps(nodeOps);
-        nodeMsg.setJsonPayload(jsonPayload);
-        nodeMsg.setSendTime(System.currentTimeMillis());
-        return nodeMsg;
-    }
-}

+ 0 - 11
common/src/main/java/cn/reghao/autodop/common/node/msg/constant/NodeOps.java

@@ -1,11 +0,0 @@
-package cn.reghao.autodop.common.node.msg.constant;
-
-/**
- * 节点应用操作(只发布消息)
- *
- * @author reghao
- * @date 2021-08-25 17:19:41
- */
-public enum NodeOps {
-    nodeStart, nodeShutdown, nodeHeartbeat, nodeLog,
-}

+ 0 - 42
common/src/main/java/cn/reghao/autodop/common/node/rpc/RpcMsg.java

@@ -1,42 +0,0 @@
-package cn.reghao.autodop.common.node.rpc;
-
-import lombok.Data;
-
-import java.util.UUID;
-
-/**
- * @author reghao
- * @date 2021-09-14 17:42:43
- */
-@Data
-public class RpcMsg {
-    private String destination;
-    private String originator;
-    private JsonRpc jsonRpc;
-    private RpcResult<?> rpcResult;
-    private String msgId;
-    private long sendTime;
-
-    public RpcMsg() {
-        this.msgId = UUID.randomUUID().toString();
-        this.sendTime = System.currentTimeMillis();
-    }
-
-    public static RpcMsg callMsg(String destination, String originator, JsonRpc jsonRpc) {
-        RpcMsg rpcMsg = new RpcMsg();
-        rpcMsg.setDestination(destination);
-        rpcMsg.setOriginator(originator);
-        rpcMsg.setJsonRpc(jsonRpc);
-        return rpcMsg;
-    }
-
-    public static RpcMsg resultMsg(RpcMsg callMsg, RpcResult<?> rpcResult) {
-        RpcMsg rpcMsg = new RpcMsg();
-        rpcMsg.setDestination(callMsg.getOriginator());
-        rpcMsg.setOriginator(callMsg.getDestination());
-        rpcMsg.setJsonRpc(callMsg.getJsonRpc());
-        rpcMsg.setRpcResult(rpcResult);
-        rpcMsg.setMsgId(callMsg.getMsgId());
-        return rpcMsg;
-    }
-}

+ 0 - 15
common/src/main/java/cn/reghao/autodop/common/node/rpc/RpcQueue.java

@@ -1,15 +0,0 @@
-package cn.reghao.autodop.common.node.rpc;
-
-/**
- * @author reghao
- * @date 2021-09-14 17:10:31
- */
-public class RpcQueue {
-    public static String rpcQueue(String nodeId, String appId) {
-        return String.format("rpc.%s.%s", nodeId, appId);
-    }
-
-    public static String rpcTopic(String nodeId, String appId) {
-        return String.format("rpc.%s.%s", nodeId, appId);
-    }
-}

+ 0 - 10
common/src/main/java/cn/reghao/autodop/common/node/rpc/clazz/MachineClazz.java

@@ -1,10 +0,0 @@
-package cn.reghao.autodop.common.node.rpc.clazz;
-
-/**
- * @author reghao
- * @date 2020-12-25 19:15:00
- */
-public enum MachineClazz {
-    shellMethod, statMethod,
-    shellResult, statResult,
-}

+ 140 - 0
common/src/main/java/cn/reghao/autodop/common/util/thread/ThreadFactoryBuilder.java

@@ -0,0 +1,140 @@
+package cn.reghao.autodop.common.util.thread;
+
+import java.util.Locale;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * com.google.common.util.concurrent.ThreadFactoryBuilder
+ *
+ * @author reghao
+ * @date 2021-09-15 10:57:44
+ */
+public class ThreadFactoryBuilder {
+    private String nameFormat = null;
+    private Boolean daemon = null;
+    private Integer priority = null;
+    private Thread.UncaughtExceptionHandler uncaughtExceptionHandler = null;
+    private ThreadFactory backingThreadFactory = null;
+
+    /**
+     * Creates a new {@link ThreadFactory} builder.
+     */
+    public ThreadFactoryBuilder() {}
+
+    /**
+     * Sets the naming format to use when naming threads ({@link Thread#setName}) which are created
+     * with this ThreadFactory.
+     *
+     * @param nameFormat a {@link String#format(String, Object...)}-compatible format String, to which
+     *     a unique integer (0, 1, etc.) will be supplied as the single parameter. This integer will
+     *     be unique to the built instance of the ThreadFactory and will be assigned sequentially. For
+     *     example, {@code "rpc-pool-%d"} will generate thread names like {@code "rpc-pool-0"},
+     *     {@code "rpc-pool-1"}, {@code "rpc-pool-2"}, etc.
+     * @return this for the builder pattern
+     */
+    public ThreadFactoryBuilder setNameFormat(String nameFormat) {
+        String unused = format(nameFormat, 0); // fail fast if the format is bad or null
+        this.nameFormat = nameFormat;
+        return this;
+    }
+
+    /**
+     * Sets daemon or not for new threads created with this ThreadFactory.
+     *
+     * @param daemon whether or not new Threads created with this ThreadFactory will be daemon threads
+     * @return this for the builder pattern
+     */
+    public ThreadFactoryBuilder setDaemon(boolean daemon) {
+        this.daemon = daemon;
+        return this;
+    }
+
+    /**
+     * Sets the priority for new threads created with this ThreadFactory.
+     *
+     * @param priority the priority for new Threads created with this ThreadFactory
+     * @return this for the builder pattern
+     */
+    public ThreadFactoryBuilder setPriority(int priority) {
+        // Thread#setPriority() already checks for validity. These error messages
+        // are nicer though and will fail-fast.
+        this.priority = priority;
+        return this;
+    }
+
+    /**
+     * Sets the {@link Thread.UncaughtExceptionHandler} for new threads created with this ThreadFactory.
+     *
+     * @param uncaughtExceptionHandler the uncaught exception handler for new Threads created with
+     *     this ThreadFactory
+     * @return this for the builder pattern
+     */
+    public ThreadFactoryBuilder setUncaughtExceptionHandler(
+            Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
+        this.uncaughtExceptionHandler = uncaughtExceptionHandler;
+        return this;
+    }
+
+    /**
+     * Sets the backing {@link ThreadFactory} for new threads created with this ThreadFactory. Threads
+     * will be created by invoking #newThread(Runnable) on this backing {@link ThreadFactory}.
+     *
+     * @param backingThreadFactory the backing {@link ThreadFactory} which will be delegated to during
+     *     thread creation.
+     * @return this for the builder pattern
+     *
+     *
+     */
+    public ThreadFactoryBuilder setThreadFactory(ThreadFactory backingThreadFactory) {
+        this.backingThreadFactory = backingThreadFactory;
+        return this;
+    }
+
+    /**
+     * Returns a new thread factory using the options supplied during the building process. After
+     * building, it is still possible to change the options used to build the ThreadFactory and/or
+     * build again. State is not shared amongst built instances.
+     *
+     * @return the fully constructed {@link ThreadFactory}
+     */
+    public ThreadFactory build() {
+        return build(this);
+    }
+
+    private static ThreadFactory build(ThreadFactoryBuilder builder) {
+        final String nameFormat = builder.nameFormat;
+        final Boolean daemon = builder.daemon;
+        final Integer priority = builder.priority;
+        final Thread.UncaughtExceptionHandler uncaughtExceptionHandler = builder.uncaughtExceptionHandler;
+        final ThreadFactory backingThreadFactory =
+                (builder.backingThreadFactory != null)
+                        ? builder.backingThreadFactory
+                        : Executors.defaultThreadFactory();
+        final AtomicLong count = (nameFormat != null) ? new AtomicLong(0) : null;
+        return new ThreadFactory() {
+            @Override
+            public Thread newThread(Runnable runnable) {
+                Thread thread = backingThreadFactory.newThread(runnable);
+                if (nameFormat != null) {
+                    thread.setName(format(nameFormat, count.getAndIncrement()));
+                }
+                if (daemon != null) {
+                    thread.setDaemon(daemon);
+                }
+                if (priority != null) {
+                    thread.setPriority(priority);
+                }
+                if (uncaughtExceptionHandler != null) {
+                    thread.setUncaughtExceptionHandler(uncaughtExceptionHandler);
+                }
+                return thread;
+            }
+        };
+    }
+
+    private static String format(String format, Object... args) {
+        return String.format(Locale.ROOT, format, args);
+    }
+}

+ 48 - 0
common/src/main/java/cn/reghao/autodop/common/util/thread/ThreadPoolWrapper.java

@@ -0,0 +1,48 @@
+package cn.reghao.autodop.common.util.thread;
+
+import java.util.concurrent.*;
+
+/**
+ * 线程池包装类
+ *
+ * @author reghao
+ * @date 2019-11-12 16:39:45
+ */
+public class ThreadPoolWrapper {
+    /**
+     * 获取一个线程池
+     *
+     * @param name 线程池名字
+     * @return 线程池
+     * @date 2019-11-12 下午4:42
+     */
+    public static ExecutorService threadPool(String name) {
+        ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat(name + "-pool-%d").build();
+
+        return new ThreadPoolExecutor(16, 32, 200L,
+                TimeUnit.MILLISECONDS,
+                new LinkedBlockingQueue<>(32),
+                namedThreadFactory,
+                new ThreadPoolExecutor.AbortPolicy());
+    }
+
+    public static ScheduledExecutorService scheduledThreadPool(String name, int size) {
+        ThreadFactory namedThreadFactory =
+                new ThreadFactoryBuilder().setNameFormat(name + "-scheduled-pool-%d").build();
+        return new ScheduledThreadPoolExecutor(size, namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
+    }
+
+    public static void shutdownScheduler(ScheduledExecutorService scheduler) {
+        scheduler.shutdown();
+        int count = 1;
+        int max = 30;
+        while (count < max && !scheduler.isTerminated()) {
+            try {
+                Thread.sleep(1_000);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+            count++;
+        }
+    }
+}

+ 5 - 5
dagent/src/main/java/cn/reghao/autodop/dagent/app/App.java

@@ -1,10 +1,10 @@
 package cn.reghao.autodop.dagent.app;
 
-import cn.reghao.autodop.common.dagent.app.AppIdParam;
-import cn.reghao.autodop.common.dagent.app.AppStatus;
-import cn.reghao.autodop.common.dagent.app.DeployResult;
-import cn.reghao.autodop.common.dagent.app.DeployParam;
-import cn.reghao.autodop.common.dagent.app.PackType;
+import cn.reghao.autodop.common.msg.rpc.dto.app.AppIdParam;
+import cn.reghao.autodop.common.msg.rpc.dto.app.AppStatus;
+import cn.reghao.autodop.common.msg.rpc.dto.app.DeployResult;
+import cn.reghao.autodop.common.msg.rpc.dto.app.DeployParam;
+import cn.reghao.autodop.common.msg.rpc.dto.app.PackType;
 import cn.reghao.autodop.common.dagent.app.log.AppLogArgs;
 import cn.reghao.autodop.common.dagent.app.log.LogConfig;
 import cn.reghao.autodop.common.dagent.app.log.LogFile;

+ 2 - 2
dagent/src/main/java/cn/reghao/autodop/dagent/app/AppService.java

@@ -1,7 +1,7 @@
 package cn.reghao.autodop.dagent.app;
 
-import cn.reghao.autodop.common.dagent.app.AppStatus;
-import cn.reghao.autodop.common.dagent.app.DeployParam;
+import cn.reghao.autodop.common.msg.rpc.dto.app.AppStatus;
+import cn.reghao.autodop.common.msg.rpc.dto.app.DeployParam;
 import cn.reghao.autodop.common.dagent.app.log.AppLogArgs;
 import cn.reghao.autodop.common.dagent.app.log.LogFile;
 import cn.reghao.autodop.common.docker.exception.DockerException;

+ 2 - 2
dagent/src/main/java/cn/reghao/autodop/dagent/app/DockerAppServiceImpl.java

@@ -1,7 +1,7 @@
 package cn.reghao.autodop.dagent.app;
 
-import cn.reghao.autodop.common.dagent.app.AppStatus;
-import cn.reghao.autodop.common.dagent.app.DeployParam;
+import cn.reghao.autodop.common.msg.rpc.dto.app.AppStatus;
+import cn.reghao.autodop.common.msg.rpc.dto.app.DeployParam;
 import cn.reghao.autodop.common.dagent.app.log.AppLogArgs;
 import cn.reghao.autodop.common.dagent.app.log.LogConfig;
 import cn.reghao.autodop.common.dagent.app.log.LogFile;

+ 2 - 2
dagent/src/main/java/cn/reghao/autodop/dagent/app/ZipAppServiceImpl.java

@@ -1,7 +1,7 @@
 package cn.reghao.autodop.dagent.app;
 
-import cn.reghao.autodop.common.dagent.app.AppStatus;
-import cn.reghao.autodop.common.dagent.app.DeployParam;
+import cn.reghao.autodop.common.msg.rpc.dto.app.AppStatus;
+import cn.reghao.autodop.common.msg.rpc.dto.app.DeployParam;
 import cn.reghao.autodop.common.dagent.app.log.AppLogArgs;
 import cn.reghao.autodop.common.dagent.app.log.LogFile;
 import lombok.extern.slf4j.Slf4j;

+ 0 - 38
dagent/src/main/java/cn/reghao/autodop/dagent/machine/DagentHeartbeatJob.java

@@ -1,38 +0,0 @@
-package cn.reghao.autodop.dagent.machine;
-
-import cn.reghao.autodop.common.message.AsyncMsg;
-import cn.reghao.autodop.common.message.MessageType;
-import cn.reghao.autodop.common.dagent.machine.Machine;
-import cn.reghao.autodop.common.message.ops.DagentOps;
-import cn.reghao.autodop.common.mqtt.DefaultMqttClient;
-import cn.reghao.jdkutil.serializer.JsonConverter;
-import cn.reghao.autodop.dagent.spring.DagentLifecycle;
-import lombok.extern.slf4j.Slf4j;
-import org.eclipse.paho.client.mqttv3.MqttException;
-import org.quartz.*;
-
-/**
- * dagent 心跳任务
- *
- * @author reghao
- * @date 2021-02-22 21:30:17
- */
-@Slf4j
-public class DagentHeartbeatJob implements Job {
-    @Override
-    public void execute(JobExecutionContext context) {
-        JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
-
-        DefaultMqttClient mqttClient = (DefaultMqttClient) jobDataMap.get("mqttClient");
-        Machine machine = (Machine) jobDataMap.get("machine");
-
-        String payload = JsonConverter.objectToJson(machine.heartbeat());
-        AsyncMsg asyncMsg = AsyncMsg.asyncMsg(DagentLifecycle.MACHINE_ID, MessageType.dagentType.name(),
-                DagentOps.dagentHeartbeat.name(), payload);
-        try {
-            mqttClient.pub("dmaster", 0, JsonConverter.objectToJson(asyncMsg));
-        } catch (MqttException e) {
-            log.error("{}", e.getMessage());
-        }
-    }
-}

+ 0 - 64
dagent/src/main/java/cn/reghao/autodop/dagent/machine/MachineScheduler.java

@@ -1,64 +0,0 @@
-package cn.reghao.autodop.dagent.machine;
-
-import cn.reghao.autodop.common.dagent.machine.Machine;
-import cn.reghao.autodop.common.mqtt.DefaultMqttClient;
-import lombok.extern.slf4j.Slf4j;
-import org.quartz.*;
-import org.quartz.impl.StdSchedulerFactory;
-import org.springframework.stereotype.Component;
-
-/**
- * @author reghao
- * @date 2020-03-20 17:04:48
- */
-@Slf4j
-@Component
-public class MachineScheduler {
-    private Scheduler scheduler;
-    private DefaultMqttClient mqttClient;
-    private Machine machine;
-
-    public MachineScheduler(DefaultMqttClient mqttClient, Machine machine) throws SchedulerException {
-        this.scheduler = StdSchedulerFactory.getDefaultScheduler();
-        this.mqttClient = mqttClient;
-        this.machine = machine;
-    }
-
-    public void add(Class<? extends Job> clazz, String jobId, String cronExp) throws SchedulerException {
-        JobDataMap jobDataMap = new JobDataMap();
-        jobDataMap.put("mqttClient", mqttClient);
-        jobDataMap.put("machine", machine);
-
-        JobDetail jobDetail = JobBuilder.newJob(clazz)
-                .withIdentity(jobId)
-                // job 需要使用的数据
-                .usingJobData(jobDataMap)
-                .build();
-
-        TriggerBuilder<CronTrigger> triggerBuilder = TriggerBuilder
-                .newTrigger()
-                .withIdentity(jobId + "@trigger")
-                .withSchedule(CronScheduleBuilder.cronSchedule(cronExp));
-        CronTrigger cronTrigger = triggerBuilder.build();
-
-        if (scheduler.getJobDetail(jobDetail.getKey()) == null) {
-            scheduler.scheduleJob(jobDetail, cronTrigger);
-            log.info("添加新定时任务 {}...", jobId);
-        }
-    }
-
-    public void remove() {
-    }
-
-    public void start() throws SchedulerException {
-        if (!scheduler.isShutdown()) {
-            scheduler.start();
-        }
-    }
-
-    public void pause() throws SchedulerException {
-        if (!scheduler.isShutdown()) {
-            scheduler.pauseAll();
-        }
-    }
-}

+ 112 - 0
dagent/src/main/java/cn/reghao/autodop/dagent/machine/NodeOpsImpl.java

@@ -0,0 +1,112 @@
+package cn.reghao.autodop.dagent.machine;
+
+import cn.reghao.autodop.common.http.DefaultWebRequest;
+import cn.reghao.autodop.common.http.WebRequest;
+import cn.reghao.autodop.common.http.WebResponse;
+import cn.reghao.autodop.common.mqtt.DefaultMqttClient;
+import cn.reghao.autodop.common.msg.Message;
+import cn.reghao.autodop.common.msg.MsgQueue;
+import cn.reghao.autodop.common.msg.pub.PubMsg;
+import cn.reghao.autodop.common.msg.pub.clazz.NodeClazz;
+import cn.reghao.autodop.common.msg.pub.dto.node.NodeAppDTO;
+import cn.reghao.autodop.common.msg.pub.dto.node.constant.AppId;
+import cn.reghao.autodop.common.msg.pub.dto.node.constant.AppStatus;
+import cn.reghao.autodop.common.util.thread.ThreadPoolWrapper;
+import cn.reghao.jdkutil.MachineId;
+import cn.reghao.jdkutil.serializer.JsonConverter;
+import com.google.gson.JsonObject;
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author reghao
+ * @date 2021-09-03 09:22:42
+ */
+@Slf4j
+@Component
+public class NodeOpsImpl {
+    private final String nodeId;
+    private final AppId appId;
+    private final String topic;
+    private final ScheduledExecutorService heartbeatScheduler;
+    private final WebRequest webRequest;
+    private final DefaultMqttClient mqttClient;
+    
+    public NodeOpsImpl(DefaultMqttClient mqttClient) throws IOException {
+        this.nodeId = MachineId.id();
+        this.appId = AppId.dagent;
+        this.topic = MsgQueue.dmasterTopic();
+        this.heartbeatScheduler = ThreadPoolWrapper.scheduledThreadPool("heartbeat", 1);
+        this.webRequest = new DefaultWebRequest();
+        this.mqttClient = mqttClient;
+    }
+    
+    public void nodeStart() {
+        NodeAppDTO nodeAppDTO = nodeAppDTO(AppStatus.online);
+        String jsonPayload = JsonConverter.objectToJson(nodeAppDTO);
+        PubMsg pubMsg = PubMsg.pubMsg(NodeClazz.class.getSimpleName(), NodeClazz.start.name(), jsonPayload);
+        Message message = Message.pubMessage("", "", pubMsg);
+        try {
+            mqttClient.pub(topic, 1, JsonConverter.objectToJson(message));
+        } catch (MqttException e) {
+            log.error("{}", e.getMessage());
+        }
+    }
+
+    public void nodeShutdown() {
+        ThreadPoolWrapper.shutdownScheduler(heartbeatScheduler);
+        NodeAppDTO nodeAppDTO = nodeAppDTO(AppStatus.offline);
+        String jsonPayload = JsonConverter.objectToJson(nodeAppDTO);
+        PubMsg pubMsg = PubMsg.pubMsg(NodeClazz.class.getSimpleName(), NodeClazz.shutdown.name(), jsonPayload);
+        Message message = Message.pubMessage("", "", pubMsg);
+        try {
+            mqttClient.pub(topic, 1, JsonConverter.objectToJson(message));
+        } catch (MqttException e) {
+            log.error("{}", e.getMessage());
+        }
+    }
+
+    public void nodeHeartbeat() {
+        // 每小时发送一次心跳
+        heartbeatScheduler.scheduleAtFixedRate(new Heartbeat(), 10, 10, TimeUnit.SECONDS);
+    }
+
+    public void nodeLog() {
+    }
+
+    private NodeAppDTO nodeAppDTO(AppStatus appStatus) {
+        String publicIp = getPublicIp();
+        String privateIp = MachineId.ipv4();
+        return new NodeAppDTO(nodeId, appId, publicIp, privateIp, appStatus);
+    }
+
+    private String getPublicIp() {
+        WebResponse webResponse = webRequest.get("http://ip.reghao.cn");
+        if (webResponse.getStatusCode() != 200) {
+            return "0.0.0.0";
+        }
+
+        JsonObject jsonObject = JsonConverter.jsonToJsonElement(webResponse.getBody()).getAsJsonObject();
+        return jsonObject.get("ip").getAsString();
+    }
+
+    class Heartbeat implements Runnable {
+        @Override
+        public void run() {
+            NodeAppDTO nodeAppDTO = nodeAppDTO(AppStatus.online);
+            String jsonPayload = JsonConverter.objectToJson(nodeAppDTO);
+            PubMsg pubMsg = PubMsg.pubMsg(NodeClazz.class.getSimpleName(), NodeClazz.heartbeat.name(), jsonPayload);
+            Message message = Message.pubMessage("", "", pubMsg);
+            try {
+                mqttClient.pub(topic, 1, JsonConverter.objectToJson(message));
+            } catch (MqttException e) {
+                log.error("{}", e.getMessage());
+            }
+        }
+    }
+}

+ 0 - 83
dagent/src/main/java/cn/reghao/autodop/dagent/mqttsub/DmasterMsgDispatcher.java

@@ -1,83 +0,0 @@
-package cn.reghao.autodop.dagent.mqttsub;
-
-import cn.reghao.autodop.common.message.AsyncMsg;
-import cn.reghao.autodop.common.message.MessageType;
-import cn.reghao.jdkutil.serializer.JsonConverter;
-import cn.reghao.autodop.dagent.mqttsub.processor.app.AppOpsProcessor;
-import cn.reghao.autodop.dagent.mqttsub.processor.machine.MachineOpsProcessor;
-import cn.reghao.autodop.dagent.spring.DagentLifecycle;
-import lombok.extern.slf4j.Slf4j;
-import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
-import org.eclipse.paho.client.mqttv3.MqttCallback;
-import org.eclipse.paho.client.mqttv3.MqttMessage;
-import org.springframework.stereotype.Component;
-
-import java.lang.management.ManagementFactory;
-
-/**
- * 分发 dmaster 发送的消息
- *
- * @author reghao
- * @date 2021-05-24 09:24:03
- */
-@Slf4j
-@Component
-public class DmasterMsgDispatcher implements MqttCallback {
-    private long startTime;
-    private MachineOpsProcessor machineOpsProcessor;
-    private AppOpsProcessor appOpsProcessor;
-
-    public DmasterMsgDispatcher(MachineOpsProcessor machineOpsProcessor, AppOpsProcessor appOpsProcessor) {
-        this.startTime = ManagementFactory.getRuntimeMXBean().getStartTime();
-        this.machineOpsProcessor = machineOpsProcessor;
-        this.appOpsProcessor = appOpsProcessor;
-    }
-
-    @Override
-    public void messageArrived(String topic, MqttMessage message) {
-        try {
-            String msg = message.toString();
-            AsyncMsg asyncMsg = JsonConverter.jsonToObject(msg, AsyncMsg.class);
-            String msgId = asyncMsg.getMsgId();
-            String type = asyncMsg.getType();
-            String ops = asyncMsg.getOps();
-            String payload = asyncMsg.getPayload();
-
-            if (!asyncMsg.getMachineId().equals(DagentLifecycle.MACHINE_ID) || asyncMsg.getSendTime() < startTime) {
-                log.info("忽略不是发送到本节点或节点启动前发送的消息...");
-                return;
-            }
-
-            switch (MessageType.valueOf(type)) {
-                case machineType:
-                    machineOpsProcessor.process(msgId, ops, payload);
-                    break;
-                case appType:
-                    appOpsProcessor.process(msgId, ops, payload);
-                    break;
-                case dagentType:
-                    break;
-                case dockerType:
-                    break;
-                default:
-            }
-        } catch (Exception e) {
-            log.error("MQTT message exception -> {}", e.getMessage());
-        }
-    }
-
-    @Override
-    public void deliveryComplete(IMqttDeliveryToken token) {
-        if (!token.isComplete()) {
-            log.error("token -> {}", token);
-        } else {
-            // 由于使用 MQTT 记录日志,而每条消息发送成功后都会调用此方法,因此会形成循环引用,导致超高 CPU 使用率
-            // log.info("token -> {}", token);
-        }
-    }
-
-    @Override
-    public void connectionLost(Throwable cause) {
-        log.error("MQTT connection lost -> {}", cause.getMessage());
-    }
-}

+ 0 - 71
dagent/src/main/java/cn/reghao/autodop/dagent/mqttsub/processor/app/AppOpsProcessor.java

@@ -1,71 +0,0 @@
-package cn.reghao.autodop.dagent.mqttsub.processor.app;
-
-import cn.reghao.autodop.common.message.AsyncMsg;
-import cn.reghao.autodop.common.message.CallResult;
-import cn.reghao.autodop.common.message.MessageType;
-import cn.reghao.autodop.common.message.ops.AppOps;
-import cn.reghao.autodop.common.message.ops.OpsProcessor;
-import cn.reghao.autodop.common.mqtt.DefaultMqttClient;
-import cn.reghao.jdkutil.serializer.JsonConverter;
-import cn.reghao.autodop.dagent.app.App;
-import cn.reghao.autodop.dagent.spring.DagentLifecycle;
-import lombok.extern.slf4j.Slf4j;
-import org.eclipse.paho.client.mqttv3.MqttException;
-import org.springframework.stereotype.Component;
-
-/**
- * @author reghao
- * @date 2020-12-30 10:26:47
- */
-@Slf4j
-@Component
-public class AppOpsProcessor implements OpsProcessor {
-    private final DefaultMqttClient mqttClient;
-    private final App app;
-    
-    public AppOpsProcessor(DefaultMqttClient mqttClient, App app) {
-        this.mqttClient = mqttClient;
-        this.app = app;
-    }
-
-    @Override
-    public void process(String msgId, String ops, String payload) throws MqttException {
-        AsyncMsg asyncMsg;
-        switch (AppOps.valueOf(ops)) {
-            case appDeploy:
-                asyncMsg = AsyncMsg.asyncMsg(DagentLifecycle.MACHINE_ID, MessageType.appType.name(),
-                        AppOps.appDeployResult.name(), JsonConverter.objectToJson(app.deploy(payload)));
-                log.info("部署应用完成...");
-                break;
-            case appStatus:
-                asyncMsg = AsyncMsg.asyncMsg(DagentLifecycle.MACHINE_ID, MessageType.appType.name(),
-                        AppOps.appStatusResult.name(), JsonConverter.objectToJson(app.status(payload)));
-                break;
-            case appRestart:
-                asyncMsg = AsyncMsg.asyncMsg(DagentLifecycle.MACHINE_ID, MessageType.appType.name(),
-                        AppOps.appRestartResult.name(), JsonConverter.objectToJson(app.restart(payload)));
-                break;
-            case appStop:
-                asyncMsg = AsyncMsg.asyncMsg(DagentLifecycle.MACHINE_ID, MessageType.appType.name(),
-                        AppOps.appStopResult.name(), JsonConverter.objectToJson(app.stop(payload)));
-                break;
-            case appStart:
-                asyncMsg = AsyncMsg.asyncMsg(DagentLifecycle.MACHINE_ID, MessageType.appType.name(),
-                        AppOps.appStartResult.name(), JsonConverter.objectToJson(app.start(payload)));
-                break;
-            case appLog:
-                asyncMsg = AsyncMsg.asyncMsg(DagentLifecycle.MACHINE_ID, MessageType.appType.name(),
-                        AppOps.appLogResult.name(), JsonConverter.objectToJson(app.log(payload)));
-                break;
-            default:
-                log.error("AppOps 中没有相应类型...");
-                asyncMsg = AsyncMsg.asyncMsg(DagentLifecycle.MACHINE_ID, MessageType.appType.name(),
-                        AppOps.appDeployResult.name(),
-                        JsonConverter.objectToJson(CallResult.error("AppOps 中没有相应类型...")));
-                break;
-        }
-
-        asyncMsg.setMsgId(msgId);
-        mqttClient.pub("dmaster", 1, JsonConverter.objectToJson(asyncMsg));
-    }
-}

+ 0 - 17
dagent/src/main/java/cn/reghao/autodop/dagent/mqttsub/processor/docker/DockerOpsProcessor.java

@@ -1,17 +0,0 @@
-package cn.reghao.autodop.dagent.mqttsub.processor.docker;
-
-import cn.reghao.autodop.common.message.ops.OpsProcessor;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.stereotype.Component;
-
-/**
- * @author reghao
- * @date 2020-12-30 10:26:47
- */
-@Slf4j
-@Component
-public class DockerOpsProcessor implements OpsProcessor {
-    @Override
-    public void process(String msgId, String ops, String payload) {
-    }
-}

+ 0 - 54
dagent/src/main/java/cn/reghao/autodop/dagent/mqttsub/processor/machine/MachineOpsProcessor.java

@@ -1,54 +0,0 @@
-package cn.reghao.autodop.dagent.mqttsub.processor.machine;
-
-import cn.reghao.autodop.common.dagent.machine.Machine;
-import cn.reghao.autodop.common.message.*;
-import cn.reghao.autodop.common.message.ops.MachineOps;
-import cn.reghao.autodop.common.message.ops.OpsProcessor;
-import cn.reghao.autodop.common.mqtt.DefaultMqttClient;
-import cn.reghao.jdkutil.serializer.JsonConverter;
-import cn.reghao.autodop.dagent.spring.DagentLifecycle;
-import lombok.extern.slf4j.Slf4j;
-import org.eclipse.paho.client.mqttv3.MqttException;
-import org.springframework.stereotype.Component;
-
-/**
- * @author reghao
- * @date 2020-12-30 10:26:47
- */
-@Slf4j
-@Component
-public class MachineOpsProcessor implements OpsProcessor {
-    private DefaultMqttClient mqttClient;
-    private Machine machine;
-
-    public MachineOpsProcessor(DefaultMqttClient mqttClient, Machine machine) {
-        this.mqttClient = mqttClient;
-        this.machine = machine;
-    }
-
-    @Override
-    public void process(String msgId, String ops, String payload) throws MqttException {
-        AsyncMsg asyncMsg;
-        switch (MachineOps.valueOf(ops)) {
-            case machineStat:
-                log.info("机器状态...");
-                asyncMsg = AsyncMsg.asyncMsg(DagentLifecycle.MACHINE_ID, MessageType.machineType.name(),
-                        MachineOps.machineStatResult.name(), JsonConverter.objectToJson(machine.stat()));
-                break;
-            case machineShell:
-                log.info("执行脚本...");
-                asyncMsg = AsyncMsg.asyncMsg(DagentLifecycle.MACHINE_ID, MessageType.machineType.name(),
-                        MachineOps.machineShellResult.name(), JsonConverter.objectToJson(machine.shell(payload)));
-                break;
-            default:
-                log.error("MachineOps 中没有相应类型...");
-                asyncMsg = AsyncMsg.asyncMsg(DagentLifecycle.MACHINE_ID, MessageType.machineType.name(),
-                        MachineOps.machineShellResult.name(),
-                        JsonConverter.objectToJson(CallResult.error("MachineOps 中没有相应类型...")));
-                break;
-        }
-
-        asyncMsg.setMsgId(msgId);
-        mqttClient.pub("dmaster", 1, JsonConverter.objectToJson(asyncMsg));
-    }
-}

+ 33 - 17
dagent/src/main/java/cn/reghao/autodop/dagent/rpc/RpcListener.java

@@ -1,11 +1,14 @@
 package cn.reghao.autodop.dagent.rpc;
 
 import cn.reghao.autodop.common.mqtt.DefaultMqttClient;
-import cn.reghao.autodop.common.node.rpc.JsonRpc;
-import cn.reghao.autodop.common.node.rpc.RpcMsg;
-import cn.reghao.autodop.common.node.rpc.RpcResult;
-import cn.reghao.autodop.common.node.rpc.clazz.Clazz;
+import cn.reghao.autodop.common.msg.Message;
+import cn.reghao.autodop.common.msg.rpc.RpcPayload;
+import cn.reghao.autodop.common.msg.rpc.RpcMsg;
+import cn.reghao.autodop.common.msg.rpc.RpcResult;
+import cn.reghao.autodop.common.msg.rpc.clazz.RpcClazz;
 import cn.reghao.autodop.dagent.rpc.impl.AppClazzImpl;
+import cn.reghao.autodop.dagent.rpc.impl.DockerClazzImpl;
+import cn.reghao.autodop.dagent.rpc.impl.MachineClazzImpl;
 import cn.reghao.jdkutil.serializer.JsonConverter;
 import lombok.extern.slf4j.Slf4j;
 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
@@ -27,41 +30,58 @@ import java.lang.management.ManagementFactory;
 public class RpcListener implements MqttCallback {
     private final long startTime;
     private final DefaultMqttClient mqttClient;
-    private AppClazzImpl appClazz;
+    private final AppClazzImpl appClazz;
+    private final MachineClazzImpl machineClazz;
+    private final DockerClazzImpl dockerClazz;
 
-    public RpcListener(DefaultMqttClient mqttClient, AppClazzImpl appClazz) {
+    public RpcListener(DefaultMqttClient mqttClient, AppClazzImpl appClazz, MachineClazzImpl machineClazz,
+                       DockerClazzImpl dockerClazz) {
         this.startTime = ManagementFactory.getRuntimeMXBean().getStartTime();
         this.mqttClient = mqttClient;
         this.appClazz = appClazz;
+        this.machineClazz = machineClazz;
+        this.dockerClazz = dockerClazz;
     }
 
     @Override
     public void messageArrived(String topic, MqttMessage message) {
         try {
             String msg = message.toString();
-            RpcMsg callMsg = JsonConverter.jsonToObject(msg, RpcMsg.class);
+            Message callMessage = JsonConverter.jsonToObject(msg, Message.class);
+            String originator = callMessage.getOriginator();
+            String destination = callMessage.getDestination();
+            RpcMsg callMsg = callMessage.getRpcMsg();
 
-            JsonRpc jsonRpc = callMsg.getJsonRpc();
-            String clazz = jsonRpc.getClazz();
-            RpcResult<?> rpcResult = null;
-            switch (Clazz.valueOf(clazz)) {
+            RpcPayload rpcPayload = callMsg.getRpcPayload();
+            String clazz = rpcPayload.getClazz();
+            RpcResult<?> rpcResult;
+            switch (RpcClazz.valueOf(clazz)) {
                 case AppClazz:
-                    rpcResult = appClazz.process(jsonRpc);
+                    rpcResult = appClazz.process(rpcPayload);
                     break;
                 case MachineClazz:
+                    rpcResult = machineClazz.process(rpcPayload);
                     break;
                 case DockerClazz:
+                    rpcResult = dockerClazz.process(rpcPayload);
                     break;
                 default:
+                    String err = String.format("找不到 %s 类型", clazz);
+                    rpcResult = RpcResult.error(err);
             }
 
             RpcMsg resultMsg = RpcMsg.resultMsg(callMsg, rpcResult);
-            rpcReply(resultMsg);
+            Message message1 = Message.rpcMessage(originator, destination, resultMsg);
+            rpcReply(message1);
         } catch (Exception e) {
             log.error("MQTT message exception -> {}", e.getMessage());
         }
     }
 
+    private void rpcReply(Message message) throws MqttException {
+        mqttClient.pub(message.getDestination(), 1, JsonConverter.objectToJson(message));
+    }
+
     @Override
     public void deliveryComplete(IMqttDeliveryToken token) {
         if (!token.isComplete()) {
@@ -76,8 +96,4 @@ public class RpcListener implements MqttCallback {
     public void connectionLost(Throwable cause) {
         log.error("MQTT connection lost -> {}", cause.getMessage());
     }
-
-    private void rpcReply(RpcMsg rpcMsg) throws MqttException {
-        mqttClient.pub(rpcMsg.getDestination(), 1, JsonConverter.objectToJson(rpcMsg));
-    }
 }

+ 179 - 42
dagent/src/main/java/cn/reghao/autodop/dagent/rpc/impl/AppClazzImpl.java

@@ -1,13 +1,12 @@
 package cn.reghao.autodop.dagent.rpc.impl;
 
-import cn.reghao.autodop.common.dagent.app.AppStatus;
-import cn.reghao.autodop.common.dagent.app.DeployResult;
-import cn.reghao.autodop.common.dagent.app.DeployParam;
-import cn.reghao.autodop.common.dagent.app.PackType;
-import cn.reghao.autodop.common.docker.exception.DockerException;
-import cn.reghao.autodop.common.node.rpc.JsonRpc;
-import cn.reghao.autodop.common.node.rpc.RpcResult;
-import cn.reghao.autodop.common.node.rpc.clazz.AppClazz;
+import cn.reghao.autodop.common.dagent.app.log.AppLogArgs;
+import cn.reghao.autodop.common.dagent.app.log.LogConfig;
+import cn.reghao.autodop.common.dagent.app.log.LogFile;
+import cn.reghao.autodop.common.msg.rpc.dto.app.*;
+import cn.reghao.autodop.common.msg.rpc.RpcPayload;
+import cn.reghao.autodop.common.msg.rpc.RpcResult;
+import cn.reghao.autodop.common.msg.rpc.clazz.AppClazz;
 import cn.reghao.autodop.common.util.ExceptionUtil;
 import cn.reghao.autodop.dagent.app.AppService;
 import cn.reghao.autodop.dagent.app.DockerAppServiceImpl;
@@ -18,6 +17,8 @@ import cn.reghao.jdkutil.result.ResultStatus;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Component;
 
+import java.util.List;
+
 /**
  * @author reghao
  * @date 2020-12-30 10:26:47
@@ -33,58 +34,194 @@ public class AppClazzImpl {
         this.zipAppServiceImpl = new ZipAppServiceImpl();
     }
     
-    public RpcResult<?> process(JsonRpc jsonRpc) throws DockerException {
-        String method = jsonRpc.getMethod();
-        Object param = jsonRpc.getParam();
+    public RpcResult<?> process(RpcPayload rpcPayload) {
+        String method = rpcPayload.getMethod();
+        Object param = rpcPayload.getParam();
         switch (AppClazz.valueOf(method)) {
             case deploy:
                 if (!(param instanceof DeployParam)) {
-                    return RpcResult.error("不是 AppDeployArgs 对象");
+                    return RpcResult.error("不是 DeployParam 对象");
                 }
                 return deploy(param);
             case status:
-                break;
+                if (!(param instanceof AppIdParam)) {
+                    return RpcResult.error("不是 AppIdParam 对象");
+                }
+                return status(param);
             case start:
-                break;
+                if (!(param instanceof AppIdParam)) {
+                    return RpcResult.error("不是 AppIdParam 对象");
+                }
+                return start(param);
             case stop:
-                break;
+                if (!(param instanceof AppIdParam)) {
+                    return RpcResult.error("不是 AppIdParam 对象");
+                }
+                return stop(param);
             case restart:
-                break;
+                if (!(param instanceof AppIdParam)) {
+                    return RpcResult.error("不是 AppIdParam 对象");
+                }
+                return restart(param);
             default:
-                ;
+                String err = String.format("找不到 %s 方法", method);
+                return RpcResult.error(err);
         }
-
-        return null;
     }
     
-    private RpcResult<?> deploy(Object param) throws DockerException {
-        DeployParam deployArgs = (DeployParam) param;
-        String buildLogId = deployArgs.getBuildLogId();
-        String packType = deployArgs.getPackType();
+    private RpcResult<?> deploy(Object param) {
+        DeployParam deployParam = (DeployParam) param;
+        String buildLogId = deployParam.getBuildLogId();
+        String packType = deployParam.getPackType();
 
         AppStatus appStatus;
-        DeployResult deployResult = new DeployResult(buildLogId,
-                DagentLifecycle.MACHINE_ID, DagentLifecycle.MACHINE_IPV4);
-        switch (PackType.valueOf(packType)) {
-            case docker:
-                try {
-                    appStatus = dockerAppServiceImpl.deploy(deployArgs);
+        DeployResult deployResult = new DeployResult(buildLogId, DagentLifecycle.MACHINE_ID, DagentLifecycle.MACHINE_IPV4);
+        try {
+            switch (PackType.valueOf(packType)) {
+                case docker:
+                    appStatus = dockerAppServiceImpl.deploy(deployParam);
                     deployResult.setResult(Result.result(ResultStatus.SUCCESS));
                     deployResult.setAppStatus(appStatus);
-                } catch (DockerException e) {
-                    deployResult.setResult(Result.result(ResultStatus.FAIL, ExceptionUtil.errorMsg(e)));
-                }
-                break;
-            case zip:
-                appStatus = zipAppServiceImpl.deploy(deployArgs);
-                deployResult.setResult(Result.result(ResultStatus.SUCCESS));
-                deployResult.setAppStatus(appStatus);
-                break;
-            default:
-                String msg = "打包类型 " + deployArgs.getPackType() + " 不存在";
-                deployResult.setResult(Result.result(ResultStatus.ERROR, msg));
+                    break;
+                case zip:
+                    appStatus = zipAppServiceImpl.deploy(deployParam);
+                    deployResult.setResult(Result.result(ResultStatus.SUCCESS));
+                    deployResult.setAppStatus(appStatus);
+                    break;
+                default:
+                    String msg = "打包类型 " + deployParam.getPackType() + " 不存在";
+                    deployResult.setResult(Result.result(ResultStatus.ERROR, msg));
+            }
+        } catch (Exception e) {
+            deployResult.setResult(Result.result(ResultStatus.FAIL, ExceptionUtil.errorMsg(e)));
         }
-
+        
         return RpcResult.success(deployResult);
     }
+
+    public RpcResult<?> status(Object param) {
+        AppIdParam appIdParam = (AppIdParam) param;
+        String appId = appIdParam.getAppId();
+        String packerType = appIdParam.getPackerType();
+
+        AppStatus appStatus;
+        try {
+            switch (PackType.valueOf(packerType)) {
+                case docker:
+                    appStatus = dockerAppServiceImpl.status(appId);
+                    break;
+                case zip:
+                    appStatus = zipAppServiceImpl.status(appId);
+                    break;
+                default:
+                    return RpcResult.error("打包类型 " + appIdParam.getPackerType() + " 不存在");
+            }
+        } catch (Exception e) {
+            return RpcResult.fail(ExceptionUtil.errorMsg(e));
+        }
+        
+        return RpcResult.success(appStatus);
+    }
+
+    public RpcResult<AppStatus> restart(Object param) {
+        AppIdParam appIdParam = (AppIdParam) param;
+        String appId = appIdParam.getAppId();
+        String packerType = appIdParam.getPackerType();
+
+        AppStatus appStatus;
+        try {
+            switch (PackType.valueOf(packerType)) {
+                case docker:
+                    appStatus = dockerAppServiceImpl.restart(appId);
+                    break;
+                case zip:
+                    appStatus = zipAppServiceImpl.restart(appId);
+                    break;
+                default:
+                    return RpcResult.error("打包类型 " + appIdParam.getPackerType() + " 不存在");
+            }
+        } catch (Exception e) {
+            return RpcResult.fail(ExceptionUtil.errorMsg(e));
+        }
+        
+        return RpcResult.success(appStatus);
+    }
+
+    public RpcResult<AppStatus> stop(Object param) {
+        AppIdParam appIdParam = (AppIdParam) param;
+        String appId = appIdParam.getAppId();
+        String packerType = appIdParam.getPackerType();
+
+        AppStatus appStatus;
+        try {
+            switch (PackType.valueOf(packerType)) {
+                case docker:
+                    appStatus = dockerAppServiceImpl.stop(appId);
+                    break;
+                case zip:
+                    appStatus = zipAppServiceImpl.stop(appId);
+                    break;
+                default:
+                    return RpcResult.error("打包类型 " + appIdParam.getPackerType() + " 不存在");
+            }
+        } catch (Exception e) {
+            return RpcResult.fail(ExceptionUtil.errorMsg(e));
+        }
+
+        return RpcResult.success(appStatus);
+    }
+
+    public RpcResult<AppStatus> start(Object param) {
+        AppIdParam appIdParam = (AppIdParam) param;
+        String appId = appIdParam.getAppId();
+        String packerType = appIdParam.getPackerType();
+
+        AppStatus appStatus;
+        try {
+            switch (PackType.valueOf(packerType)) {
+                case docker:
+                    appStatus = dockerAppServiceImpl.start(appId);
+                    break;
+                case zip:
+                    appStatus = zipAppServiceImpl.start(appId);
+                    break;
+                default:
+                    return RpcResult.error("打包类型 " + appIdParam.getPackerType() + " 不存在");
+            }
+        } catch (Exception e) {
+            return RpcResult.fail(ExceptionUtil.errorMsg(e));
+        }
+
+        return RpcResult.success(appStatus);
+    }
+
+    public RpcResult<?> log(Object param) {
+        AppLogArgs appLogArgs = (AppLogArgs) param;
+        String packerType = appLogArgs.getPackerType();
+        long count = appLogArgs.getLogConfigs().stream().filter(LogConfig::getIsDir).count();
+        try {
+            switch (PackType.valueOf(packerType)) {
+                case docker:
+                    if (count > 0) {
+                        List<LogFile> logFiles = dockerAppServiceImpl.logFiles(appLogArgs);
+                        return RpcResult.success(logFiles);
+                    } else {
+                        List<String> logContent = dockerAppServiceImpl.logContent(appLogArgs);
+                        return RpcResult.success(logContent);
+                    }
+                case zip:
+                    if (count > 0) {
+                        List<LogFile> logFiles = zipAppServiceImpl.logFiles(appLogArgs);
+                        return RpcResult.success(logFiles);
+                    } else {
+                        List<String> logContent = zipAppServiceImpl.logContent(appLogArgs);
+                        return RpcResult.success(logContent);
+                    }
+                default:
+                    return RpcResult.error("打包类型 " + appLogArgs.getPackerType() + " 不存在");
+            }
+        } catch (Exception e) {
+            return RpcResult.fail(ExceptionUtil.errorMsg(e));
+        }
+    }
 }

+ 33 - 0
dagent/src/main/java/cn/reghao/autodop/dagent/rpc/impl/DockerClazzImpl.java

@@ -0,0 +1,33 @@
+package cn.reghao.autodop.dagent.rpc.impl;
+
+import cn.reghao.autodop.common.docker.Docker;
+import cn.reghao.autodop.common.msg.rpc.RpcPayload;
+import cn.reghao.autodop.common.msg.rpc.RpcResult;
+import cn.reghao.autodop.common.msg.rpc.clazz.DockerClazz;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author reghao
+ * @date 2020-12-30 10:26:47
+ */
+@Slf4j
+@Component
+public class DockerClazzImpl {
+    private Docker docker;
+
+    public DockerClazzImpl() {
+        //this.docker = new Docker();
+    }
+    
+    public RpcResult<?> process(RpcPayload rpcPayload) {
+        String method = rpcPayload.getMethod();
+        Object param = rpcPayload.getParam();
+        switch (DockerClazz.valueOf(method)) {
+            case dockerImage:
+            default:
+        }
+
+        return null;
+    }
+}

+ 21 - 11
dagent/src/main/java/cn/reghao/autodop/dagent/rpc/impl/MachineClazzImpl.java

@@ -1,10 +1,11 @@
 package cn.reghao.autodop.dagent.rpc.impl;
 
 import cn.reghao.autodop.common.dagent.machine.Machine;
-import cn.reghao.autodop.common.docker.exception.DockerException;
-import cn.reghao.autodop.common.node.rpc.JsonRpc;
-import cn.reghao.autodop.common.node.rpc.RpcResult;
-import cn.reghao.autodop.common.node.rpc.clazz.MachineClazz;
+import cn.reghao.autodop.common.dagent.machine.MachineStat;
+import cn.reghao.autodop.common.dagent.machine.shell.MachineShell;
+import cn.reghao.autodop.common.msg.rpc.RpcPayload;
+import cn.reghao.autodop.common.msg.rpc.RpcResult;
+import cn.reghao.autodop.common.msg.rpc.clazz.MachineClazz;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Component;
 
@@ -15,22 +16,31 @@ import org.springframework.stereotype.Component;
 @Slf4j
 @Component
 public class MachineClazzImpl {
-    private Machine machine;
+    private final Machine machine;
 
     public MachineClazzImpl(Machine machine) {
         this.machine = machine;
     }
     
-    public RpcResult<?> process(JsonRpc jsonRpc) {
-        String method = jsonRpc.getMethod();
-        Object param = jsonRpc.getParam();
+    public RpcResult<?> process(RpcPayload rpcPayload) {
+        String method = rpcPayload.getMethod();
+        Object param = rpcPayload.getParam();
         switch (MachineClazz.valueOf(method)) {
-            case statMethod:
+            case stat:
+                break;
+            case shell:
                 break;
             default:
-                ;
         }
-
         return null;
     }
+
+
+    public RpcResult<MachineStat> stat() {
+        return RpcResult.success(machine.heartbeat());
+    }
+
+    public RpcResult<MachineShell> shell(String script) {
+        return RpcResult.success(null);
+    }
 }

+ 20 - 43
dagent/src/main/java/cn/reghao/autodop/dagent/spring/DagentLifecycle.java

@@ -3,20 +3,15 @@ package cn.reghao.autodop.dagent.spring;
 import ch.qos.logback.classic.spi.ILoggingEvent;
 import ch.qos.logback.core.Appender;
 import cn.reghao.autodop.common.log.Appenders;
-import cn.reghao.autodop.common.message.AsyncMsg;
-import cn.reghao.autodop.common.message.MessageType;
-import cn.reghao.autodop.common.dagent.machine.Machine;
-import cn.reghao.autodop.common.message.ops.DagentOps;
 import cn.reghao.autodop.common.mqtt.DefaultMqttClient;
+import cn.reghao.autodop.common.msg.MsgQueue;
+import cn.reghao.autodop.common.msg.pub.dto.node.constant.AppId;
 import cn.reghao.autodop.common.util.MachineId;
-import cn.reghao.jdkutil.serializer.JsonConverter;
-import cn.reghao.autodop.dagent.mqttsub.DmasterMsgDispatcher;
-import cn.reghao.autodop.dagent.machine.DagentHeartbeatJob;
-import cn.reghao.autodop.dagent.machine.MachineScheduler;
+import cn.reghao.autodop.dagent.machine.NodeOpsImpl;
+import cn.reghao.autodop.dagent.rpc.RpcListener;
 import cn.reghao.autodop.common.log.LoggerConfig;
 import lombok.extern.slf4j.Slf4j;
 import org.eclipse.paho.client.mqttv3.MqttException;
-import org.quartz.SchedulerException;
 import org.springframework.beans.factory.DisposableBean;
 import org.springframework.boot.ApplicationArguments;
 import org.springframework.boot.ApplicationRunner;
@@ -38,27 +33,23 @@ public class DagentLifecycle implements ApplicationRunner, DisposableBean {
     public static String MACHINE_ID;
     public static String MACHINE_IPV4;
 
-    private final DmasterMsgDispatcher dmasterMsgDispatcher;
     private final DefaultMqttClient mqttClient;
-    private final MachineScheduler machineScheduler;
-    private final Machine machine;
+    private final NodeOpsImpl nodeOps;
+    private final RpcListener rpcListener;
 
-    public DagentLifecycle(DmasterMsgDispatcher dmasterMsgDispatcher,
-                           DefaultMqttClient mqttClient,
-                           MachineScheduler machineScheduler,
-                           Machine machine) throws IOException {
+    public DagentLifecycle(DefaultMqttClient mqttClient, NodeOpsImpl nodeOps, RpcListener rpcListener)
+            throws IOException {
         MACHINE_ID = MachineId.id();
         MACHINE_IPV4 = MachineId.ipv4();
-        this.dmasterMsgDispatcher = dmasterMsgDispatcher;
         this.mqttClient = mqttClient;
-        this.machineScheduler = machineScheduler;
-        this.machine = machine;
+        this.nodeOps = nodeOps;
+        this.rpcListener = rpcListener;
         initLogger();
     }
 
     private void initLogger() {
         List<Appender<ILoggingEvent>> appenders = new ArrayList<>();
-        appenders.add(Appenders.mqttAppender(MACHINE_ID, MACHINE_IPV4, "dagent", mqttClient));
+        appenders.add(Appenders.mqttAppender(MACHINE_ID, AppId.dagent.name(), mqttClient));
         LoggerConfig.initLogger(appenders);
     }
 
@@ -70,39 +61,25 @@ public class DagentLifecycle implements ApplicationRunner, DisposableBean {
     }
 
     @Override
-    public void destroy() throws MqttException {
+    public void destroy() {
         pubDagentShutdown();
     }
 
     private void subTopic() throws MqttException {
-        String topic = "dagent/" + DagentLifecycle.MACHINE_ID;
-        mqttClient.sub(topic, dmasterMsgDispatcher);
+        String topic = MsgQueue.dagentTopic(MACHINE_ID);
+        mqttClient.sub(topic, rpcListener);
     }
 
-    private void pubDagentStart() throws MqttException {
-        String payload = JsonConverter.objectToJson(machine.registry());
-        AsyncMsg asyncMsg = AsyncMsg.asyncMsg(MACHINE_ID, MessageType.dagentType.name(),
-                DagentOps.dagentStart.name(), payload);
-        mqttClient.pub("dmaster", 1, JsonConverter.objectToJson(asyncMsg));
-
-        payload = JsonConverter.objectToJson(machine.heartbeat());
-        asyncMsg = AsyncMsg.asyncMsg(MACHINE_ID, MessageType.dagentType.name(),
-                DagentOps.dagentHeartbeat.name(), payload);
-        mqttClient.pub("dmaster", 1, JsonConverter.objectToJson(asyncMsg));
+    private void pubDagentStart() {
+        nodeOps.nodeStart();
     }
 
-    private void pubDagentHeartbeat() throws SchedulerException {
-        String jobId = DagentHeartbeatJob.class.getSimpleName();
-        String cronExp = "0/10 * * * * ?";
-        machineScheduler.add(DagentHeartbeatJob.class, jobId, cronExp);
-        machineScheduler.start();
+    private void pubDagentHeartbeat() {
+        nodeOps.nodeHeartbeat();
     }
 
-    private void pubDagentShutdown() throws MqttException {
-        String payload = JsonConverter.objectToJson(machine.heartbeat());
-        AsyncMsg asyncMsg = AsyncMsg.asyncMsg(MACHINE_ID, MessageType.dagentType.name(),
-                DagentOps.dagnetShutdown.name(), payload);
-        mqttClient.pub("dmaster", 1, JsonConverter.objectToJson(asyncMsg));
+    private void pubDagentShutdown() {
+        nodeOps.nodeShutdown();
         log.info("Dagent 停止...");
     }
 }

+ 6 - 0
dmaster/pom.xml

@@ -172,6 +172,12 @@
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-websocket</artifactId>
         </dependency>
+
+        <dependency>
+            <groupId>org.quartz-scheduler</groupId>
+            <artifactId>quartz</artifactId>
+            <version>2.3.2</version>
+        </dependency>
     </dependencies>
 
     <profiles>

+ 1 - 1
dmaster/src/main/java/cn/reghao/autodop/dmaster/app/controller/page/AppConfigPageController.java

@@ -1,6 +1,6 @@
 package cn.reghao.autodop.dmaster.app.controller.page;
 
-import cn.reghao.autodop.common.dagent.app.PackType;
+import cn.reghao.autodop.common.msg.rpc.dto.app.PackType;
 import cn.reghao.autodop.dmaster.app.model.constant.AppType;
 import cn.reghao.autodop.dmaster.app.model.constant.EnvList;
 import cn.reghao.autodop.dmaster.machine.db.crud.MachineQuery;

+ 1 - 1
dmaster/src/main/java/cn/reghao/autodop/dmaster/app/controller/page/BuildConfigPageController.java

@@ -1,6 +1,6 @@
 package cn.reghao.autodop.dmaster.app.controller.page;
 
-import cn.reghao.autodop.common.dagent.app.PackType;
+import cn.reghao.autodop.common.msg.rpc.dto.app.PackType;
 import cn.reghao.autodop.common.dagent.machine.disk.Disk;
 import cn.reghao.autodop.dmaster.app.model.constant.build.CompileType;
 import cn.reghao.autodop.dmaster.app.model.constant.build.RepoAuthType;

+ 1 - 1
dmaster/src/main/java/cn/reghao/autodop/dmaster/app/model/po/AppDeploying.java

@@ -1,6 +1,6 @@
 package cn.reghao.autodop.dmaster.app.model.po;
 
-import cn.reghao.autodop.common.dagent.app.DeployResult;
+import cn.reghao.autodop.common.msg.rpc.dto.app.DeployResult;
 import cn.reghao.autodop.dmaster.app.model.po.config.AppOrchestration;
 import cn.reghao.autodop.dmaster.app.model.po.config.deploy.DeployConfig;
 import cn.reghao.autodop.dmaster.app.model.po.log.BuildLog;

+ 1 - 1
dmaster/src/main/java/cn/reghao/autodop/dmaster/app/model/po/AppRunning.java

@@ -1,6 +1,6 @@
 package cn.reghao.autodop.dmaster.app.model.po;
 
-import cn.reghao.autodop.common.dagent.app.AppStatus;
+import cn.reghao.autodop.common.msg.rpc.dto.app.AppStatus;
 import cn.reghao.autodop.dmaster.app.model.po.config.AppOrchestration;
 import cn.reghao.autodop.dmaster.app.model.po.config.deploy.DeployConfig;
 import cn.reghao.autodop.dmaster.app.model.po.log.BuildLog;

+ 1 - 1
dmaster/src/main/java/cn/reghao/autodop/dmaster/app/model/po/config/build/PackerConfig.java

@@ -1,6 +1,6 @@
 package cn.reghao.autodop.dmaster.app.model.po.config.build;
 
-import cn.reghao.autodop.common.dagent.app.PackType;
+import cn.reghao.autodop.common.msg.rpc.dto.app.PackType;
 import cn.reghao.autodop.dmaster.util.validator.ValidEnum;
 import cn.reghao.autodop.dmaster.util.db.BaseEntity;
 import lombok.Data;

+ 2 - 2
dmaster/src/main/java/cn/reghao/autodop/dmaster/app/model/po/log/DeployLog.java

@@ -1,7 +1,7 @@
 package cn.reghao.autodop.dmaster.app.model.po.log;
 
-import cn.reghao.autodop.common.dagent.app.AppStatus;
-import cn.reghao.autodop.common.dagent.app.DeployResult;
+import cn.reghao.autodop.common.msg.rpc.dto.app.AppStatus;
+import cn.reghao.autodop.common.msg.rpc.dto.app.DeployResult;
 import cn.reghao.jdkutil.result.Result;
 import cn.reghao.autodop.dmaster.util.db.BaseDocument;
 import lombok.Data;

+ 2 - 2
dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/AppOpsResultService.java

@@ -1,7 +1,7 @@
 package cn.reghao.autodop.dmaster.app.service;
 
-import cn.reghao.autodop.common.dagent.app.AppStatus;
-import cn.reghao.autodop.common.dagent.app.DeployResult;
+import cn.reghao.autodop.common.msg.rpc.dto.app.AppStatus;
+import cn.reghao.autodop.common.msg.rpc.dto.app.DeployResult;
 import cn.reghao.autodop.common.http.DefaultWebRequest;
 import cn.reghao.autodop.common.http.WebRequest;
 import cn.reghao.autodop.dmaster.app.db.query.config.AppQuery;

+ 1 - 1
dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/AppStatusService.java

@@ -3,7 +3,7 @@ package cn.reghao.autodop.dmaster.app.service;
 import cn.reghao.autodop.common.message.AsyncMsg;
 import cn.reghao.autodop.common.message.MessageType;
 import cn.reghao.autodop.common.message.ops.AppOps;
-import cn.reghao.autodop.common.dagent.app.AppIdParam;
+import cn.reghao.autodop.common.msg.rpc.dto.app.AppIdParam;
 import cn.reghao.autodop.common.mqtt.DefaultMqttClient;
 import cn.reghao.jdkutil.serializer.JsonConverter;
 import cn.reghao.autodop.dmaster.app.model.constant.StatusOps;

+ 1 - 1
dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/BuildDeployDispatcher.java

@@ -16,7 +16,7 @@ import cn.reghao.autodop.dmaster.app.service.bd.BuildNotifyMsg;
 import cn.reghao.autodop.dmaster.notification.model.po.NotifyGroup;
 import cn.reghao.autodop.dmaster.notification.model.po.NotifyType;
 import cn.reghao.autodop.dmaster.notification.service.NotifyService;
-import cn.reghao.autodop.dmaster.spring.thread.ThreadPoolWrapper;
+import cn.reghao.autodop.common.util.thread.ThreadPoolWrapper;
 import cn.reghao.autodop.dmaster.notification.service.notifier.ding.DingMsg;
 import lombok.extern.slf4j.Slf4j;
 import org.eclipse.paho.client.mqttv3.MqttException;

+ 1 - 1
dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/bd/AppDeployer.java

@@ -3,7 +3,7 @@ package cn.reghao.autodop.dmaster.app.service.bd;
 import cn.reghao.autodop.common.message.AsyncMsg;
 import cn.reghao.autodop.common.message.MessageType;
 import cn.reghao.autodop.common.message.ops.AppOps;
-import cn.reghao.autodop.common.dagent.app.DeployParam;
+import cn.reghao.autodop.common.msg.rpc.dto.app.DeployParam;
 import cn.reghao.autodop.common.mqtt.DefaultMqttClient;
 import cn.reghao.jdkutil.serializer.JsonConverter;
 import cn.reghao.autodop.dmaster.app.db.query.config.AppQuery;

+ 1 - 1
dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/bd/AppIntegrate.java

@@ -1,6 +1,6 @@
 package cn.reghao.autodop.dmaster.app.service.bd;
 
-import cn.reghao.autodop.common.dagent.app.PackType;
+import cn.reghao.autodop.common.msg.rpc.dto.app.PackType;
 import cn.reghao.autodop.dmaster.app.model.constant.build.CompileType;
 import cn.reghao.autodop.dmaster.app.model.constant.build.RepoType;
 import cn.reghao.autodop.dmaster.app.model.po.config.build.CompilerConfig;

+ 2 - 2
dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/bd/DeployNotifyMsg.java

@@ -1,7 +1,7 @@
 package cn.reghao.autodop.dmaster.app.service.bd;
 
-import cn.reghao.autodop.common.dagent.app.AppStatus;
-import cn.reghao.autodop.common.dagent.app.DeployResult;
+import cn.reghao.autodop.common.msg.rpc.dto.app.AppStatus;
+import cn.reghao.autodop.common.msg.rpc.dto.app.DeployResult;
 import cn.reghao.autodop.dmaster.app.model.po.log.BuildLog;
 import cn.reghao.autodop.dmaster.notification.service.notifier.ding.DingMsg;
 import cn.reghao.jdkutil.converter.DateTimeConverter;

+ 1 - 1
dmaster/src/main/java/cn/reghao/autodop/dmaster/machine/service/ssh/WebSshImpl.java

@@ -1,7 +1,7 @@
 package cn.reghao.autodop.dmaster.machine.service.ssh;
 
 import cn.reghao.jdkutil.serializer.JsonConverter;
-import cn.reghao.autodop.dmaster.spring.thread.ThreadPoolWrapper;
+import cn.reghao.autodop.common.util.thread.ThreadPoolWrapper;
 import cn.reghao.autodop.dmaster.machine.entity.SshConnData;
 import cn.reghao.autodop.dmaster.machine.entity.SshConnInfo;
 import com.jcraft.jsch.Channel;

+ 43 - 17
dmaster/src/main/java/cn/reghao/autodop/dmaster/mqttsub/DagentMsgDispatcher.java

@@ -1,7 +1,11 @@
 package cn.reghao.autodop.dmaster.mqttsub;
 
-import cn.reghao.autodop.common.message.MessageType;
-import cn.reghao.autodop.common.message.AsyncMsg;
+import cn.reghao.autodop.common.msg.Message;
+import cn.reghao.autodop.common.msg.MsgType;
+import cn.reghao.autodop.common.msg.pub.PubMsg;
+import cn.reghao.autodop.common.msg.pub.clazz.NodeClazz;
+import cn.reghao.autodop.common.msg.rpc.RpcMsg;
+import cn.reghao.autodop.common.msg.rpc.RpcResult;
 import cn.reghao.jdkutil.serializer.JsonConverter;
 import cn.reghao.autodop.dmaster.mqttsub.processor.AppOpsProcessor;
 import cn.reghao.autodop.dmaster.mqttsub.processor.DagentOpsProcessor;
@@ -19,7 +23,7 @@ import org.springframework.stereotype.Component;
  * @date 2021-05-24 09:24:03
  */
 @Slf4j
-@Component
+@Deprecated
 public class DagentMsgDispatcher implements MqttCallback {
     private DagentOpsProcessor dagentOpsProcessor;
     private MachineOpsProcessor machineOpsProcessor;
@@ -41,23 +45,19 @@ public class DagentMsgDispatcher implements MqttCallback {
             }
 
             String msg = message.toString();
-            AsyncMsg asyncMsg = JsonConverter.jsonToObject(msg, AsyncMsg.class);
-            String msgId = asyncMsg.getMsgId();
-            String type = asyncMsg.getType();
-            String ops = asyncMsg.getOps();
-            String payload = asyncMsg.getPayload();
-
-            switch (MessageType.valueOf(type)) {
-                case dagentType:
-                    dagentOpsProcessor.process(msgId, ops, payload);
-                    break;
-                case machineType:
-                    machineOpsProcessor.process(msgId, ops, payload);
+            Message nodeMsg = JsonConverter.jsonToObject(msg, Message.class);
+            String msgType = nodeMsg.getMsgType();
+            switch (MsgType.valueOf(msgType)) {
+                case pub:
+                    PubMsg pubMsg = nodeMsg.getPubMsg();
+                    dispatchPubMsg(pubMsg);
                     break;
-                case appType:
-                    appOpsProcessor.process(msgId, ops, payload);
+                case rpc:
+                    RpcMsg rpcMsg = nodeMsg.getRpcMsg();
+                    dispatchRpcMsg(rpcMsg);
                     break;
                 default:
+                    log.error("");
             }
         } catch (Exception e) {
             log.error("MQTT message exception -> {}", e.getMessage());
@@ -73,4 +73,30 @@ public class DagentMsgDispatcher implements MqttCallback {
     public void connectionLost(Throwable cause) {
         log.error("MQTT connection lost -> {}", cause.getMessage());
     }
+
+    private void dispatchPubMsg(PubMsg pubMsg) {
+        String clazz = pubMsg.getClazz();
+        String method = pubMsg.getMethod();
+        String jsonPayload = pubMsg.getJsonPayload();
+
+        if (clazz.equals(NodeClazz.class.getSimpleName())) {
+            switch (NodeClazz.valueOf(method)) {
+                case start:
+                    break;
+                case heartbeat:
+                    break;
+                case shutdown:
+                    break;
+                case log:
+                    break;
+                default:
+                    ;
+            }
+        }
+    }
+
+    private void dispatchRpcMsg(RpcMsg rpcMsg) {
+        String msgId = rpcMsg.getMsgId();
+        RpcResult<?> rpcResult = rpcMsg.getRpcResult();
+    }
 }

+ 100 - 0
dmaster/src/main/java/cn/reghao/autodop/dmaster/mqttsub/MessageListener.java

@@ -0,0 +1,100 @@
+package cn.reghao.autodop.dmaster.mqttsub;
+
+import cn.reghao.autodop.common.msg.Message;
+import cn.reghao.autodop.common.msg.MsgType;
+import cn.reghao.autodop.common.msg.pub.PubMsg;
+import cn.reghao.autodop.common.msg.pub.clazz.NodeClazz;
+import cn.reghao.autodop.common.msg.pub.clazz.PubClazz;
+import cn.reghao.autodop.common.msg.rpc.RpcPayload;
+import cn.reghao.autodop.common.msg.rpc.RpcMsg;
+import cn.reghao.autodop.common.msg.rpc.RpcResult;
+import cn.reghao.autodop.dmaster.mqttsub.processor.AppOpsProcessor;
+import cn.reghao.autodop.dmaster.mqttsub.processor.DagentOpsProcessor;
+import cn.reghao.autodop.dmaster.mqttsub.processor.MachineOpsProcessor;
+import cn.reghao.jdkutil.serializer.JsonConverter;
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.springframework.stereotype.Component;
+
+/**
+ * 分发 dagent 发送的消息
+ *
+ * @author reghao
+ * @date 2021-05-24 09:24:03
+ */
+@Slf4j
+@Component
+public class MessageListener implements MqttCallback {
+    private DagentOpsProcessor dagentOpsProcessor;
+    private MachineOpsProcessor machineOpsProcessor;
+    private AppOpsProcessor appOpsProcessor;
+
+    public MessageListener(DagentOpsProcessor dagentOpsProcessor,
+                           MachineOpsProcessor machineOpsProcessor,
+                           AppOpsProcessor appOpsProcessor) {
+        this.dagentOpsProcessor = dagentOpsProcessor;
+        this.machineOpsProcessor = machineOpsProcessor;
+        this.appOpsProcessor = appOpsProcessor;
+    }
+
+    @Override
+    public void messageArrived(String topic, MqttMessage message) {
+        try {
+            if (message.isDuplicate()) {
+                log.info("重新发送的消息...");
+            }
+
+            String msg = message.toString();
+            Message message1 = JsonConverter.jsonToObject(msg, Message.class);
+            String msgType = message1.getMsgType();
+            switch (MsgType.valueOf(msgType)) {
+                case pub:
+                    processPubMsg(message1.getPubMsg());
+                    break;
+                case rpc:
+                    processRpcMsg(message1.getRpcMsg());
+                    break;
+                default:
+            }
+        } catch (Exception e) {
+            log.error("MQTT message exception -> {}", e.getMessage());
+        }
+    }
+
+    private void processPubMsg(PubMsg pubMsg) {
+        String clazz = pubMsg.getClazz();
+        String method = pubMsg.getMethod();
+        String jsonPayload = pubMsg.getJsonPayload();
+        if (clazz.equals(NodeClazz.class.getSimpleName())) {
+            switch (NodeClazz.valueOf(method)) {
+                case start:
+                    break;
+                case heartbeat:
+                    break;
+                case shutdown:
+                    break;
+                case log:
+                    break;
+            }
+        }
+    }
+
+    private void processRpcMsg(RpcMsg rpcMsg) {
+        String msgId = rpcMsg.getMsgId();
+        RpcPayload rpcPayload = rpcMsg.getRpcPayload();
+        RpcResult<?> rpcResult = rpcMsg.getRpcResult();
+
+    }
+
+    @Override
+    public void deliveryComplete(IMqttDeliveryToken token) {
+        log.info("token -> {}", token);
+    }
+
+    @Override
+    public void connectionLost(Throwable cause) {
+        log.error("MQTT connection lost -> {}", cause.getMessage());
+    }
+}

+ 0 - 68
dmaster/src/main/java/cn/reghao/autodop/dmaster/mqttsub/RpcListener.java

@@ -1,68 +0,0 @@
-package cn.reghao.autodop.dmaster.mqttsub;
-
-import cn.reghao.autodop.common.message.AsyncMsg;
-import cn.reghao.autodop.common.message.MessageType;
-import cn.reghao.autodop.common.node.rpc.JsonRpc;
-import cn.reghao.autodop.common.node.rpc.RpcMsg;
-import cn.reghao.autodop.common.node.rpc.RpcResult;
-import cn.reghao.autodop.dmaster.mqttsub.processor.AppOpsProcessor;
-import cn.reghao.autodop.dmaster.mqttsub.processor.DagentOpsProcessor;
-import cn.reghao.autodop.dmaster.mqttsub.processor.MachineOpsProcessor;
-import cn.reghao.jdkutil.serializer.JsonConverter;
-import lombok.extern.slf4j.Slf4j;
-import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
-import org.eclipse.paho.client.mqttv3.MqttCallback;
-import org.eclipse.paho.client.mqttv3.MqttMessage;
-import org.springframework.stereotype.Component;
-
-/**
- * 分发 dagent 发送的消息
- *
- * @author reghao
- * @date 2021-05-24 09:24:03
- */
-@Slf4j
-@Component
-public class RpcListener implements MqttCallback {
-    private DagentOpsProcessor dagentOpsProcessor;
-    private MachineOpsProcessor machineOpsProcessor;
-    private AppOpsProcessor appOpsProcessor;
-
-    public RpcListener(DagentOpsProcessor dagentOpsProcessor,
-                       MachineOpsProcessor machineOpsProcessor,
-                       AppOpsProcessor appOpsProcessor) {
-        this.dagentOpsProcessor = dagentOpsProcessor;
-        this.machineOpsProcessor = machineOpsProcessor;
-        this.appOpsProcessor = appOpsProcessor;
-    }
-
-    @Override
-    public void messageArrived(String topic, MqttMessage message) {
-        try {
-            if (message.isDuplicate()) {
-                log.info("重新发送的消息...");
-            }
-
-            String msg = message.toString();
-            RpcMsg resultMsg = JsonConverter.jsonToObject(msg, RpcMsg.class);
-
-            String msgId = resultMsg.getMsgId();
-            JsonRpc jsonRpc = resultMsg.getJsonRpc();
-            RpcResult<?> rpcResult = resultMsg.getRpcResult();
-
-
-        } catch (Exception e) {
-            log.error("MQTT message exception -> {}", e.getMessage());
-        }
-    }
-
-    @Override
-    public void deliveryComplete(IMqttDeliveryToken token) {
-        log.info("token -> {}", token);
-    }
-
-    @Override
-    public void connectionLost(Throwable cause) {
-        log.error("MQTT connection lost -> {}", cause.getMessage());
-    }
-}

+ 2 - 2
dmaster/src/main/java/cn/reghao/autodop/dmaster/mqttsub/processor/AppOpsProcessor.java

@@ -1,7 +1,7 @@
 package cn.reghao.autodop.dmaster.mqttsub.processor;
 
-import cn.reghao.autodop.common.dagent.app.AppStatus;
-import cn.reghao.autodop.common.dagent.app.DeployResult;
+import cn.reghao.autodop.common.msg.rpc.dto.app.AppStatus;
+import cn.reghao.autodop.common.msg.rpc.dto.app.DeployResult;
 import cn.reghao.autodop.common.message.CallResult;
 import cn.reghao.autodop.common.message.ops.AppOps;
 import cn.reghao.autodop.common.mqtt.DefaultMqttClient;

+ 1 - 1
dmaster/src/main/java/cn/reghao/autodop/dmaster/notification/service/NotifyService.java

@@ -3,7 +3,7 @@ package cn.reghao.autodop.dmaster.notification.service;
 import cn.reghao.autodop.common.http.DefaultWebRequest;
 import cn.reghao.autodop.common.http.WebRequest;
 import cn.reghao.autodop.dmaster.notification.model.po.*;
-import cn.reghao.autodop.dmaster.spring.thread.ThreadPoolWrapper;
+import cn.reghao.autodop.common.util.thread.ThreadPoolWrapper;
 import cn.reghao.autodop.dmaster.notification.db.repository.DingAccountRepository;
 import cn.reghao.autodop.dmaster.notification.db.repository.EmailAccountRepository;
 import cn.reghao.autodop.dmaster.notification.db.repository.NotifyGroupRepository;

+ 17 - 13
dmaster/src/main/java/cn/reghao/autodop/dmaster/spring/DmasterLifecycle.java

@@ -4,11 +4,13 @@ import ch.qos.logback.classic.spi.ILoggingEvent;
 import ch.qos.logback.core.Appender;
 import cn.reghao.autodop.common.log.LoggerConfig;
 import cn.reghao.autodop.common.mqtt.DefaultMqttClient;
+import cn.reghao.autodop.common.msg.MsgQueue;
+import cn.reghao.autodop.common.msg.pub.dto.node.constant.AppId;
 import cn.reghao.autodop.common.util.MachineId;
 import cn.reghao.autodop.dmaster.app.db.crud.config.build.BuildDirCrudService;
 import cn.reghao.autodop.dmaster.app.model.po.config.build.BuildDir;
 import cn.reghao.autodop.dmaster.app.model.po.config.build.BuildDirLocal;
-import cn.reghao.autodop.dmaster.mqttsub.DagentMsgDispatcher;
+import cn.reghao.autodop.dmaster.mqttsub.MessageListener;
 import cn.reghao.autodop.dmaster.sys.db.AppRuntimeLogCrudService;
 import cn.reghao.autodop.dmaster.util.log.Appenders;
 import lombok.extern.slf4j.Slf4j;
@@ -32,18 +34,21 @@ public class DmasterLifecycle implements ApplicationRunner, DisposableBean {
     public static String MACHINE_ID;
     public static String MACHINE_IPV4;
 
-    private DagentMsgDispatcher dagentMsgDispatcher;
-    private DefaultMqttClient mqttClient;
-    private BuildDirCrudService buildDirCrudService;
+    private final AppId appId;
+    private final String topic;
+    private final MessageListener messageListener;
+    private final DefaultMqttClient mqttClient;
+    private final BuildDirCrudService buildDirCrudService;
 
-    public DmasterLifecycle(MachineId machineId,
-                            AppRuntimeLogCrudService logCrudService,
-                            DagentMsgDispatcher dagentMsgDispatcher,
+    public DmasterLifecycle(AppRuntimeLogCrudService logCrudService,
+                            MessageListener messageListener,
                             BuildDirCrudService buildDirCrudService,
                             DefaultMqttClient mqttClient) throws IOException {
-        MACHINE_ID = machineId.id();
-        MACHINE_IPV4 = machineId.ipv4();
-        this.dagentMsgDispatcher = dagentMsgDispatcher;
+        MACHINE_ID = MachineId.id();
+        MACHINE_IPV4 = MachineId.ipv4();
+        this.appId = AppId.dmaster;
+        this.topic = MsgQueue.dmasterTopic();
+        this.messageListener = messageListener;
         this.mqttClient = mqttClient;
         this.buildDirCrudService = buildDirCrudService;
         initLogger(logCrudService);
@@ -51,7 +56,7 @@ public class DmasterLifecycle implements ApplicationRunner, DisposableBean {
 
     private void initLogger(AppRuntimeLogCrudService logCrudService) {
         List<Appender<ILoggingEvent>> appenders = new ArrayList<>();
-        appenders.add(Appenders.mongoAppender("dmaster", logCrudService));
+        appenders.add(Appenders.mongoAppender(appId.name(), logCrudService));
         LoggerConfig.initLogger(appenders);
     }
 
@@ -68,8 +73,7 @@ public class DmasterLifecycle implements ApplicationRunner, DisposableBean {
     }
 
     private void subTopic() throws MqttException {
-        String topic = "dmaster";
-        mqttClient.sub(topic, dagentMsgDispatcher);
+        mqttClient.sub(topic, messageListener);
     }
 
     private void initBuildDir() {

+ 0 - 30
dmaster/src/main/java/cn/reghao/autodop/dmaster/spring/thread/ThreadPoolWrapper.java

@@ -1,30 +0,0 @@
-package cn.reghao.autodop.dmaster.spring.thread;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-import java.util.concurrent.*;
-
-/**
- * 线程池包装类
- *
- * @author reghao
- * @date 2019-11-12 16:39:45
- */
-public class ThreadPoolWrapper {
-    /**
-     * 获取一个线程池
-     *
-     * @param name 线程池名字
-     * @return 线程池
-     * @date 2019-11-12 下午4:42
-     */
-    public static ExecutorService threadPool(String name) {
-        ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat(name + "-pool-%d").build();
-
-        return new ThreadPoolExecutor(16, 32, 200L,
-                TimeUnit.MILLISECONDS,
-                new LinkedBlockingQueue<>(32),
-                namedThreadFactory,
-                new ThreadPoolExecutor.AbortPolicy());
-    }
-}