Преглед на файлове

一个 dagent 对应一个 queue

reghao преди 5 години
родител
ревизия
6957a59b6f
променени са 17 файла, в които са добавени 172 реда и са изтрити 113 реда
  1. 16 3
      common/src/main/java/cn/reghao/autodop/common/amqp/MqMessage.java
  2. 20 8
      common/src/main/java/cn/reghao/autodop/common/amqp/RabbitProducer.java
  3. 22 0
      common/src/main/java/cn/reghao/autodop/common/amqp/RabbitProperties.java
  4. 5 16
      dagent/src/main/java/cn/reghao/autodop/dagent/machine/timer/HeartbeatJob.java
  5. 1 2
      dagent/src/main/java/cn/reghao/autodop/dagent/utils/amqp/AppDispatcher.java
  6. 5 5
      dagent/src/main/java/cn/reghao/autodop/dagent/utils/amqp/DagentConsumer.java
  7. 52 0
      dagent/src/main/java/cn/reghao/autodop/dagent/utils/amqp/DagentConsumerConfig.java
  8. 11 1
      dagent/src/main/java/cn/reghao/autodop/dagent/utils/amqp/MachineDispatcher.java
  9. 0 13
      dagent/src/main/java/cn/reghao/autodop/dagent/utils/amqp/RabbitConfig.java
  10. 0 35
      dagent/src/main/java/cn/reghao/autodop/dagent/utils/amqp/RabbitConsumerConfig.java
  11. 3 3
      dagent/src/main/java/cn/reghao/autodop/dagent/utils/lifecycle/AfterAppStart.java
  12. 7 7
      dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/deploy/AppDeployer.java
  13. 5 5
      dmaster/src/main/java/cn/reghao/autodop/dmaster/utils/amqp/DmasterConsumer.java
  14. 9 5
      dmaster/src/main/java/cn/reghao/autodop/dmaster/utils/amqp/DmasterConsumerConfig.java
  15. 3 3
      dmaster/src/main/java/cn/reghao/autodop/dmaster/utils/amqp/MachineDispatcher.java
  16. 5 1
      dmaster/src/main/resources/application.yml
  17. 8 6
      dmaster/src/test/java/cn/reghao/autodop/common/amqp/RabbitProducerTest.java

+ 16 - 3
common/src/main/java/cn/reghao/autodop/common/amqp/MQMessage.java → common/src/main/java/cn/reghao/autodop/common/amqp/MqMessage.java

@@ -8,16 +8,29 @@ import lombok.Data;
  * @date 2020-12-25 17:49:26
  */
 @Data
-public class MQMessage {
+public class MqMessage {
     private String machineId;
     private long sendTime;
     private boolean isRpc;
+    // @MessageType
     private String type;
+    // @MachineOps 或 @AppOps
     private String ops;
     private String payload;
 
-    public static MQMessage mqMessage(String type, String ops, boolean isRpc, String payload) {
-        MQMessage mqMessage = new MQMessage();
+    public static MqMessage mqMessage(String machineId, String type, String ops, boolean isRpc, String payload) {
+        MqMessage mqMessage = new MqMessage();
+        mqMessage.setMachineId(machineId);
+        mqMessage.setSendTime(System.currentTimeMillis());
+        mqMessage.setRpc(isRpc);
+        mqMessage.setType(type);
+        mqMessage.setOps(ops);
+        mqMessage.setPayload(payload);
+        return mqMessage;
+    }
+
+    public static MqMessage mqMessage(String type, String ops, boolean isRpc, String payload) {
+        MqMessage mqMessage = new MqMessage();
         mqMessage.setMachineId(Machine.machineId());
         mqMessage.setSendTime(System.currentTimeMillis());
         mqMessage.setRpc(isRpc);

+ 20 - 8
common/src/main/java/cn/reghao/autodop/common/amqp/RabbitProducer.java

@@ -6,6 +6,8 @@ import org.springframework.amqp.rabbit.core.RabbitTemplate;
 import org.springframework.stereotype.Component;
 
 /**
+ * MQ 消息生产者
+ *
  * @author reghao
  * @date 2020-12-30 10:21:23
  */
@@ -18,17 +20,27 @@ public class RabbitProducer {
         this.rabbitTemplate = rabbitTemplate;
     }
 
-    public void send(String routeKey, MQMessage mqMessage) {
-        String msg = JsonConverter.objectToJson(mqMessage);
-        rabbitTemplate.convertAndSend(routeKey, msg);
+    /**
+     * @param
+     * @return
+     * @date 2021-02-23 下午3:28
+     */
+    public void send(String exchange, String routeKey, MqMessage mqMessage) {
+        rabbitTemplate.convertAndSend(exchange, routeKey, mqMessage);
+        // 默认的 exchange 是 amq.direct
+        //rabbitTemplate.convertAndSend(routeKey, mqMessage);
     }
 
-    public RpcResult callRemote(String routeKey, MQMessage mqMessage) {
-        String msg = JsonConverter.objectToJson(mqMessage);
-        // TODO RPC 调用等待超时,若 RPC 服务端业务处理的时间超过设定的超时,则会引发不可知的错误
+    /**
+     * RPC 调用
+     *
+     * @param
+     * @return
+     * @date 2021-02-23 下午3:57
+     */
+    public RpcResult callRemote(String exchange, String routeKey, MqMessage mqMessage) {
         rabbitTemplate.setReplyTimeout(30_000);
-        // TODO 处理未成功发送到 mq 时的情况
-        String result = (String) rabbitTemplate.convertSendAndReceive(routeKey, msg);
+        String result = (String) rabbitTemplate.convertSendAndReceive(exchange, routeKey, mqMessage);
         if (result == null) {
             return RpcResult.fail("RPC timeout...");
         } else {

+ 22 - 0
common/src/main/java/cn/reghao/autodop/common/amqp/RabbitProperties.java

@@ -0,0 +1,22 @@
+package cn.reghao.autodop.common.amqp;
+
+import lombok.Data;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author reghao
+ * @date 2020-09-04 10:57:56
+ */
+/*@Data
+@Component
+@ConfigurationProperties(prefix = "rabbit")*/
+public class RabbitProperties {
+    /*@Value("${rabbit.exchange}")
+    private String exchange;
+    @Value("${rabbit.routeKey}")
+    private String routeKey;
+    @Value("${rabbit.queue}")
+    private String queue;*/
+}

+ 5 - 16
dagent/src/main/java/cn/reghao/autodop/dagent/machine/timer/HeartbeatJob.java

@@ -1,6 +1,6 @@
 package cn.reghao.autodop.dagent.machine.timer;
 
-import cn.reghao.autodop.common.amqp.MQMessage;
+import cn.reghao.autodop.common.amqp.MqMessage;
 import cn.reghao.autodop.common.amqp.MessageType;
 import cn.reghao.autodop.common.amqp.RabbitProducer;
 import cn.reghao.autodop.common.dagent.machine.Machine;
@@ -8,26 +8,15 @@ import cn.reghao.autodop.common.dagent.machine.api.MachineOps;
 import cn.reghao.autodop.common.utils.serializer.JsonConverter;
 import lombok.extern.slf4j.Slf4j;
 import org.quartz.*;
-import org.springframework.stereotype.Component;
 
 /**
+ * 机器心跳任务
+ *
  * @author reghao
  * @date 2021-02-22 21:30:17
  */
 @Slf4j
-@Component
 public class HeartbeatJob implements Job {
-    private RabbitProducer producer;
-    private Machine machine;
-
-    public HeartbeatJob() {
-    }
-
-    public HeartbeatJob(RabbitProducer producer, Machine machine) {
-        this.producer = producer;
-        this.machine = machine;
-    }
-
     @Override
     public void execute(JobExecutionContext context) throws JobExecutionException {
         JobKey jobKey = context.getJobDetail().getKey();
@@ -36,9 +25,9 @@ public class HeartbeatJob implements Job {
         Machine machine = (Machine) jobDataMap.get("machine");
 
         String payload = JsonConverter.objectToJson(machine.heartbeat());
-        MQMessage mqMessage = MQMessage.mqMessage(MessageType.machineType.name(),
+        MqMessage mqMessage = MqMessage.mqMessage(MessageType.machineType.name(),
                 MachineOps.machineHeartbeatOps.name(), false, payload);
-        rabbitProducer.send("dmaster", mqMessage);
+        rabbitProducer.send("amq.direct", "dmaster", mqMessage);
         log.info("发送机器心跳...");
     }
 }

+ 1 - 2
dagent/src/main/java/cn/reghao/autodop/dagent/utils/amqp/AppDispatcher.java

@@ -20,9 +20,8 @@ public class AppDispatcher {
     }
 
     public void dispatch(String ops, String payload) {
-        log.info("应用非 RPC 消息分发...");
     }
-    
+
     public RpcResult rpcDispatch(String ops, String payload) {
         switch (AppOps.valueOf(ops)) {
             case appDeployOps:

+ 5 - 5
dagent/src/main/java/cn/reghao/autodop/dagent/utils/amqp/RabbitConsumer.java → dagent/src/main/java/cn/reghao/autodop/dagent/utils/amqp/DagentConsumer.java

@@ -1,6 +1,6 @@
 package cn.reghao.autodop.dagent.utils.amqp;
 
-import cn.reghao.autodop.common.amqp.MQMessage;
+import cn.reghao.autodop.common.amqp.MqMessage;
 import cn.reghao.autodop.common.amqp.MessageType;
 import cn.reghao.autodop.common.amqp.RpcResult;
 import cn.reghao.autodop.common.dagent.machine.Machine;
@@ -16,19 +16,19 @@ import org.springframework.stereotype.Component;
 import java.lang.management.ManagementFactory;
 
 /**
- * 消息分发
+ * dagent 消息分发
  *
  * @author reghao
  * @date 2020-09-04 11:00:22
  */
 @Slf4j
 @Component
-public class RabbitConsumer implements ChannelAwareMessageListener {
+public class DagentConsumer implements ChannelAwareMessageListener {
     private RabbitTemplate rabbitTemplate;
     private MachineDispatcher machineDispatcher;
     private AppDispatcher appDispatcher;
 
-    public RabbitConsumer(RabbitTemplate rabbitTemplate,
+    public DagentConsumer(RabbitTemplate rabbitTemplate,
                           MachineDispatcher machineDispatcher,
                           AppDispatcher appDispatcher) {
         this.rabbitTemplate = rabbitTemplate;
@@ -42,7 +42,7 @@ public class RabbitConsumer implements ChannelAwareMessageListener {
         long startTime = ManagementFactory.getRuntimeMXBean().getStartTime();
         String body = new String(msg.getBody());
         try {
-            MQMessage mqMessage = (MQMessage) JsonConverter.jsonToObject(body, MQMessage.class);
+            MqMessage mqMessage = (MqMessage) JsonConverter.jsonToObject(body, MqMessage.class);
             String machineId = mqMessage.getMachineId();
             long sendTime = mqMessage.getSendTime();
             boolean isRpc = mqMessage.isRpc();

+ 52 - 0
dagent/src/main/java/cn/reghao/autodop/dagent/utils/amqp/DagentConsumerConfig.java

@@ -0,0 +1,52 @@
+package cn.reghao.autodop.dagent.utils.amqp;
+
+import cn.reghao.autodop.common.dagent.machine.Machine;
+import org.springframework.amqp.core.Binding;
+import org.springframework.amqp.core.BindingBuilder;
+import org.springframework.amqp.core.DirectExchange;
+import org.springframework.amqp.core.Queue;
+import org.springframework.amqp.rabbit.connection.ConnectionFactory;
+import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * dagent 队列消费者
+ *
+ * @author reghao
+ * @date 2020-09-04 10:57:56
+ */
+@Configuration
+public class DagentConsumerConfig {
+    private DagentConsumer dagentConsumer;
+
+    public DagentConsumerConfig(DagentConsumer dagentConsumer) {
+        this.dagentConsumer = dagentConsumer;
+    }
+
+    @Bean
+    DirectExchange directExchange(){
+        return new DirectExchange("amq.direct", true, false);
+    }
+
+    @Bean
+    public Queue dagentQueue() {
+        String queueName = "dagent@" + Machine.machineId();
+        return new Queue(queueName, true);
+    }
+
+    @Bean
+    Binding bindingQueueToExchange(Queue queue, DirectExchange directExchange) {
+        String routeKey = Machine.machineId();
+        return BindingBuilder.bind(queue).to(directExchange).with(routeKey);
+    }
+
+    @Bean
+    public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
+        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
+        container.setConnectionFactory(connectionFactory);
+        container.setMessageListener(dagentConsumer);
+        container.addQueues(dagentQueue());
+        return container;
+    }
+}

+ 11 - 1
dagent/src/main/java/cn/reghao/autodop/dagent/utils/amqp/MachineDispatcher.java

@@ -20,7 +20,17 @@ public class MachineDispatcher {
     }
 
     public void dispatch(String ops, String payload) {
-        log.info("机器非 RPC 消息分发...");
+        switch (MachineOps.valueOf(ops)) {
+            case machineStateOps:
+                log.info("机器状态...");
+                machine.state();
+            case machineShellOps:
+                log.info("执行脚本...");
+                machine.shell(payload);
+            default:
+                log.error("MachineOps 中没有相应类型...");
+                RpcResult.error("MachineOps 中没有相应类型...");
+        }
     }
 
     public RpcResult rpcDispatch(String ops, String payload) {

+ 0 - 13
dagent/src/main/java/cn/reghao/autodop/dagent/utils/amqp/RabbitConfig.java

@@ -1,13 +0,0 @@
-package cn.reghao.autodop.dagent.utils.amqp;
-
-/**
- * @author reghao
- * @date 2020-09-04 10:57:56
- */
-public class RabbitConfig {
-    private String host;
-    private int port;
-    private String vhost;
-    private String username;
-    private String password;
-}

+ 0 - 35
dagent/src/main/java/cn/reghao/autodop/dagent/utils/amqp/RabbitConsumerConfig.java

@@ -1,35 +0,0 @@
-package cn.reghao.autodop.dagent.utils.amqp;
-
-import org.springframework.amqp.core.Queue;
-import org.springframework.amqp.rabbit.connection.ConnectionFactory;
-import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
-/**
- * @author reghao
- * @date 2020-09-04 10:57:56
- */
-@Configuration
-public class RabbitConsumerConfig {
-    private RabbitConsumer rabbitConsumer;
-
-    public RabbitConsumerConfig(RabbitConsumer rabbitConsumer) {
-        this.rabbitConsumer = rabbitConsumer;
-    }
-
-    @Bean
-    public Queue dagentQueue() {
-        String queueName = "dagent";
-        return new Queue(queueName, true);
-    }
-
-    @Bean
-    public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
-        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
-        container.setConnectionFactory(connectionFactory);
-        container.setMessageListener(rabbitConsumer);
-        container.addQueues(dagentQueue());
-        return container;
-    }
-}

+ 3 - 3
dagent/src/main/java/cn/reghao/autodop/dagent/utils/lifecycle/AfterAppStart.java

@@ -1,6 +1,6 @@
 package cn.reghao.autodop.dagent.utils.lifecycle;
 
-import cn.reghao.autodop.common.amqp.MQMessage;
+import cn.reghao.autodop.common.amqp.MqMessage;
 import cn.reghao.autodop.common.amqp.MessageType;
 import cn.reghao.autodop.common.amqp.RabbitProducer;
 import cn.reghao.autodop.common.dagent.machine.Machine;
@@ -49,9 +49,9 @@ public class AfterAppStart implements ApplicationRunner {
 
     private void registry() {
         String payload = JsonConverter.objectToJson(machine.registry());
-        MQMessage mqMessage = MQMessage.mqMessage(MessageType.machineType.name(),
+        MqMessage mqMessage = MqMessage.mqMessage(MessageType.machineType.name(),
                 MachineOps.machineRegistryOps.name(), false, payload);
-        rabbitProducer.send("dmaster", mqMessage);
+        rabbitProducer.send("amq.direct", "dmaster", mqMessage);
         log.info("发送机器注册信息...");
     }
 

+ 7 - 7
dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/deploy/AppDeployer.java

@@ -1,6 +1,6 @@
 package cn.reghao.autodop.dmaster.app.service.deploy;
 
-import cn.reghao.autodop.common.amqp.MQMessage;
+import cn.reghao.autodop.common.amqp.MqMessage;
 import cn.reghao.autodop.common.amqp.MessageType;
 import cn.reghao.autodop.common.amqp.RpcResult;
 import cn.reghao.autodop.common.dagent.app.api.AppOps;
@@ -33,12 +33,12 @@ import java.util.concurrent.Future;
 @Slf4j
 @Component
 public class AppDeployer {
-    private final String routeKey = "dagent";
-    private RabbitProducer sender;
+    private final String exchange = "amq.direct";
+    private RabbitProducer rabbitProducer;
     private ExecutorService threadPool = ThreadPoolWrapper.threadPool("deploy");
 
-    public AppDeployer(RabbitProducer sender) {
-        this.sender = sender;
+    public AppDeployer(RabbitProducer rabbitProducer) {
+        this.rabbitProducer = rabbitProducer;
     }
 
     /**
@@ -118,7 +118,7 @@ public class AppDeployer {
 
         @Override
         public RpcResult call() {
-            MQMessage mqMessage = new MQMessage();
+            MqMessage mqMessage = new MqMessage();
             mqMessage.setMachineId(machineId);
             mqMessage.setSendTime(System.currentTimeMillis());
             mqMessage.setRpc(true);
@@ -127,7 +127,7 @@ public class AppDeployer {
             mqMessage.setPayload(JsonConverter.objectToJson(appDeployArgs));
 
             log.info("RPC 调用部署应用...");
-            return sender.callRemote(routeKey, mqMessage);
+            return rabbitProducer.callRemote(exchange, machineId, mqMessage);
         }
     }
 }

+ 5 - 5
dmaster/src/main/java/cn/reghao/autodop/dmaster/utils/amqp/RabbitConsumer.java → dmaster/src/main/java/cn/reghao/autodop/dmaster/utils/amqp/DmasterConsumer.java

@@ -1,6 +1,6 @@
 package cn.reghao.autodop.dmaster.utils.amqp;
 
-import cn.reghao.autodop.common.amqp.MQMessage;
+import cn.reghao.autodop.common.amqp.MqMessage;
 import cn.reghao.autodop.common.amqp.MessageType;
 import cn.reghao.autodop.common.utils.serializer.JsonConverter;
 import lombok.extern.slf4j.Slf4j;
@@ -9,17 +9,17 @@ import org.springframework.amqp.core.MessageListener;
 import org.springframework.stereotype.Component;
 
 /**
- * 消息分发
+ * dmaster 消息分发
  *
  * @author reghao
  * @date 2020-09-04 11:00:22
  */
 @Slf4j
 @Component
-public class RabbitConsumer implements MessageListener {
+public class DmasterConsumer implements MessageListener {
     private MessageDispatcher machineMessageDispatcher;
 
-    public RabbitConsumer(MessageDispatcher machineMessageDispatcher) {
+    public DmasterConsumer(MessageDispatcher machineMessageDispatcher) {
         this.machineMessageDispatcher = machineMessageDispatcher;
     }
 
@@ -27,7 +27,7 @@ public class RabbitConsumer implements MessageListener {
     public void onMessage(Message msg) {
         String body = new String(msg.getBody());
         try {
-            MQMessage mqMessage = (MQMessage) JsonConverter.jsonToObject(body, MQMessage.class);
+            MqMessage mqMessage = (MqMessage) JsonConverter.jsonToObject(body, MqMessage.class);
             String machineId = mqMessage.getMachineId();
             long sendTime = mqMessage.getSendTime();
             boolean isRpc = mqMessage.isRpc();

+ 9 - 5
dmaster/src/main/java/cn/reghao/autodop/dmaster/utils/amqp/RabbitConsumerConfig.java → dmaster/src/main/java/cn/reghao/autodop/dmaster/utils/amqp/DmasterConsumerConfig.java

@@ -1,5 +1,6 @@
 package cn.reghao.autodop.dmaster.utils.amqp;
 
+import cn.reghao.autodop.common.dagent.machine.Machine;
 import org.springframework.amqp.core.Queue;
 import org.springframework.amqp.rabbit.connection.ConnectionFactory;
 import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
@@ -7,20 +8,23 @@ import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
 /**
+ * dmaster 队列消费者
+ *
  * @author reghao
  * @date 2020-09-04 10:57:56
  */
 @Configuration
-public class RabbitConsumerConfig {
-    private RabbitConsumer rabbitConsumer;
+public class DmasterConsumerConfig {
+    private DmasterConsumer dmasterConsumer;
 
-    public RabbitConsumerConfig(RabbitConsumer rabbitConsumer) {
-        this.rabbitConsumer = rabbitConsumer;
+    public DmasterConsumerConfig(DmasterConsumer dmasterConsumer) {
+        this.dmasterConsumer = dmasterConsumer;
     }
 
     @Bean
     public Queue dmasterQueue() {
         String queueName = "dmaster";
+        //String queueName = "dmaster@" + Machine.machineId();
         return new Queue(queueName, true);
     }
 
@@ -28,7 +32,7 @@ public class RabbitConsumerConfig {
     public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
         SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
         container.setConnectionFactory(connectionFactory);
-        container.setMessageListener(rabbitConsumer);
+        container.setMessageListener(dmasterConsumer);
         container.addQueues(dmasterQueue());
         return container;
     }

+ 3 - 3
dmaster/src/main/java/cn/reghao/autodop/dmaster/utils/amqp/MachineMessageDispatcher.java → dmaster/src/main/java/cn/reghao/autodop/dmaster/utils/amqp/MachineDispatcher.java

@@ -16,12 +16,12 @@ import org.springframework.stereotype.Component;
  */
 @Slf4j
 @Component
-public class MachineMessageDispatcher implements MessageDispatcher {
+public class MachineDispatcher implements MessageDispatcher {
     private MachineCrudService machineCrudService;
     private MachineLogCrudService machineLogCrudService;
 
-    public MachineMessageDispatcher(MachineCrudService machineCrudService,
-                                    MachineLogCrudService machineLogCrudService) {
+    public MachineDispatcher(MachineCrudService machineCrudService,
+                             MachineLogCrudService machineLogCrudService) {
         this.machineCrudService = machineCrudService;
         this.machineLogCrudService = machineLogCrudService;
     }

+ 5 - 1
dmaster/src/main/resources/application.yml

@@ -38,4 +38,8 @@ oss:
   key: jYk8lOKwSCorFEz7
   secret: hzSrcew08V5zk58kVVgInV7OqbHyVc
   bucket: iquizoo
-  folder: eval3.x/alpha/eval_exercises/
+  folder: eval3.x/alpha/eval_exercises/
+#rabbit:
+#  exchange: test
+#  routeKey: test
+#  queue: test

+ 8 - 6
dmaster/src/test/java/cn/reghao/autodop/common/amqp/RabbitProducerTest.java

@@ -25,8 +25,9 @@ class RabbitProducerTest {
 
     @Test
     void send() throws InterruptedException {
-        String queueName = "dagent";
-        MQMessage mqMessage = new MQMessage();
+        String exchange = "amq.direct";
+        String routeKey = "dagent";
+        MqMessage mqMessage = new MqMessage();
         mqMessage.setMachineId("5d1a727991f34d3a9c1220a1899e6ebd");
         mqMessage.setType(MessageType.machineType.name());
         mqMessage.setSendTime(System.currentTimeMillis());
@@ -35,7 +36,7 @@ class RabbitProducerTest {
 
         while (true) {
             Thread.sleep(1000);
-            rabbitProducer.send(queueName, mqMessage);
+            rabbitProducer.send(exchange, routeKey, mqMessage);
             log.info("发送消息...");
         }
     }
@@ -52,9 +53,10 @@ class RabbitProducerTest {
         appDeployArgs.setAppPath("docker.alpha.iquizoo.com/iq3x/dnkt:56d47672");
         appDeployArgs.setRunningDir("");*/
 
-        String queueName = "dagent";
+        String exchange = "amq.direct";
+        String routeKey = "dagent";
         String machineId = "5d1a727991f34d3a9c1220a1899e6ebd";
-        MQMessage mqMessage = new MQMessage();
+        MqMessage mqMessage = new MqMessage();
         mqMessage.setMachineId(machineId);
         mqMessage.setSendTime(System.currentTimeMillis());
         mqMessage.setRpc(true);
@@ -62,7 +64,7 @@ class RabbitProducerTest {
         mqMessage.setOps(MachineOps.machineShellOps.name());
         mqMessage.setPayload(JsonConverter.objectToJson(machineShellArgs));
 
-        RpcResult rpcResult = rabbitProducer.callRemote(queueName, mqMessage);
+        RpcResult rpcResult = rabbitProducer.callRemote(exchange, routeKey, mqMessage);
         MachineShell machineShell = (MachineShell) JsonConverter.jsonToObject(rpcResult.getResult(), MachineShell.class);
         System.out.println(rpcResult.getResult());
     }