|
|
@@ -2,9 +2,13 @@ package cn.reghao.autodop.common.amqp;
|
|
|
|
|
|
import cn.reghao.autodop.common.utils.serializer.JsonConverter;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.springframework.amqp.core.MessageDeliveryMode;
|
|
|
+import org.springframework.amqp.rabbit.connection.CorrelationData;
|
|
|
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
+import java.util.UUID;
|
|
|
+
|
|
|
/**
|
|
|
* MQ 消息生产者
|
|
|
*
|
|
|
@@ -18,6 +22,23 @@ public class RabbitProducer {
|
|
|
|
|
|
public RabbitProducer(RabbitTemplate rabbitTemplate) {
|
|
|
this.rabbitTemplate = rabbitTemplate;
|
|
|
+ config();
|
|
|
+ }
|
|
|
+
|
|
|
+ private void config() {
|
|
|
+ // 开启可靠投递
|
|
|
+ rabbitTemplate.setConfirmCallback((data, ack, cause) -> {
|
|
|
+ if (!ack) {
|
|
|
+ log.error("消息发送失败 -> {}", cause + (data != null ? data.toString() : null));
|
|
|
+ } else {
|
|
|
+ log.info("消息发送成功 -> {}", cause + (data != null ? data.getId() : null));
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ rabbitTemplate.setMandatory(true);
|
|
|
+ rabbitTemplate.setReturnCallback(((message, replyCode, replyText, exchange, routingKey) ->
|
|
|
+ log.info("消息发送 ReturnCallback: {}, {}, {}, {}, {}",
|
|
|
+ message, replyCode, replyText, exchange, routingKey)));
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -27,9 +48,11 @@ public class RabbitProducer {
|
|
|
*/
|
|
|
public void send(String exchange, String routeKey, MqMessage mqMessage) {
|
|
|
String msg = JsonConverter.objectToJson(mqMessage);
|
|
|
- rabbitTemplate.convertAndSend(exchange, routeKey, msg);
|
|
|
// 默认的 exchange 是 amq.direct
|
|
|
- //rabbitTemplate.convertAndSend(routeKey, msg);
|
|
|
+ rabbitTemplate.convertAndSend(exchange, routeKey, msg, message -> {
|
|
|
+ message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
|
|
|
+ return message;
|
|
|
+ }, new CorrelationData(UUID.randomUUID().toString()));
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -40,6 +63,7 @@ public class RabbitProducer {
|
|
|
* @date 2021-02-23 下午3:57
|
|
|
*/
|
|
|
public RpcResult callRemote(String exchange, String routeKey, MqMessage mqMessage) {
|
|
|
+ // TODO 处理 RPC 调用在超时后才成功的情况
|
|
|
rabbitTemplate.setReplyTimeout(90_000);
|
|
|
String msg = JsonConverter.objectToJson(mqMessage);
|
|
|
String result = (String) rabbitTemplate.convertSendAndReceive(exchange, routeKey, msg);
|