Forráskód Böngészése

update message-service MediaConnection

reghao 5 hónapja
szülő
commit
eac68ba763

+ 20 - 26
message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/media/MediaConnection.java

@@ -38,16 +38,14 @@ public class MediaConnection {
     private AccountQuery accountQuery;
 
     private final RabbitProducer rabbitProducer;
-    private RedisString redisString;
     private RedisHash<HashSet<Long>> redisHash;
     // userId -> session
     private final Map<Long, WebSocketSession> userSessions = new HashMap<>();
     // sessionId -> userId
     private final Map<String, Long> sessionUsers = new HashMap<>();
 
-    public MediaConnection(RabbitProducer rabbitProducer, RedisString redisString, RedisHash<HashSet<Long>> redisHash) {
+    public MediaConnection(RabbitProducer rabbitProducer, RedisHash<HashSet<Long>> redisHash) {
         this.rabbitProducer = rabbitProducer;
-        this.redisString = redisString;
         this.redisHash = redisHash;
     }
 
@@ -56,6 +54,7 @@ public class MediaConnection {
         String videoId = (String) webSocketSession.getAttributes().get("videoId");
         AuthedAccount authedAccount = accountQuery.getAuthedAccount(TokenType.token.getValue(), userToken);
         if (authedAccount != null) {
+            String loginId = authedAccount.getLoginId();
             long userId = authedAccount.getUserId();
             String redisKey = RedisKeys.getVideoUsersKey(videoId);
             HashSet<Long> userIds = redisHash.get(redisKey, videoId);
@@ -66,16 +65,9 @@ public class MediaConnection {
                 userIds.add(userId);
             }
             redisHash.hset(redisKey, videoId, userIds);
+            sendViewEvent(videoId);
 
             addUserSession(userId, webSocketSession);
-            sendViewCount(videoId);
-
-            Map<String, Object> wsMessage = new HashMap<>();
-            String serverAddress = System.getenv("SERVER_ADDRESS");
-            wsMessage.put("broadcastBy", serverAddress);
-            wsMessage.put("type", "sendViewCount");
-            wsMessage.put("videoId", videoId);
-            broadcastCluster(JsonConverter.objectToJson(wsMessage));
             return true;
         }
         return false;
@@ -92,16 +84,21 @@ public class MediaConnection {
         Long userId = sessionUsers.get(sessionId);
         userSessions.remove(userId);
         sessionUsers.remove(userId);
+
+        String videoId = (String) webSocketSession.getAttributes().get("videoId");
+        String redisKey = RedisKeys.getVideoUsersKey(videoId);
+        HashSet<Long> userIds = redisHash.get(redisKey, videoId);
+        if (userIds != null) {
+            userIds.remove(userId);
+            redisHash.hset(redisKey, videoId, userIds);
+        }
+        sendViewEvent(videoId);
     }
 
     public Long getUserId(String sessionId) {
         return sessionUsers.get(sessionId);
     }
 
-    public WebSocketSession getUserSession(long userId) {
-        return userSessions.get(userId+"");
-    }
-
     public void sendPingPayload(WebSocketSession session) throws IOException {
         PingPayload pingPayload = new PingPayload(60, 120);
         EventResp eventResp = new EventResp();
@@ -118,18 +115,15 @@ public class MediaConnection {
         session.sendMessage(textMessage);
     }
 
-    public void sendTextMsg(PushMsg pushMsg) throws IOException {
-        String userId = pushMsg.getReceiverId();
-        EventMsg eventMsg = pushMsg.getEventMsg();
-
-        WebSocketSession webSocketSession = userSessions.get(userId);
-        if (webSocketSession == null) {
-            log.error("{} 不在线", userId);
-            return;
-        }
+    public void sendViewEvent(String videoId) {
+        sendViewCount(videoId);
 
-        TextMessage textMessage = new TextMessage(JsonConverter.objectToJson(eventMsg));
-        webSocketSession.sendMessage(textMessage);
+        Map<String, Object> wsMessage = new HashMap<>();
+        String serverAddress = System.getenv("SERVER_ADDRESS");
+        wsMessage.put("broadcastBy", serverAddress);
+        wsMessage.put("type", "sendViewCount");
+        wsMessage.put("videoId", videoId);
+        broadcastCluster(JsonConverter.objectToJson(wsMessage));
     }
 
     public void send(String receiverId, String content) throws IOException {