소스 검색

RecommendService 引入生产者消费者模型产生用户的推荐数据

reghao 1 년 전
부모
커밋
32849840af

+ 12 - 5
content/content-service/src/main/java/cn/reghao/tnb/content/app/vod/service/impl/RecommendServiceImpl.java

@@ -13,9 +13,10 @@ import cn.reghao.tnb.content.app.vod.model.query.VideoQuery;
 import cn.reghao.tnb.content.app.vod.service.ContentPermission;
 import cn.reghao.tnb.content.app.vod.service.RecommendService;
 import cn.reghao.tnb.content.app.vod.service.VideoPostQuery;
+import cn.reghao.tnb.content.app.vod.service.rcmd.RcmdData;
+import cn.reghao.tnb.content.app.vod.service.rcmd.RcmdProducer;
 import cn.reghao.tnb.content.app.vod.service.rcmd.RedisKeys;
 import cn.reghao.tnb.content.app.vod.service.rcmd.UserInterestBased;
-import cn.reghao.tnb.content.app.vod.service.rcmd.task.RcmdTask;
 import cn.reghao.tnb.content.app.util.redis.RedisKey;
 import cn.reghao.tnb.content.app.util.redis.ds.RedisHash;
 import cn.reghao.tnb.content.app.util.redis.ds.RedisSet;
@@ -27,7 +28,6 @@ import org.springframework.stereotype.Service;
 import java.time.LocalDateTime;
 import java.util.*;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 import java.util.stream.Collectors;
 
 /**
@@ -49,10 +49,12 @@ public class RecommendServiceImpl implements RecommendService {
     private final VideoPostQuery videoPostQuery;
     private final UserInterestBased userInterestBased;
     private final RedisSet redisSet;
+    private final RcmdProducer rcmdProducer;
 
     public RecommendServiceImpl(RedisSortedSet sortedSet, RedisHash<VideoCard> redisHash,
                                 ContentPermission contentPermission, VideoPostMapper videoPostMapper,
-                                VideoPostQuery videoPostQuery, UserInterestBased userInterestBased, RedisSet redisSet) {
+                                VideoPostQuery videoPostQuery, UserInterestBased userInterestBased, RedisSet redisSet,
+                                RcmdProducer rcmdProducer) {
         this.sortedSet = sortedSet;
         this.redisHash = redisHash;
         this.contentPermission = contentPermission;
@@ -60,6 +62,7 @@ public class RecommendServiceImpl implements RecommendService {
         this.videoPostQuery = videoPostQuery;
         this.userInterestBased = userInterestBased;
         this.redisSet = redisSet;
+        this.rcmdProducer = rcmdProducer;
     }
 
     public List<VideoCard> getRecommendVideos(long userId, String nextId) {
@@ -77,8 +80,12 @@ public class RecommendServiceImpl implements RecommendService {
     }
 
     public void putRecommendVideos(long loginUser, int size) {
-        RcmdTask rcmdTask = new RcmdTask(loginUser, videoPostQuery, userInterestBased, redisSet, contentPermission, size);
-        Future<?> future = threadPool.submit(rcmdTask);
+        RcmdData rcmdData = new RcmdData(loginUser, pageSize*3);
+        try {
+            rcmdProducer.put(rcmdData);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
     }
 
     public List<VideoCard> getRecommendVideos1(long userId, String nextId) {

+ 84 - 0
content/content-service/src/main/java/cn/reghao/tnb/content/app/vod/service/rcmd/RcmdConsumer.java

@@ -0,0 +1,84 @@
+package cn.reghao.tnb.content.app.vod.service.rcmd;
+
+import cn.reghao.jutil.jdk.thread.ThreadPoolWrapper;
+import cn.reghao.tnb.content.app.util.redis.ds.RedisSet;
+import cn.reghao.tnb.content.app.vod.service.ContentPermission;
+import cn.reghao.tnb.content.app.vod.service.VideoPostQuery;
+import cn.reghao.tnb.content.app.vod.service.rcmd.task.RcmdTask;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+
+/**
+ * @author reghao
+ * @date 2024-12-22 21:19:57
+ */
+@Slf4j
+@Component
+public class RcmdConsumer {
+    private final int threads = 10;
+    private final ExecutorService threadPool = ThreadPoolWrapper.threadPool("data-consumer", threads);
+    private final Object monitor;
+    private final RcmdProducer rcmdProducer;
+    private final VideoPostQuery videoPostQuery;
+    private final UserInterestBased userInterestBased;
+    private final RedisSet redisSet;
+    private final ContentPermission contentPermission;
+
+    public RcmdConsumer(VideoPostQuery videoPostQuery, UserInterestBased userInterestBased, RedisSet redisSet,
+                        ContentPermission contentPermission, RcmdProducer rcmdProducer) {
+        this.videoPostQuery = videoPostQuery;
+        this.userInterestBased = userInterestBased;
+        this.redisSet = redisSet;
+        this.contentPermission = contentPermission;
+        this.monitor = rcmdProducer.getMonitor();
+        this.rcmdProducer = rcmdProducer;
+    }
+
+    @PostConstruct
+    public void run() {
+        threadPool.submit(new ConsumerThread());
+        log.info("rcmd producer-consumer 模型启动...");
+    }
+
+    class ConsumerThread implements Runnable {
+        @Override
+        public void run() {
+            ThreadPoolExecutor tpe = (ThreadPoolExecutor) threadPool;
+            while (!Thread.interrupted()) {
+                try {
+                    if (tpe.getActiveCount() < threads) {
+                        dispatch();
+                    } else {
+                        //log.info("当前有 {} 个活跃线程, 休眠 10s 等待线程池空闲...", tpe.getActiveCount());
+                        Thread.sleep(10_000);
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+
+    private void dispatch() throws Exception {
+        synchronized (monitor) {
+            Object object = rcmdProducer.get();
+            if (object != null) {
+                if (object instanceof RcmdData) {
+                    RcmdData rcmdData = (RcmdData) object;
+                    RcmdTask rcmdTask = new RcmdTask(rcmdData, videoPostQuery, userInterestBased, redisSet, contentPermission);
+                    Future<?> future = threadPool.submit(rcmdTask);
+                } else {
+                    log.error("Object 类型未知");
+                }
+            } else {
+                log.info("调用 monitor.wait() 等待 DataProducer 中有数据可用");
+                monitor.wait();
+            }
+        }
+    }
+}

+ 15 - 0
content/content-service/src/main/java/cn/reghao/tnb/content/app/vod/service/rcmd/RcmdData.java

@@ -0,0 +1,15 @@
+package cn.reghao.tnb.content.app.vod.service.rcmd;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+/**
+ * @author reghao
+ * @date 2024-12-22 21:22:49
+ */
+@AllArgsConstructor
+@Getter
+public class RcmdData {
+    private long userId;
+    private int size;
+}

+ 41 - 0
content/content-service/src/main/java/cn/reghao/tnb/content/app/vod/service/rcmd/RcmdProducer.java

@@ -0,0 +1,41 @@
+package cn.reghao.tnb.content.app.vod.service.rcmd;
+
+import org.springframework.stereotype.Component;
+
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * @author reghao
+ * @date 2024-12-22 21:19:48
+ */
+@Component
+public class RcmdProducer {
+    private final Object monitor;
+    private final LinkedBlockingQueue<Object> dataQueue;
+
+    public RcmdProducer() {
+        this.monitor = new Object();
+        this.dataQueue = new LinkedBlockingQueue<>(10_000);
+    }
+
+    public Object getMonitor() {
+        return monitor;
+    }
+
+    public void put(Object object) throws InterruptedException {
+        dataQueue.put(object);
+        synchronized (monitor) {
+            // 通知 consumer 线程有数据可用
+            monitor.notify();
+        }
+    }
+
+    public Object get() throws InterruptedException {
+        return dataQueue.poll();
+        //return dataQueue.take();
+    }
+
+    public int size() {
+        return dataQueue.size();
+    }
+}

+ 8 - 7
content/content-service/src/main/java/cn/reghao/tnb/content/app/vod/service/rcmd/task/RcmdTask.java

@@ -4,6 +4,7 @@ import cn.reghao.tnb.content.api.dto.VideoCard;
 import cn.reghao.tnb.content.app.util.redis.ds.RedisSet;
 import cn.reghao.tnb.content.app.vod.service.ContentPermission;
 import cn.reghao.tnb.content.app.vod.service.VideoPostQuery;
+import cn.reghao.tnb.content.app.vod.service.rcmd.RcmdData;
 import cn.reghao.tnb.content.app.vod.service.rcmd.RedisKeys;
 import cn.reghao.tnb.content.app.vod.service.rcmd.UserInterestBased;
 import lombok.extern.slf4j.Slf4j;
@@ -18,26 +19,26 @@ import java.util.Set;
  */
 @Slf4j
 public class RcmdTask implements Runnable {
-    private final long loginUser;
+    private final RcmdData rcmdData;
     private final RedisSet redisSet;
     private final UserInterestBased userInterestBased;
     private final VideoPostQuery videoPostQuery;
     private final ContentPermission contentPermission;
-    private final int size;
 
-    public RcmdTask(long loginUser, VideoPostQuery videoPostQuery,
+    public RcmdTask(RcmdData rcmdData, VideoPostQuery videoPostQuery,
                     UserInterestBased userInterestBased, RedisSet redisSet,
-                    ContentPermission contentPermission, int size) {
-        this.loginUser = loginUser;
+                    ContentPermission contentPermission) {
+        this.rcmdData = rcmdData;
         this.videoPostQuery = videoPostQuery;
         this.userInterestBased = userInterestBased;
         this.redisSet = redisSet;
         this.contentPermission = contentPermission;
-        this.size = size;
     }
 
     public void run() {
-        List<Integer> userScopes = contentPermission.getUserScopes();
+        long loginUser = rcmdData.getUserId();
+        int size = rcmdData.getSize();
+        List<Integer> userScopes = contentPermission.getUserScopes(loginUser);
         long start = System.currentTimeMillis();
         try {
             List<String> tagIds = videoPostQuery.getRandomTags(1000);