Selaa lähdekoodia

微调 dagent

reghao 5 vuotta sitten
vanhempi
commit
193b6abb53

+ 1 - 0
dagent/src/main/java/cn/reghao/autodop/dagent/utils/amqp/AppMessageDispatcher.java

@@ -20,6 +20,7 @@ public class AppMessageDispatcher {
     }
 
     public void dispatch(String ops, String payload) {
+        log.info("应用非 RPC 消息分发...");
     }
     
     public RpcResult rpcDispatch(String ops, String payload) {

+ 1 - 0
dagent/src/main/java/cn/reghao/autodop/dagent/utils/amqp/MachineMessageDispatcher.java

@@ -20,6 +20,7 @@ public class MachineMessageDispatcher {
     }
 
     public void dispatch(String ops, String payload) {
+        log.info("机器非 RPC 消息分发...");
     }
 
     public RpcResult rpcDispatch(String ops, String payload) {

+ 20 - 2
dagent/src/main/java/cn/reghao/autodop/dagent/utils/amqp/MessageDispatcher.java

@@ -1,9 +1,27 @@
 package cn.reghao.autodop.dagent.utils.amqp;
 
+import cn.reghao.autodop.common.amqp.MessageType;
+import cn.reghao.autodop.common.amqp.RpcResult;
+import cn.reghao.autodop.common.dagent.machine.Machine;
+import cn.reghao.autodop.dagent.app.App;
+import org.springframework.stereotype.Component;
+
 /**
  * @author reghao
  * @date 2020-12-30 10:26:47
  */
-public interface MessageDispatcher {
-    void dispatch(String ops, String payload);
+public class MessageDispatcher {
+    private Machine machine;
+    private App app;
+
+    public MessageDispatcher(Machine machine, App app) {
+        this.machine = machine;
+        this.app = app;
+    }
+
+    void dispatch(String type, String ops, String payload) {
+    }
+
+    void rpcDispatch(MessageType type, String ops, String payload) {
+    }
 }

+ 5 - 1
dagent/src/main/java/cn/reghao/autodop/dagent/utils/amqp/RabbitConsumer.java

@@ -50,6 +50,7 @@ public class RabbitConsumer implements ChannelAwareMessageListener {
             String ops = mqMessage.getOps();
             String payload = mqMessage.getPayload();
 
+            // TODO 使用 machineId 作为 routeKey 来确定 dagent 节点
             if (!machineId.equals(Machine.machineId()) || sendTime < startTime) {
                 log.info("忽略不是发送到本机或 dagent 启动前发送到 MQ 中的所有消息...");
                 return;
@@ -63,6 +64,7 @@ public class RabbitConsumer implements ChannelAwareMessageListener {
                         rpcReply(messageProperties, rpcResult);
                     } else {
                         machineMessageDispatcher.dispatch(ops, payload);
+                        channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
                     }
                     break;
                 case appType:
@@ -71,11 +73,11 @@ public class RabbitConsumer implements ChannelAwareMessageListener {
                         rpcReply(messageProperties, rpcResult);
                     } else {
                         appMessageDispatcher.dispatch(ops, payload);
+                        channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
                     }
                     break;
                 default:
             }
-            channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
         } catch (Exception e) {
             log.error(e.getMessage());
         }
@@ -83,12 +85,14 @@ public class RabbitConsumer implements ChannelAwareMessageListener {
 
     /**
      * RPC 调用响应
+     * RPC 调用不需要返回 ACK
      *
      * @param
      * @return
      * @date 2021-02-22 下午5:43
      */
     private void rpcReply(MessageProperties messageProperties, RpcResult result) {
+        log.info("RPC 调用完成,返回结果...");
         String replyTo = messageProperties.getReplyTo();
         String correlationId = messageProperties.getCorrelationId();
         rabbitTemplate.convertAndSend(replyTo, JsonConverter.objectToJson(result), message -> {