|
|
@@ -1,6 +1,5 @@
|
|
|
package cn.reghao.tnb.content.app.vod.service.rcmd.task;
|
|
|
|
|
|
-import cn.reghao.jutil.jdk.thread.ThreadPoolWrapper;
|
|
|
import cn.reghao.tnb.content.app.util.redis.ds.RedisList;
|
|
|
import cn.reghao.tnb.content.app.util.redis.ds.RedisSet;
|
|
|
import cn.reghao.tnb.content.app.vod.service.ContentPermission;
|
|
|
@@ -9,12 +8,13 @@ import cn.reghao.tnb.content.app.vod.service.rcmd.UserInterestBased;
|
|
|
import cn.reghao.tnb.user.api.iface.UserService;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.apache.dubbo.config.annotation.DubboReference;
|
|
|
+import org.slf4j.MDC;
|
|
|
+import org.springframework.beans.factory.annotation.Qualifier;
|
|
|
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
import javax.annotation.PostConstruct;
|
|
|
-import java.util.concurrent.ExecutorService;
|
|
|
-import java.util.concurrent.Future;
|
|
|
-import java.util.concurrent.ThreadPoolExecutor;
|
|
|
+import java.util.concurrent.Executor;
|
|
|
|
|
|
/**
|
|
|
* @author reghao
|
|
|
@@ -26,40 +26,41 @@ public class RcmdConsumer {
|
|
|
@DubboReference(check = false)
|
|
|
private UserService userService;
|
|
|
|
|
|
- private final int threads = 10;
|
|
|
- private final ExecutorService threadPool = ThreadPoolWrapper.threadPool("data-consumer", threads);
|
|
|
+ private final Executor taskExecutor;
|
|
|
private final Object monitor;
|
|
|
private final RcmdProducer rcmdProducer;
|
|
|
private final VideoPostQuery videoPostQuery;
|
|
|
- private final UserInterestBased userInterestBased;
|
|
|
private final RedisSet redisSet;
|
|
|
private final RedisList redisList;
|
|
|
private final ContentPermission contentPermission;
|
|
|
+ private final UserInterestBased userInterestBased;
|
|
|
|
|
|
- public RcmdConsumer(VideoPostQuery videoPostQuery, UserInterestBased userInterestBased, RedisSet redisSet,
|
|
|
- RedisList redisList, ContentPermission contentPermission, RcmdProducer rcmdProducer) {
|
|
|
+ public RcmdConsumer(@Qualifier("taskExecutor") Executor taskExecutor, VideoPostQuery videoPostQuery, RedisSet redisSet, RedisList redisList,
|
|
|
+ ContentPermission contentPermission, RcmdProducer rcmdProducer, UserInterestBased userInterestBased) {
|
|
|
+ this.taskExecutor = taskExecutor;
|
|
|
this.videoPostQuery = videoPostQuery;
|
|
|
- this.userInterestBased = userInterestBased;
|
|
|
this.redisSet = redisSet;
|
|
|
this.redisList = redisList;
|
|
|
this.contentPermission = contentPermission;
|
|
|
this.monitor = rcmdProducer.getMonitor();
|
|
|
this.rcmdProducer = rcmdProducer;
|
|
|
+ this.userInterestBased = userInterestBased;
|
|
|
}
|
|
|
|
|
|
@PostConstruct
|
|
|
public void run() {
|
|
|
- threadPool.submit(new ConsumerThread());
|
|
|
+ taskExecutor.execute(new ConsumerThread());
|
|
|
log.info("rcmd producer-consumer 模型启动...");
|
|
|
}
|
|
|
|
|
|
class ConsumerThread implements Runnable {
|
|
|
@Override
|
|
|
public void run() {
|
|
|
- ThreadPoolExecutor tpe = (ThreadPoolExecutor) threadPool;
|
|
|
+ ThreadPoolTaskExecutor executor = (ThreadPoolTaskExecutor) taskExecutor;
|
|
|
+ int capacity = executor.getQueueCapacity();
|
|
|
while (!Thread.interrupted()) {
|
|
|
try {
|
|
|
- if (tpe.getActiveCount() < threads) {
|
|
|
+ if (executor.getQueueSize() < capacity) {
|
|
|
dispatch();
|
|
|
} else {
|
|
|
//log.info("当前有 {} 个活跃线程, 休眠 10s 等待线程池空闲...", tpe.getActiveCount());
|
|
|
@@ -78,10 +79,12 @@ public class RcmdConsumer {
|
|
|
if (object != null) {
|
|
|
if (object instanceof RcmdData) {
|
|
|
RcmdData rcmdData = (RcmdData) object;
|
|
|
+ String requestId = rcmdData.getRequestId();
|
|
|
+ MDC.put("request_id", requestId);
|
|
|
+
|
|
|
int mode = userService.getRecommendMode(rcmdData.getUserId());
|
|
|
- RcmdTask rcmdTask = new RcmdTask(rcmdData, videoPostQuery, userInterestBased,
|
|
|
- redisSet, redisList, contentPermission, mode);
|
|
|
- Future<?> future = threadPool.submit(rcmdTask);
|
|
|
+ RcmdTask rcmdTask = new RcmdTask(rcmdData, videoPostQuery, redisSet, redisList, contentPermission, mode);
|
|
|
+ taskExecutor.execute(rcmdTask);
|
|
|
} else {
|
|
|
log.error("Object 类型未知");
|
|
|
}
|