Browse Source

更新 message-service 中 WebSocketHandler 实现的处理逻辑

reghao 1 year ago
parent
commit
48b9a222a5
15 changed files with 440 additions and 464 deletions
  1. 1 0
      message/message-api/src/main/java/cn/reghao/tnb/message/api/model/EventType.java
  2. 5 0
      message/message-service/pom.xml
  3. 20 23
      message/message-service/src/main/java/cn/reghao/tnb/message/app/rabbit/RabbitListeners.java
  4. 1 2
      message/message-service/src/main/java/cn/reghao/tnb/message/app/rabbit/RabbitProducer.java
  5. 2 9
      message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/WebSocketConfig.java
  6. 78 10
      message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/WsConnection.java
  7. 0 151
      message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/WsConnectionChat.java
  8. 62 48
      message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/handler/ChatHandler.java
  9. 0 98
      message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/handler/DeviceHandler.java
  10. 102 0
      message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/handler/ProgressHandler.java
  11. 0 123
      message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/handler/VideoProgressHandler.java
  12. 16 0
      message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/msg/EventPayload.java
  13. 8 0
      message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/msg/ProgressEvent.java
  14. 93 0
      message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/service/ChatService.java
  15. 52 0
      message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/service/ProgressService.java

+ 1 - 0
message/message-api/src/main/java/cn/reghao/tnb/message/api/model/EventType.java

@@ -6,6 +6,7 @@ package cn.reghao.tnb.message.api.model;
  */
 public enum EventType {
     heartbeat,
+    media_progress,
     event_login,
     event_contact_apply,
     event_talk_keyboard, event_talk, event_talk_revoke, event_talk_join_group,

+ 5 - 0
message/message-service/pom.xml

@@ -38,6 +38,11 @@
             <artifactId>admin-api</artifactId>
             <version>1.0.0-SNAPSHOT</version>
         </dependency>
+        <dependency>
+            <groupId>cn.reghao.tnb.user</groupId>
+            <artifactId>user-api</artifactId>
+            <version>1.0.0-SNAPSHOT</version>
+        </dependency>
 
         <dependency>
             <groupId>mysql</groupId>

+ 20 - 23
message/message-service/src/main/java/cn/reghao/tnb/message/app/rabbit/RabbitListeners.java

@@ -1,10 +1,8 @@
 package cn.reghao.tnb.message.app.rabbit;
 
 import cn.reghao.jutil.jdk.serializer.JsonConverter;
-import cn.reghao.tnb.message.app.ws.WsConnectionChat;
+import cn.reghao.tnb.message.app.ws.WsConnection;
 import cn.reghao.tnb.message.app.ws.msg.ChatPayload;
-import cn.reghao.tnb.message.app.ws.msg.EventMessage;
-import com.google.gson.JsonObject;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.amqp.core.ExchangeTypes;
 import org.springframework.amqp.rabbit.annotation.Exchange;
@@ -14,8 +12,7 @@ import org.springframework.amqp.rabbit.annotation.RabbitListener;
 import org.springframework.messaging.handler.annotation.Payload;
 import org.springframework.stereotype.Component;
 
-import java.util.HashMap;
-import java.util.Map;
+import java.io.IOException;
 
 /**
  * @author reghao
@@ -24,31 +21,31 @@ import java.util.Map;
 @Slf4j
 @Component
 public class RabbitListeners {
-    private final WsConnectionChat wsConnectionChat;
+    private final WsConnection wsConnection;
 
-    public RabbitListeners(WsConnectionChat wsConnectionChat) {
-        this.wsConnectionChat = wsConnectionChat;
+    public RabbitListeners(WsConnection wsConnection) {
+        this.wsConnection = wsConnection;
     }
 
     @RabbitListener(bindings =@QueueBinding(
             // 队列名字可以随便定义
-            value = @Queue(value = "tnb.message.chat.queue1",autoDelete = "false"),
+            value = @Queue(autoDelete = "false"),
             // 交换机的名字必须和生产者保持一致, fanout 是以广播模式(发布订阅模式)
             exchange = @Exchange(value = "amq.fanout", type = ExchangeTypes.FANOUT)
     ))
-    public void messageQueueConsumer(@Payload String msg) {
-        JsonObject jsonObject = JsonConverter.jsonToJsonElement(msg).getAsJsonObject();
-        String event = jsonObject.get("event").getAsString();
-        JsonObject data1 = jsonObject.get("data").getAsJsonObject();
-        long senderId = data1.get("senderId").getAsLong();
-        long receiverId = data1.get("receiverId").getAsLong();
-        String content = data1.get("content").getAsString();
-
-        log.info("send {} -> {}", receiverId, content);
-        ChatPayload chatPayload = new ChatPayload(senderId, receiverId, content);
-        EventMessage eventMessage = new EventMessage();
-        eventMessage.setEvent("event_talk");
-        eventMessage.setPayload(chatPayload);
-        wsConnectionChat.sendTextMessage(receiverId, eventMessage);
+    public void chatMessageConsumer(@Payload String msg) {
+        ChatPayload chatPayload = JsonConverter.jsonToObject(msg, ChatPayload.class);
+        long receiverId = chatPayload.getReceiverId();
+        String content = chatPayload.getContent();
+        if (wsConnection.getUserSession(receiverId) != null) {
+            log.info("send {} -> {}", receiverId, content);
+            try {
+                wsConnection.sendText(receiverId+"", msg);
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+        } else {
+            log.info("{} 的 session 不在本节点上", receiverId);
+        }
     }
 }

+ 1 - 2
message/message-service/src/main/java/cn/reghao/tnb/message/app/rabbit/RabbitProducer.java

@@ -18,8 +18,7 @@ public class RabbitProducer {
     }
 
     public void sendChatMessage(String msg) {
-        String exchange = "tnb.message.chat.fanout";
-        exchange = "amq.fanout";
+        String exchange = exchange = "amq.fanout";
         String routingKey = "tnb.message.chat.queue";
         routingKey = "";
         //如果交换机名称写错了。那么confirmCallback 回调失败的。

+ 2 - 9
message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/WebSocketConfig.java

@@ -17,16 +17,13 @@ public class WebSocketConfig implements WebSocketConfigurer {
     private final WebSocketInterceptor webSocketInterceptor;
     private final WebSocketHandler videoProgressHandler;
     private final WebSocketHandler chatHandler;
-    private final WebSocketHandler deviceHandler;
 
     public WebSocketConfig(WebSocketInterceptor webSocketInterceptor,
-                           @Qualifier("videoProgressHandler") WebSocketHandler videoProgressHandler,
-                           @Qualifier("chatHandler") WebSocketHandler chatHandler,
-                           @Qualifier("deviceHandler") WebSocketHandler deviceHandler) {
+                           @Qualifier("progressHandler") WebSocketHandler videoProgressHandler,
+                           @Qualifier("chatHandler") WebSocketHandler chatHandler) {
         this.webSocketInterceptor = webSocketInterceptor;
         this.videoProgressHandler = videoProgressHandler;
         this.chatHandler = chatHandler;
-        this.deviceHandler = deviceHandler;
     }
 
     @Override
@@ -38,9 +35,5 @@ public class WebSocketConfig implements WebSocketConfigurer {
         registry.addHandler(chatHandler, "/ws/chat")
                 .addInterceptors(webSocketInterceptor)
                 .setAllowedOrigins("*");
-
-        registry.addHandler(deviceHandler, "/ws/device")
-                .addInterceptors(webSocketInterceptor)
-                .setAllowedOrigins("*");
     }
 }

+ 78 - 10
message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/WsConnection.java

@@ -2,11 +2,19 @@ package cn.reghao.tnb.message.app.ws;
 
 import cn.reghao.jutil.jdk.serializer.JdkSerializer;
 import cn.reghao.jutil.jdk.serializer.JsonConverter;
+import cn.reghao.tnb.account.api.constant.TokenType;
+import cn.reghao.tnb.account.api.dto.AuthedAccount;
+import cn.reghao.tnb.account.api.iface.AccountQuery;
 import cn.reghao.tnb.message.api.dto.PushMsg;
 import cn.reghao.tnb.message.api.dto.event.EventMsg;
 import cn.reghao.tnb.message.api.model.EventType;
 import cn.reghao.tnb.message.api.model.resp.EventMessageResp;
+import cn.reghao.tnb.message.app.rabbit.RabbitProducer;
+import cn.reghao.tnb.message.app.ws.msg.ChatPayload;
+import cn.reghao.tnb.message.app.ws.msg.EventMessage;
+import cn.reghao.tnb.message.app.ws.msg.PingPayload;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.dubbo.config.annotation.DubboReference;
 import org.springframework.stereotype.Component;
 import org.springframework.web.socket.BinaryMessage;
 import org.springframework.web.socket.TextMessage;
@@ -24,9 +32,31 @@ import java.util.Map;
 @Slf4j
 @Component
 public class WsConnection {
+    @DubboReference(check = false)
+    private AccountQuery accountQuery;
+    private final RabbitProducer rabbitProducer;
+    // userId -> session
     private final Map<String, WebSocketSession> userSessions = new HashMap<>();
+    // sessionId -> userId
     private final Map<String, String> sessionUsers = new HashMap<>();
 
+    public WsConnection(RabbitProducer rabbitProducer) {
+        this.rabbitProducer = rabbitProducer;
+    }
+
+    public boolean addSession(WebSocketSession webSocketSession) {
+        String query = webSocketSession.getUri().getQuery();
+        String jwtToken = query.replace("token=", "");
+        AuthedAccount authedAccount = accountQuery.getAuthedAccount(TokenType.token.getValue(), jwtToken);
+        if (authedAccount != null) {
+            long userId = authedAccount.getUserId();
+            addUserSession(userId+"", webSocketSession);
+            return true;
+        }
+
+        return false;
+    }
+
     public void addUserSession(String userId, WebSocketSession webSocketSession) {
         String sessionId = webSocketSession.getId();
         userSessions.put(userId, webSocketSession);
@@ -44,6 +74,26 @@ public class WsConnection {
         return sessionUsers.get(sessionId);
     }
 
+    public WebSocketSession getUserSession(long userId) {
+        return userSessions.get(userId+"");
+    }
+
+    public void sendPingPayload(WebSocketSession session) throws IOException {
+        PingPayload pingPayload = new PingPayload(60, 300);
+        EventMessage eventMessage = new EventMessage();
+        eventMessage.setEvent("connect");
+        eventMessage.setPayload(pingPayload);
+
+        TextMessage textMessage = new TextMessage(JsonConverter.objectToJson(pingPayload));
+        session.sendMessage(textMessage);
+    }
+
+    public void sendHeartbeatPong(WebSocketSession session) throws IOException {
+        EventMessageResp<String> resp = new EventMessageResp<>(EventType.heartbeat, "pong");
+        TextMessage textMessage = new TextMessage(JsonConverter.objectToJson(resp));
+        session.sendMessage(textMessage);
+    }
+
     public void sendTextMsg(PushMsg pushMsg) throws IOException {
         String userId = pushMsg.getReceiverId();
         EventMsg eventMsg = pushMsg.getEventMsg();
@@ -58,10 +108,10 @@ public class WsConnection {
         webSocketSession.sendMessage(textMessage);
     }
 
-    public void send(String userId, String content) throws IOException {
-        WebSocketSession webSocketSession = userSessions.get(userId);
+    public void send(String receiverId, String content) throws IOException {
+        WebSocketSession webSocketSession = userSessions.get(receiverId);
         if (webSocketSession == null) {
-            log.error("{} 不在线", userId);
+            log.error("{} 不在线", receiverId);
             return;
         }
 
@@ -70,10 +120,10 @@ public class WsConnection {
         webSocketSession.sendMessage(textMessage);
     }
 
-    public void send1(String userId, Object object) throws IOException {
-        WebSocketSession webSocketSession = userSessions.get(userId);
+    public void send1(String receiverId, Object object) throws IOException {
+        WebSocketSession webSocketSession = userSessions.get(receiverId);
         if (webSocketSession == null) {
-            log.error("{} 不在线", userId);
+            log.error("{} 不在线", receiverId);
             return;
         }
 
@@ -81,8 +131,8 @@ public class WsConnection {
         webSocketSession.sendMessage(textMessage);
     }
 
-    public void send(String dest, Object message) throws IOException {
-        WebSocketSession session = userSessions.get(dest);
+    public void send(String receiverId, Object message) throws IOException {
+        WebSocketSession session = userSessions.get(receiverId);
         if (session != null) {
             byte[] bytes = JdkSerializer.serialize(message);
             BinaryMessage binaryMessage = new BinaryMessage(bytes);
@@ -90,8 +140,8 @@ public class WsConnection {
         }
     }
 
-    public void sendText(String dest, Object message) throws IOException {
-        WebSocketSession session = userSessions.get(dest);
+    public void sendText(String receiverId, String message) throws IOException {
+        WebSocketSession session = userSessions.get(receiverId);
         if (session != null) {
             String json = JsonConverter.objectToJson(message);
             TextMessage textMessage = new TextMessage(json);
@@ -99,6 +149,20 @@ public class WsConnection {
         }
     }
 
+    public void sendChatMessage(String receiverId, ChatPayload chatPayload) throws IOException {
+        WebSocketSession session = userSessions.get(receiverId);
+        if (session == null) {
+            return;
+        }
+
+        EventMessage eventMessage = new EventMessage();
+        eventMessage.setEvent("event_talk");
+        eventMessage.setPayload(chatPayload);
+
+        TextMessage textMessage = new TextMessage(JsonConverter.objectToJson(eventMessage));
+        session.sendMessage(textMessage);
+    }
+
     public void unicast() {
     }
 
@@ -122,4 +186,8 @@ public class WsConnection {
             }
         }
     }
+
+    public void clusterBroadcast(String payload) {
+        rabbitProducer.sendChatMessage(payload);
+    }
 }

+ 0 - 151
message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/WsConnectionChat.java

@@ -1,151 +0,0 @@
-package cn.reghao.tnb.message.app.ws;
-
-import cn.reghao.jutil.jdk.serializer.JsonConverter;
-import cn.reghao.tnb.account.api.constant.TokenType;
-import cn.reghao.tnb.account.api.dto.AuthedAccount;
-import cn.reghao.tnb.account.api.iface.AccountQuery;
-import cn.reghao.tnb.message.api.model.EventType;
-import cn.reghao.tnb.message.api.model.resp.EventMessageResp;
-import cn.reghao.tnb.message.app.rabbit.RabbitProducer;
-import cn.reghao.tnb.message.app.ws.msg.EventMessage;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonObject;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.dubbo.config.annotation.DubboReference;
-import org.springframework.stereotype.Component;
-import org.springframework.web.socket.TextMessage;
-import org.springframework.web.socket.WebSocketSession;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * @author reghao
- * @date 2022-04-21 10:32:37
- */
-@Slf4j
-@Component
-public class WsConnectionChat {
-    @DubboReference(check = false, retries = 0)
-    private AccountQuery accountQuery;
-
-    private final Map<String, Long> sessionMap = new HashMap<>();
-    private final Map<Long, WebSocketSession> userMap = new HashMap<>();
-    private final Gson gson = new GsonBuilder().create();
-    private final RabbitProducer rabbitProducer;
-
-    public WsConnectionChat(RabbitProducer rabbitProducer) {
-        this.rabbitProducer = rabbitProducer;
-    }
-
-    public void online(WebSocketSession session) throws IOException {
-        String query = session.getUri().getQuery();
-        String jwtToken = query.replace("token=", "");
-        AuthedAccount authedAccount = accountQuery.getAuthedAccount(TokenType.token.getValue(), jwtToken);
-        if (authedAccount != null) {
-            long userId = authedAccount.getUserId();
-            loginEvent(userId, true);
-            sessionMap.put(session.getId(), userId);
-            userMap.put(userId, session);
-            log.info("{} 在线", userId);
-        }
-    }
-
-    public void offline(WebSocketSession session) throws IOException {
-        String sessionId = session.getId();
-        Long userId = sessionMap.get(sessionId);
-        if (userId != null) {
-            loginEvent(userId, false);
-            sessionMap.remove(sessionId);
-            userMap.remove(userId);
-            log.info("{} 离线", userId);
-        }
-    }
-
-    private void loginEvent(long userId, boolean online) {
-        /*userContactService.setOnline(userId, online);
-        EvtLoginResp evtLoginResp = new EvtLoginResp(userId, online);
-        EventMessageResp<EvtLoginResp> resp = new EventMessageResp<>(EventType.event_login, evtLoginResp);
-        TextMessage textMessage = new TextMessage(gson.toJson(resp));
-        List<Long> friendIds = userContactService.getOnlineFriends(userId);
-        for (Long friendId : friendIds) {
-            WebSocketSession session = userMap.get(friendId);
-            if (session != null) {
-                try {
-                    session.sendMessage(textMessage);
-                } catch (IOException e) {
-                    e.printStackTrace();
-                }
-            }
-        }*/
-    }
-
-    public void dispatch(WebSocketSession session, String payload) throws IOException {
-        JsonObject jsonObject = JsonConverter.jsonToJsonElement(payload).getAsJsonObject();
-        String event = jsonObject.get("event").getAsString();
-        switch (EventType.valueOf(event)) {
-            case heartbeat:
-                log.info("heartbeat event");
-                EventMessageResp<String> resp = new EventMessageResp<>(EventType.heartbeat, "pong");
-                sendMessage(session, resp);
-                break;
-            case event_contact_apply:
-                log.info("contact_apply event");
-                break;
-            case event_talk_keyboard:
-                JsonObject data = jsonObject.get("data").getAsJsonObject();
-                data.get("senderId").getAsLong();
-                data.get("receiverId").getAsLong();
-                break;
-            case event_talk:
-                rabbitProducer.sendChatMessage(payload);
-                JsonObject data1 = jsonObject.get("data").getAsJsonObject();
-                long senderId = data1.get("senderId").getAsLong();
-                long receiverId = data1.get("receiverId").getAsLong();
-                String content = data1.get("content").getAsString();
-                //log.info("send {} -> {}", receiverId, content);
-                /*WebSocketSession webSocketSession = userMap.get(receiverId);
-                if (webSocketSession != null) {
-                    Map<String, Object> map = new HashMap<>();
-                    map.put("senderId", senderId);
-                    map.put("receiverId", receiverId);
-                    map.put("payload", content);
-                    sendMessage(webSocketSession, map);
-                } else {
-                    log.error("not found {} session", receiverId);
-                }*/
-                break;
-            case event_talk_revoke:
-                log.info("talk_revoke event");
-                break;
-            case event_talk_join_group:
-                log.info("talk_join_group event");
-                break;
-            case event_error:
-                log.info("error event");
-                break;
-            default:
-        }
-    }
-
-    public void sendMessage(WebSocketSession session, Object payload) throws IOException {
-        TextMessage textMessage = new TextMessage(gson.toJson(payload));
-        session.sendMessage(textMessage);
-    }
-
-    public void sendTextMessage(long userId, Object payload) {
-        WebSocketSession session = userMap.get(userId);
-        if (session != null) {
-            TextMessage textMessage = new TextMessage(gson.toJson(payload));
-            try {
-                session.sendMessage(textMessage);
-            } catch (IOException e) {
-                e.printStackTrace();
-            }
-        } else {
-            log.info("receiver {} not found", userId);
-        }
-    }
-}

+ 62 - 48
message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/handler/ChatHandler.java

@@ -2,20 +2,15 @@ package cn.reghao.tnb.message.app.ws.handler;
 
 import cn.reghao.jutil.jdk.serializer.JdkSerializer;
 import cn.reghao.jutil.jdk.serializer.JsonConverter;
-import cn.reghao.tnb.account.api.constant.TokenType;
-import cn.reghao.tnb.account.api.dto.AuthedAccount;
-import cn.reghao.tnb.account.api.iface.AccountQuery;
-import cn.reghao.tnb.message.app.ws.WsConnectionChat;
-import cn.reghao.tnb.message.app.ws.msg.EventMessage;
-import cn.reghao.tnb.message.app.ws.msg.PingPayload;
+import cn.reghao.tnb.message.api.model.EventType;
+import cn.reghao.tnb.message.app.ws.WsConnection;
+import cn.reghao.tnb.message.app.ws.service.ChatService;
+import cn.reghao.tnb.message.app.ws.msg.EventPayload;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.dubbo.config.annotation.DubboReference;
 import org.springframework.stereotype.Component;
 import org.springframework.web.socket.*;
 
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
 
 /**
  * @author reghao
@@ -24,48 +19,33 @@ import java.util.Map;
 @Slf4j
 @Component
 public class ChatHandler implements WebSocketHandler {
-    @DubboReference(check = false)
-    private AccountQuery accountQuery;
-    private final WsConnectionChat wsConnectionChat;
+    private final WsConnection wsConnection;
+    private final ChatService chatService;
 
-    public ChatHandler(WsConnectionChat wsConnectionChat) {
-        this.wsConnectionChat = wsConnectionChat;
+    public ChatHandler(WsConnection wsConnection, ChatService chatService) {
+        this.wsConnection = wsConnection;
+        this.chatService = chatService;
     }
 
     @Override
     public void afterConnectionEstablished(WebSocketSession webSocketSession) throws IOException {
-        String query = webSocketSession.getUri().getQuery();
-        String jwtToken = query.replace("token=", "");
-        AuthedAccount authedAccount = accountQuery.getAuthedAccount(TokenType.token.getValue(), jwtToken);
-        if (authedAccount == null) {
+        boolean ret = wsConnection.addSession(webSocketSession);
+        if (ret) {
+            log.info("WebSocket 建立连接");
+            wsConnection.sendPingPayload(webSocketSession);
+            String userId = wsConnection.getUserId(webSocketSession.getId());
+            chatService.onLoginEvent(Long.parseLong(userId), true);
+        } else {
+            log.info("没有认证信息, 关闭 WebSocket 连接");
             webSocketSession.close(CloseStatus.NO_STATUS_CODE);
-            return;
         }
-
-        wsConnectionChat.online(webSocketSession);
-        log.info("WebSocket 建立连接");
-
-        PingPayload pingPayload = new PingPayload(60, 300);
-        EventMessage eventMessage = new EventMessage();
-        eventMessage.setEvent("connect");
-        eventMessage.setPayload(pingPayload);
-        wsConnectionChat.sendMessage(webSocketSession, eventMessage);
     }
 
     @Override
     public void handleMessage(WebSocketSession webSocketSession, WebSocketMessage<?> webSocketMessage)
             throws IOException {
         if (webSocketMessage instanceof TextMessage) {
-            String payload = (String) webSocketMessage.getPayload();
-            try {
-                // TODO gson 会将 Long 反序列化为 Double
-                EventMessage eventMessage = JsonConverter.jsonToObject(payload, EventMessage.class);
-                String event = eventMessage.getEvent();
-                Object eventPayload = eventMessage.getPayload();
-                wsConnectionChat.dispatch(webSocketSession, payload);
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
+            dispatchTextMessage(webSocketSession, (TextMessage) webSocketMessage);
         } else if (webSocketMessage instanceof BinaryMessage) {
             log.info("接收到 WebSocket 二进制消息");
             BinaryMessage binaryMessage = (BinaryMessage) webSocketMessage;
@@ -82,25 +62,59 @@ public class ChatHandler implements WebSocketHandler {
     @Override
     public void handleTransportError(WebSocketSession webSocketSession, Throwable throwable) {
         log.error("WebSocket 数据传输错误");
-        try {
-            wsConnectionChat.offline(webSocketSession);
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
+        handleOnClose(webSocketSession);
     }
 
     @Override
     public void afterConnectionClosed(WebSocketSession webSocketSession, CloseStatus closeStatus) {
         log.info("WebSocket 断开连接");
-        try {
-            wsConnectionChat.offline(webSocketSession);
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
+        handleOnClose(webSocketSession);
     }
 
     @Override
     public boolean supportsPartialMessages() {
         return false;
     }
+
+    private void dispatchTextMessage(WebSocketSession webSocketSession, TextMessage textMessage) {
+        String sessionId = webSocketSession.getId();
+        String payload = textMessage.getPayload();
+        try {
+            // TODO gson 会将 Long 反序列化为 Double
+            EventPayload eventPayload = JsonConverter.jsonToObject(payload, EventPayload.class);
+            String event = eventPayload.getEvent();
+            switch (EventType.valueOf(event)) {
+                case heartbeat:
+                    wsConnection.sendHeartbeatPong(webSocketSession);
+                    break;
+                case event_contact_apply:
+                    log.info("contact_apply event");
+                    break;
+                case event_talk_keyboard:
+                    log.info("talk_keyboard event");
+                    break;
+                case event_talk:
+                    chatService.onEventTalk(payload);
+                    break;
+                case event_talk_revoke:
+                    log.info("talk_revoke event");
+                    break;
+                case event_talk_join_group:
+                    log.info("talk_join_group event");
+                    break;
+                case event_error:
+                    log.info("error event");
+                    break;
+                default:
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    private void handleOnClose(WebSocketSession webSocketSession) {
+        String userId = wsConnection.getUserId(webSocketSession.getId());
+        chatService.onLoginEvent(Long.parseLong(userId), false);
+        wsConnection.removeUserSession(webSocketSession);
+    }
 }

+ 0 - 98
message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/handler/DeviceHandler.java

@@ -1,98 +0,0 @@
-package cn.reghao.tnb.message.app.ws.handler;
-
-import cn.reghao.jutil.jdk.serializer.JdkSerializer;
-import cn.reghao.tnb.account.api.constant.TokenType;
-import cn.reghao.tnb.account.api.dto.AccountInfo;
-import cn.reghao.tnb.account.api.iface.AccountQuery;
-import cn.reghao.tnb.message.app.ws.WsConnection;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.dubbo.config.annotation.DubboReference;
-import org.springframework.stereotype.Component;
-import org.springframework.web.socket.*;
-
-import java.io.IOException;
-
-/**
- * 处理硬件设备发送的消息
- *
- * @author reghao
- * @date 2024-02-05 10:53:03
- */
-@Slf4j
-@Component
-public class DeviceHandler implements WebSocketHandler {
-    @DubboReference(check = false)
-    private AccountQuery accountQuery;
-
-    private final WsConnection wsConnection;
-
-    public DeviceHandler(WsConnection wsConnection) {
-        this.wsConnection = wsConnection;
-    }
-
-    @Override
-    public void afterConnectionEstablished(WebSocketSession webSocketSession) throws IOException {
-        String query = webSocketSession.getUri().getQuery();
-        String jwtToken = query.replace("token=", "");
-        accountQuery.getAuthedAccount(TokenType.token.getValue(), jwtToken);
-        //deviceService.setState(jwtToken, true);
-        /*if (jwtToken.equals("1234567890")) {
-            userId = "10001";
-        } else {
-            userId = getUserId(jwtToken);
-        }
-
-        if (userId == null) {
-            webSocketSession.close(CloseStatus.NO_STATUS_CODE);
-            return;
-        }*/
-
-        wsConnection.addUserSession(jwtToken, webSocketSession);
-        log.info("WebSocket 建立连接");
-    }
-
-    private String getUserId(String jwtToken) {
-        log.info("jwt-token -> {}", jwtToken);
-        return null;
-    }
-
-    @Override
-    public void handleMessage(WebSocketSession webSocketSession, WebSocketMessage<?> webSocketMessage)
-            throws IOException {
-        String sessionId = webSocketSession.getId();
-        if (webSocketMessage instanceof TextMessage) {
-            String payload = (String) webSocketMessage.getPayload();
-            webSocketSession.sendMessage(webSocketMessage);
-            log.info("接收到 text 消息 {}", payload);
-        } else if (webSocketMessage instanceof BinaryMessage) {
-            log.info("接收到 WebSocket 二进制消息");
-            BinaryMessage binaryMessage = (BinaryMessage) webSocketMessage;
-            Object object = JdkSerializer.deserialize(binaryMessage.getPayload().array());
-        } else if (webSocketMessage instanceof PingMessage) {
-            log.info("接收到 WebSocket PingMessage");
-        } else if (webSocketMessage instanceof PongMessage) {
-            log.info("接收到 WebSocket PongMessage");
-        } else {
-            log.error("接收到未知类型的 WebSocket 消息");
-        }
-    }
-
-    @Override
-    public void handleTransportError(WebSocketSession webSocketSession, Throwable throwable) {
-        wsConnection.removeUserSession(webSocketSession);
-        log.error("WebSocket 数据传输错误");
-    }
-
-    @Override
-    public void afterConnectionClosed(WebSocketSession webSocketSession, CloseStatus closeStatus) {
-        String sessionId = webSocketSession.getId();
-        wsConnection.removeUserSession(webSocketSession);
-        //deviceService.setState(sessionId, false);
-        log.info("WebSocket 断开连接");
-    }
-
-    @Override
-    public boolean supportsPartialMessages() {
-        return false;
-    }
-}

+ 102 - 0
message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/handler/ProgressHandler.java

@@ -0,0 +1,102 @@
+package cn.reghao.tnb.message.app.ws.handler;
+
+import cn.reghao.jutil.jdk.serializer.JsonConverter;
+import cn.reghao.tnb.message.api.dto.MediaProgress;
+import cn.reghao.tnb.message.api.model.EventType;
+import cn.reghao.tnb.message.app.ws.WsConnection;
+import cn.reghao.tnb.message.app.ws.msg.EventPayload;
+import cn.reghao.tnb.message.app.ws.service.ProgressService;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+import org.springframework.web.socket.*;
+
+import java.io.IOException;
+
+/**
+ * 媒体进度
+ *
+ * @author reghao
+ * @date 2022-03-11 16:45:52
+ */
+@Slf4j
+@Component
+public class ProgressHandler implements WebSocketHandler {
+    private final WsConnection wsConnection;
+    private final ProgressService progressService;
+
+    public ProgressHandler(WsConnection wsConnection, ProgressService progressService) {
+        this.wsConnection = wsConnection;
+        this.progressService = progressService;
+    }
+
+    @Override
+    public void afterConnectionEstablished(WebSocketSession webSocketSession) throws IOException {
+        boolean ret = wsConnection.addSession(webSocketSession);
+        if (ret) {
+            log.info("WebSocket 建立连接");
+        } else {
+            log.info("没有认证信息, 关闭 WebSocket 连接");
+            webSocketSession.close(CloseStatus.NO_STATUS_CODE);
+        }
+    }
+
+    @Override
+    public void handleMessage(WebSocketSession webSocketSession, WebSocketMessage<?> webSocketMessage)
+            throws IOException {
+        if (webSocketMessage instanceof TextMessage) {
+            dispatchTextMessage(webSocketSession, (TextMessage) webSocketMessage);
+        } else if (webSocketMessage instanceof BinaryMessage) {
+            log.info("接收到 WebSocket 二进制消息");
+        } else if (webSocketMessage instanceof PingMessage) {
+            log.info("接收到 WebSocket PingMessage");
+        } else if (webSocketMessage instanceof PongMessage) {
+            log.info("接收到 WebSocket PongMessage");
+        } else {
+            log.error("接收到未知类型的 WebSocket 消息");
+        }
+    }
+
+    @Override
+    public void handleTransportError(WebSocketSession webSocketSession, Throwable throwable) {
+        log.error("WebSocket 数据传输错误");
+        handleOnClose(webSocketSession);
+    }
+
+    @Override
+    public void afterConnectionClosed(WebSocketSession webSocketSession, CloseStatus closeStatus) {
+        log.info("WebSocket 断开连接");
+        handleOnClose(webSocketSession);
+    }
+
+    @Override
+    public boolean supportsPartialMessages() {
+        return false;
+    }
+
+    private void dispatchTextMessage(WebSocketSession webSocketSession, TextMessage textMessage) {
+        String sessionId = webSocketSession.getId();
+        String payload = textMessage.getPayload();
+        try {
+            EventPayload eventPayload = JsonConverter.jsonToObject(payload, EventPayload.class);
+            String event = eventPayload.getEvent();
+            switch (EventType.valueOf(event)) {
+                case heartbeat:
+                    wsConnection.sendHeartbeatPong(webSocketSession);
+                    break;
+                case media_progress:
+                    String jsonPayload = eventPayload.getPayload();
+                    MediaProgress mediaProgress = JsonConverter.jsonToObject(jsonPayload, MediaProgress.class);
+                    progressService.putMediaProgress(sessionId, mediaProgress);
+                    break;
+                default:
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    private void handleOnClose(WebSocketSession webSocketSession) {
+        progressService.removeMediaProgress(webSocketSession.getId());
+        wsConnection.removeUserSession(webSocketSession);
+    }
+}

+ 0 - 123
message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/handler/VideoProgressHandler.java

@@ -1,123 +0,0 @@
-package cn.reghao.tnb.message.app.ws.handler;
-
-import cn.reghao.jutil.jdk.serializer.JsonConverter;
-import cn.reghao.tnb.account.api.constant.TokenType;
-import cn.reghao.tnb.account.api.dto.AuthedAccount;
-import cn.reghao.tnb.account.api.iface.AccountQuery;
-import cn.reghao.tnb.message.api.dto.MediaProgress;
-import cn.reghao.tnb.message.app.rabbit.RabbitProducer;
-import cn.reghao.tnb.message.app.ws.WsConnection;
-import com.google.gson.JsonObject;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.dubbo.config.annotation.DubboReference;
-import org.springframework.stereotype.Component;
-import org.springframework.web.socket.*;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * 媒体进度
- *
- * @author reghao
- * @date 2022-03-11 16:45:52
- */
-@Slf4j
-@Component
-public class VideoProgressHandler implements WebSocketHandler {
-    @DubboReference(check = false)
-    private AccountQuery accountQuery;
-
-    private final WsConnection wsConnection;
-    private final Map<String, MediaProgress> map = new HashMap<>();
-    private final RabbitProducer rabbitProducer;
-
-    public VideoProgressHandler(WsConnection wsConnection, RabbitProducer rabbitProducer) {
-        this.wsConnection = wsConnection;
-        this.rabbitProducer = rabbitProducer;
-    }
-
-    @Override
-    public void afterConnectionEstablished(WebSocketSession webSocketSession) throws IOException {
-        String query = webSocketSession.getUri().getQuery();
-        String jwtToken = query.replace("token=", "");
-        AuthedAccount authedAccount = accountQuery.getAuthedAccount(TokenType.token.getValue(), jwtToken);
-        if (authedAccount != null) {
-            long userId = authedAccount.getUserId();
-            wsConnection.addUserSession(userId+"", webSocketSession);
-        }
-
-        log.info("WebSocket 建立连接");
-    }
-
-    @Override
-    public void handleMessage(WebSocketSession webSocketSession, WebSocketMessage<?> webSocketMessage)
-            throws IOException {
-        String sessionId = webSocketSession.getId();
-        if (webSocketMessage instanceof TextMessage) {
-            try {
-                String payload = (String) webSocketMessage.getPayload();
-                JsonObject jsonObject = JsonConverter.jsonToJsonElement(payload).getAsJsonObject();
-                String event = jsonObject.get("event").getAsString();
-                String payload1 = jsonObject.get("payload").getAsString();
-                if ("progress".equals(event)) {
-                    MediaProgress mediaProgress = JsonConverter.jsonToObject(payload1, MediaProgress.class);
-                    if (mediaProgress.getEnded()) {
-                        persistProgress(sessionId, mediaProgress);
-                    } else {
-                        map.put(sessionId, mediaProgress);
-                    }
-                }
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-        } else if (webSocketMessage instanceof BinaryMessage) {
-            log.info("接收到 WebSocket 二进制消息");
-        } else if (webSocketMessage instanceof PingMessage) {
-            log.info("接收到 WebSocket PingMessage");
-        } else if (webSocketMessage instanceof PongMessage) {
-            log.info("接收到 WebSocket PongMessage");
-        } else {
-            log.error("接收到未知类型的 WebSocket 消息");
-        }
-    }
-
-    @Override
-    public void handleTransportError(WebSocketSession webSocketSession, Throwable throwable) {
-        persistProgress(webSocketSession.getId());
-        wsConnection.removeUserSession(webSocketSession);
-        log.error("WebSocket 数据传输错误");
-    }
-
-    @Override
-    public void afterConnectionClosed(WebSocketSession webSocketSession, CloseStatus closeStatus) {
-        try {
-            persistProgress(webSocketSession.getId());
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-        wsConnection.removeUserSession(webSocketSession);
-        log.info("WebSocket 断开连接");
-    }
-
-    private void persistProgress(String sessionId) {
-        MediaProgress mediaProgress = map.get(sessionId);
-        if (mediaProgress != null) {
-            long userId = Long.parseLong(wsConnection.getUserId(sessionId));
-            mediaProgress.setUserId(userId);
-            rabbitProducer.sendMediaProgress(mediaProgress);
-        }
-    }
-
-    private void persistProgress(String sessionId, MediaProgress mediaProgress) {
-        long userId = Long.parseLong(wsConnection.getUserId(sessionId));
-        mediaProgress.setUserId(userId);
-        rabbitProducer.sendMediaProgress(mediaProgress);
-    }
-
-    @Override
-    public boolean supportsPartialMessages() {
-        return false;
-    }
-}

+ 16 - 0
message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/msg/EventPayload.java

@@ -0,0 +1,16 @@
+package cn.reghao.tnb.message.app.ws.msg;
+
+import lombok.Getter;
+import lombok.Setter;
+
+/**
+ * @author reghao
+ * @date 2024-12-02 11:07:53
+ */
+@Setter
+@Getter
+public class EventPayload {
+    private String event;
+    // JSON 字符串
+    private String payload;
+}

+ 8 - 0
message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/msg/ProgressEvent.java

@@ -0,0 +1,8 @@
+package cn.reghao.tnb.message.app.ws.msg;
+
+/**
+ * @author reghao
+ * @date 2024-12-02 10:16:51
+ */
+public class ProgressEvent {
+}

+ 93 - 0
message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/service/ChatService.java

@@ -0,0 +1,93 @@
+package cn.reghao.tnb.message.app.ws.service;
+
+import cn.reghao.jutil.jdk.serializer.JsonConverter;
+import cn.reghao.tnb.account.api.constant.TokenType;
+import cn.reghao.tnb.account.api.dto.AuthedAccount;
+import cn.reghao.tnb.account.api.iface.AccountQuery;
+import cn.reghao.tnb.message.api.model.EventType;
+import cn.reghao.tnb.message.api.model.resp.EventMessageResp;
+import cn.reghao.tnb.message.api.model.resp.EvtLoginResp;
+import cn.reghao.tnb.message.app.rabbit.RabbitProducer;
+import cn.reghao.tnb.message.app.ws.WsConnection;
+import cn.reghao.tnb.message.app.ws.msg.ChatPayload;
+import cn.reghao.tnb.message.app.ws.msg.EventMessage;
+import cn.reghao.tnb.user.api.iface.UserContactService;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonObject;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.dubbo.config.annotation.DubboReference;
+import org.springframework.stereotype.Component;
+import org.springframework.web.socket.TextMessage;
+import org.springframework.web.socket.WebSocketSession;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author reghao
+ * @date 2022-04-21 10:32:37
+ */
+@Slf4j
+@Component
+public class ChatService {
+    @DubboReference(check = false, retries = 0)
+    private AccountQuery accountQuery;
+    @DubboReference(check = false, retries = 0)
+    private UserContactService userContactService;
+
+    private final Map<String, Long> sessionMap = new HashMap<>();
+    private final Map<Long, WebSocketSession> userMap = new HashMap<>();
+    private final Gson gson = new GsonBuilder().create();
+    private final WsConnection wsConnection;
+    private RabbitProducer rabbitProducer;
+
+    public ChatService(WsConnection wsConnection) {
+        this.wsConnection = wsConnection;
+    }
+
+    public void online(WebSocketSession session) throws IOException {
+        String query = session.getUri().getQuery();
+        String jwtToken = query.replace("token=", "");
+        AuthedAccount authedAccount = accountQuery.getAuthedAccount(TokenType.token.getValue(), jwtToken);
+        if (authedAccount != null) {
+            long userId = authedAccount.getUserId();
+            onLoginEvent(userId, true);
+            sessionMap.put(session.getId(), userId);
+            userMap.put(userId, session);
+            log.info("{} 在线", userId);
+        }
+    }
+
+    public void onLoginEvent(long userId, boolean online) {
+        EvtLoginResp evtLoginResp = new EvtLoginResp(userId, online);
+        EventMessageResp<EvtLoginResp> resp = new EventMessageResp<>(EventType.event_login, evtLoginResp);
+        TextMessage textMessage = new TextMessage(JsonConverter.objectToJson(resp));
+
+        userContactService.setOnline(userId, online);
+        List<Long> friendIds = userContactService.getOnlineFriends(userId);
+        for (Long friendId : friendIds) {
+            WebSocketSession session = userMap.get(friendId);
+            if (session != null) {
+                try {
+                    session.sendMessage(textMessage);
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+
+    public void onEventTalk(String payload) throws IOException {
+        ChatPayload chatPayload = JsonConverter.jsonToObject(payload, ChatPayload.class);
+        long receiverId = chatPayload.getReceiverId();
+        if (wsConnection.getUserSession(receiverId) == null) {
+            wsConnection.clusterBroadcast(payload);
+            return;
+        }
+
+        wsConnection.sendText(receiverId+"", payload);
+    }
+}

+ 52 - 0
message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/service/ProgressService.java

@@ -0,0 +1,52 @@
+package cn.reghao.tnb.message.app.ws.service;
+
+import cn.reghao.tnb.account.api.iface.AccountQuery;
+import cn.reghao.tnb.message.api.dto.MediaProgress;
+import cn.reghao.tnb.message.app.rabbit.RabbitProducer;
+import cn.reghao.tnb.message.app.ws.WsConnection;
+import org.apache.dubbo.config.annotation.DubboReference;
+import org.springframework.stereotype.Service;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author reghao
+ * @date 2024-12-02 11:36:31
+ */
+@Service
+public class ProgressService {
+    @DubboReference(check = false)
+    private AccountQuery accountQuery;
+
+    private final Map<String, MediaProgress> map = new HashMap<>();
+    private final WsConnection wsConnection;
+    private final RabbitProducer rabbitProducer;
+
+    public ProgressService(WsConnection wsConnection, RabbitProducer rabbitProducer) {
+        this.wsConnection = wsConnection;
+        this.rabbitProducer = rabbitProducer;
+    }
+
+    public void putMediaProgress(String sessionId, MediaProgress mediaProgress) {
+        map.put(sessionId, mediaProgress);
+        if (mediaProgress.getEnded()) {
+            persistProgress(sessionId, mediaProgress);
+        }
+    }
+
+    public void removeMediaProgress(String sessionId) {
+        MediaProgress mediaProgress = map.remove(sessionId);
+        if (mediaProgress != null) {
+            persistProgress(sessionId, mediaProgress);
+        }
+    }
+
+    public void persistProgress(String sessionId, MediaProgress mediaProgress) {
+        long userId = Long.parseLong(wsConnection.getUserId(sessionId));
+        if (mediaProgress != null) {
+            mediaProgress.setUserId(userId);
+            rabbitProducer.sendMediaProgress(mediaProgress);
+        }
+    }
+}