Parcourir la source

调整 message-service 中的 websocket 连接, 使用 mq 中间件将 websocket 消息 fanout 到其他 message 集群中的其他节点

reghao il y a 1 an
Parent
commit
6c94176995

+ 27 - 0
message/message-service/src/main/java/cn/reghao/tnb/message/app/rabbit/RabbitDeclareConfig.java

@@ -0,0 +1,27 @@
+package cn.reghao.tnb.message.app.rabbit;
+
+import org.springframework.amqp.core.*;
+import org.springframework.context.annotation.Bean;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author reghao
+ * @date 2024-11-28 15:34:05
+ */
+//@Component
+public class RabbitDeclareConfig {
+    @Bean
+    public FanoutExchange fanoutExchange() {
+        return new FanoutExchange("ws.fanout", true, false);
+    }
+
+    @Bean
+    public Queue wsQueue() {
+        return new Queue("ws.fanout.queue", true);
+    }
+
+    @Bean
+    public Binding binding(Queue queue, Exchange exchange) {
+        return BindingBuilder.bind(queue).to(exchange).with("").noargs();
+    }
+}

+ 37 - 0
message/message-service/src/main/java/cn/reghao/tnb/message/app/rabbit/RabbitListenerConfig.java

@@ -0,0 +1,37 @@
+package cn.reghao.tnb.message.app.rabbit;
+
+import cn.reghao.tnb.message.app.rabbit.listener.WsMessageListener;
+import org.springframework.amqp.core.MessageListener;
+import org.springframework.amqp.rabbit.connection.ConnectionFactory;
+import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
+import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+
+/**
+ * @author reghao
+ * @date 2024-11-28 15:42:16
+ */
+//@Component
+public class RabbitListenerConfig {
+    private final ConnectionFactory connectionFactory;
+
+    public RabbitListenerConfig(ConnectionFactory connectionFactory) {
+        this.connectionFactory = connectionFactory;
+    }
+
+    public MessageListenerContainer addQueueListener(String queueName, MessageListener listener) {
+        SimpleMessageListenerContainer mlc = new SimpleMessageListenerContainer(connectionFactory);
+        mlc.addQueueNames(queueName);
+        mlc.setupMessageListener(listener);
+        return mlc;
+    }
+
+    @PostConstruct
+    public void init() {
+        // 监听并处理 tnb.message.websocket 队列的消息
+        MessageListenerContainer mlc = addQueueListener("tnb.message.websocket" , new WsMessageListener());
+        mlc.start();
+    }
+}

+ 54 - 0
message/message-service/src/main/java/cn/reghao/tnb/message/app/rabbit/RabbitListeners.java

@@ -0,0 +1,54 @@
+package cn.reghao.tnb.message.app.rabbit;
+
+import cn.reghao.jutil.jdk.serializer.JsonConverter;
+import cn.reghao.tnb.message.app.ws.WsConnectionChat;
+import cn.reghao.tnb.message.app.ws.msg.ChatPayload;
+import cn.reghao.tnb.message.app.ws.msg.EventMessage;
+import com.google.gson.JsonObject;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.core.ExchangeTypes;
+import org.springframework.amqp.rabbit.annotation.Exchange;
+import org.springframework.amqp.rabbit.annotation.Queue;
+import org.springframework.amqp.rabbit.annotation.QueueBinding;
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.messaging.handler.annotation.Payload;
+import org.springframework.stereotype.Component;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author reghao
+ * @date 2024-11-28 14:37:18
+ */
+@Slf4j
+@Component
+public class RabbitListeners {
+    private final WsConnectionChat wsConnectionChat;
+
+    public RabbitListeners(WsConnectionChat wsConnectionChat) {
+        this.wsConnectionChat = wsConnectionChat;
+    }
+
+    @RabbitListener(bindings =@QueueBinding(
+            // 队列名字可以随便定义
+            value = @Queue(value = "tnb.message.chat.queue1",autoDelete = "false"),
+            // 交换机的名字必须和生产者保持一致, fanout 是以广播模式(发布订阅模式)
+            exchange = @Exchange(value = "amq.fanout", type = ExchangeTypes.FANOUT)
+    ))
+    public void messageQueueConsumer(@Payload String msg) {
+        JsonObject jsonObject = JsonConverter.jsonToJsonElement(msg).getAsJsonObject();
+        String event = jsonObject.get("event").getAsString();
+        JsonObject data1 = jsonObject.get("data").getAsJsonObject();
+        long senderId = data1.get("senderId").getAsLong();
+        long receiverId = data1.get("receiverId").getAsLong();
+        String content = data1.get("content").getAsString();
+
+        log.info("send {} -> {}", receiverId, content);
+        ChatPayload chatPayload = new ChatPayload(senderId, receiverId, content);
+        EventMessage eventMessage = new EventMessage();
+        eventMessage.setEvent("event_talk");
+        eventMessage.setPayload(chatPayload);
+        wsConnectionChat.sendTextMessage(receiverId, eventMessage);
+    }
+}

+ 7 - 6
message/message-service/src/main/java/cn/reghao/tnb/message/app/rabbit/RabbitProducer.java

@@ -17,17 +17,18 @@ public class RabbitProducer {
         this.rabbitTemplate = rabbitTemplate;
     }
 
-    public void send(String msg) {
-        String exchange = "";
-        String routingKey = "tnb.message";
-
+    public void sendChatMessage(String msg) {
+        String exchange = "tnb.message.chat.fanout";
+        exchange = "amq.fanout";
+        String routingKey = "tnb.message.chat.queue";
+        routingKey = "";
         //如果交换机名称写错了。那么confirmCallback 回调失败的。
         //如果设置routingkey 是一个错误,那么returncallback 会进行回调调用
-        rabbitTemplate.convertAndSend(routingKey, msg);
+        rabbitTemplate.convertAndSend(exchange, routingKey, msg);
     }
 
     public void sendMediaProgress(MediaProgress mediaProgress) {
-        String routingKey = "tnb.message";
+        String routingKey = "tnb.content.media";
         String jsonPayload = JsonConverter.objectToJson(mediaProgress);
         rabbitTemplate.convertAndSend(routingKey, jsonPayload);
     }

+ 17 - 0
message/message-service/src/main/java/cn/reghao/tnb/message/app/rabbit/listener/WsMessageListener.java

@@ -0,0 +1,17 @@
+package cn.reghao.tnb.message.app.rabbit.listener;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.core.MessageListener;
+
+/**
+ * @author reghao
+ * @date 2024-11-28 15:45:00
+ */
+@Slf4j
+public class WsMessageListener implements MessageListener {
+    @Override
+    public void onMessage(Message message) {
+        log.info("new message");
+    }
+}

+ 15 - 6
message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/WsConnectionChat.java

@@ -6,6 +6,8 @@ import cn.reghao.tnb.account.api.dto.AuthedAccount;
 import cn.reghao.tnb.account.api.iface.AccountQuery;
 import cn.reghao.tnb.message.api.model.EventType;
 import cn.reghao.tnb.message.api.model.resp.EventMessageResp;
+import cn.reghao.tnb.message.app.rabbit.RabbitProducer;
+import cn.reghao.tnb.message.app.ws.msg.EventMessage;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import com.google.gson.JsonObject;
@@ -32,6 +34,11 @@ public class WsConnectionChat {
     private final Map<String, Long> sessionMap = new HashMap<>();
     private final Map<Long, WebSocketSession> userMap = new HashMap<>();
     private final Gson gson = new GsonBuilder().create();
+    private final RabbitProducer rabbitProducer;
+
+    public WsConnectionChat(RabbitProducer rabbitProducer) {
+        this.rabbitProducer = rabbitProducer;
+    }
 
     public void online(WebSocketSession session) throws IOException {
         String query = session.getUri().getQuery();
@@ -93,13 +100,13 @@ public class WsConnectionChat {
                 data.get("receiverId").getAsLong();
                 break;
             case event_talk:
+                rabbitProducer.sendChatMessage(payload);
                 JsonObject data1 = jsonObject.get("data").getAsJsonObject();
                 long senderId = data1.get("senderId").getAsLong();
                 long receiverId = data1.get("receiverId").getAsLong();
                 String content = data1.get("content").getAsString();
-                log.info("send {} -> {}", receiverId, content);
-
-                WebSocketSession webSocketSession = userMap.get(receiverId);
+                //log.info("send {} -> {}", receiverId, content);
+                /*WebSocketSession webSocketSession = userMap.get(receiverId);
                 if (webSocketSession != null) {
                     Map<String, Object> map = new HashMap<>();
                     map.put("senderId", senderId);
@@ -108,7 +115,7 @@ public class WsConnectionChat {
                     sendMessage(webSocketSession, map);
                 } else {
                     log.error("not found {} session", receiverId);
-                }
+                }*/
                 break;
             case event_talk_revoke:
                 log.info("talk_revoke event");
@@ -128,15 +135,17 @@ public class WsConnectionChat {
         session.sendMessage(textMessage);
     }
 
-    public void sendTextMessage(long userId, String text) {
+    public void sendTextMessage(long userId, Object payload) {
         WebSocketSession session = userMap.get(userId);
         if (session != null) {
-            TextMessage textMessage = new TextMessage(text);
+            TextMessage textMessage = new TextMessage(gson.toJson(payload));
             try {
                 session.sendMessage(textMessage);
             } catch (IOException e) {
                 e.printStackTrace();
             }
+        } else {
+            log.info("receiver {} not found", userId);
         }
     }
 }

+ 15 - 0
message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/handler/ChatHandler.java

@@ -1,16 +1,21 @@
 package cn.reghao.tnb.message.app.ws.handler;
 
 import cn.reghao.jutil.jdk.serializer.JdkSerializer;
+import cn.reghao.jutil.jdk.serializer.JsonConverter;
 import cn.reghao.tnb.account.api.constant.TokenType;
 import cn.reghao.tnb.account.api.dto.AuthedAccount;
 import cn.reghao.tnb.account.api.iface.AccountQuery;
 import cn.reghao.tnb.message.app.ws.WsConnectionChat;
+import cn.reghao.tnb.message.app.ws.msg.EventMessage;
+import cn.reghao.tnb.message.app.ws.msg.PingPayload;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.dubbo.config.annotation.DubboReference;
 import org.springframework.stereotype.Component;
 import org.springframework.web.socket.*;
 
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * @author reghao
@@ -39,6 +44,12 @@ public class ChatHandler implements WebSocketHandler {
 
         wsConnectionChat.online(webSocketSession);
         log.info("WebSocket 建立连接");
+
+        PingPayload pingPayload = new PingPayload(60, 300);
+        EventMessage eventMessage = new EventMessage();
+        eventMessage.setEvent("connect");
+        eventMessage.setPayload(pingPayload);
+        wsConnectionChat.sendMessage(webSocketSession, eventMessage);
     }
 
     @Override
@@ -47,6 +58,10 @@ public class ChatHandler implements WebSocketHandler {
         if (webSocketMessage instanceof TextMessage) {
             String payload = (String) webSocketMessage.getPayload();
             try {
+                // TODO gson 会将 Long 反序列化为 Double
+                EventMessage eventMessage = JsonConverter.jsonToObject(payload, EventMessage.class);
+                String event = eventMessage.getEvent();
+                Object eventPayload = eventMessage.getPayload();
                 wsConnectionChat.dispatch(webSocketSession, payload);
             } catch (Exception e) {
                 e.printStackTrace();

+ 16 - 0
message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/msg/ChatPayload.java

@@ -0,0 +1,16 @@
+package cn.reghao.tnb.message.app.ws.msg;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+/**
+ * @author reghao
+ * @date 2024-11-29 10:39:54
+ */
+@AllArgsConstructor
+@Getter
+public class ChatPayload {
+    private Long senderId;
+    private Long receiverId;
+    private String content;
+}

+ 15 - 0
message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/msg/EventMessage.java

@@ -0,0 +1,15 @@
+package cn.reghao.tnb.message.app.ws.msg;
+
+import lombok.Getter;
+import lombok.Setter;
+
+/**
+ * @author reghao
+ * @date 2024-11-28 17:57:37
+ */
+@Setter
+@Getter
+public class EventMessage {
+    private String event;
+    private Object payload;
+}

+ 15 - 0
message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/msg/PingPayload.java

@@ -0,0 +1,15 @@
+package cn.reghao.tnb.message.app.ws.msg;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+/**
+ * @author reghao
+ * @date 2024-11-29 09:07:26
+ */
+@AllArgsConstructor
+@Getter
+public class PingPayload {
+    private int interval;
+    private int timeout;
+}