Explorar el Código

对应 dagent 的修改

reghao hace 5 años
padre
commit
c370177ab7

+ 2 - 17
common/src/main/java/cn/reghao/autodop/common/mq/RabbitProducer.java

@@ -4,15 +4,9 @@ import cn.reghao.autodop.common.mq.protocol.MQMessage;
 import cn.reghao.autodop.common.result.RpcResult;
 import cn.reghao.autodop.common.utils.data.serializer.JsonConverter;
 import lombok.extern.slf4j.Slf4j;
-import org.springframework.amqp.AmqpException;
-import org.springframework.amqp.core.Message;
-import org.springframework.amqp.core.MessagePostProcessor;
-import org.springframework.amqp.core.MessageProperties;
 import org.springframework.amqp.rabbit.core.RabbitTemplate;
 import org.springframework.stereotype.Component;
 
-import java.util.UUID;
-
 /**
  * @author reghao
  * @date 2020-12-30 10:21:23
@@ -37,20 +31,11 @@ public class RabbitProducer {
         rabbitTemplate.setReplyTimeout(10_000);
         // TODO 处理未成功发送到 mq 时的情况
         // TODO 处理 RPC 服务端未启动时的情况
-        /*String result = (String) rabbitTemplate.convertSendAndReceive(routeKey, (Object) msg, new MessagePostProcessor() {
-            @Override
-            public Message postProcessMessage(Message message) throws AmqpException {
-                MessageProperties props = message.getMessageProperties();
-                props.setCorrelationId(correlationId);
-                return message;
-            }
-        });*/
         String result = (String) rabbitTemplate.convertSendAndReceive(routeKey, msg);
         if (result == null) {
-            log.info("RPC 调用超时...");
-            return null;
+            RpcResult rpcResult = new RpcResult(1, "RPC 调用超时...");
+            return rpcResult;
         } else {
-            log.info("RPC 调用成功 -> {}", result);
             return (RpcResult) JsonConverter.jsonToObject(result, RpcResult.class);
         }
     }

+ 1 - 1
common/src/main/java/cn/reghao/autodop/common/mq/protocol/machine/MachineOps.java

@@ -5,5 +5,5 @@ package cn.reghao.autodop.common.mq.protocol.machine;
  * @date 2020-12-25 19:15:00
  */
 public enum MachineOps {
-    machineRegistry, machineHeartbeat, machineState, machineShell
+    machineRegistry, machineHeartbeat, machineLog, machineState, machineShell
 }

+ 3 - 0
common/src/main/java/cn/reghao/autodop/common/mq/protocol/machine/MachineRegistry.java

@@ -2,6 +2,8 @@ package cn.reghao.autodop.common.mq.protocol.machine;
 
 import lombok.Data;
 
+import java.util.List;
+
 /**
  * @author reghao
  * @date 2020-12-25 17:53:15
@@ -11,4 +13,5 @@ public class MachineRegistry {
     private String bootTime;
     private String osType;
     private String osArch;
+    private List<String> ipAddr;
 }

+ 5 - 3
common/src/main/java/cn/reghao/autodop/common/mq/MachineMessageDispatcher.java → dmaster/src/main/java/cn/reghao/autodop/dmaster/cluster/MachineMessageDispatcher.java

@@ -1,11 +1,10 @@
-package cn.reghao.autodop.common.mq;
+package cn.reghao.autodop.dmaster.cluster;
 
 import cn.reghao.autodop.common.mq.protocol.machine.MachineHeartbeat;
 import cn.reghao.autodop.common.mq.protocol.machine.MachineOps;
 import cn.reghao.autodop.common.mq.protocol.machine.MachineRegistry;
 import cn.reghao.autodop.common.utils.data.serializer.JsonConverter;
 import lombok.extern.slf4j.Slf4j;
-import org.springframework.stereotype.Component;
 
 /**
  * @author reghao
@@ -24,8 +23,11 @@ public class MachineMessageDispatcher implements MessageDispatcher {
                 log.info("机器心跳消息...");
                 MachineHeartbeat machineHeartbeat = (MachineHeartbeat) JsonConverter.jsonToObject(msg, MachineHeartbeat.class);
                 break;
+            case machineLog:
+                log.info("dagent 日志消息...");
+                break;
             default:
-                log.info("msg from machine...");
+                log.info("来自机器的消息...");
         }
     }
 }

+ 1 - 1
common/src/main/java/cn/reghao/autodop/common/mq/MessageDispatcher.java → dmaster/src/main/java/cn/reghao/autodop/dmaster/cluster/MessageDispatcher.java

@@ -1,4 +1,4 @@
-package cn.reghao.autodop.common.mq;
+package cn.reghao.autodop.dmaster.cluster;
 
 /**
  * @author reghao

+ 4 - 9
common/src/main/java/cn/reghao/autodop/common/mq/RabbitConsumer.java → dmaster/src/main/java/cn/reghao/autodop/dmaster/cluster/RabbitConsumer.java

@@ -1,9 +1,7 @@
-package cn.reghao.autodop.common.mq;
+package cn.reghao.autodop.dmaster.cluster;
 
 import cn.reghao.autodop.common.mq.protocol.MQMessage;
 import cn.reghao.autodop.common.mq.protocol.machine.MachineMessage;
-import cn.reghao.autodop.common.mq.protocol.machine.MachineOps;
-import cn.reghao.autodop.common.mq.protocol.machine.MachineState;
 import cn.reghao.autodop.common.mq.protocol.MessageType;
 import cn.reghao.autodop.common.utils.data.serializer.JsonConverter;
 import lombok.extern.slf4j.Slf4j;
@@ -18,10 +16,10 @@ import org.springframework.amqp.core.MessageListener;
  */
 @Slf4j
 public class RabbitConsumer implements MessageListener {
-    private MessageDispatcher messageDispatcher;
+    private MessageDispatcher machineMessageDispatcher;
 
     public RabbitConsumer() {
-        this.messageDispatcher = new MachineMessageDispatcher();
+        this.machineMessageDispatcher = new MachineMessageDispatcher();
     }
 
     @Override
@@ -36,7 +34,7 @@ public class RabbitConsumer implements MessageListener {
                             (MachineMessage) JsonConverter.jsonToObject(mqMessage.getPayload(), MachineMessage.class);
                     String machineOps = machineMessage.getMachineOps();
                     String message = machineMessage.getMessage();
-                    messageDispatcher.dispatch(machineOps, message);
+                    machineMessageDispatcher.dispatch(machineOps, message);
                     break;
                 case app:
                     log.info("msg from app...");
@@ -47,7 +45,4 @@ public class RabbitConsumer implements MessageListener {
             log.error(e.getMessage());
         }
     }
-
-    void receive(String queue) {
-    }
 }

+ 1 - 1
common/src/main/java/cn/reghao/autodop/common/mq/RabbitConsumerConfig.java → dmaster/src/main/java/cn/reghao/autodop/dmaster/cluster/RabbitConsumerConfig.java

@@ -1,4 +1,4 @@
-package cn.reghao.autodop.common.mq;
+package cn.reghao.autodop.dmaster.cluster;
 
 import org.springframework.amqp.core.Queue;
 import org.springframework.amqp.rabbit.connection.ConnectionFactory;

+ 1 - 1
dmaster/src/test/java/cn/reghao/autodop/common/mq/RabbitProducerTest.java

@@ -49,6 +49,6 @@ class RabbitProducerTest {
         mqMessage.setRpc(true);
 
         RpcResult rpcResult = rabbitProducer.callRemote(queueName, mqMessage);
-        System.out.println();
+        System.out.println(rpcResult.getMsg());
     }
 }