Przeglądaj źródła

message 添加 RabbitMQ 客户端 com.rabbitmq.client 的使用

reghao 1 rok temu
rodzic
commit
8c8c6e685f

+ 35 - 0
message/message-service/src/main/java/cn/reghao/tnb/message/app/amqp/RabbitConfig.java

@@ -0,0 +1,35 @@
+package cn.reghao.tnb.message.app.amqp;
+
+/**
+ * @author reghao
+ * @date 2020-03-23 15:33:04
+ */
+public class RabbitConfig {
+    private String host;
+    private String username;
+    private String password;
+    private String vhost;
+
+    public RabbitConfig(String host, String username, String password, String vhost) {
+        this.host = host;
+        this.username = username;
+        this.password = password;
+        this.vhost = vhost;
+    }
+
+    public String getHost() {
+        return host;
+    }
+
+    public String getUsername() {
+        return username;
+    }
+
+    public String getPassword() {
+        return password;
+    }
+
+    public String getVhost() {
+        return vhost;
+    }
+}

+ 55 - 0
message/message-service/src/main/java/cn/reghao/tnb/message/app/amqp/RabbitProducer.java

@@ -0,0 +1,55 @@
+package cn.reghao.tnb.message.app.amqp;
+
+import cn.reghao.jutil.jdk.serializer.JsonConverter;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.MessageProperties;
+
+import java.io.IOException;
+
+/**
+ * @author reghao
+ * @date 2021-04-01 21:55:32
+ */
+public class RabbitProducer {
+    private final ConnectionFactory connFactory;
+    private final String exchange = "amq.direct";
+
+    public RabbitProducer(RabbitConfig config) {
+        this.connFactory = new ConnectionFactory();
+        this.connFactory.setHost(config.getHost());
+        this.connFactory.setPort(5672);
+        this.connFactory.setUsername(config.getUsername());
+        this.connFactory.setPassword(config.getPassword());
+        this.connFactory.setVirtualHost(config.getVhost());
+    }
+
+    public void sendObject(String exchange, String routeKey, Object payload) {
+        try (Connection connection = connFactory.newConnection(); Channel channel = connection.createChannel()) {
+            String msg = JsonConverter.objectToJson(payload);
+            send(channel, exchange, routeKey, msg);
+        } catch (Exception e) {
+            System.out.printf("发送消息到 %s 队列失败...\n", routeKey);
+            e.printStackTrace();
+        }
+    }
+
+    public void sendMessage(String routeKey, Object bizMessage) {
+        try (Connection connection = connFactory.newConnection(); Channel channel = connection.createChannel()) {
+            String msg = JsonConverter.objectToJson(bizMessage);
+            send(channel, exchange, routeKey, msg);
+        } catch (Exception e) {
+            System.out.printf("发送消息到 %s 队列失败...\n", routeKey);
+            e.printStackTrace();
+        }
+    }
+
+    private void send(Channel channel, String exchange, String routeKey, String msg) throws IOException {
+        String msgBus = "MsgQueue.messageBus()";
+        // 发送消息前先声明队列
+        channel.queueDeclare(msgBus, true, false, false, null);
+        channel.queueBind(msgBus, exchange, routeKey);
+        channel.basicPublish(exchange, routeKey, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
+    }
+}

+ 53 - 0
message/message-service/src/main/java/cn/reghao/tnb/message/app/amqp/RpcListener.java

@@ -0,0 +1,53 @@
+package cn.reghao.tnb.message.app.amqp;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.DeliverCallback;
+
+/**
+ * @author reghao
+ * @date 2021-08-24 17:43:14
+ */
+public class RpcListener implements Runnable {
+    private final String controlBus = "MsgQueue.controlBus(AppId.crawler.name(), )";
+    private final ConnectionFactory connFactory;
+    private final Object monitor = new Object();
+
+    public RpcListener(RabbitConfig config) {
+        this.connFactory = new ConnectionFactory();
+        this.connFactory.setHost(config.getHost());
+        this.connFactory.setPort(5672);
+        this.connFactory.setUsername(config.getUsername());
+        this.connFactory.setPassword(config.getPassword());
+        this.connFactory.setVirtualHost(config.getVhost());
+    }
+
+    @Override
+    public void run() {
+        try (Connection connection = connFactory.newConnection(); Channel channel = connection.createChannel()) {
+            // TODO 设置队列中消息的 TTL
+            channel.queueDeclare(controlBus, true, false, false, null);
+            channel.queuePurge(controlBus);
+            channel.basicQos(1);
+            System.out.printf("RPC server 在 %s 线程中等待 RPC 请求...\n", Thread.currentThread().getName());
+
+            DeliverCallback deliverCallback = new RpcListenerCallback(channel, monitor);
+            channel.basicConsume(controlBus, false, deliverCallback, (consumerTag -> {}));
+
+            // Wait and be prepared to consume the message from RPC client.
+            while (!Thread.interrupted()) {
+                synchronized (monitor) {
+                    try {
+                        // 等待 consumer 线程执行完成
+                        monitor.wait();
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
+                }
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+}

+ 65 - 0
message/message-service/src/main/java/cn/reghao/tnb/message/app/amqp/RpcListenerCallback.java

@@ -0,0 +1,65 @@
+package cn.reghao.tnb.message.app.amqp;
+
+import cn.reghao.jutil.jdk.result.Result;
+import cn.reghao.jutil.jdk.serializer.JsonConverter;
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.DeliverCallback;
+import com.rabbitmq.client.Delivery;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * @author reghao
+ * @date 2021-08-24 17:53:29
+ */
+public class RpcListenerCallback implements DeliverCallback  {
+    private final Channel channel;
+    private final Object monitor;
+
+    public RpcListenerCallback(Channel channel, Object monitor) {
+        this.channel = channel;
+        this.monitor = monitor;
+    }
+
+    @Override
+    public void handle(String consumerTag, Delivery delivery) throws IOException {
+        Result rpcResult = null;
+        try {
+            String msg = new String(delivery.getBody(), StandardCharsets.UTF_8);
+        } catch (RuntimeException e) {
+            System.out.println(e.getMessage());
+            rpcResult = Result.error("rpc 调用失败: " + e.getMessage());
+        } finally {
+            if (rpcResult == null) {
+                rpcResult = Result.error("rpc 调用失败");
+            }
+            rpcReply(delivery, rpcResult);
+
+            synchronized (monitor) {
+                // 通知 server 线程 consumer 线程执行完成
+                monitor.notify();
+            }
+        }
+    }
+
+    /**
+     * RPC 调用响应
+     *
+     * @param
+     * @return
+     * @date 2021-08-28 下午8:45
+     */
+    private void rpcReply(Delivery delivery, Result result) throws IOException {
+        String correlationId = delivery.getProperties().getCorrelationId();
+        String replyTo = delivery.getProperties().getReplyTo();
+        AMQP.BasicProperties replyProps = new AMQP.BasicProperties
+                .Builder()
+                .correlationId(correlationId)
+                .build();
+        byte[] bytes = JsonConverter.objectToJson(result).getBytes(StandardCharsets.UTF_8);
+        channel.basicPublish("", replyTo, true, replyProps, bytes);
+        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
+    }
+}