reghao преди 5 години
родител
ревизия
596c9f0e1b

+ 5 - 3
common/src/main/java/cn/reghao/autodop/common/amqp/RabbitProducer.java

@@ -26,9 +26,10 @@ public class RabbitProducer {
      * @date 2021-02-23 下午3:28
      */
     public void send(String exchange, String routeKey, MqMessage mqMessage) {
-        rabbitTemplate.convertAndSend(exchange, routeKey, mqMessage);
+        String msg = JsonConverter.objectToJson(mqMessage);
+        rabbitTemplate.convertAndSend(exchange, routeKey, msg);
         // 默认的 exchange 是 amq.direct
-        //rabbitTemplate.convertAndSend(routeKey, mqMessage);
+        //rabbitTemplate.convertAndSend(routeKey, msg);
     }
 
     /**
@@ -40,7 +41,8 @@ public class RabbitProducer {
      */
     public RpcResult callRemote(String exchange, String routeKey, MqMessage mqMessage) {
         rabbitTemplate.setReplyTimeout(30_000);
-        String result = (String) rabbitTemplate.convertSendAndReceive(exchange, routeKey, mqMessage);
+        String msg = JsonConverter.objectToJson(mqMessage);
+        String result = (String) rabbitTemplate.convertSendAndReceive(exchange, routeKey, msg);
         if (result == null) {
             return RpcResult.fail("RPC timeout...");
         } else {

+ 14 - 0
dmaster/src/main/java/cn/reghao/autodop/dmaster/utils/amqp/DmasterConsumerConfig.java

@@ -1,6 +1,9 @@
 package cn.reghao.autodop.dmaster.utils.amqp;
 
 import cn.reghao.autodop.common.dagent.machine.Machine;
+import org.springframework.amqp.core.Binding;
+import org.springframework.amqp.core.BindingBuilder;
+import org.springframework.amqp.core.DirectExchange;
 import org.springframework.amqp.core.Queue;
 import org.springframework.amqp.rabbit.connection.ConnectionFactory;
 import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
@@ -21,6 +24,11 @@ public class DmasterConsumerConfig {
         this.dmasterConsumer = dmasterConsumer;
     }
 
+    @Bean
+    DirectExchange directExchange(){
+        return new DirectExchange("amq.direct", true, false);
+    }
+
     @Bean
     public Queue dmasterQueue() {
         String queueName = "dmaster";
@@ -28,6 +36,12 @@ public class DmasterConsumerConfig {
         return new Queue(queueName, true);
     }
 
+    @Bean
+    Binding bindingQueueToExchange(Queue queue, DirectExchange directExchange) {
+        String routeKey = Machine.machineId();
+        return BindingBuilder.bind(queue).to(directExchange).with("dmaster");
+    }
+
     @Bean
     public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
         SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();