Переглянути джерело

更新 message-service 中的 websocket 处理

reghao 7 місяців тому
батько
коміт
03d6cc826b
32 змінених файлів з 671 додано та 145 видалено
  1. 5 0
      message/message-service/pom.xml
  2. 47 0
      message/message-service/src/main/java/cn/reghao/tnb/message/app/config/SpringLifecycle.java
  3. 24 11
      message/message-service/src/main/java/cn/reghao/tnb/message/app/rabbit/RabbitListeners.java
  4. 3 4
      message/message-service/src/main/java/cn/reghao/tnb/message/app/rabbit/RabbitProducer.java
  5. 1 1
      message/message-service/src/main/java/cn/reghao/tnb/message/app/rabbit/c/RabbitConfig.java
  6. 0 1
      message/message-service/src/main/java/cn/reghao/tnb/message/app/rabbit/c/RabbitProducers.java
  7. 0 1
      message/message-service/src/main/java/cn/reghao/tnb/message/app/rabbit/c/RpcListener.java
  8. 1 1
      message/message-service/src/main/java/cn/reghao/tnb/message/app/rabbit/listener/RabbitListenerConfig.java
  9. 32 0
      message/message-service/src/main/java/cn/reghao/tnb/message/app/redis/RedisConfig.java
  10. 15 0
      message/message-service/src/main/java/cn/reghao/tnb/message/app/redis/RedisKeys.java
  11. 62 0
      message/message-service/src/main/java/cn/reghao/tnb/message/app/redis/ds/RedisHash.java
  12. 52 0
      message/message-service/src/main/java/cn/reghao/tnb/message/app/redis/ds/RedisOps.java
  13. 46 0
      message/message-service/src/main/java/cn/reghao/tnb/message/app/redis/ds/RedisString.java
  14. 36 0
      message/message-service/src/main/java/cn/reghao/tnb/message/app/redis/ds/RedisStringObject.java
  15. 0 13
      message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/Keys.java
  16. 16 31
      message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/chat/ChatConnection.java
  17. 13 15
      message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/chat/ChatHandler.java
  18. 9 11
      message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/chat/ChatService.java
  19. 1 2
      message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/chat/msg/ChatPayload.java
  20. 1 1
      message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/chat/msg/EventReq.java
  21. 1 1
      message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/chat/msg/EventResp.java
  22. 1 1
      message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/config/PingPayload.java
  23. 6 6
      message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/config/WebSocketConfig.java
  24. 24 8
      message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/config/WebSocketInterceptor.java
  25. 10 0
      message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/config/WebSocketPath.java
  26. 201 0
      message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/media/MediaConnection.java
  27. 22 22
      message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/media/MediaHandler.java
  28. 6 7
      message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/media/MediaService.java
  29. 22 0
      message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/media/msg/MediaEvent.java
  30. 9 0
      message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/media/msg/MediaEventType.java
  31. 0 8
      message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/msg/ProgressEvent.java
  32. 5 0
      message/message-service/src/main/resources/application-test.yml

+ 5 - 0
message/message-service/pom.xml

@@ -66,6 +66,11 @@
             <artifactId>spring-boot-starter-websocket</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-data-redis</artifactId>
+        </dependency>
+
         <dependency>
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-amqp</artifactId>

+ 47 - 0
message/message-service/src/main/java/cn/reghao/tnb/message/app/config/SpringLifecycle.java

@@ -4,7 +4,14 @@ import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.DisposableBean;
 import org.springframework.boot.ApplicationArguments;
 import org.springframework.boot.ApplicationRunner;
+import org.springframework.boot.SpringApplication;
 import org.springframework.stereotype.Component;
+import org.springframework.web.context.WebApplicationContext;
+
+import java.net.*;
+import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.Set;
 
 /**
  * @author reghao
@@ -13,8 +20,24 @@ import org.springframework.stereotype.Component;
 @Slf4j
 @Component
 public class SpringLifecycle implements ApplicationRunner, DisposableBean {
+    private WebApplicationContext applicationContext;
+
+    public SpringLifecycle(WebApplicationContext applicationContext) {
+        this.applicationContext = applicationContext;
+    }
+
     @Override
     public void run(ApplicationArguments args) {
+        Set<String> ipv4AddressSet = getAddress();
+        // 通过 vm option -Dserver.address=192.168.0.10 指定
+        String serverAddress1 = System.getProperty("server.address");
+        // 通过环境变量 SERVER_ADDRESS=192.168.0.10 指定
+        String serverAddress = System.getenv("SERVER_ADDRESS");
+        if (!ipv4AddressSet.contains(serverAddress)) {
+            log.error("没有通过环境变量 SERVER_ADDRESS 指定地址, 或是指定的地址不是机器地址, 结束 MessageService 进程...");
+            System.exit(SpringApplication.exit(applicationContext));
+        }
+
         log.info("MessageService 启动...");
     }
 
@@ -22,4 +45,28 @@ public class SpringLifecycle implements ApplicationRunner, DisposableBean {
     public void destroy() {
         log.info("MessageService 停止...");
     }
+
+    public Set<String> getAddress() {
+        Set<String> ipv4AddressSet = new HashSet<>();
+        try {
+            Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();
+            // 遍历主机的网络接口
+            while (interfaces.hasMoreElements()) {
+                NetworkInterface iface = interfaces.nextElement();
+                String ifaceName = iface.getName();
+                Enumeration<InetAddress> inetAddrs = iface.getInetAddresses();
+                while (inetAddrs.hasMoreElements()) {
+                    InetAddress address = inetAddrs.nextElement();
+                    if (!address.isLoopbackAddress()) {
+                        if (address instanceof Inet4Address) {
+                            ipv4AddressSet.add(address.getHostAddress());
+                        }
+                    }
+                }
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        return ipv4AddressSet;
+    }
 }

+ 24 - 11
message/message-service/src/main/java/cn/reghao/tnb/message/app/rabbit/listener/RabbitListeners.java → message/message-service/src/main/java/cn/reghao/tnb/message/app/rabbit/RabbitListeners.java

@@ -1,10 +1,10 @@
-package cn.reghao.tnb.message.app.rabbit.listener;
+package cn.reghao.tnb.message.app.rabbit;
 
 import cn.reghao.jutil.jdk.serializer.JsonConverter;
 import cn.reghao.tnb.message.api.dto.msg.BaseMessage;
 import cn.reghao.tnb.message.app.service.MessageConsumer;
-import cn.reghao.tnb.message.app.ws.WsConnection;
-import cn.reghao.tnb.message.app.ws.msg.ChatPayload;
+import cn.reghao.tnb.message.app.ws.media.MediaConnection;
+import com.google.gson.JsonObject;
 import com.rabbitmq.client.Channel;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.amqp.core.ExchangeTypes;
@@ -22,11 +22,11 @@ import java.io.IOException;
 @Slf4j
 @Component
 public class RabbitListeners {
-    private final WsConnection wsConnection;
+    private final MediaConnection mediaConnection;
     private final MessageConsumer messageConsumer;
 
-    public RabbitListeners(WsConnection wsConnection, MessageConsumer messageConsumer) {
-        this.wsConnection = wsConnection;
+    public RabbitListeners(MediaConnection mediaConnection, MessageConsumer messageConsumer) {
+        this.mediaConnection = mediaConnection;
         this.messageConsumer = messageConsumer;
     }
 
@@ -36,8 +36,22 @@ public class RabbitListeners {
             // 交换机的名字必须和生产者保持一致, fanout 是以广播模式(发布订阅模式)
             exchange = @Exchange(value = "amq.fanout", type = ExchangeTypes.FANOUT))
     )
-    public void chatMessageConsumer(@Payload String msg) {
-        ChatPayload chatPayload = JsonConverter.jsonToObject(msg, ChatPayload.class);
+    public void consumeWsMessage(@Payload String msg) {
+        String serverAddress = System.getenv("SERVER_ADDRESS");
+        log.info("WsMessage -> {}", msg);
+        JsonObject jsonObject = JsonConverter.jsonToJsonElement(msg).getAsJsonObject();
+        String broadcastBy = jsonObject.get("broadcastBy").getAsString();
+        if (broadcastBy.equals(serverAddress)) {
+            log.info("broadcast by myself, ignore message...");
+            return;
+        }
+
+        String type = jsonObject.get("type").getAsString();
+        if ("sendViewCount".equals(type)) {
+            String videoId = jsonObject.get("videoId").getAsString();
+            mediaConnection.sendViewCount(videoId);
+        }
+        /*ChatPayload chatPayload = JsonConverter.jsonToObject(msg, ChatPayload.class);
         long receiverId = chatPayload.getReceiverId();
         String content = chatPayload.getContent();
         if (wsConnection.getUserSession(receiverId) != null) {
@@ -49,7 +63,7 @@ public class RabbitListeners {
             }
         } else {
             log.info("{} 的 session 不在本节点上", receiverId);
-        }
+        }*/
     }
 
     @RabbitListener(bindings =@QueueBinding(
@@ -58,8 +72,7 @@ public class RabbitListeners {
             exchange = @Exchange(value = "amq.direct")),
             ackMode = "MANUAL"
     )
-    @RabbitHandler
-    public void consumeMessage(@Payload BaseMessage baseMessage, Channel channel, Message message) {
+    public void consumeNotifyMessage(@Payload BaseMessage baseMessage, Channel channel, Message message) {
         messageConsumer.sendMessage(baseMessage);
         long deliveryTag = message.getMessageProperties().getDeliveryTag();
         try {

+ 3 - 4
message/message-service/src/main/java/cn/reghao/tnb/message/app/rabbit/RabbitProducer.java

@@ -17,10 +17,9 @@ public class RabbitProducer {
         this.rabbitTemplate = rabbitTemplate;
     }
 
-    public void sendChatMessage(String msg) {
-        String exchange = exchange = "amq.fanout";
-        String routingKey = "tnb.message.chat.queue";
-        routingKey = "";
+    public void broadcastWsMessage(String msg) {
+        String exchange = "amq.fanout";
+        String routingKey = "";
         //如果交换机名称写错了。那么confirmCallback 回调失败的。
         //如果设置routingkey 是一个错误,那么returncallback 会进行回调调用
         rabbitTemplate.convertAndSend(exchange, routingKey, msg);

+ 1 - 1
message/message-service/src/main/java/cn/reghao/tnb/message/app/rabbit/RabbitConfig.java → message/message-service/src/main/java/cn/reghao/tnb/message/app/rabbit/c/RabbitConfig.java

@@ -1,4 +1,4 @@
-package cn.reghao.tnb.message.app.rabbit;
+package cn.reghao.tnb.message.app.rabbit.c;
 
 /**
  * @author reghao

+ 0 - 1
message/message-service/src/main/java/cn/reghao/tnb/message/app/rabbit/c/RabbitProducers.java

@@ -1,7 +1,6 @@
 package cn.reghao.tnb.message.app.rabbit.c;
 
 import cn.reghao.jutil.jdk.serializer.JsonConverter;
-import cn.reghao.tnb.message.app.rabbit.RabbitConfig;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;

+ 0 - 1
message/message-service/src/main/java/cn/reghao/tnb/message/app/rabbit/c/RpcListener.java

@@ -1,6 +1,5 @@
 package cn.reghao.tnb.message.app.rabbit.c;
 
-import cn.reghao.tnb.message.app.rabbit.RabbitConfig;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;

+ 1 - 1
message/message-service/src/main/java/cn/reghao/tnb/message/app/rabbit/RabbitListenerConfig.java → message/message-service/src/main/java/cn/reghao/tnb/message/app/rabbit/listener/RabbitListenerConfig.java

@@ -1,4 +1,4 @@
-package cn.reghao.tnb.message.app.rabbit;
+package cn.reghao.tnb.message.app.rabbit.listener;
 
 import cn.reghao.tnb.message.app.rabbit.listener.WsMessageListener;
 import org.springframework.amqp.core.MessageListener;

+ 32 - 0
message/message-service/src/main/java/cn/reghao/tnb/message/app/redis/RedisConfig.java

@@ -0,0 +1,32 @@
+package cn.reghao.tnb.message.app.redis;
+
+import org.springframework.cache.annotation.CachingConfigurerSupport;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.data.redis.connection.RedisConnectionFactory;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.serializer.RedisSerializer;
+import org.springframework.data.redis.serializer.StringRedisSerializer;
+
+/**
+ * Redis 连接和序列化配置
+ *
+ * @author reghao
+ * @date 2021-11-15 14:40:57
+ */
+@Configuration
+public class RedisConfig extends CachingConfigurerSupport {
+    @Bean
+    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
+        RedisTemplate<String, Object> template = new RedisTemplate<>();
+        template.setConnectionFactory(factory);
+
+        template.setKeySerializer(new StringRedisSerializer());
+        //template.setValueSerializer(RedisSerializer.string());
+        //template.setValueSerializer(RedisSerializer.java());
+
+        template.setHashKeySerializer(RedisSerializer.string());
+        template.setValueSerializer(RedisSerializer.java());
+        return template;
+    }
+}

+ 15 - 0
message/message-service/src/main/java/cn/reghao/tnb/message/app/redis/RedisKeys.java

@@ -0,0 +1,15 @@
+package cn.reghao.tnb.message.app.redis;
+
+/**
+ * @author reghao
+ * @date 2023-02-03 15:57:06
+ */
+public class RedisKeys {
+    public static String getMediaProgressKey(String videoId) {
+        return String.format("tnb:message:progress:%s", videoId);
+    }
+
+    public static String getVideoUsersKey(String videoId) {
+        return String.format("tnb:message:progress:%s", videoId);
+    }
+}

+ 62 - 0
message/message-service/src/main/java/cn/reghao/tnb/message/app/redis/ds/RedisHash.java

@@ -0,0 +1,62 @@
+package cn.reghao.tnb.message.app.redis.ds;
+
+import org.springframework.data.redis.core.HashOperations;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * @author reghao
+ * @date 2021-03-07 17:44:23
+ */
+@Component
+public class RedisHash<T> {
+    private final HashOperations<String, String, T> hashOps;
+
+    public RedisHash(RedisTemplate<String, Object> redisTemplate) {
+        this.hashOps = redisTemplate.opsForHash();
+    }
+
+    public void set(String key, String hashKey, T hashValue) {
+        hashOps.put(key, hashKey, hashValue);
+    }
+
+    public T get(String key, String hashKey) {
+        return hashOps.get(key, hashKey);
+    }
+
+    public List<T> multiGet(String key, Set<String> hashKeys) {
+        return hashOps.multiGet(key, hashKeys);
+    }
+
+    public Map<String, T> entries(String key) {
+        return hashOps.entries(key);
+    }
+
+    public void delete(String key, String hashKey) {
+        hashOps.delete(key, hashKey);
+    }
+
+    public void hset(String key, String hashKey, T hashValue) {
+        hashOps.put(key, hashKey, hashValue);
+    }
+
+    public void hincrby(String key, String hashKey, int value) {
+        hashOps.increment(key, hashKey, value);
+    }
+
+    public Long hlen(String key) {
+        return hashOps.size(key);
+    }
+
+    public void hdel(String key, String hashKey) {
+        hashOps.delete(key, hashKey);
+    }
+
+    public Map<String, T> hgetall(String key) {
+        return hashOps.entries(key);
+    }
+}

+ 52 - 0
message/message-service/src/main/java/cn/reghao/tnb/message/app/redis/ds/RedisOps.java

@@ -0,0 +1,52 @@
+package cn.reghao.tnb.message.app.redis.ds;
+
+import org.springframework.data.redis.core.RedisOperations;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.core.script.DefaultRedisScript;
+import org.springframework.stereotype.Component;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author reghao
+ * @date 2021-03-07 17:44:23
+ */
+@Component
+public class RedisOps {
+    private final RedisOperations<String, String> redisOps;
+
+    public RedisOps(RedisTemplate<String, String> redisTemplate) {
+        this.redisOps = redisTemplate;
+    }
+
+    public Long eval(String lua, String[] keys, Object... values) {
+        DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(lua, Long.class);
+        return redisOps.execute(redisScript, Arrays.asList(keys), values);
+    }
+
+    public boolean expire(String key, long timeout) {
+        Boolean ret = redisOps.expire(key, timeout, TimeUnit.SECONDS);
+        return ret != null && ret;
+    }
+
+    public long ttl(String key) {
+        Long ttl = redisOps.getExpire(key);
+        return ttl != null ? ttl : 0;
+    }
+
+    public boolean exists(String key) {
+        Boolean ret = redisOps.hasKey(key);
+        return ret != null && ret;
+    }
+
+    public Set<String> keys(String pattern) {
+        return redisOps.keys(pattern);
+    }
+
+    public void del(String... key) {
+        Long ret = redisOps.delete(List.of(key));
+    }
+}

+ 46 - 0
message/message-service/src/main/java/cn/reghao/tnb/message/app/redis/ds/RedisString.java

@@ -0,0 +1,46 @@
+package cn.reghao.tnb.message.app.redis.ds;
+
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.core.ValueOperations;
+import org.springframework.stereotype.Component;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author reghao
+ * @date 2021-03-07 17:44:23
+ */
+@Component
+public class RedisString {
+    private final ValueOperations<String, String> stringOps;
+
+    public RedisString(RedisTemplate<String, String> redisTemplate) {
+        this.stringOps = redisTemplate.opsForValue();
+    }
+
+    public void set(String key, String value) {
+        stringOps.set(key, value);
+    }
+
+    public void setWithTimeout(String key, String value, long timeout) {
+        stringOps.set(key, value, timeout, TimeUnit.SECONDS);
+    }
+
+    public Boolean setIfAbsent(String key, String value, long timeout) {
+        return stringOps.setIfAbsent(key, value, timeout, TimeUnit.SECONDS);
+    }
+
+    public String get(String key) {
+        return stringOps.get(key);
+    }
+
+    public long incr(String key) {
+        Long ret = stringOps.increment(key);
+        return ret != null ? ret : 0;
+    }
+
+    public long decr(String key) {
+        Long ret = stringOps.decrement(key);
+        return ret != null ? ret : 0;
+    }
+}

+ 36 - 0
message/message-service/src/main/java/cn/reghao/tnb/message/app/redis/ds/RedisStringObject.java

@@ -0,0 +1,36 @@
+package cn.reghao.tnb.message.app.redis.ds;
+
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.core.ValueOperations;
+import org.springframework.stereotype.Component;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author reghao
+ * @date 2021-03-07 17:44:23
+ */
+@Component
+public class RedisStringObject {
+    private final ValueOperations<String, Object> stringOps;
+
+    public RedisStringObject(RedisTemplate<String, Object> redisTemplate) {
+        this.stringOps = redisTemplate.opsForValue();
+    }
+
+    public void set(String key, Object value) {
+        stringOps.set(key, value);
+    }
+
+    public void setWithTimeout(String key, Object value, long timeout) {
+        stringOps.set(key, value, timeout, TimeUnit.SECONDS);
+    }
+
+    public Boolean setIfAbsent(String key, Object value, long timeout) {
+        return stringOps.setIfAbsent(key, value, timeout, TimeUnit.SECONDS);
+    }
+
+    public Object get(String key) {
+        return stringOps.get(key);
+    }
+}

+ 0 - 13
message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/Keys.java

@@ -1,13 +0,0 @@
-package cn.reghao.tnb.message.app.ws;
-
-/**
- * @author reghao
- * @date 2021-07-07 13:45:22
- */
-public class Keys {
-    public static final String USER_UUID = "user-uuid";
-    // SSH 连接操作
-    public static final String OPS_CONNECT = "connect";
-    // SSH 命令操作
-    public static final String OPS_COMMAND = "command";
-}

+ 16 - 31
message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/WsConnection.java → message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/chat/ChatConnection.java

@@ -1,4 +1,4 @@
-package cn.reghao.tnb.message.app.ws;
+package cn.reghao.tnb.message.app.ws.chat;
 
 import cn.reghao.jutil.jdk.serializer.JdkSerializer;
 import cn.reghao.jutil.jdk.serializer.JsonConverter;
@@ -10,10 +10,9 @@ import cn.reghao.tnb.message.api.dto.event.EventMsg;
 import cn.reghao.tnb.message.api.model.EventType;
 import cn.reghao.tnb.message.api.model.resp.EventMessageResp;
 import cn.reghao.tnb.message.app.rabbit.RabbitProducer;
-import cn.reghao.tnb.message.app.ws.msg.ChatPayload;
-import cn.reghao.tnb.message.app.ws.msg.EventReq;
-import cn.reghao.tnb.message.app.ws.msg.EventResp;
-import cn.reghao.tnb.message.app.ws.msg.PingPayload;
+import cn.reghao.tnb.message.app.ws.chat.msg.ChatPayload;
+import cn.reghao.tnb.message.app.ws.chat.msg.EventResp;
+import cn.reghao.tnb.message.app.ws.config.PingPayload;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.dubbo.config.annotation.DubboReference;
 import org.springframework.stereotype.Component;
@@ -28,38 +27,37 @@ import java.util.Map;
 
 /**
  * @author reghao
- * @date 2022-09-01 10:44:06
+ * @date 2025-10-11 20:45:24
  */
 @Slf4j
 @Component
-public class WsConnection {
+public class ChatConnection {
     @DubboReference(check = false, retries = 0, timeout = 60_000)
     private AccountQuery accountQuery;
 
     private final RabbitProducer rabbitProducer;
     // userId -> session
-    private final Map<String, WebSocketSession> userSessions = new HashMap<>();
+    private final Map<Long, WebSocketSession> userSessions = new HashMap<>();
     // sessionId -> userId
-    private final Map<String, String> sessionUsers = new HashMap<>();
+    private final Map<String, Long> sessionUsers = new HashMap<>();
 
-    public WsConnection(RabbitProducer rabbitProducer) {
+    public ChatConnection(RabbitProducer rabbitProducer) {
         this.rabbitProducer = rabbitProducer;
     }
 
     public boolean addSession(WebSocketSession webSocketSession) {
-        String query = webSocketSession.getUri().getQuery();
-        String jwtToken = query.replace("token=", "");
-        AuthedAccount authedAccount = accountQuery.getAuthedAccount(TokenType.token.getValue(), jwtToken);
+        String userToken = (String) webSocketSession.getAttributes().get("userToken");
+        //String videoId = (String) webSocketSession.getAttributes().get("videoId");
+        AuthedAccount authedAccount = accountQuery.getAuthedAccount(TokenType.token.getValue(), userToken);
         if (authedAccount != null) {
             long userId = authedAccount.getUserId();
-            addUserSession(userId+"", webSocketSession);
+            addUserSession(userId, webSocketSession);
             return true;
         }
-
         return false;
     }
 
-    public void addUserSession(String userId, WebSocketSession webSocketSession) {
+    public void addUserSession(long userId, WebSocketSession webSocketSession) {
         String sessionId = webSocketSession.getId();
         userSessions.put(userId, webSocketSession);
         sessionUsers.put(sessionId, userId);
@@ -67,12 +65,12 @@ public class WsConnection {
 
     public void removeUserSession(WebSocketSession webSocketSession) {
         String sessionId = webSocketSession.getId();
-        String userId = sessionUsers.get(sessionId);
+        Long userId = sessionUsers.get(sessionId);
         userSessions.remove(userId);
         sessionUsers.remove(userId);
     }
 
-    public String getUserId(String sessionId) {
+    public Long getUserId(String sessionId) {
         return sessionUsers.get(sessionId);
     }
 
@@ -142,15 +140,6 @@ public class WsConnection {
         }
     }
 
-    public void sendText(String receiverId, String message) throws IOException {
-        WebSocketSession session = userSessions.get(receiverId);
-        if (session != null) {
-            String json = JsonConverter.objectToJson(message);
-            TextMessage textMessage = new TextMessage(json);
-            session.sendMessage(textMessage);
-        }
-    }
-
     public void sendChatMessage(String receiverId, ChatPayload chatPayload) throws IOException {
         WebSocketSession session = userSessions.get(receiverId);
         if (session == null) {
@@ -188,8 +177,4 @@ public class WsConnection {
             }
         }
     }
-
-    public void clusterBroadcast(String payload) {
-        rabbitProducer.sendChatMessage(payload);
-    }
 }

+ 13 - 15
message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/handler/ChatHandler.java → message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/chat/ChatHandler.java

@@ -1,11 +1,9 @@
-package cn.reghao.tnb.message.app.ws.handler;
+package cn.reghao.tnb.message.app.ws.chat;
 
 import cn.reghao.jutil.jdk.serializer.JdkSerializer;
 import cn.reghao.jutil.jdk.serializer.JsonConverter;
 import cn.reghao.tnb.message.api.model.EventType;
-import cn.reghao.tnb.message.app.ws.WsConnection;
-import cn.reghao.tnb.message.app.ws.msg.EventReq;
-import cn.reghao.tnb.message.app.ws.service.ChatService;
+import cn.reghao.tnb.message.app.ws.chat.msg.EventReq;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Component;
 import org.springframework.web.socket.*;
@@ -19,22 +17,22 @@ import java.io.IOException;
 @Slf4j
 @Component
 public class ChatHandler implements WebSocketHandler {
-    private final WsConnection wsConnection;
+    private final ChatConnection chatConnection;
     private final ChatService chatService;
 
-    public ChatHandler(WsConnection wsConnection, ChatService chatService) {
-        this.wsConnection = wsConnection;
+    public ChatHandler(ChatConnection chatConnection, ChatService chatService) {
+        this.chatConnection = chatConnection;
         this.chatService = chatService;
     }
 
     @Override
     public void afterConnectionEstablished(WebSocketSession webSocketSession) throws IOException {
-        boolean ret = wsConnection.addSession(webSocketSession);
+        boolean ret = chatConnection.addSession(webSocketSession);
         if (ret) {
             log.info("WebSocket 建立连接");
-            wsConnection.sendPingPayload(webSocketSession);
-            String userId = wsConnection.getUserId(webSocketSession.getId());
-            chatService.onLoginEvent(Long.parseLong(userId), true);
+            chatConnection.sendPingPayload(webSocketSession);
+            Long userId = chatConnection.getUserId(webSocketSession.getId());
+            chatService.onLoginEvent(userId, true);
         } else {
             log.info("没有认证信息, 关闭 WebSocket 连接");
             webSocketSession.close(CloseStatus.NO_STATUS_CODE);
@@ -85,7 +83,7 @@ public class ChatHandler implements WebSocketHandler {
             String event = eventReq.getEvent();
             switch (EventType.valueOf(event)) {
                 case heartbeat:
-                    wsConnection.sendHeartbeatPong(webSocketSession);
+                    chatConnection.sendHeartbeatPong(webSocketSession);
                     break;
                 case event_contact_apply:
                     log.info("contact_apply event");
@@ -113,8 +111,8 @@ public class ChatHandler implements WebSocketHandler {
     }
 
     private void handleOnClose(WebSocketSession webSocketSession) {
-        String userId = wsConnection.getUserId(webSocketSession.getId());
-        chatService.onLoginEvent(Long.parseLong(userId), false);
-        wsConnection.removeUserSession(webSocketSession);
+        Long userId = chatConnection.getUserId(webSocketSession.getId());
+        chatService.onLoginEvent(userId, false);
+        chatConnection.removeUserSession(webSocketSession);
     }
 }

+ 9 - 11
message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/service/ChatService.java → message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/chat/ChatService.java

@@ -1,12 +1,11 @@
-package cn.reghao.tnb.message.app.ws.service;
+package cn.reghao.tnb.message.app.ws.chat;
 
 import cn.reghao.jutil.jdk.serializer.JsonConverter;
 import cn.reghao.tnb.account.api.iface.AccountQuery;
 import cn.reghao.tnb.message.api.model.EventType;
 import cn.reghao.tnb.message.api.model.resp.EventMessageResp;
 import cn.reghao.tnb.message.api.model.resp.EvtLoginResp;
-import cn.reghao.tnb.message.app.ws.WsConnection;
-import cn.reghao.tnb.message.app.ws.msg.ChatPayload;
+import cn.reghao.tnb.message.app.ws.chat.msg.ChatPayload;
 import cn.reghao.tnb.user.api.iface.UserContactService;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.dubbo.config.annotation.DubboReference;
@@ -29,10 +28,10 @@ public class ChatService {
     @DubboReference(check = false, retries = 0)
     private UserContactService userContactService;
 
-    private final WsConnection wsConnection;
+    private final ChatConnection chatConnection;
 
-    public ChatService(WsConnection wsConnection) {
-        this.wsConnection = wsConnection;
+    public ChatService(ChatConnection chatConnection) {
+        this.chatConnection = chatConnection;
     }
 
     public void onLoginEvent(long userId, boolean online) {
@@ -43,7 +42,7 @@ public class ChatService {
         userContactService.setOnline(userId, online);
         List<Long> friendIds = userContactService.getOnlineFriends(userId);
         for (Long friendId : friendIds) {
-            WebSocketSession session = wsConnection.getUserSession(friendId);
+            WebSocketSession session = chatConnection.getUserSession(friendId);
             if (session != null) {
                 try {
                     session.sendMessage(textMessage);
@@ -57,11 +56,10 @@ public class ChatService {
     public void onEventTalk(String payload) throws IOException {
         ChatPayload chatPayload = JsonConverter.jsonToObject(payload, ChatPayload.class);
         long receiverId = chatPayload.getReceiverId();
-        if (wsConnection.getUserSession(receiverId) == null) {
-            wsConnection.clusterBroadcast(payload);
+        if (chatConnection.getUserSession(receiverId) == null) {
+            // wsConnection.broadcastCluster(payload);
             return;
         }
-
-        wsConnection.sendChatMessage(receiverId+"", chatPayload);
+        chatConnection.sendChatMessage(receiverId+"", chatPayload);
     }
 }

+ 1 - 2
message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/msg/ChatPayload.java → message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/chat/msg/ChatPayload.java

@@ -1,6 +1,5 @@
-package cn.reghao.tnb.message.app.ws.msg;
+package cn.reghao.tnb.message.app.ws.chat.msg;
 
-import lombok.AllArgsConstructor;
 import lombok.Getter;
 import lombok.NoArgsConstructor;
 import lombok.Setter;

+ 1 - 1
message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/msg/EventReq.java → message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/chat/msg/EventReq.java

@@ -1,4 +1,4 @@
-package cn.reghao.tnb.message.app.ws.msg;
+package cn.reghao.tnb.message.app.ws.chat.msg;
 
 import lombok.Getter;
 import lombok.Setter;

+ 1 - 1
message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/msg/EventResp.java → message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/chat/msg/EventResp.java

@@ -1,4 +1,4 @@
-package cn.reghao.tnb.message.app.ws.msg;
+package cn.reghao.tnb.message.app.ws.chat.msg;
 
 import lombok.Getter;
 import lombok.Setter;

+ 1 - 1
message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/msg/PingPayload.java → message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/config/PingPayload.java

@@ -1,4 +1,4 @@
-package cn.reghao.tnb.message.app.ws.msg;
+package cn.reghao.tnb.message.app.ws.config;
 
 import lombok.AllArgsConstructor;
 import lombok.Getter;

+ 6 - 6
message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/WebSocketConfig.java → message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/config/WebSocketConfig.java

@@ -1,4 +1,4 @@
-package cn.reghao.tnb.message.app.ws;
+package cn.reghao.tnb.message.app.ws.config;
 
 import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.context.annotation.Configuration;
@@ -15,24 +15,24 @@ import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry
 @Configuration
 public class WebSocketConfig implements WebSocketConfigurer {
     private final WebSocketInterceptor webSocketInterceptor;
-    private final WebSocketHandler videoProgressHandler;
+    private final WebSocketHandler progressHandler;
     private final WebSocketHandler chatHandler;
 
     public WebSocketConfig(WebSocketInterceptor webSocketInterceptor,
-                           @Qualifier("progressHandler") WebSocketHandler videoProgressHandler,
+                           @Qualifier("mediaHandler") WebSocketHandler progressHandler,
                            @Qualifier("chatHandler") WebSocketHandler chatHandler) {
         this.webSocketInterceptor = webSocketInterceptor;
-        this.videoProgressHandler = videoProgressHandler;
+        this.progressHandler = progressHandler;
         this.chatHandler = chatHandler;
     }
 
     @Override
     public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
-        registry.addHandler(videoProgressHandler, "/ws/progress")
+        registry.addHandler(progressHandler, WebSocketPath.wsMedia)
                 .addInterceptors(webSocketInterceptor)
                 .setAllowedOrigins("*");
 
-        registry.addHandler(chatHandler, "/ws/chat")
+        registry.addHandler(chatHandler, WebSocketPath.wsChat)
                 .addInterceptors(webSocketInterceptor)
                 .setAllowedOrigins("*");
     }

+ 24 - 8
message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/WebSocketInterceptor.java → message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/config/WebSocketInterceptor.java

@@ -1,5 +1,6 @@
-package cn.reghao.tnb.message.app.ws;
+package cn.reghao.tnb.message.app.ws.config;
 
+import lombok.extern.slf4j.Slf4j;
 import org.springframework.http.server.ServerHttpRequest;
 import org.springframework.http.server.ServerHttpResponse;
 import org.springframework.http.server.ServletServerHttpRequest;
@@ -8,12 +9,14 @@ import org.springframework.stereotype.Component;
 import org.springframework.web.socket.WebSocketHandler;
 import org.springframework.web.socket.server.HandshakeInterceptor;
 
+import javax.servlet.http.HttpServletRequest;
 import java.util.Map;
 
 /**
  * @author reghao
  * @date 2022-03-11 16:42:44
  */
+@Slf4j
 @Component
 public class WebSocketInterceptor implements HandshakeInterceptor {
     @Override
@@ -22,15 +25,28 @@ public class WebSocketInterceptor implements HandshakeInterceptor {
         if (request instanceof ServletServerHttpRequest) {
             ServletServerHttpRequest httpRequest = (ServletServerHttpRequest) request;
             String path = httpRequest.getURI().getPath();
-            if (path.startsWith("/ws")) {
-                String query = httpRequest.getURI().getQuery();
-                String token = query.replace("token=", "");
-                if (!token.isBlank()) {
-                    attributes.put(Keys.USER_UUID, token);
+            if (path.startsWith(WebSocketPath.wsMedia)) {
+                HttpServletRequest httpServletRequest = httpRequest.getServletRequest();
+                String userToken = httpServletRequest.getParameter("userToken");
+                String videoId = httpServletRequest.getParameter("videoId");
+                if (userToken != null && videoId != null) {
+                    attributes.put("userToken", userToken);
+                    attributes.put("videoId", videoId);
                     return true;
                 }
-            } else {
-                return true;
+
+                log.error("connect to {} error", WebSocketPath.wsMedia);
+                return false;
+            } else if (path.startsWith(WebSocketPath.wsChat)) {
+                HttpServletRequest httpServletRequest = httpRequest.getServletRequest();
+                String userToken = httpServletRequest.getParameter("userToken");
+                if (userToken != null) {
+                    attributes.put("userToken", userToken);
+                    return true;
+                }
+
+                log.error("connect to {} error", WebSocketPath.wsChat);
+                return false;
             }
         }
 

+ 10 - 0
message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/config/WebSocketPath.java

@@ -0,0 +1,10 @@
+package cn.reghao.tnb.message.app.ws.config;
+
+/**
+ * @author reghao
+ * @date 2025-09-26 09:30:11
+ */
+public class WebSocketPath {
+    public static String wsMedia = "/ws/media";
+    public static String wsChat = "/ws/chat";
+}

+ 201 - 0
message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/media/MediaConnection.java

@@ -0,0 +1,201 @@
+package cn.reghao.tnb.message.app.ws.media;
+
+import cn.reghao.jutil.jdk.serializer.JdkSerializer;
+import cn.reghao.jutil.jdk.serializer.JsonConverter;
+import cn.reghao.tnb.account.api.constant.TokenType;
+import cn.reghao.tnb.account.api.dto.AuthedAccount;
+import cn.reghao.tnb.account.api.iface.AccountQuery;
+import cn.reghao.tnb.message.api.dto.PushMsg;
+import cn.reghao.tnb.message.api.dto.event.EventMsg;
+import cn.reghao.tnb.message.api.model.EventType;
+import cn.reghao.tnb.message.api.model.resp.EventMessageResp;
+import cn.reghao.tnb.message.app.rabbit.RabbitProducer;
+import cn.reghao.tnb.message.app.redis.RedisKeys;
+import cn.reghao.tnb.message.app.redis.ds.RedisHash;
+import cn.reghao.tnb.message.app.redis.ds.RedisString;
+import cn.reghao.tnb.message.app.ws.chat.msg.ChatPayload;
+import cn.reghao.tnb.message.app.ws.chat.msg.EventResp;
+import cn.reghao.tnb.message.app.ws.config.PingPayload;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.dubbo.config.annotation.DubboReference;
+import org.springframework.stereotype.Component;
+import org.springframework.web.socket.BinaryMessage;
+import org.springframework.web.socket.TextMessage;
+import org.springframework.web.socket.WebSocketSession;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.*;
+
+/**
+ * @author reghao
+ * @date 2022-09-01 10:44:06
+ */
+@Slf4j
+@Component
+public class MediaConnection {
+    @DubboReference(check = false, retries = 0, timeout = 60_000)
+    private AccountQuery accountQuery;
+
+    private final RabbitProducer rabbitProducer;
+    private RedisString redisString;
+    private RedisHash<HashSet<Long>> redisHash;
+    // userId -> session
+    private final Map<Long, WebSocketSession> userSessions = new HashMap<>();
+    // sessionId -> userId
+    private final Map<String, Long> sessionUsers = new HashMap<>();
+
+    public MediaConnection(RabbitProducer rabbitProducer, RedisString redisString, RedisHash<HashSet<Long>> redisHash) {
+        this.rabbitProducer = rabbitProducer;
+        this.redisString = redisString;
+        this.redisHash = redisHash;
+    }
+
+    public boolean addSession(WebSocketSession webSocketSession) {
+        String userToken = (String) webSocketSession.getAttributes().get("userToken");
+        String videoId = (String) webSocketSession.getAttributes().get("videoId");
+        AuthedAccount authedAccount = accountQuery.getAuthedAccount(TokenType.token.getValue(), userToken);
+        if (authedAccount != null) {
+            long userId = authedAccount.getUserId();
+            String redisKey = RedisKeys.getVideoUsersKey(videoId);
+            HashSet<Long> userIds = redisHash.get(redisKey, videoId);
+            if (userIds == null) {
+                userIds = new HashSet<>();
+                userIds.add(userId);
+            } else {
+                userIds.add(userId);
+            }
+            redisHash.hset(redisKey, videoId, userIds);
+
+            addUserSession(userId, webSocketSession);
+            sendViewCount(videoId);
+
+            Map<String, Object> wsMessage = new HashMap<>();
+            String serverAddress = System.getenv("SERVER_ADDRESS");
+            wsMessage.put("broadcastBy", serverAddress);
+            wsMessage.put("type", "sendViewCount");
+            wsMessage.put("videoId", videoId);
+            broadcastCluster(JsonConverter.objectToJson(wsMessage));
+            return true;
+        }
+        return false;
+    }
+
+    public void addUserSession(long userId, WebSocketSession webSocketSession) {
+        String sessionId = webSocketSession.getId();
+        userSessions.put(userId, webSocketSession);
+        sessionUsers.put(sessionId, userId);
+    }
+
+    public void removeUserSession(WebSocketSession webSocketSession) {
+        String sessionId = webSocketSession.getId();
+        Long userId = sessionUsers.get(sessionId);
+        userSessions.remove(userId);
+        sessionUsers.remove(userId);
+    }
+
+    public Long getUserId(String sessionId) {
+        return sessionUsers.get(sessionId);
+    }
+
+    public WebSocketSession getUserSession(long userId) {
+        return userSessions.get(userId+"");
+    }
+
+    public void sendPingPayload(WebSocketSession session) throws IOException {
+        PingPayload pingPayload = new PingPayload(60, 120);
+        EventResp eventResp = new EventResp();
+        eventResp.setEvent("connect");
+        eventResp.setPayload(pingPayload);
+
+        TextMessage textMessage = new TextMessage(JsonConverter.objectToJson(eventResp));
+        session.sendMessage(textMessage);
+    }
+
+    public void sendHeartbeatPong(WebSocketSession session) throws IOException {
+        EventMessageResp<String> resp = new EventMessageResp<>(EventType.heartbeat, "pong");
+        TextMessage textMessage = new TextMessage(JsonConverter.objectToJson(resp));
+        session.sendMessage(textMessage);
+    }
+
+    public void sendTextMsg(PushMsg pushMsg) throws IOException {
+        String userId = pushMsg.getReceiverId();
+        EventMsg eventMsg = pushMsg.getEventMsg();
+
+        WebSocketSession webSocketSession = userSessions.get(userId);
+        if (webSocketSession == null) {
+            log.error("{} 不在线", userId);
+            return;
+        }
+
+        TextMessage textMessage = new TextMessage(JsonConverter.objectToJson(eventMsg));
+        webSocketSession.sendMessage(textMessage);
+    }
+
+    public void send(String receiverId, String content) throws IOException {
+        WebSocketSession webSocketSession = userSessions.get(receiverId);
+        if (webSocketSession == null) {
+            log.error("{} 不在线", receiverId);
+            return;
+        }
+
+        EventMessageResp<String> resp = new EventMessageResp<>(EventType.event_content, content);
+        TextMessage textMessage = new TextMessage(JsonConverter.objectToJson(resp));
+        webSocketSession.sendMessage(textMessage);
+    }
+
+    public void send1(String receiverId, Object object) throws IOException {
+        WebSocketSession webSocketSession = userSessions.get(receiverId);
+        if (webSocketSession == null) {
+            log.error("{} 不在线", receiverId);
+            return;
+        }
+
+        TextMessage textMessage = new TextMessage(JsonConverter.objectToJson(object));
+        webSocketSession.sendMessage(textMessage);
+    }
+
+    public void send(String receiverId, Object message) throws IOException {
+        WebSocketSession session = userSessions.get(receiverId);
+        if (session != null) {
+            byte[] bytes = JdkSerializer.serialize(message);
+            BinaryMessage binaryMessage = new BinaryMessage(bytes);
+            session.sendMessage(binaryMessage);
+        }
+    }
+
+    public void sendViewCount(String videoId) {
+        String redisKey = RedisKeys.getVideoUsersKey(videoId);
+        Set<Long> userIds = redisHash.get(redisKey, videoId);
+        if (userIds == null) {
+            return;
+        }
+
+        int viewCount = userIds.size();
+        Map<String, Object> message = new HashMap<>();
+        message.put("viewCount", viewCount);
+        userIds.forEach(userId -> {
+            WebSocketSession session = userSessions.get(userId);
+            if (session != null) {
+                String json = JsonConverter.objectToJson(message);
+                TextMessage textMessage = new TextMessage(json);
+                try {
+                    session.sendMessage(textMessage);
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+    }
+
+    /**
+     * 集群中广播
+     *
+     * @param
+     * @return
+     * @date 2025-10-11 20:10:127
+     */
+    public void broadcastCluster(String payload) {
+        rabbitProducer.broadcastWsMessage(payload);
+    }
+}

+ 22 - 22
message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/handler/ProgressHandler.java → message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/media/MediaHandler.java

@@ -1,11 +1,9 @@
-package cn.reghao.tnb.message.app.ws.handler;
+package cn.reghao.tnb.message.app.ws.media;
 
 import cn.reghao.jutil.jdk.serializer.JsonConverter;
 import cn.reghao.tnb.message.api.dto.MediaProgress;
-import cn.reghao.tnb.message.api.model.EventType;
-import cn.reghao.tnb.message.app.ws.WsConnection;
-import cn.reghao.tnb.message.app.ws.msg.EventReq;
-import cn.reghao.tnb.message.app.ws.service.ProgressService;
+import cn.reghao.tnb.message.app.ws.media.msg.MediaEvent;
+import cn.reghao.tnb.message.app.ws.media.msg.MediaEventType;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Component;
 import org.springframework.web.socket.*;
@@ -20,18 +18,18 @@ import java.io.IOException;
  */
 @Slf4j
 @Component
-public class ProgressHandler implements WebSocketHandler {
-    private final WsConnection wsConnection;
-    private final ProgressService progressService;
+public class MediaHandler implements WebSocketHandler {
+    private final MediaConnection mediaConnection;
+    private final MediaService mediaService;
 
-    public ProgressHandler(WsConnection wsConnection, ProgressService progressService) {
-        this.wsConnection = wsConnection;
-        this.progressService = progressService;
+    public MediaHandler(MediaConnection mediaConnection, MediaService mediaService) {
+        this.mediaConnection = mediaConnection;
+        this.mediaService = mediaService;
     }
 
     @Override
     public void afterConnectionEstablished(WebSocketSession webSocketSession) throws IOException {
-        boolean ret = wsConnection.addSession(webSocketSession);
+        boolean ret = mediaConnection.addSession(webSocketSession);
         if (ret) {
             log.info("WebSocket 建立连接");
         } else {
@@ -77,16 +75,18 @@ public class ProgressHandler implements WebSocketHandler {
         String sessionId = webSocketSession.getId();
         String payload = textMessage.getPayload();
         try {
-            EventReq eventReq = JsonConverter.jsonToObject(payload, EventReq.class);
-            String event = eventReq.getEvent();
-            switch (EventType.valueOf(event)) {
+            MediaEvent mediaEvent = JsonConverter.jsonToObject(payload, MediaEvent.class);
+            String type = mediaEvent.getType();
+            MediaEventType mediaEventType = MediaEventType.valueOf(type);
+            switch (mediaEventType) {
                 case heartbeat:
-                    wsConnection.sendHeartbeatPong(webSocketSession);
+                    mediaConnection.sendHeartbeatPong(webSocketSession);
                     break;
-                case media_progress:
-                    String jsonData = eventReq.getData();
-                    MediaProgress mediaProgress = JsonConverter.jsonToObject(jsonData, MediaProgress.class);
-                    progressService.putMediaProgress(sessionId, mediaProgress);
+                case progress:
+                    String jsonStr = mediaEvent.getData().toString();
+                    MediaProgress mediaProgress = JsonConverter.jsonToObject(jsonStr, MediaProgress.class);
+                    String videoId = mediaProgress.getMediaId();
+                    mediaService.putMediaProgress(sessionId, mediaProgress);
                     break;
                 default:
             }
@@ -96,7 +96,7 @@ public class ProgressHandler implements WebSocketHandler {
     }
 
     private void handleOnClose(WebSocketSession webSocketSession) {
-        progressService.removeMediaProgress(webSocketSession.getId());
-        wsConnection.removeUserSession(webSocketSession);
+        mediaService.removeMediaProgress(webSocketSession.getId());
+        mediaConnection.removeUserSession(webSocketSession);
     }
 }

+ 6 - 7
message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/service/ProgressService.java → message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/media/MediaService.java

@@ -1,9 +1,8 @@
-package cn.reghao.tnb.message.app.ws.service;
+package cn.reghao.tnb.message.app.ws.media;
 
 import cn.reghao.tnb.account.api.iface.AccountQuery;
 import cn.reghao.tnb.message.api.dto.MediaProgress;
 import cn.reghao.tnb.message.app.rabbit.RabbitProducer;
-import cn.reghao.tnb.message.app.ws.WsConnection;
 import org.apache.dubbo.config.annotation.DubboReference;
 import org.springframework.stereotype.Service;
 
@@ -15,16 +14,16 @@ import java.util.Map;
  * @date 2024-12-02 11:36:31
  */
 @Service
-public class ProgressService {
+public class MediaService {
     @DubboReference(check = false)
     private AccountQuery accountQuery;
 
     private final Map<String, MediaProgress> map = new HashMap<>();
-    private final WsConnection wsConnection;
+    private final MediaConnection mediaConnection;
     private final RabbitProducer rabbitProducer;
 
-    public ProgressService(WsConnection wsConnection, RabbitProducer rabbitProducer) {
-        this.wsConnection = wsConnection;
+    public MediaService(MediaConnection mediaConnection, RabbitProducer rabbitProducer) {
+        this.mediaConnection = mediaConnection;
         this.rabbitProducer = rabbitProducer;
     }
 
@@ -43,7 +42,7 @@ public class ProgressService {
     }
 
     public void persistProgress(String sessionId, MediaProgress mediaProgress) {
-        long userId = Long.parseLong(wsConnection.getUserId(sessionId));
+        Long userId = mediaConnection.getUserId(sessionId);
         if (mediaProgress != null) {
             mediaProgress.setUserId(userId);
             rabbitProducer.sendMediaProgress(mediaProgress);

+ 22 - 0
message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/media/msg/MediaEvent.java

@@ -0,0 +1,22 @@
+package cn.reghao.tnb.message.app.ws.media.msg;
+
+import lombok.Getter;
+import lombok.Setter;
+
+/**
+ * @author reghao
+ * @date 2024-12-02 10:16:51
+ */
+@Setter
+@Getter
+public class MediaEvent {
+    private String type;
+    private String direction;
+    private Object data;
+
+    /*public MediaEvent() {
+        this.type = "progress";
+        this.direction = "c2s";
+        this.data = "";
+    }*/
+}

+ 9 - 0
message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/media/msg/MediaEventType.java

@@ -0,0 +1,9 @@
+package cn.reghao.tnb.message.app.ws.media.msg;
+
+/**
+ * @author reghao
+ * @date 2025-10-11 21:27:54
+ */
+public enum MediaEventType {
+    heartbeat, progress, view
+}

+ 0 - 8
message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/msg/ProgressEvent.java

@@ -1,8 +0,0 @@
-package cn.reghao.tnb.message.app.ws.msg;
-
-/**
- * @author reghao
- * @date 2024-12-02 10:16:51
- */
-public class ProgressEvent {
-}

+ 5 - 0
message/message-service/src/main/resources/application-test.yml

@@ -6,6 +6,11 @@ spring:
   cloud:
     discovery:
       enabled: true
+  redis:
+    database: 0
+    host: 192.168.0.209
+    port: 6379
+    password: Test@123456
   rabbitmq:
     host: 192.168.0.209
     port: 5672