reghao 5 лет назад
Родитель
Сommit
334f9cdd46

+ 1 - 1
common/src/main/java/cn/reghao/autodop/common/mq/protocol/app/AppOps.java → common/src/main/java/cn/reghao/autodop/common/dagent/app/AppOps.java

@@ -1,4 +1,4 @@
-package cn.reghao.autodop.common.mq.protocol.app;
+package cn.reghao.autodop.common.dagent.app;
 
 /**
  * @author reghao

+ 1 - 1
common/src/main/java/cn/reghao/autodop/common/mq/protocol/app/DeployArgs.java → common/src/main/java/cn/reghao/autodop/common/dagent/app/DeployArgs.java

@@ -1,4 +1,4 @@
-package cn.reghao.autodop.common.mq.protocol.app;
+package cn.reghao.autodop.common.dagent.app;
 
 import lombok.Data;
 

+ 1 - 1
common/src/main/java/cn/reghao/autodop/common/mq/protocol/app/LogArgs.java → common/src/main/java/cn/reghao/autodop/common/dagent/app/LogArgs.java

@@ -1,4 +1,4 @@
-package cn.reghao.autodop.common.mq.protocol.app;
+package cn.reghao.autodop.common.dagent.app;
 
 import lombok.Data;
 

+ 1 - 1
common/src/main/java/cn/reghao/autodop/common/mq/protocol/app/StatusArgs.java → common/src/main/java/cn/reghao/autodop/common/dagent/app/StatusArgs.java

@@ -1,4 +1,4 @@
-package cn.reghao.autodop.common.mq.protocol.app;
+package cn.reghao.autodop.common.dagent.app;
 
 import lombok.Data;
 

+ 16 - 0
common/src/main/java/cn/reghao/autodop/common/dagent/machine/MachineLog.java

@@ -0,0 +1,16 @@
+package cn.reghao.autodop.common.dagent.machine;
+
+import lombok.Data;
+
+/**
+ * @author reghao
+ * @date 2020-10-22 15:45:29
+ */
+@Data
+public class MachineLog {
+    private long timestamp;
+    private String threadName;
+    private String loggerName;
+    private String level;
+    private String msg;
+}

+ 13 - 0
common/src/main/java/cn/reghao/autodop/common/dagent/machine/MachineShell.java

@@ -0,0 +1,13 @@
+package cn.reghao.autodop.common.dagent.machine;
+
+import lombok.Data;
+
+/**
+ * @author reghao
+ * @date 2021-01-27 19:13:10
+ */
+@Data
+public class MachineShell {
+    private String type;
+    private String content;
+}

+ 30 - 0
common/src/main/java/cn/reghao/autodop/common/dagent/protocol/RPCResult.java

@@ -0,0 +1,30 @@
+package cn.reghao.autodop.common.dagent.protocol;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import static cn.reghao.autodop.common.result.ResultCode.FAIL;
+import static cn.reghao.autodop.common.result.ResultCode.SUCCESS;
+
+/**
+ * @author reghao
+ * @date 2020-09-07 23:04:16
+ */
+@Data
+@AllArgsConstructor
+public class RPCResult {
+    private int statusCode;
+    private String result;
+
+    public static RPCResult success(String data) {
+        return new RPCResult(SUCCESS.getCode(), data);
+    }
+
+    public static RPCResult fail(String data) {
+        return new RPCResult(FAIL.getCode(), data);
+    }
+
+    public static RPCResult error(String data) {
+        return new RPCResult(FAIL.getCode(), data);
+    }
+}

+ 4 - 5
common/src/main/java/cn/reghao/autodop/common/mq/RabbitProducer.java

@@ -1,7 +1,7 @@
 package cn.reghao.autodop.common.mq;
 
+import cn.reghao.autodop.common.dagent.protocol.RPCResult;
 import cn.reghao.autodop.common.mq.protocol.MQMessage;
-import cn.reghao.autodop.common.result.RpcResult;
 import cn.reghao.autodop.common.utils.data.serializer.JsonConverter;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.amqp.rabbit.core.RabbitTemplate;
@@ -25,7 +25,7 @@ public class RabbitProducer {
         rabbitTemplate.convertAndSend(routeKey, msg);
     }
 
-    public RpcResult callRemote(String routeKey, MQMessage mqMessage) {
+    public RPCResult callRemote(String routeKey, MQMessage mqMessage) {
         String msg = JsonConverter.objectToJson(mqMessage);
         // TODO RPC 调用等待超时,若 RPC 服务端业务处理的时间超过设定的超时,则会引发不可知的错误
         rabbitTemplate.setReplyTimeout(10_000);
@@ -33,10 +33,9 @@ public class RabbitProducer {
         // TODO 处理 RPC 服务端未启动时的情况
         String result = (String) rabbitTemplate.convertSendAndReceive(routeKey, msg);
         if (result == null) {
-            RpcResult rpcResult = new RpcResult(1, "RPC 调用超时...");
-            return rpcResult;
+            return RPCResult.fail("RPC 调用超时...");
         } else {
-            return (RpcResult) JsonConverter.jsonToObject(result, RpcResult.class);
+            return (RPCResult) JsonConverter.jsonToObject(result, RPCResult.class);
         }
     }
 }

+ 3 - 2
common/src/main/java/cn/reghao/autodop/common/mq/protocol/MQMessage.java

@@ -9,8 +9,9 @@ import lombok.Data;
 @Data
 public class MQMessage {
     private String machineId;
-    private String msgType;
     private long sendTime;
-    private String payload;
     private boolean isRpc;
+    private String type;
+    private String ops;
+    private String payload;
 }

+ 1 - 1
common/src/main/java/cn/reghao/autodop/common/mq/protocol/MessageType.java

@@ -5,5 +5,5 @@ package cn.reghao.autodop.common.mq.protocol;
  * @date 2020-12-25 17:49:44
  */
 public enum MessageType {
-    app, machine
+    machineType, appType
 }

+ 0 - 1
common/src/main/java/cn/reghao/autodop/common/mq/protocol/machine/MachineMessage.java

@@ -8,7 +8,6 @@ import lombok.Data;
  */
 @Data
 public class MachineMessage {
-    private String machineId;
     private String machineOps;
     private String message;
 }

+ 4 - 1
common/src/main/java/cn/reghao/autodop/common/mq/protocol/machine/MachineOps.java

@@ -5,5 +5,8 @@ package cn.reghao.autodop.common.mq.protocol.machine;
  * @date 2020-12-25 19:15:00
  */
 public enum MachineOps {
-    machineRegistry, machineHeartbeat, machineLog, machineState, machineShell
+    // 主动发送到 dmaster
+    machineRegistryOps, machineHeartbeatOps, machineLogOps,
+    // dmaster RPC 调用时返回
+    machineShellOps, machineStateOps
 }

+ 5 - 3
dmaster/src/main/java/cn/reghao/autodop/dmaster/cluster/MachineMessageDispatcher.java

@@ -1,6 +1,7 @@
 package cn.reghao.autodop.dmaster.cluster;
 
 import cn.reghao.autodop.common.dagent.machine.MachineInfo;
+import cn.reghao.autodop.common.dagent.machine.MachineLog;
 import cn.reghao.autodop.common.mq.protocol.machine.MachineOps;
 import cn.reghao.autodop.common.utils.data.serializer.JsonConverter;
 import lombok.extern.slf4j.Slf4j;
@@ -14,16 +15,17 @@ public class MachineMessageDispatcher implements MessageDispatcher {
     @Override
     public void dispatch(String msgType, String msg) {
         switch (MachineOps.valueOf(msgType)) {
-            case machineRegistry:
+            case machineRegistryOps:
                 log.info("机器注册消息...");
                 MachineInfo machineInfo = (MachineInfo) JsonConverter.jsonToObject(msg, MachineInfo.class);
                 break;
-            case machineHeartbeat:
+            case machineHeartbeatOps:
                 log.info("机器心跳消息...");
                 //MachineHeartbeat machineHeartbeat = (MachineHeartbeat) JsonConverter.jsonToObject(msg, MachineHeartbeat.class);
                 break;
-            case machineLog:
+            case machineLogOps:
                 log.info("dagent 日志消息...");
+                MachineLog machineLog = (MachineLog) JsonConverter.jsonToObject(msg, MachineLog.class);
                 break;
             default:
                 log.info("来自机器的消息...");

+ 4 - 4
dmaster/src/main/java/cn/reghao/autodop/dmaster/cluster/RabbitConsumer.java

@@ -28,18 +28,18 @@ public class RabbitConsumer implements MessageListener {
         try {
             MQMessage mqMessage = (MQMessage) JsonConverter.jsonToObject(body, MQMessage.class);
             String machineId = mqMessage.getMachineId();
-            String msgType = mqMessage.getMsgType();
+            String type = mqMessage.getType();
 
             log.info("MQMessage from {}...", machineId);
-            switch (MessageType.valueOf(msgType)) {
-                case machine:
+            switch (MessageType.valueOf(type)) {
+                case machineType:
                     MachineMessage machineMessage =
                             (MachineMessage) JsonConverter.jsonToObject(mqMessage.getPayload(), MachineMessage.class);
                     String machineOps = machineMessage.getMachineOps();
                     String message = machineMessage.getMessage();
                     machineMessageDispatcher.dispatch(machineOps, message);
                     break;
-                case app:
+                case appType:
                     log.info("msg from app...");
                     break;
                 default:

+ 17 - 7
dmaster/src/test/java/cn/reghao/autodop/common/mq/RabbitProducerTest.java

@@ -1,8 +1,12 @@
 package cn.reghao.autodop.common.mq;
 
+import cn.reghao.autodop.common.dagent.machine.MachineShell;
+import cn.reghao.autodop.common.dagent.protocol.RPCResult;
 import cn.reghao.autodop.common.mq.protocol.MQMessage;
 import cn.reghao.autodop.common.mq.protocol.MessageType;
-import cn.reghao.autodop.common.result.RpcResult;
+import cn.reghao.autodop.common.mq.protocol.machine.MachineMessage;
+import cn.reghao.autodop.common.mq.protocol.machine.MachineOps;
+import cn.reghao.autodop.common.utils.data.serializer.JsonConverter;
 import cn.reghao.autodop.dmaster.DmasterApplication;
 import lombok.extern.slf4j.Slf4j;
 import org.junit.jupiter.api.Test;
@@ -26,7 +30,7 @@ class RabbitProducerTest {
         String queueName = "dagent";
         MQMessage mqMessage = new MQMessage();
         mqMessage.setMachineId("5d1a727991f34d3a9c1220a1899e6ebd");
-        mqMessage.setMsgType(MessageType.machine.name());
+        mqMessage.setType(MessageType.machineType.name());
         mqMessage.setSendTime(System.currentTimeMillis());
         mqMessage.setPayload("test test");
         mqMessage.setRpc(false);
@@ -40,15 +44,21 @@ class RabbitProducerTest {
 
     @Test
     void callRemote() {
+        MachineShell machineShell = new MachineShell();
+        machineShell.setType("command");
+        machineShell.setContent("cat /etc/nginx/nginx.conf");
+
         String queueName = "dagent";
+        String machineId = "5d1a727991f34d3a9c1220a1899e6ebd";
         MQMessage mqMessage = new MQMessage();
-        mqMessage.setMachineId("5d1a727991f34d3a9c1220a1899e6ebd");
-        mqMessage.setMsgType(MessageType.app.name());
+        mqMessage.setMachineId(machineId);
         mqMessage.setSendTime(System.currentTimeMillis());
-        mqMessage.setPayload("test test");
         mqMessage.setRpc(true);
+        mqMessage.setType(MessageType.machineType.name());
+        mqMessage.setOps(MachineOps.machineShellOps.name());
+        mqMessage.setPayload(JsonConverter.objectToJson(machineShell));
 
-        RpcResult rpcResult = rabbitProducer.callRemote(queueName, mqMessage);
-        System.out.println(rpcResult.getMsg());
+        RPCResult rpcResult = rabbitProducer.callRemote(queueName, mqMessage);
+        System.out.println(rpcResult.getResult());
     }
 }