Просмотр исходного кода

dmaster 通过 RabbitMQ 对 dagent 进行 RPC 调用

reghao 5 лет назад
Родитель
Сommit
f32b1de7e4

+ 2 - 2
common/src/main/java/cn/reghao/autodop/common/mq/RabbitConsumerConfig.java

@@ -13,7 +13,7 @@ import org.springframework.context.annotation.Configuration;
 @Configuration
 public class RabbitConsumerConfig {
     @Bean
-    public Queue dagentQueue() {
+    public Queue dmasterQueue() {
         String queueName = "dmaster";
         return new Queue(queueName, true);
     }
@@ -23,7 +23,7 @@ public class RabbitConsumerConfig {
         SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
         container.setConnectionFactory(connectionFactory);
         container.setMessageListener(new RabbitConsumer());
-        container.addQueues(dagentQueue());
+        container.addQueues(dmasterQueue());
         return container;
     }
 }

+ 17 - 1
common/src/main/java/cn/reghao/autodop/common/mq/RabbitProducer.java

@@ -4,9 +4,15 @@ import cn.reghao.autodop.common.mq.protocol.MQMessage;
 import cn.reghao.autodop.common.result.RpcResult;
 import cn.reghao.autodop.common.utils.data.serializer.JsonConverter;
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.AmqpException;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.core.MessagePostProcessor;
+import org.springframework.amqp.core.MessageProperties;
 import org.springframework.amqp.rabbit.core.RabbitTemplate;
 import org.springframework.stereotype.Component;
 
+import java.util.UUID;
+
 /**
  * @author reghao
  * @date 2020-12-30 10:21:23
@@ -31,12 +37,22 @@ public class RabbitProducer {
         rabbitTemplate.setReplyTimeout(10_000);
         // TODO 处理未成功发送到 mq 时的情况
         // TODO 处理 RPC 服务端未启动时的情况
+        /*String result = (String) rabbitTemplate.convertSendAndReceive(routeKey, (Object) msg, new MessagePostProcessor() {
+            @Override
+            public Message postProcessMessage(Message message) throws AmqpException {
+                MessageProperties props = message.getMessageProperties();
+                props.setCorrelationId(correlationId);
+                return message;
+            }
+        });*/
         String result = (String) rabbitTemplate.convertSendAndReceive(routeKey, msg);
         if (result == null) {
             log.info("RPC 调用超时...");
             return null;
         } else {
-            return (RpcResult) JsonConverter.jsonToObject(result, RpcResult.class);
+            log.info("RPC 调用成功 -> {}", result);
+            return null;
+            //return (RpcResult) JsonConverter.jsonToObject(result, RpcResult.class);
         }
     }
 }

+ 2 - 3
dagent/src/main/java/cn/reghao/autodop/dagent/utils/mq/RabbitConfig.java

@@ -24,13 +24,12 @@ public class RabbitConfig {
 
     @Bean
     public Queue dagentQueue() throws SocketException {
-        String address = NetworkUtil.hostAddr().get(0).getIpv4();
-        String queueName = "dagent.rpc@" + address;
+        String queueName = "dagent";
         return new Queue(queueName, true);
     }
 
     @Bean
-    public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) throws SocketException {
+    public SimpleMessageListenerContainer messageListenerContainers(ConnectionFactory connectionFactory) throws SocketException {
         SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
         container.setConnectionFactory(connectionFactory);
         container.setMessageListener(new RemoteCallDispatcher(rabbitTemplate));

+ 7 - 2
dagent/src/main/java/cn/reghao/autodop/dagent/utils/mq/RemoteCallDispatcher.java

@@ -33,8 +33,11 @@ public class RemoteCallDispatcher implements ChannelAwareMessageListener {
     // TODO dagent 启动时如何处理 mq 中积压的消息?
     @Override
     public void onMessage(Message msg, Channel channel) {
+        log.info("RPC 调用...");
         MessageProperties messageProperties = msg.getMessageProperties();
-        String body = new String(msg.getBody());
+        sendResult(messageProperties, RpcResult.success());
+
+        /*String body = new String(msg.getBody());
         JsonObject jsonObject = new JsonParser().parse(body).getAsJsonObject();
         long sendTime = jsonObject.get("sendTime").getAsLong();
         long startTime = ManagementFactory.getRuntimeMXBean().getStartTime();
@@ -62,13 +65,15 @@ public class RemoteCallDispatcher implements ChannelAwareMessageListener {
             sendResult(messageProperties, RpcResult.success());
         } catch (Exception e) {
             sendResult(messageProperties, RpcResult.fail(e.getMessage()));
-        }
+        }*/
         //channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
     }
 
     private void sendResult(MessageProperties messageProperties, String result) {
         String replyTo = messageProperties.getReplyTo();
         String correlationId = messageProperties.getCorrelationId();
+        log.info("replyTo -> {}", replyTo);
+        log.info("correlationId -> {}", correlationId);
         rabbitTemplate.convertAndSend(replyTo, result, message -> {
             message.getMessageProperties().setCorrelationId(correlationId);
             return message;

+ 8 - 8
dagent/src/main/resources/application-dev.yml

@@ -4,13 +4,13 @@ dmaster:
   api: /api/node/start
 spring:
   rabbitmq:
-    host: mq.srv.iquizoo.com
+#    host: mq.srv.iquizoo.com
+#    port: 5672
+#    username: iquizoo-mq1
+#    password: z2pT1PXR
+#    virtual-host: /
+    host: localhost
     port: 5672
-    username: iquizoo-mq1
-    password: z2pT1PXR
     virtual-host: /
-#    host: localhost
-#    port: 5672
-#    username: reghao
-#    password: 12345678
-#    virtual-host: /
+    username: dev
+    password: Dev@12345678