Przeglądaj źródła

更新 message-service 模块的 ws/media 包

reghao 5 miesięcy temu
rodzic
commit
5d4621932b

+ 14 - 2
message/message-service/src/main/java/cn/reghao/tnb/message/app/config/SpringLifecycle.java

@@ -20,7 +20,8 @@ import java.util.Set;
 @Slf4j
 @Component
 public class SpringLifecycle implements ApplicationRunner, DisposableBean {
-    private WebApplicationContext applicationContext;
+    private final WebApplicationContext applicationContext;
+    private String nodeId = "";
 
     public SpringLifecycle(WebApplicationContext applicationContext) {
         this.applicationContext = applicationContext;
@@ -38,6 +39,13 @@ public class SpringLifecycle implements ApplicationRunner, DisposableBean {
             System.exit(SpringApplication.exit(applicationContext));
         }
 
+        int serverPort = 0;
+        String serverPortStr = applicationContext.getEnvironment().getProperty("server.port");
+        if (serverPortStr != null) {
+            serverPort = Integer.parseInt(serverPortStr);
+        }
+
+        this.nodeId = String.format("%s_%s", serverAddress.replace(".", "_"), serverPort);
         log.info("MessageService 启动...");
     }
 
@@ -46,7 +54,7 @@ public class SpringLifecycle implements ApplicationRunner, DisposableBean {
         log.info("MessageService 停止...");
     }
 
-    public Set<String> getAddress() {
+    private Set<String> getAddress() {
         Set<String> ipv4AddressSet = new HashSet<>();
         try {
             Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();
@@ -69,4 +77,8 @@ public class SpringLifecycle implements ApplicationRunner, DisposableBean {
         }
         return ipv4AddressSet;
     }
+
+    public String getNodeId() {
+        return this.nodeId;
+    }
 }

+ 11 - 7
message/message-service/src/main/java/cn/reghao/tnb/message/app/rabbit/RabbitListeners.java

@@ -2,8 +2,9 @@ package cn.reghao.tnb.message.app.rabbit;
 
 import cn.reghao.jutil.jdk.serializer.JsonConverter;
 import cn.reghao.tnb.message.api.dto.msg.BaseMessage;
+import cn.reghao.tnb.message.app.config.SpringLifecycle;
 import cn.reghao.tnb.message.app.service.MessageConsumer;
-import cn.reghao.tnb.message.app.ws.media.MediaConnection;
+import cn.reghao.tnb.message.app.ws.media.MediaService;
 import com.google.gson.JsonObject;
 import com.rabbitmq.client.Channel;
 import lombok.extern.slf4j.Slf4j;
@@ -22,12 +23,15 @@ import java.io.IOException;
 @Slf4j
 @Component
 public class RabbitListeners {
-    private final MediaConnection mediaConnection;
+    private final MediaService mediaService;
     private final MessageConsumer messageConsumer;
+    private final SpringLifecycle springLifecycle;
 
-    public RabbitListeners(MediaConnection mediaConnection, MessageConsumer messageConsumer) {
-        this.mediaConnection = mediaConnection;
+    public RabbitListeners(MediaService mediaService, MessageConsumer messageConsumer,
+                           SpringLifecycle springLifecycle) {
+        this.mediaService = mediaService;
         this.messageConsumer = messageConsumer;
+        this.springLifecycle = springLifecycle;
     }
 
     @RabbitListener(bindings =@QueueBinding(
@@ -37,11 +41,11 @@ public class RabbitListeners {
             exchange = @Exchange(value = "amq.fanout", type = ExchangeTypes.FANOUT))
     )
     public void consumeWsMessage(@Payload String msg) {
-        String serverAddress = System.getenv("SERVER_ADDRESS");
+        String nodeId = springLifecycle.getNodeId();
         log.info("WsMessage -> {}", msg);
         JsonObject jsonObject = JsonConverter.jsonToJsonElement(msg).getAsJsonObject();
         String broadcastBy = jsonObject.get("broadcastBy").getAsString();
-        if (broadcastBy.equals(serverAddress)) {
+        if (broadcastBy.equals(nodeId)) {
             log.info("broadcast by myself, ignore message...");
             return;
         }
@@ -49,7 +53,7 @@ public class RabbitListeners {
         String type = jsonObject.get("type").getAsString();
         if ("sendViewCount".equals(type)) {
             String videoId = jsonObject.get("videoId").getAsString();
-            mediaConnection.sendViewCount(videoId);
+            mediaService.sendViewCount(videoId);
         }
         /*ChatPayload chatPayload = JsonConverter.jsonToObject(msg, ChatPayload.class);
         long receiverId = chatPayload.getReceiverId();

+ 6 - 2
message/message-service/src/main/java/cn/reghao/tnb/message/app/redis/RedisKeys.java

@@ -5,7 +5,11 @@ package cn.reghao.tnb.message.app.redis;
  * @date 2023-02-03 15:57:06
  */
 public class RedisKeys {
-    public static String getVideoUsersKey() {
-        return String.format("tnb:message:video_progress");
+    public static String getVideoUsersKey(String videoId) {
+        return String.format("tnb:message:video_progress:%s", videoId);
+    }
+
+    public static String getVideoUsersKey(String videoId, String nodeId) {
+        return String.format("tnb:message:video_progress:%s:%s", videoId, nodeId);
     }
 }

+ 0 - 195
message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/media/MediaConnection.java

@@ -1,195 +0,0 @@
-package cn.reghao.tnb.message.app.ws.media;
-
-import cn.reghao.jutil.jdk.serializer.JdkSerializer;
-import cn.reghao.jutil.jdk.serializer.JsonConverter;
-import cn.reghao.tnb.account.api.constant.TokenType;
-import cn.reghao.tnb.account.api.dto.AuthedAccount;
-import cn.reghao.tnb.account.api.iface.AccountQuery;
-import cn.reghao.tnb.message.api.dto.PushMsg;
-import cn.reghao.tnb.message.api.dto.event.EventMsg;
-import cn.reghao.tnb.message.api.model.EventType;
-import cn.reghao.tnb.message.api.model.resp.EventMessageResp;
-import cn.reghao.tnb.message.app.rabbit.RabbitProducer;
-import cn.reghao.tnb.message.app.redis.RedisKeys;
-import cn.reghao.tnb.message.app.redis.ds.RedisHash;
-import cn.reghao.tnb.message.app.redis.ds.RedisString;
-import cn.reghao.tnb.message.app.ws.chat.msg.ChatPayload;
-import cn.reghao.tnb.message.app.ws.chat.msg.EventResp;
-import cn.reghao.tnb.message.app.ws.config.PingPayload;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.dubbo.config.annotation.DubboReference;
-import org.springframework.stereotype.Component;
-import org.springframework.web.socket.BinaryMessage;
-import org.springframework.web.socket.TextMessage;
-import org.springframework.web.socket.WebSocketSession;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.*;
-
-/**
- * @author reghao
- * @date 2022-09-01 10:44:06
- */
-@Slf4j
-@Component
-public class MediaConnection {
-    @DubboReference(check = false, retries = 0, timeout = 60_000)
-    private AccountQuery accountQuery;
-
-    private final RabbitProducer rabbitProducer;
-    private RedisHash<HashSet<Long>> redisHash;
-    // userId -> session
-    private final Map<Long, WebSocketSession> userSessions = new HashMap<>();
-    // sessionId -> userId
-    private final Map<String, Long> sessionUsers = new HashMap<>();
-
-    public MediaConnection(RabbitProducer rabbitProducer, RedisHash<HashSet<Long>> redisHash) {
-        this.rabbitProducer = rabbitProducer;
-        this.redisHash = redisHash;
-    }
-
-    public boolean addSession(WebSocketSession webSocketSession) {
-        String userToken = (String) webSocketSession.getAttributes().get("userToken");
-        String videoId = (String) webSocketSession.getAttributes().get("videoId");
-        AuthedAccount authedAccount = accountQuery.getAuthedAccount(TokenType.token.getValue(), userToken);
-        if (authedAccount != null) {
-            String loginId = authedAccount.getLoginId();
-            long userId = authedAccount.getUserId();
-            String redisKey = RedisKeys.getVideoUsersKey();
-            HashSet<Long> userIds = redisHash.get(redisKey, videoId);
-            if (userIds == null) {
-                userIds = new HashSet<>();
-                userIds.add(userId);
-            } else {
-                userIds.add(userId);
-            }
-            redisHash.hset(redisKey, videoId, userIds);
-            sendViewEvent(videoId);
-
-            addUserSession(userId, webSocketSession);
-            return true;
-        }
-        return false;
-    }
-
-    public void addUserSession(long userId, WebSocketSession webSocketSession) {
-        String sessionId = webSocketSession.getId();
-        userSessions.put(userId, webSocketSession);
-        sessionUsers.put(sessionId, userId);
-    }
-
-    public void removeUserSession(WebSocketSession webSocketSession) {
-        String sessionId = webSocketSession.getId();
-        Long userId = sessionUsers.get(sessionId);
-        userSessions.remove(userId);
-        sessionUsers.remove(userId);
-
-        String videoId = (String) webSocketSession.getAttributes().get("videoId");
-        String redisKey = RedisKeys.getVideoUsersKey();
-        HashSet<Long> userIds = redisHash.get(redisKey, videoId);
-        if (userIds != null) {
-            userIds.remove(userId);
-            redisHash.hset(redisKey, videoId, userIds);
-        }
-        sendViewEvent(videoId);
-    }
-
-    public Long getUserId(String sessionId) {
-        return sessionUsers.get(sessionId);
-    }
-
-    public void sendPingPayload(WebSocketSession session) throws IOException {
-        PingPayload pingPayload = new PingPayload(60, 120);
-        EventResp eventResp = new EventResp();
-        eventResp.setEvent("connect");
-        eventResp.setPayload(pingPayload);
-
-        TextMessage textMessage = new TextMessage(JsonConverter.objectToJson(eventResp));
-        session.sendMessage(textMessage);
-    }
-
-    public void sendHeartbeatPong(WebSocketSession session) throws IOException {
-        EventMessageResp<String> resp = new EventMessageResp<>(EventType.heartbeat, "pong");
-        TextMessage textMessage = new TextMessage(JsonConverter.objectToJson(resp));
-        session.sendMessage(textMessage);
-    }
-
-    public void sendViewEvent(String videoId) {
-        sendViewCount(videoId);
-
-        Map<String, Object> wsMessage = new HashMap<>();
-        String serverAddress = System.getenv("SERVER_ADDRESS");
-        wsMessage.put("broadcastBy", serverAddress);
-        wsMessage.put("type", "sendViewCount");
-        wsMessage.put("videoId", videoId);
-        broadcastCluster(JsonConverter.objectToJson(wsMessage));
-    }
-
-    public void send(String receiverId, String content) throws IOException {
-        WebSocketSession webSocketSession = userSessions.get(receiverId);
-        if (webSocketSession == null) {
-            log.error("{} 不在线", receiverId);
-            return;
-        }
-
-        EventMessageResp<String> resp = new EventMessageResp<>(EventType.event_content, content);
-        TextMessage textMessage = new TextMessage(JsonConverter.objectToJson(resp));
-        webSocketSession.sendMessage(textMessage);
-    }
-
-    public void send1(String receiverId, Object object) throws IOException {
-        WebSocketSession webSocketSession = userSessions.get(receiverId);
-        if (webSocketSession == null) {
-            log.error("{} 不在线", receiverId);
-            return;
-        }
-
-        TextMessage textMessage = new TextMessage(JsonConverter.objectToJson(object));
-        webSocketSession.sendMessage(textMessage);
-    }
-
-    public void send(String receiverId, Object message) throws IOException {
-        WebSocketSession session = userSessions.get(receiverId);
-        if (session != null) {
-            byte[] bytes = JdkSerializer.serialize(message);
-            BinaryMessage binaryMessage = new BinaryMessage(bytes);
-            session.sendMessage(binaryMessage);
-        }
-    }
-
-    public void sendViewCount(String videoId) {
-        String redisKey = RedisKeys.getVideoUsersKey();
-        Set<Long> userIds = redisHash.get(redisKey, videoId);
-        if (userIds == null) {
-            return;
-        }
-
-        int viewCount = userIds.size();
-        Map<String, Object> message = new HashMap<>();
-        message.put("viewCount", viewCount);
-        userIds.forEach(userId -> {
-            WebSocketSession session = userSessions.get(userId);
-            if (session != null) {
-                String json = JsonConverter.objectToJson(message);
-                TextMessage textMessage = new TextMessage(json);
-                try {
-                    session.sendMessage(textMessage);
-                } catch (IOException e) {
-                    e.printStackTrace();
-                }
-            }
-        });
-    }
-
-    /**
-     * 集群中广播
-     *
-     * @param
-     * @return
-     * @date 2025-10-11 20:10:127
-     */
-    public void broadcastCluster(String payload) {
-        rabbitProducer.broadcastWsMessage(payload);
-    }
-}

+ 5 - 13
message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/media/MediaHandler.java

@@ -19,17 +19,15 @@ import java.io.IOException;
 @Slf4j
 @Component
 public class MediaHandler implements WebSocketHandler {
-    private final MediaConnection mediaConnection;
     private final MediaService mediaService;
 
-    public MediaHandler(MediaConnection mediaConnection, MediaService mediaService) {
-        this.mediaConnection = mediaConnection;
+    public MediaHandler(MediaService mediaService) {
         this.mediaService = mediaService;
     }
 
     @Override
     public void afterConnectionEstablished(WebSocketSession webSocketSession) throws IOException {
-        boolean ret = mediaConnection.addSession(webSocketSession);
+        boolean ret = mediaService.addSession(webSocketSession);
         if (ret) {
             log.info("WebSocket 建立连接");
         } else {
@@ -57,13 +55,13 @@ public class MediaHandler implements WebSocketHandler {
     @Override
     public void handleTransportError(WebSocketSession webSocketSession, Throwable throwable) {
         log.error("WebSocket 数据传输错误: {}", throwable.getMessage());
-        handleOnClose(webSocketSession);
+        mediaService.removeSession(webSocketSession);
     }
 
     @Override
     public void afterConnectionClosed(WebSocketSession webSocketSession, CloseStatus closeStatus) {
         log.info("WebSocket 断开连接: {} -> {}", closeStatus.getCode(), closeStatus.getReason());
-        handleOnClose(webSocketSession);
+        mediaService.removeSession(webSocketSession);
     }
 
     @Override
@@ -80,12 +78,11 @@ public class MediaHandler implements WebSocketHandler {
             MediaEventType mediaEventType = MediaEventType.valueOf(type);
             switch (mediaEventType) {
                 case heartbeat:
-                    mediaConnection.sendHeartbeatPong(webSocketSession);
+                    mediaService.sendHeartbeatPong(webSocketSession);
                     break;
                 case progress:
                     String jsonStr = mediaEvent.getData().toString();
                     MediaProgress mediaProgress = JsonConverter.jsonToObject(jsonStr, MediaProgress.class);
-                    String videoId = mediaProgress.getMediaId();
                     mediaService.putMediaProgress(sessionId, mediaProgress);
                     break;
                 default:
@@ -94,9 +91,4 @@ public class MediaHandler implements WebSocketHandler {
             e.printStackTrace();
         }
     }
-
-    private void handleOnClose(WebSocketSession webSocketSession) {
-        mediaService.removeMediaProgress(webSocketSession.getId());
-        mediaConnection.removeUserSession(webSocketSession);
-    }
 }

+ 176 - 13
message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/media/MediaService.java

@@ -1,48 +1,211 @@
 package cn.reghao.tnb.message.app.ws.media;
 
+import cn.reghao.jutil.jdk.serializer.JdkSerializer;
+import cn.reghao.jutil.jdk.serializer.JsonConverter;
+import cn.reghao.tnb.account.api.constant.TokenType;
+import cn.reghao.tnb.account.api.dto.AuthedAccount;
 import cn.reghao.tnb.account.api.iface.AccountQuery;
 import cn.reghao.tnb.message.api.dto.MediaProgress;
+import cn.reghao.tnb.message.api.model.EventType;
+import cn.reghao.tnb.message.api.model.resp.EventMessageResp;
+import cn.reghao.tnb.message.app.config.SpringLifecycle;
 import cn.reghao.tnb.message.app.rabbit.RabbitProducer;
+import cn.reghao.tnb.message.app.redis.RedisKeys;
+import cn.reghao.tnb.message.app.redis.ds.RedisHash;
+import cn.reghao.tnb.message.app.redis.ds.RedisOps;
+import cn.reghao.tnb.message.app.ws.chat.msg.EventResp;
+import cn.reghao.tnb.message.app.ws.config.PingPayload;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.dubbo.config.annotation.DubboReference;
-import org.springframework.stereotype.Service;
+import org.springframework.stereotype.Component;
+import org.springframework.web.socket.BinaryMessage;
+import org.springframework.web.socket.TextMessage;
+import org.springframework.web.socket.WebSocketSession;
 
+import java.io.IOException;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 /**
  * @author reghao
  * @date 2024-12-02 11:36:31
  */
-@Service
+@Slf4j
+@Component
 public class MediaService {
-    @DubboReference(check = false)
+    @DubboReference(check = false, retries = 0, timeout = 60_000)
     private AccountQuery accountQuery;
 
-    private final Map<String, MediaProgress> map = new HashMap<>();
-    private final MediaConnection mediaConnection;
     private final RabbitProducer rabbitProducer;
+    private final RedisHash<Long> redisHash;
+    private final RedisOps redisOps;
+    // userId -> session
+    private final Map<Long, WebSocketSession> userSessions = new HashMap<>();
+    // sessionId -> userId
+    private final Map<String, Long> sessionUsers = new HashMap<>();
+    // sessionId -> mediaProgress
+    private final Map<String, MediaProgress> map = new HashMap<>();
+    private final SpringLifecycle springLifecycle;
 
-    public MediaService(MediaConnection mediaConnection, RabbitProducer rabbitProducer) {
-        this.mediaConnection = mediaConnection;
+    public MediaService(RabbitProducer rabbitProducer, RedisHash<Long> redisHash,
+                        RedisOps redisOps, SpringLifecycle springLifecycle) {
         this.rabbitProducer = rabbitProducer;
+        this.redisHash = redisHash;
+        this.redisOps = redisOps;
+        this.springLifecycle = springLifecycle;
     }
 
-    public void putMediaProgress(String sessionId, MediaProgress mediaProgress) {
-        map.put(sessionId, mediaProgress);
-        if (mediaProgress.getEnded()) {
-            persistProgress(sessionId, mediaProgress);
+    public boolean addSession(WebSocketSession webSocketSession) {
+        String sessionId = webSocketSession.getId();
+        String userToken = (String) webSocketSession.getAttributes().get("userToken");
+        String videoId = (String) webSocketSession.getAttributes().get("videoId");
+        AuthedAccount authedAccount = accountQuery.getAuthedAccount(TokenType.token.getValue(), userToken);
+        if (authedAccount != null) {
+            String loginId = authedAccount.getLoginId();
+            long userId = authedAccount.getUserId();
+
+            String nodeId = springLifecycle.getNodeId();
+            String redisKey = RedisKeys.getVideoUsersKey(videoId, nodeId);
+            redisHash.hset(redisKey, sessionId, userId);
+
+            addUserSession(userId, webSocketSession);
+            check(videoId, nodeId);
+            sendViewEvent(videoId);
+            return true;
         }
+        return false;
     }
 
-    public void removeMediaProgress(String sessionId) {
+    private void addUserSession(long userId, WebSocketSession webSocketSession) {
+        String sessionId = webSocketSession.getId();
+        userSessions.put(userId, webSocketSession);
+        sessionUsers.put(sessionId, userId);
+    }
+
+    private void check(String videoId, String nodeId) {
+        String keyPattern = RedisKeys.getVideoUsersKey(videoId) + ":*";
+        Set<String> redisKeys = redisOps.keys(keyPattern);
+        for (String redisKey : redisKeys) {
+            if (redisKey.endsWith(nodeId)) {
+                Map<String, Long> redisMap = redisHash.hgetall(redisKey);
+                Set<String> redisSessionSet = redisMap.keySet();
+                Set<String> localSessionSet = sessionUsers.keySet();
+                boolean ret = redisSessionSet.removeAll(localSessionSet);
+                if (ret) {
+                    for (String hashKey : redisSessionSet) {
+                        redisHash.delete(redisKey, hashKey);
+                    }
+                }
+            }
+        }
+    }
+
+    public void removeSession(WebSocketSession webSocketSession) {
+        String sessionId = webSocketSession.getId();
         MediaProgress mediaProgress = map.remove(sessionId);
         if (mediaProgress != null) {
             persistProgress(sessionId, mediaProgress);
         }
+
+        Long userId = sessionUsers.get(sessionId);
+        userSessions.remove(userId);
+        sessionUsers.remove(sessionId);
+
+        String videoId = (String) webSocketSession.getAttributes().get("videoId");
+        String nodeId = springLifecycle.getNodeId();
+        String redisKey = RedisKeys.getVideoUsersKey(videoId, nodeId);
+        redisHash.delete(redisKey, sessionId);
+        sendViewEvent(videoId);
+    }
+
+    public Long getUserId(String sessionId) {
+        return sessionUsers.get(sessionId);
+    }
+
+    public void sendPingPayload(WebSocketSession session) throws IOException {
+        PingPayload pingPayload = new PingPayload(60, 120);
+        EventResp eventResp = new EventResp();
+        eventResp.setEvent("connect");
+        eventResp.setPayload(pingPayload);
+
+        TextMessage textMessage = new TextMessage(JsonConverter.objectToJson(eventResp));
+        session.sendMessage(textMessage);
+    }
+
+    public void sendHeartbeatPong(WebSocketSession session) throws IOException {
+        EventMessageResp<String> resp = new EventMessageResp<>(EventType.heartbeat, "pong");
+        TextMessage textMessage = new TextMessage(JsonConverter.objectToJson(resp));
+        session.sendMessage(textMessage);
+    }
+
+    public void sendViewEvent(String videoId) {
+        sendViewCount(videoId);
+
+        Map<String, Object> wsMessage = new HashMap<>();
+        wsMessage.put("broadcastBy", springLifecycle.getNodeId());
+        wsMessage.put("type", "sendViewCount");
+        wsMessage.put("videoId", videoId);
+        broadcastCluster(JsonConverter.objectToJson(wsMessage));
+    }
+
+    public void send(String receiverId, Object message) throws IOException {
+        WebSocketSession session = userSessions.get(receiverId);
+        if (session != null) {
+            byte[] bytes = JdkSerializer.serialize(message);
+            BinaryMessage binaryMessage = new BinaryMessage(bytes);
+            session.sendMessage(binaryMessage);
+        }
+    }
+
+    public void sendViewCount(String videoId) {
+        String keyPattern = RedisKeys.getVideoUsersKey(videoId) + ":*";
+        Set<String> redisKeys = redisOps.keys(keyPattern);
+        Set<Long> userSet = new HashSet<>();
+        for (String redisKey : redisKeys) {
+            Map<String, Long> redisMap = redisHash.hgetall(redisKey);
+            userSet.addAll(new HashSet<>(redisMap.values()));
+        }
+
+        int viewCount = userSet.size();
+        Map<String, Object> message = new HashMap<>();
+        message.put("viewCount", viewCount);
+        userSet.forEach(userId -> {
+            WebSocketSession session = userSessions.get(userId);
+            if (session != null) {
+                String json = JsonConverter.objectToJson(message);
+                TextMessage textMessage = new TextMessage(json);
+                try {
+                    session.sendMessage(textMessage);
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+    }
+
+    /**
+     * 集群中广播
+     *
+     * @param
+     * @return
+     * @date 2025-10-11 20:10:127
+     */
+    public void broadcastCluster(String payload) {
+        rabbitProducer.broadcastWsMessage(payload);
+    }
+
+    public void putMediaProgress(String sessionId, MediaProgress mediaProgress) {
+        map.put(sessionId, mediaProgress);
+        if (mediaProgress.getEnded()) {
+            persistProgress(sessionId, mediaProgress);
+        }
     }
 
     public void persistProgress(String sessionId, MediaProgress mediaProgress) {
-        Long userId = mediaConnection.getUserId(sessionId);
+        Long userId = getUserId(sessionId);
         if (mediaProgress != null) {
             mediaProgress.setUserId(userId);
             rabbitProducer.sendMediaProgress(mediaProgress);