|
|
@@ -1,108 +0,0 @@
|
|
|
-package cn.reghao.tnb.search.app.log.pc;
|
|
|
-
|
|
|
-import cn.reghao.jutil.jdk.web.log.NginxLog;
|
|
|
-import cn.reghao.tnb.search.app.log.NginxLogService;
|
|
|
-import jakarta.annotation.PostConstruct;
|
|
|
-import lombok.extern.slf4j.Slf4j;
|
|
|
-import org.springframework.beans.factory.annotation.Qualifier;
|
|
|
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
|
|
-import org.springframework.stereotype.Component;
|
|
|
-
|
|
|
-import java.util.concurrent.Executor;
|
|
|
-
|
|
|
-/**
|
|
|
- * @author reghao
|
|
|
- * @date 2025-12-31 14:25:59
|
|
|
- */
|
|
|
-@Slf4j
|
|
|
-@Component
|
|
|
-public class LogConsumer {
|
|
|
- private final Executor taskExecutor;
|
|
|
- private final Object monitor;
|
|
|
- private final LogProducer logProducer;
|
|
|
- private final NginxLogService nginxLogService;
|
|
|
-
|
|
|
- public LogConsumer(@Qualifier("taskExecutor") Executor taskExecutor, LogProducer logProducer,
|
|
|
- NginxLogService nginxLogService) {
|
|
|
- this.taskExecutor = taskExecutor;
|
|
|
- this.monitor = logProducer.getMonitor();
|
|
|
- this.logProducer = logProducer;
|
|
|
- this.nginxLogService = nginxLogService;
|
|
|
- }
|
|
|
-
|
|
|
- @PostConstruct
|
|
|
- public void run() {
|
|
|
- taskExecutor.execute(new ConsumerThread());
|
|
|
- log.info("NginxLog producer-consumer 模型启动...");
|
|
|
- }
|
|
|
-
|
|
|
- public int getActiveTasks() {
|
|
|
- int queueSize = logProducer.size();
|
|
|
- ThreadPoolTaskExecutor executor = (ThreadPoolTaskExecutor) taskExecutor;
|
|
|
- int capacity = executor.getQueueCapacity();
|
|
|
- int size = executor.getQueueSize();
|
|
|
- int activeCount = executor.getActiveCount();
|
|
|
- log.info("{} -> {} {} {}", queueSize, capacity, size, activeCount);
|
|
|
- return activeCount;
|
|
|
- }
|
|
|
-
|
|
|
- class ConsumerThread implements Runnable {
|
|
|
- @Override
|
|
|
- public void run() {
|
|
|
- ThreadPoolTaskExecutor executor = (ThreadPoolTaskExecutor) taskExecutor;
|
|
|
- int capacity = executor.getQueueCapacity();
|
|
|
- while (!Thread.interrupted()) {
|
|
|
- try {
|
|
|
- if (executor.getQueueSize() < capacity) {
|
|
|
- dispatch();
|
|
|
- } else {
|
|
|
- log.info("当前有 {} 个活跃线程, 休眠 1s 等待线程池空闲...", executor.getActiveCount());
|
|
|
- Thread.sleep(1_000);
|
|
|
- }
|
|
|
- } catch (Exception e) {
|
|
|
- e.printStackTrace();
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private void dispatch() throws Exception {
|
|
|
- synchronized (monitor) {
|
|
|
- Object object = logProducer.get();
|
|
|
- if (object != null) {
|
|
|
- if (object instanceof NginxLog) {
|
|
|
- NginxLog nginxLog = (NginxLog) object;
|
|
|
- String timeStr = nginxLog.getTimeIso8601().replace("+08:00", "");
|
|
|
- nginxLog.setTimeIso8601(timeStr);
|
|
|
-
|
|
|
- LogTask logTask = new LogTask(nginxLogService, nginxLog);
|
|
|
- taskExecutor.execute(logTask);
|
|
|
- } else {
|
|
|
- log.error("Object 类型未知");
|
|
|
- }
|
|
|
- } else {
|
|
|
- //log.info("调用 monitor.wait() 等待 DataProducer 中有数据可用");
|
|
|
- monitor.wait();
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- static class LogTask implements Runnable {
|
|
|
- private final NginxLogService nginxLogService;
|
|
|
- private final NginxLog nginxLog;
|
|
|
-
|
|
|
- public LogTask(NginxLogService nginxLogService, NginxLog nginxLog) {
|
|
|
- this.nginxLogService = nginxLogService;
|
|
|
- this.nginxLog = nginxLog;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void run() {
|
|
|
- try {
|
|
|
- nginxLogService.processNginxLog(nginxLog);
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("LogTask failed with message: {}", e.getMessage());
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-}
|