|
@@ -1,33 +1,38 @@
|
|
|
package cn.reghao.autodop.dmaster.utils.amqp;
|
|
package cn.reghao.autodop.dmaster.utils.amqp;
|
|
|
|
|
|
|
|
-import cn.reghao.autodop.common.amqp.MqMessage;
|
|
|
|
|
import cn.reghao.autodop.common.amqp.MessageType;
|
|
import cn.reghao.autodop.common.amqp.MessageType;
|
|
|
|
|
+import cn.reghao.autodop.common.amqp.MqMessage;
|
|
|
import cn.reghao.autodop.common.utils.serializer.JsonConverter;
|
|
import cn.reghao.autodop.common.utils.serializer.JsonConverter;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
-import org.springframework.amqp.core.Message;
|
|
|
|
|
-import org.springframework.amqp.core.MessageListener;
|
|
|
|
|
|
|
+import org.springframework.amqp.rabbit.annotation.Exchange;
|
|
|
|
|
+import org.springframework.amqp.rabbit.annotation.Queue;
|
|
|
|
|
+import org.springframework.amqp.rabbit.annotation.QueueBinding;
|
|
|
|
|
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
|
|
|
|
+import org.springframework.messaging.handler.annotation.Payload;
|
|
|
import org.springframework.stereotype.Component;
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * dmaster 消息分发
|
|
|
|
|
|
|
+ * 消息分发
|
|
|
*
|
|
*
|
|
|
* @author reghao
|
|
* @author reghao
|
|
|
* @date 2020-09-04 11:00:22
|
|
* @date 2020-09-04 11:00:22
|
|
|
*/
|
|
*/
|
|
|
@Slf4j
|
|
@Slf4j
|
|
|
-//@Component
|
|
|
|
|
-public class DmasterConsumer implements MessageListener {
|
|
|
|
|
- private MessageDispatcher machineMessageDispatcher;
|
|
|
|
|
|
|
+@Component
|
|
|
|
|
+public class RabbitConsumer {
|
|
|
|
|
+ private MachineDispatcher machineDispatcher;
|
|
|
|
|
|
|
|
- public DmasterConsumer(MessageDispatcher machineMessageDispatcher) {
|
|
|
|
|
- this.machineMessageDispatcher = machineMessageDispatcher;
|
|
|
|
|
|
|
+ public RabbitConsumer(MachineDispatcher machineDispatcher) {
|
|
|
|
|
+ this.machineDispatcher = machineDispatcher;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- @Override
|
|
|
|
|
- public void onMessage(Message msg) {
|
|
|
|
|
- String body = new String(msg.getBody());
|
|
|
|
|
|
|
+ @RabbitListener(bindings = @QueueBinding(
|
|
|
|
|
+ value = @Queue(value = "dmaster", durable = "true"),
|
|
|
|
|
+ exchange = @Exchange(value = "amq.direct"))
|
|
|
|
|
+ )
|
|
|
|
|
+ public void dmasterQueueConsumer(@Payload String msg) {
|
|
|
try {
|
|
try {
|
|
|
- MqMessage mqMessage = (MqMessage) JsonConverter.jsonToObject(body, MqMessage.class);
|
|
|
|
|
|
|
+ MqMessage mqMessage = (MqMessage) JsonConverter.jsonToObject(msg, MqMessage.class);
|
|
|
String machineId = mqMessage.getMachineId();
|
|
String machineId = mqMessage.getMachineId();
|
|
|
long sendTime = mqMessage.getSendTime();
|
|
long sendTime = mqMessage.getSendTime();
|
|
|
boolean isRpc = mqMessage.isRpc();
|
|
boolean isRpc = mqMessage.isRpc();
|
|
@@ -38,7 +43,7 @@ public class DmasterConsumer implements MessageListener {
|
|
|
log.info("MQMessage from {}...", machineId);
|
|
log.info("MQMessage from {}...", machineId);
|
|
|
switch (MessageType.valueOf(type)) {
|
|
switch (MessageType.valueOf(type)) {
|
|
|
case machineType:
|
|
case machineType:
|
|
|
- machineMessageDispatcher.dispatch(ops, payload);
|
|
|
|
|
|
|
+ machineDispatcher.dispatch(ops, payload);
|
|
|
break;
|
|
break;
|
|
|
case appType:
|
|
case appType:
|
|
|
log.info("msg from app...");
|
|
log.info("msg from app...");
|
|
@@ -49,4 +54,12 @@ public class DmasterConsumer implements MessageListener {
|
|
|
log.error(e.getMessage());
|
|
log.error(e.getMessage());
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
|
|
+ @RabbitListener(bindings = @QueueBinding(
|
|
|
|
|
+ value = @Queue(value = "log", durable = "true"),
|
|
|
|
|
+ exchange = @Exchange(value = "rabbit.log"))
|
|
|
|
|
+ )
|
|
|
|
|
+ public void logQueueConsumer(@Payload String msg) {
|
|
|
|
|
+ log.info("日志消息 -> {}", msg);
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|