소스 검색

对应 dagent 的 RPC 调用开发

reghao 5 년 전
부모
커밋
97ea4e92e6

+ 11 - 4
common/src/main/java/cn/reghao/autodop/common/mq/RabbitProducer.java

@@ -3,6 +3,7 @@ package cn.reghao.autodop.common.mq;
 import cn.reghao.autodop.common.mq.protocol.MQMessage;
 import cn.reghao.autodop.common.result.RpcResult;
 import cn.reghao.autodop.common.utils.data.serializer.JsonConverter;
+import lombok.extern.slf4j.Slf4j;
 import org.springframework.amqp.rabbit.core.RabbitTemplate;
 import org.springframework.stereotype.Component;
 
@@ -10,6 +11,7 @@ import org.springframework.stereotype.Component;
  * @author reghao
  * @date 2020-12-30 10:21:23
  */
+@Slf4j
 @Component
 public class RabbitProducer {
     private RabbitTemplate rabbitTemplate;
@@ -20,16 +22,21 @@ public class RabbitProducer {
 
     void send(String routeKey, MQMessage mqMessage) {
         String msg = JsonConverter.objectToJson(mqMessage);
-        msg = "msg from spring rabbitmq...";
         rabbitTemplate.convertAndSend(routeKey, msg);
     }
 
-    public RpcResult callRemote(String routeKey, String msg) {
+    public RpcResult callRemote(String routeKey, MQMessage mqMessage) {
+        String msg = JsonConverter.objectToJson(mqMessage);
         // TODO RPC 调用等待超时,若 RPC 服务端业务处理的时间超过设定的超时,则会引发不可知的错误
-        rabbitTemplate.setReplyTimeout(60_000);
+        rabbitTemplate.setReplyTimeout(10_000);
         // TODO 处理未成功发送到 mq 时的情况
         // TODO 处理 RPC 服务端未启动时的情况
         String result = (String) rabbitTemplate.convertSendAndReceive(routeKey, msg);
-        return (RpcResult) JsonConverter.jsonToObject(result, RpcResult.class);
+        if (result == null) {
+            log.info("RPC 调用超时...");
+            return null;
+        } else {
+            return (RpcResult) JsonConverter.jsonToObject(result, RpcResult.class);
+        }
     }
 }

+ 11 - 2
dmaster/src/test/java/cn/reghao/autodop/common/mq/RabbitProducerTest.java

@@ -2,6 +2,7 @@ package cn.reghao.autodop.common.mq;
 
 import cn.reghao.autodop.common.mq.protocol.MQMessage;
 import cn.reghao.autodop.common.mq.protocol.MessageType;
+import cn.reghao.autodop.common.result.RpcResult;
 import cn.reghao.autodop.dmaster.DmasterApplication;
 import lombok.extern.slf4j.Slf4j;
 import org.junit.jupiter.api.Test;
@@ -29,15 +30,23 @@ class RabbitProducerTest {
         mqMessage.setSendTime(System.currentTimeMillis());
         mqMessage.setPayload("test test");
 
-        while (true) {
+        /*while (true) {
             Thread.sleep(100);
             rabbitProducer.send(queueName, mqMessage);
             log.info("发送消息...");
-        }
+        }*/
     }
 
     @Test
     void callRemote() {
+        String queueName = "dagent";
+        MQMessage mqMessage = new MQMessage();
+        mqMessage.setMachineId("5d1a727991f34d3a9c1220a1899e6ebd");
+        mqMessage.setMsgType(MessageType.app.name());
+        mqMessage.setSendTime(System.currentTimeMillis());
+        mqMessage.setPayload("test test");
 
+        RpcResult rpcResult = rabbitProducer.callRemote(queueName, mqMessage);
+        System.out.println();
     }
 }