Browse Source

1.search-service 添加对 NginxLog 的处理
2.引入 kafka 来存储和消费采集的 NginxLog

reghao 5 tháng trước cách đây
mục cha
commit
a773e1451e
30 tập tin đã thay đổi với 1229 bổ sung457 xóa
  1. 12 0
      search/search-service/pom.xml
  2. 10 6
      search/search-service/src/main/java/cn/reghao/tnb/search/app/config/AppProperties.java
  3. 17 0
      search/search-service/src/main/java/cn/reghao/tnb/search/app/config/BeanConfig.java
  4. 5 5
      search/search-service/src/main/java/cn/reghao/tnb/search/app/config/SpringLifecycle.java
  5. 96 13
      search/search-service/src/main/java/cn/reghao/tnb/search/app/controller/NginxLogController.java
  6. 6 6
      search/search-service/src/main/java/cn/reghao/tnb/search/app/es/ElasticService.java
  7. 172 0
      search/search-service/src/main/java/cn/reghao/tnb/search/app/es/EsQuery.java
  8. 4 4
      search/search-service/src/main/java/cn/reghao/tnb/search/app/es/MappingService.java
  9. 78 239
      search/search-service/src/main/java/cn/reghao/tnb/search/app/es/SearchService.java
  10. 7 3
      search/search-service/src/main/java/cn/reghao/tnb/search/app/log/NginxLogDocument.java
  11. 1 1
      search/search-service/src/main/java/cn/reghao/tnb/search/app/log/NginxLogSearch.java
  12. 264 125
      search/search-service/src/main/java/cn/reghao/tnb/search/app/log/NginxLogService.java
  13. 136 0
      search/search-service/src/main/java/cn/reghao/tnb/search/app/log/NgxLogService.java
  14. 23 0
      search/search-service/src/main/java/cn/reghao/tnb/search/app/log/ip/GeoIpTool.java
  15. 0 23
      search/search-service/src/main/java/cn/reghao/tnb/search/app/log/ip/IpTool.java
  16. 5 5
      search/search-service/src/main/java/cn/reghao/tnb/search/app/log/ip/Location.java
  17. 20 0
      search/search-service/src/main/java/cn/reghao/tnb/search/app/log/kafka/KafkaDemo.java
  18. 43 0
      search/search-service/src/main/java/cn/reghao/tnb/search/app/log/kafka/KafkaPub.java
  19. 79 0
      search/search-service/src/main/java/cn/reghao/tnb/search/app/log/kafka/KafkaSub.java
  20. 25 0
      search/search-service/src/main/java/cn/reghao/tnb/search/app/log/model/dto/DateTimeRange.java
  21. 19 0
      search/search-service/src/main/java/cn/reghao/tnb/search/app/log/model/vo/ChartData.java
  22. 23 0
      search/search-service/src/main/java/cn/reghao/tnb/search/app/log/model/vo/ChartMap.java
  23. 3 3
      search/search-service/src/main/java/cn/reghao/tnb/search/app/log/model/vo/GroupCount.java
  24. 108 0
      search/search-service/src/main/java/cn/reghao/tnb/search/app/log/pc/LogConsumer.java
  25. 41 0
      search/search-service/src/main/java/cn/reghao/tnb/search/app/log/pc/LogProducer.java
  26. 3 3
      search/search-service/src/main/java/cn/reghao/tnb/search/app/lucene/LuceneIndex.java
  27. 3 3
      search/search-service/src/main/java/cn/reghao/tnb/search/app/lucene/LuceneSearch.java
  28. 1 1
      search/search-service/src/main/java/cn/reghao/tnb/search/app/ws/config/WebSocketConfig.java
  29. 15 11
      search/search-service/src/main/java/cn/reghao/tnb/search/app/ws/handler/LogHandler.java
  30. 10 6
      search/search-service/src/main/resources/application-dev.yml

+ 12 - 0
search/search-service/pom.xml

@@ -55,6 +55,12 @@
             <artifactId>spring-boot-starter-websocket</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>org.hibernate.validator</groupId>
+            <artifactId>hibernate-validator</artifactId>
+            <version>8.0.1.Final</version>
+        </dependency>
+
         <dependency>
             <groupId>com.mysql</groupId>
             <artifactId>mysql-connector-j</artifactId>
@@ -91,6 +97,12 @@
             <artifactId>spring-boot-starter-data-mongodb</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>3.9.1</version> <!-- Use the latest compatible version -->
+        </dependency>
+
         <!-- lucene -->
         <dependency>
             <groupId>org.apache.lucene</groupId>

+ 10 - 6
search/search-service/src/main/java/cn/reghao/tnb/search/app/config/ElasticProperties.java → search/search-service/src/main/java/cn/reghao/tnb/search/app/config/AppProperties.java

@@ -12,12 +12,16 @@ import org.springframework.context.annotation.Configuration;
 @Getter
 @Setter
 @Configuration
-@ConfigurationProperties(prefix = "es")
-public class ElasticProperties {
-    private String host;
-    private int port;
-    private String username;
-    private String password;
+@ConfigurationProperties(prefix = "app")
+public class AppProperties {
+    private String esHost;
+    private int esPort;
+    private String esUsername;
+    private String esPassword;
     private String nativeLuceneDir;
     private String hibernateLuceneDir;
+    private String kafkaUri;
+    private String kafkaTopic;
+    private String geoipPath;
+    private String geojsonPath;
 }

+ 17 - 0
search/search-service/src/main/java/cn/reghao/tnb/search/app/config/BeanConfig.java

@@ -0,0 +1,17 @@
+package cn.reghao.tnb.search.app.config;
+
+import cn.reghao.tnb.search.app.log.ip.GeoIpTool;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * @author reghao
+ * @date 2026-01-08 16:56:47
+ */
+@Configuration
+public class BeanConfig {
+    @Bean
+    public GeoIpTool ipTool(AppProperties appProperties) throws Exception {
+        return new GeoIpTool(appProperties.getGeoipPath());
+    }
+}

+ 5 - 5
search/search-service/src/main/java/cn/reghao/tnb/search/app/config/SpringLifecycle.java

@@ -15,17 +15,17 @@ import java.io.File;
 @Slf4j
 @Component
 public class SpringLifecycle implements ApplicationRunner, DisposableBean {
-    private final ElasticProperties elasticProperties;
+    private final AppProperties appProperties;
 
-    public SpringLifecycle(ElasticProperties elasticProperties) {
-        this.elasticProperties = elasticProperties;
+    public SpringLifecycle(AppProperties appProperties) {
+        this.appProperties = appProperties;
     }
 
     @Override
     public void run(ApplicationArguments args) {
-        String nativeLuceneDir = elasticProperties.getNativeLuceneDir();
+        String nativeLuceneDir = appProperties.getNativeLuceneDir();
         createDir(nativeLuceneDir);
-        String hibernateLuceneDir = elasticProperties.getHibernateLuceneDir();
+        String hibernateLuceneDir = appProperties.getHibernateLuceneDir();
         createDir(hibernateLuceneDir);
         log.info("SearchService 启动...");
     }

+ 96 - 13
search/search-service/src/main/java/cn/reghao/tnb/search/app/controller/NginxLogController.java

@@ -1,14 +1,17 @@
 package cn.reghao.tnb.search.app.controller;
 
+import cn.reghao.tnb.common.db.SelectOption;
 import cn.reghao.tnb.common.web.WebResult;
 import cn.reghao.tnb.search.app.log.NginxLogService;
-import cn.reghao.tnb.search.app.log.model.vo.DateCount;
+import cn.reghao.tnb.search.app.log.model.dto.DateTimeRange;
+import cn.reghao.tnb.search.app.log.model.vo.ChartData;
+import cn.reghao.tnb.search.app.log.model.vo.ChartMap;
+import cn.reghao.tnb.search.app.log.model.vo.GroupCount;
+import cn.reghao.tnb.search.app.log.pc.LogConsumer;
 import io.swagger.v3.oas.annotations.Operation;
 import org.springframework.http.MediaType;
-import org.springframework.web.bind.annotation.GetMapping;
-import org.springframework.web.bind.annotation.RequestMapping;
-import org.springframework.web.bind.annotation.RequestParam;
-import org.springframework.web.bind.annotation.RestController;
+import org.springframework.validation.annotation.Validated;
+import org.springframework.web.bind.annotation.*;
 
 import java.util.List;
 
@@ -20,22 +23,102 @@ import java.util.List;
 @RequestMapping("/api/search1/nginx")
 public class NginxLogController {
     private final NginxLogService nginxLogService;
+    private final LogConsumer logConsumer;
 
-    public NginxLogController(NginxLogService nginxLogService) {
+    public NginxLogController(NginxLogService nginxLogService, LogConsumer logConsumer) {
         this.nginxLogService = nginxLogService;
+        this.logConsumer = logConsumer;
     }
 
     @Operation(summary = "", description = "N")
-    @GetMapping(value = "/log", produces = MediaType.APPLICATION_JSON_VALUE)
-    public String nginxLog() {
-        List<DateCount> list = nginxLogService.getNginxLogs();
+    @GetMapping(value = "/task", produces = MediaType.APPLICATION_JSON_VALUE)
+    public String logTask() {
+        int total = logConsumer.getActiveTasks();
+        return WebResult.success(total);
+    }
+
+    @Operation(summary = "NginxLog 日期列表", description = "N")
+    @GetMapping(value = "/log/date", produces = MediaType.APPLICATION_JSON_VALUE)
+    public String nginxLogDate() {
+        List<SelectOption> list = nginxLogService.getDateList();
         return WebResult.success(list);
     }
 
-    @Operation(summary = "", description = "N")
-    @GetMapping(value = "/log/chart", produces = MediaType.APPLICATION_JSON_VALUE)
-    public String getLogChart(@RequestParam(value = "dateStr") String dateStr) {
-        List<Object> chartData = nginxLogService.getChartData(dateStr);
+    @Operation(summary = "NginxLog 在指定日期的数量", description = "N")
+    @GetMapping(value = "/count", produces = MediaType.APPLICATION_JSON_VALUE)
+    public String nginxLogCount(@RequestParam(value = "dateStr") String dateStr) {
+        long total = nginxLogService.countNginxLog(dateStr);
+        return WebResult.success(total);
+    }
+
+    @Operation(summary = "不超过 24 小时", description = "N")
+    @PostMapping(value = "/log2", produces = MediaType.APPLICATION_JSON_VALUE)
+    public String nginxLog2(@RequestBody @Validated DateTimeRange dateTimeRange) throws Exception {
+        List<Object> chartData = nginxLogService.getNginxLog2(dateTimeRange);
+        return WebResult.success(chartData);
+    }
+
+    @Operation(summary = "指定日期的最近一周内 NginxLog 在每天的数量", description = "N")
+    @GetMapping(value = "/log/chart1", produces = MediaType.APPLICATION_JSON_VALUE)
+    public String getLogChart1(@RequestParam(value = "dateStr") String dateStr) throws Exception {
+        List<Object> chartData = nginxLogService.getChartData5(dateStr, "");
+        return WebResult.success(chartData);
+    }
+
+    @Operation(summary = "指定日期和 url 的最近一周内 NginxLog 在每天的数量", description = "N")
+    @GetMapping(value = "/log/chart5", produces = MediaType.APPLICATION_JSON_VALUE)
+    public String getLogChart5(@RequestParam(value = "dateStr") String dateStr,
+                               @RequestParam(value = "url") String url) throws Exception {
+        List<Object> chartData = nginxLogService.getChartData5(dateStr, url);
         return WebResult.success(chartData);
     }
+
+    @Operation(summary = "指定 url 在某天内每秒的数量", description = "N")
+    @GetMapping(value = "/log/chart2", produces = MediaType.APPLICATION_JSON_VALUE)
+    public String getLogChart2(@RequestParam(value = "dateStr") String dateStr,
+                               @RequestParam(value = "url") String url) throws Exception {
+        List<Object> chartData = nginxLogService.getChartData2(dateStr, url);
+        return WebResult.success(chartData);
+    }
+
+    @Operation(summary = "对指定字段在某天内进行聚合", description = "N")
+    @GetMapping(value = "/log/chart3", produces = MediaType.APPLICATION_JSON_VALUE)
+    public String getLogChart3(@RequestParam(value = "dateStr") String dateStr,
+                               @RequestParam(value = "aggregateField") String aggregateField) throws Exception {
+        String aggregateField0;
+        if ("url".equals(aggregateField)) {
+            aggregateField0 = "url.raw";
+        } else if ("statusCode".equals(aggregateField)) {
+            aggregateField0 = "status";
+        } else if ("ip".equals(aggregateField)) {
+            aggregateField0 = "remoteAddr";
+        } else if ("userAgent".equals(aggregateField)) {
+            aggregateField0 = "httpUserAgent.raw";
+        } else {
+            return WebResult.failWithMsg("unknown aggregate type: " + aggregateField);
+        }
+
+        List<GroupCount> chartData = nginxLogService.getChartData3(aggregateField0, dateStr);
+        return WebResult.success(chartData);
+    }
+
+    @Operation(summary = "获取 NginxLog 中耗时 TopN 的数据", description = "N")
+    @GetMapping(value = "/log/chart4", produces = MediaType.APPLICATION_JSON_VALUE)
+    public String getLogChart4(@RequestParam(value = "dateStr") String dateStr) throws Exception {
+        List<ChartData> list = nginxLogService.getTopN(dateStr);
+        return WebResult.success(list);
+    }
+
+    @Operation(summary = "获取 NginxLog 中根据访问 IP 得到的 echart 地图", description = "N")
+    @GetMapping(value = "/log/chart6", produces = MediaType.APPLICATION_JSON_VALUE)
+    public String getLogChart6(@RequestParam(value = "dateStr") String dateStr,
+                               @RequestParam(value = "aggregateField") String aggregateField) throws Exception {
+        if ("ip".equals(aggregateField)) {
+            String aggregateField0 = "remoteAddr";
+            ChartMap chartMap = nginxLogService.getChartMap(aggregateField0, dateStr);
+            return WebResult.success(chartMap);
+        } else {
+            return WebResult.failWithMsg("");
+        }
+    }
 }

+ 6 - 6
search/search-service/src/main/java/cn/reghao/tnb/search/app/es/ElasticService.java

@@ -1,6 +1,6 @@
 package cn.reghao.tnb.search.app.es;
 
-import cn.reghao.tnb.search.app.config.ElasticProperties;
+import cn.reghao.tnb.search.app.config.AppProperties;
 import co.elastic.clients.elasticsearch.ElasticsearchAsyncClient;
 import co.elastic.clients.elasticsearch.ElasticsearchClient;
 import co.elastic.clients.json.jackson.JacksonJsonpMapper;
@@ -25,11 +25,11 @@ import java.util.concurrent.TimeUnit;
 public class ElasticService {
     private final ElasticsearchClient esClient;
 
-    public ElasticService(ElasticProperties elasticProperties) {
-        String host = elasticProperties.getHost();
-        int port = elasticProperties.getPort();
-        String username = elasticProperties.getUsername();
-        String password = elasticProperties.getPassword();
+    public ElasticService(AppProperties appProperties) {
+        String host = appProperties.getEsHost();
+        int port = appProperties.getEsPort();
+        String username = appProperties.getEsUsername();
+        String password = appProperties.getEsPassword();
         this.esClient = getElasticsearchClient(host, port, username, password);
     }
 

+ 172 - 0
search/search-service/src/main/java/cn/reghao/tnb/search/app/es/EsQuery.java

@@ -0,0 +1,172 @@
+package cn.reghao.tnb.search.app.es;
+
+import co.elastic.clients.elasticsearch._types.FieldValue;
+import co.elastic.clients.elasticsearch._types.query_dsl.*;
+import co.elastic.clients.json.JsonData;
+
+import java.util.List;
+
+/**
+ * @author reghao
+ * @date 2026-01-03 16:40:03
+ */
+public class EsQuery {
+
+    /**
+     * 等值查询
+     *
+     * @param
+     * @return
+     * @date 2025-03-12 13:45:17
+     */
+    public static Query getTermQuery(String fieldName, String fieldValue) {
+        String fieldName1 = "url";
+        String fieldValue1 = "/datareceive/ReceiveData/SendContentResult\"";
+        String fieldName2 = "requestMethod";
+        String fieldValue2 = "POST";
+        Query query = TermQuery.of(t -> t
+                .field(fieldName).value(FieldValue.of(fieldValue))
+        )._toQuery();
+        return query;
+    }
+
+    /**
+     * MatchQuery 搜索时, 首先会解析查询字符串, 进行分词,然后查询
+     * TermQuery 搜索时, 会根据输入的查询内容进行搜索, 并不会解析查询内容,对它分词
+     *
+     * @param
+     * @return
+     * @date 2025-03-12 15:04:22
+     */
+    public static Query getMatchQuery() {
+        String fieldName = "host";
+        String searchText = "api.iquizoo.com";
+
+        Query query = MatchQuery.of(m -> m.field(fieldName).query(searchText))._toQuery();
+        return query;
+    }
+
+    /**
+     * 多值查询, 相当于 SQL 语句中的 in 查询
+     *
+     * @param
+     * @return
+     * @date 2025-03-12 14:12:38
+     */
+    public static Query getTermsQuery() {
+        String fieldName = "status";
+        List<FieldValue> fieldValueList = List.of(FieldValue.of("401"), FieldValue.of("500"));
+
+        Query query = TermsQuery.of(t ->
+                t.field(fieldName).terms(TermsQueryField.of(q -> q.value(fieldValueList)))
+        )._toQuery();
+        return query;
+    }
+
+    /**
+     * 或查询, 相当于 SQL 查询
+     * SELECT * FROM test1 where (uid = 1 or uid =2) and phone = 12345678919
+     *
+     * @param
+     * @return
+     * @date 2025-03-12 13:46:26
+     */
+    public static Query getOrQuery() {
+        Query query = BoolQuery.of(b ->
+                b.should(m -> m.term(t -> t.field("status").value(FieldValue.of(401))))
+                        .should(m -> m.term(t -> t.field("status").value(FieldValue.of(403))))
+                        .must(m -> m.term(t -> t.field("host").value(FieldValue.of("api.iquizoo.com"))))
+        )._toQuery();
+        return query;
+    }
+
+    public static Query getAndQuery() {
+        String fieldName1 = "url.raw";
+        String fieldValue1 = "/datareceive/ReceiveData/SendContentResult";
+
+        String fieldName2 = "requestMethod";
+        String fieldValue2 = "POST";
+
+        Query query = BoolQuery.of(b -> b
+                .must(m -> m.term(t -> t.field(fieldName1).value(FieldValue.of(fieldValue1))))
+                .must(m -> m.term(t -> t.field(fieldName2).value(FieldValue.of(fieldValue2))))
+        )._toQuery();
+        return query;
+    }
+
+    /**
+     * 模糊查询, 相当于 SQL 语句中的 like 查询
+     *
+     * @param
+     * @return
+     * @date 2025-03-12 14:11:45
+     */
+    public static Query getWildcardQuery() {
+        Query query = BoolQuery.of(b ->
+                b.must(m -> m.wildcard(t -> t.field("url").value("*result*")))
+        )._toQuery();
+        return query;
+    }
+
+    /**
+     * 存在查询, 相当于 SQL 语句中的 exist
+     *
+     * @param
+     * @return
+     * @date 2025-03-12 14:20:26
+     */
+    public static Query getExistsQuery(String fieldName) {
+        // 判断字段是否存在
+        Query query = ExistsQuery.of(t -> t.field(fieldName))._toQuery();
+        return query;
+    }
+
+    /**
+     * 范围查询, 相当于 SQL 语句中的 > 和 <
+     * gt 是大于,lt 是小于,gte 是大于等于,lte 是小于等于
+     *
+     * @param
+     * @return
+     * @date 2025-03-12 14:21:26
+     */
+    public static Query getRangeQuery(String fieldName) {
+        String fieldName1 = "status";
+        int start1 = 404;
+        int start2 = 600;
+
+        String dateValue = "2023-11-06";
+        String start = String.format("%sT00:00:00Z", dateValue);
+        String end = String.format("%sT23:59:59Z", dateValue);
+
+        Query query = RangeQuery.of(t -> t.field(fieldName).gte(JsonData.of(start)).lte(JsonData.of(end)))._toQuery();
+        return query;
+    }
+
+    /**
+     * 正则查询
+     *
+     * @param
+     * @return
+     * @date 2025-03-12 14:22:18
+     */
+    public static Query getRegexpQuery() {
+        Query query = RegexpQuery.of(t -> t.field("host").value("api.*"))._toQuery();
+        return query;
+    }
+
+    public static Query getQuery() {
+        Query query1 = MatchAllQuery.of(m -> m.queryName("host"))._toQuery();
+
+        // 一般情况下有一个单词错误的情况下,fuzzy 查询可以找到另一个近似的词来代替,主要有以下场景:
+        //
+        //修改一个单词,如:box--->fox。
+        //移除一个单词,如:black-->lack。
+        //插入一个单词,如:sic-->sick。
+        //转换两个单词顺序,如:act-->cat。
+        Query query2 = FuzzyQuery.of(f -> f.field("host").value("lonel"))._toQuery();
+
+        // 通过指定字段的前缀进行查询
+        Query query = PrefixQuery.of(p -> p.field("host").value("api"))._toQuery();
+        return query;
+    }
+}

+ 4 - 4
search/search-service/src/main/java/cn/reghao/tnb/search/app/es/MappingService.java

@@ -17,10 +17,10 @@ public class MappingService {
     Property intProp = Property.of(builder -> builder.integer(IntegerNumberProperty.of(pro -> pro.index(true))));
     Property longProp = Property.of(builder -> builder.long_(LongNumberProperty.of(pro -> pro.index(true).store(true))));
     Property doubleProp = Property.of(builder -> builder.double_(DoubleNumberProperty.of(pro -> pro.index(true).store(true))));
-    Property dateProp = Property.of(builder -> builder.date(DateProperty.of(pro -> pro.index(true))));
-    Property keywordProp = Property.of(builder -> builder.keyword(KeywordProperty.of(pro -> pro.index(true))));
-    Property textProp = Property.of(builder -> builder.text(TextProperty.of(pro -> pro.index(true))));
-    Property textPropIk = Property.of(builder -> builder.text(TextProperty.of(pro -> pro.index(true).analyzer("ik_max_word"))));
+    Property dateProp = Property.of(builder -> builder.date(DateProperty.of(pro -> pro.index(true).store(true))));
+    Property keywordProp = Property.of(builder -> builder.keyword(KeywordProperty.of(pro -> pro.index(true).store(true))));
+    Property textProp = Property.of(builder -> builder.text(TextProperty.of(pro -> pro.index(true).store(true))));
+    Property textPropIk = Property.of(builder -> builder.text(TextProperty.of(pro -> pro.index(true).analyzer("ik_max_word").store(true))));
     Property textKeywordProp = new Property(new TextProperty.Builder().index(true).fields("raw", keywordProp).store(true).build());
 
     public Map<String, Property> getPropertyMapWithNginxLog(Class<?> clazz) {

+ 78 - 239
search/search-service/src/main/java/cn/reghao/tnb/search/app/es/SearchService.java

@@ -18,16 +18,11 @@ import co.elastic.clients.elasticsearch.core.search.HitsMetadata;
 import co.elastic.clients.elasticsearch.core.search.TotalHits;
 import co.elastic.clients.elasticsearch.core.search.TotalHitsRelation;
 import co.elastic.clients.json.JsonData;
-import co.elastic.clients.util.ObjectBuilder;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Service;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.function.Function;
+import java.util.*;
 import java.util.stream.Collectors;
 
 /**
@@ -39,12 +34,37 @@ import java.util.stream.Collectors;
 public class SearchService {
     private final ElasticsearchClient esClient;
 
-    public SearchService(ElasticService elasticService) throws Exception {
+    public SearchService(ElasticService elasticService)  {
         this.esClient = elasticService.getElasticsearchClient();
     }
 
-    public void search(String indexName, String fieldValue) throws IOException {
-        String fieldName = "name";
+    public long count(String index, Query query) {
+        Query combinedQuery = Query.of(q -> q.matchAll(m -> m));
+        Query query1 = EsQuery.getMatchQuery();
+        // remoteAddr 字段为空
+        Query query2 = BoolQuery.of(b ->
+                b.mustNot(m -> m.exists(t -> t.field("remoteAddr"))
+                ))._toQuery();
+        // remoteAddr 字段不为空
+        Query query3 = BoolQuery.of(b ->
+                b.must(m -> m.exists(t -> t.field("remoteAddr"))
+                ))._toQuery();
+        String fieldName  = "requestTimestamp";
+        Query rangeQuery = EsQuery.getRangeQuery(fieldName);
+
+        CountRequest countRequest = CountRequest.of(s -> s.index(index).query(query));
+        long total = -1;
+        try {
+            CountResponse countResponse = esClient.count(countRequest);
+            total = countResponse.count();
+            ShardStatistics shardStatistics = countResponse.shards();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        return total;
+    }
+
+    public void searchByText(String indexName, String fieldName, String fieldValue) throws IOException {
         SearchResponse<NginxLog> searchResponse = esClient.search(s -> s
                 .index(indexName)
                 /*.sort(s1 -> s1.field(f -> f.field(fieldName).order(SortOrder.Asc)))
@@ -78,22 +98,28 @@ public class SearchService {
      * @date 2025-12-25 15:54:34
      */
     public List<NginxLog> searchByPage(String indexName, int pageSize, int pageNumber,
-                                       String fieldName, String fieldValue) throws IOException {
-        String sortField = "timeIso8601";
-        Query query = RangeQuery.of(r -> r.field("age").gte(JsonData.of(8)))._toQuery();
-        Query query11 = getOrQuery();
-        Query query1 = getAndQuery();
+                                       Query query, String sortField, SortOrder sortOrder) throws IOException {
+        //String sortField = "requestTimestamp";
+        //Query query = RangeQuery.of(r -> r.field("age").gte(JsonData.of(8)))._toQuery();
+        Query query11 = EsQuery.getOrQuery();
+        Query query1 = EsQuery.getAndQuery();
         // 等值查询
-        Query termQuery = getTermQuery(fieldName, fieldValue);
+        //Query termQuery = EsQuery.getTermQuery(fieldName, fieldValue);
+        //Query rangeQuery = EsQuery.getRangeQuery(fieldName);
+        String dateField = "timeIso8601";
+        String dateValue = "2023-11-08";
+        String start0 = String.format("%sT00:00:00", dateValue);
+        String end0 = String.format("%sT23:59:59", dateValue);
 
         int start = (pageNumber-1)*pageSize;
         SearchRequest searchRequest = SearchRequest.of(s -> s
                 .index(indexName)
-                .query(termQuery)
+                .query(q -> q.range(r -> r.field(dateField).gte(JsonData.of(start0)).lte(JsonData.of(end0))))
+                .query(query)
                 .from(start)
                 .size(pageSize)
                 // 按 id 字段降序排列
-                .sort(f -> f.field(o -> o.field(sortField).order(SortOrder.Desc)))
+                .sort(f -> f.field(o -> o.field(sortField).order(sortOrder)))
         );
 
         SearchResponse<NginxLog> searchResponse = esClient.search(searchRequest, NginxLog.class);
@@ -162,196 +188,6 @@ public class SearchService {
         System.out.println();
     }
 
-    public void search() throws IOException {
-        // 判断字段是否存在
-        String fieldName = "host";
-        Query query = getExistsQuery(fieldName);
-
-        String index = "nginx_log";
-        SearchRequest searchRequest = new SearchRequest.Builder()
-                .index(index)
-                .query(query)
-                .build();
-        SearchResponse<NginxLog> searchResponse = esClient.search(searchRequest, NginxLog.class);
-        TotalHits totalHits = searchResponse.hits().total();
-        long total = totalHits.value();
-        List<NginxLog> list = searchResponse.hits().hits().stream().map(Hit::source).collect(Collectors.toList());
-    }
-
-    public long count(String index) throws IOException {
-        Query query1 = getMatchQuery();
-        // remoteAddr 字段为空
-        Query query2 = BoolQuery.of(b ->
-                b.mustNot(m -> m.exists(t -> t.field("remoteAddr"))
-                ))._toQuery();
-        // remoteAddr 字段不为空
-        Query query3 = BoolQuery.of(b ->
-                b.must(m -> m.exists(t -> t.field("remoteAddr"))
-                ))._toQuery();
-
-        CountRequest countRequest = CountRequest.of(s -> s
-                        .index(index)
-                //.query(query3)
-        );
-
-        CountResponse countResponse = esClient.count(countRequest);
-        long total = countResponse.count();
-        ShardStatistics shardStatistics = countResponse.shards();
-        return total;
-    }
-
-    /**
-     * 等值查询
-     *
-     * @param
-     * @return
-     * @date 2025-03-12 13:45:17
-     */
-    public Query getTermQuery(String fieldName, String fieldValue) {
-        String fieldName1 = "url";
-        String fieldValue1 = "/datareceive/ReceiveData/SendContentResult\"";
-        String fieldName2 = "requestMethod";
-        String fieldValue2 = "POST";
-        Query query = TermQuery.of(t -> t
-                .field(fieldName).value(FieldValue.of(fieldValue))
-        )._toQuery();
-        return query;
-    }
-
-    /**
-     * MatchQuery 搜索时, 首先会解析查询字符串, 进行分词,然后查询
-     * TermQuery 搜索时, 会根据输入的查询内容进行搜索, 并不会解析查询内容,对它分词
-     *
-     * @param
-     * @return
-     * @date 2025-03-12 15:04:22
-     */
-    public Query getMatchQuery() {
-        String fieldName = "host";
-        String searchText = "api.iquizoo.com";
-
-        Query query = MatchQuery.of(m -> m.field(fieldName).query(searchText))._toQuery();
-        return query;
-    }
-
-    /**
-     * 多值查询, 相当于 SQL 语句中的 in 查询
-     *
-     * @param
-     * @return
-     * @date 2025-03-12 14:12:38
-     */
-    public Query getTermsQuery() {
-        String fieldName = "status";
-        List<FieldValue> fieldValueList = List.of(FieldValue.of("401"), FieldValue.of("500"));
-
-        Query query = TermsQuery.of(t ->
-                t.field(fieldName).terms(TermsQueryField.of(q -> q.value(fieldValueList)))
-        )._toQuery();
-        return query;
-    }
-
-    /**
-     * 或查询, 相当于 SQL 查询
-     * SELECT * FROM test1 where (uid = 1 or uid =2) and phone = 12345678919
-     *
-     * @param
-     * @return
-     * @date 2025-03-12 13:46:26
-     */
-    public Query getOrQuery() {
-        Query query = BoolQuery.of(b ->
-                b.should(m -> m.term(t -> t.field("status").value(FieldValue.of(401))))
-                        .should(m -> m.term(t -> t.field("status").value(FieldValue.of(403))))
-                        .must(m -> m.term(t -> t.field("host").value(FieldValue.of("api.iquizoo.com"))))
-        )._toQuery();
-        return query;
-    }
-
-    public Query getAndQuery() {
-        String fieldName1 = "url.raw";
-        String fieldValue1 = "/datareceive/ReceiveData/SendContentResult";
-
-        String fieldName2 = "requestMethod";
-        String fieldValue2 = "POST";
-
-        Query query = BoolQuery.of(b -> b
-                .must(m -> m.term(t -> t.field(fieldName1).value(FieldValue.of(fieldValue1))))
-                .must(m -> m.term(t -> t.field(fieldName2).value(FieldValue.of(fieldValue2))))
-        )._toQuery();
-        return query;
-    }
-
-    /**
-     * 模糊查询, 相当于 SQL 语句中的 like 查询
-     *
-     * @param
-     * @return
-     * @date 2025-03-12 14:11:45
-     */
-    public Query getWildcardQuery() {
-        Query query = BoolQuery.of(b ->
-                b.must(m -> m.wildcard(t -> t.field("url").value("*result*")))
-        )._toQuery();
-        return query;
-    }
-
-    /**
-     * 存在查询, 相当于 SQL 语句中的 exist
-     *
-     * @param
-     * @return
-     * @date 2025-03-12 14:20:26
-     */
-    public Query getExistsQuery(String fieldName) {
-        // 判断字段是否存在
-        Query query = ExistsQuery.of(t -> t.field(fieldName))._toQuery();
-        return query;
-    }
-
-    /**
-     * 范围查询, 相当于 SQL 语句中的 > 和 <
-     * gt 是大于,lt 是小于,gte 是大于等于,lte 是小于等于
-     *
-     * @param
-     * @return
-     * @date 2025-03-12 14:21:26
-     */
-    public Query getRangeQuery() {
-        int status1 = 404;
-        int status2 = 600;
-        Query query = RangeQuery.of(t -> t.field("status").gte(JsonData.of(status1)).lte(JsonData.of(status2)))._toQuery();
-        return query;
-    }
-
-    /**
-     * 正则查询
-     *
-     * @param
-     * @return
-     * @date 2025-03-12 14:22:18
-     */
-    public Query getRegexpQuery() {
-        Query query = RegexpQuery.of(t -> t.field("host").value("api.*"))._toQuery();
-        return query;
-    }
-
-    public Query getQuery() {
-        Query query1 = MatchAllQuery.of(m -> m.queryName("host"))._toQuery();
-
-        // 一般情况下有一个单词错误的情况下,fuzzy 查询可以找到另一个近似的词来代替,主要有以下场景:
-        //
-        //修改一个单词,如:box--->fox。
-        //移除一个单词,如:black-->lack。
-        //插入一个单词,如:sic-->sick。
-        //转换两个单词顺序,如:act-->cat。
-        Query query2 = FuzzyQuery.of(f -> f.field("host").value("lonel"))._toQuery();
-
-        // 通过指定字段的前缀进行查询
-        Query query = PrefixQuery.of(p -> p.field("host").value("api"))._toQuery();
-        return query;
-    }
-
     /**
      * 聚合查询, 相当于 SQL 的 group by
      *
@@ -408,7 +244,7 @@ public class SearchService {
                 list.forEach(bucket -> {
                     String groupKey = (String) bucket.key()._get();
                     long count = bucket.docCount();
-                    //System.out.println(groupKey + " : " + IpTool.getLocation(groupKey) + " -> " + count);
+                    //System.out.println(groupKey + " : " + GeoIpTool.getLocation(groupKey) + " -> " + count);
                     System.out.printf("%s -> %s\n", groupKey, count);
                 });
                 System.out.println("bucket size = " + list.size());
@@ -421,7 +257,6 @@ public class SearchService {
                     String dateTimeStr = DateTimeConverter.format(Long.parseLong(groupKey));
                     long count = bucket.docCount();
                     map.put(secondTimestamp, count);
-                    System.out.printf("%s -> %s\n", dateTimeStr, count);
                 });
             } else if (value instanceof DateRangeAggregate aggregate) {
                 List<RangeBucket> list = aggregate.buckets().array();
@@ -435,7 +270,6 @@ public class SearchService {
                     long msTimestamp = Long.parseLong(rangeBucket.key());
                     String dateStr = DateTimeConverter.localDateTime(msTimestamp).toLocalDate().toString();
                     long count = rangeBucket.docCount();
-                    System.out.printf("%s -> %s\n", dateStr, count);
                 });
             } else {
                 System.out.println(value);
@@ -450,61 +284,48 @@ public class SearchService {
      * @return 日期 -> 每日访问量
      * @date 2025-12-25 17:23:04
      */
-    public Map<String, Long> aggregateByDay(String index, String dateField) throws Exception {
+    public Map<String, Long> aggregateByDay(String index, String aggregateField, Query query) throws Exception {
         SearchRequest searchRequest = new SearchRequest.Builder()
                 .index(index)
                 .size(0)
+                .query(query)
                 // 按天进行聚合
-                .aggregations("agg1", a->a.dateHistogram(t->t.field(dateField).calendarInterval(CalendarInterval.Day).minDocCount(0)))
+                .aggregations("agg1", a->a.dateHistogram(t->t.field(aggregateField).calendarInterval(CalendarInterval.Day).minDocCount(0)))
                 .build();
 
         SearchResponse<NginxLog> searchResponse = esClient.search(searchRequest, NginxLog.class);
         TotalHits totalHits = searchResponse.hits().total();
         long total = totalHits.value();
         Map<String, Aggregate> resultMap = searchResponse.aggregations();
-
-        Map<String, Long> map = new TreeMap<>();
-        resultMap.forEach((k, v) -> {
-            Object value = v._get();
-            if (value instanceof DateHistogramAggregate aggregate) {
-                List<DateHistogramBucket> list = aggregate.buckets().array();
-                list.forEach(rangeBucket -> {
-                    long msTimestamp = Long.parseLong(rangeBucket.key());
-                    String dateStr = DateTimeConverter.localDateTime(msTimestamp).toLocalDate().toString();
-                    long count = rangeBucket.docCount();
-                    map.put(dateStr, count);
-                });
-            } else {
-                System.out.println("unknown aggregate type: " + value.getClass().getName());
-            }
-        });
-
+        Map<String, Long> map = getAggregateMap(aggregateField, resultMap);
         return map;
     }
 
     /**
-     * 在指定日期里对 url 进行聚合
+     * 在指定日期时间范围内对某个字段进行聚合
      *
      * @param
      * @return url -> 每日访问量
      * @date 2025-12-25 17:31:56
      */
-    public Map<String, Long> aggregateByUrl(String index, String urlField,
-                                            String dateField, String dateValue) throws Exception {
-        String start = String.format("%sT00:00:00Z", dateValue);
-        String end = String.format("%sT23:59:59Z", dateValue);
+    public Map<String, Long> aggregateByQuery(String index, String aggregateField, Query query) throws Exception {
+        int aggregateSize = 65535;
         SearchRequest searchRequest = new SearchRequest.Builder()
                 .index(index)
                 .size(0)
-                .query(q -> q.range(r -> r.field(dateField).gte(JsonData.of(start)).lte(JsonData.of(end))))
-                .aggregations("first_agg", a->a.terms(t->t.field(urlField).size(65535)))
+                .query(query)
+                .aggregations("first_agg", a->a.terms(t->t.field(aggregateField).size(aggregateSize)))
                 .build();
 
         SearchResponse<NginxLog> searchResponse = esClient.search(searchRequest, NginxLog.class);
         TotalHits totalHits = searchResponse.hits().total();
         long total = totalHits.value();
         Map<String, Aggregate> resultMap = searchResponse.aggregations();
+        Map<String, Long> map = getAggregateMap(aggregateField, resultMap);
+        return map;
+    }
 
+    private Map<String, Long> getAggregateMap(String aggregateField, Map<String, Aggregate> resultMap) {
         Map<String, Long> map = new TreeMap<>();
         resultMap.forEach((k, v) -> {
             Object value = v._get();
@@ -513,12 +334,30 @@ public class SearchService {
                 list.forEach(bucket -> {
                     String groupKey = (String) bucket.key()._get();
                     long count = bucket.docCount();
-                    //System.out.println(groupKey + " : " + IpTool.getLocation(groupKey) + " -> " + count);
-                    // System.out.printf("%s -> %s\n", groupKey, count);
                     map.put(groupKey, count);
                 });
+            } else if (value instanceof LongTermsAggregate terms) {
+                List<LongTermsBucket> list = terms.buckets().array();
+                list.forEach(bucket -> {
+                    long count = bucket.docCount();
+                    String groupKey = bucket.key();
+                    if ("timeIso8601".equals(aggregateField)) {
+                        String dateTimeStr = DateTimeConverter.format(Long.parseLong(groupKey));
+                        map.put(dateTimeStr, count);
+                    } else {
+                        map.put(groupKey, count);
+                    }
+                });
+            } else if (value instanceof DateHistogramAggregate aggregate) {
+                List<DateHistogramBucket> list = aggregate.buckets().array();
+                list.forEach(rangeBucket -> {
+                    long msTimestamp = Long.parseLong(rangeBucket.key());
+                    String dateStr = DateTimeConverter.localDateTime(msTimestamp).toLocalDate().toString();
+                    long count = rangeBucket.docCount();
+                    map.put(dateStr, count);
+                });
             } else {
-                System.out.println(value);
+                System.out.println("unknown aggregate type: " + value.getClass().getName());
             }
         });
 

+ 7 - 3
search/search-service/src/main/java/cn/reghao/tnb/search/app/log/NginxLogDocument.java

@@ -33,15 +33,19 @@ public class NginxLogDocument {
 
     public void addDocument(String indexName, NginxLog product) throws IOException {
         IndexResponse indexResponse = esClient.index(i -> i.index(indexName).id(product.getId()).document(product));
-        log.info("add one document result: {}", indexResponse.result().jsonValue());
+        String result = indexResponse.result().jsonValue();
+        if (!"created".equals(result)) {
+            log.error("add one document result: {}", indexResponse.result().jsonValue());
+        }
     }
 
     public void batchAddDocument(String indexName, List<NginxLog> nginxLogs) throws IOException {
         List<BulkOperation> bulkOperations = new ArrayList<>();
         nginxLogs.forEach(p -> bulkOperations.add(BulkOperation.of(b -> b.index(c -> c.id(p.getId()).document(p)))));
         BulkResponse bulkResponse = esClient.bulk(s -> s.index(indexName).operations(bulkOperations));
-        //bulkResponse.items().forEach(b -> log.info("bulk response result = {}", b.result()));
-        //log.error("bulk response.error() = {}", bulkResponse.errors());
+        if (bulkResponse.errors()) {
+            bulkResponse.items().forEach(b -> log.error("bulk response result = {}", b.result()));
+        }
     }
 
     public void saveAll(String index, List<NginxLog> nginxLogs) throws IOException {

+ 1 - 1
search/search-service/src/main/java/cn/reghao/tnb/search/app/log/NginxLogSearch.java

@@ -371,7 +371,7 @@ public class NginxLogSearch {
                     String groupKey = (String) bucket.key()._get();
                     long count = bucket.docCount();
                     countList.add(count);
-                    //System.out.println(groupKey + " : " + IpTool.getLocation(groupKey) + " -> " + count);
+                    //System.out.println(groupKey + " : " + GeoIpTool.getLocation(groupKey) + " -> " + count);
                     System.out.println(groupKey + " : " + count);
                 });
                 System.out.println("bucket size = " + list.size());

+ 264 - 125
search/search-service/src/main/java/cn/reghao/tnb/search/app/log/NginxLogService.java

@@ -1,15 +1,34 @@
 package cn.reghao.tnb.search.app.log;
 
-import cn.reghao.jutil.jdk.web.log.NginxLog;
 import cn.reghao.jutil.jdk.converter.DateTimeConverter;
-import cn.reghao.tnb.search.app.log.model.vo.DateCount;
+import cn.reghao.jutil.jdk.io.TextFile;
+import cn.reghao.jutil.jdk.serializer.JsonConverter;
+import cn.reghao.jutil.jdk.web.log.NginxLog;
+import cn.reghao.tnb.common.db.SelectOption;
+import cn.reghao.tnb.search.app.config.AppProperties;
+import cn.reghao.tnb.search.app.es.EsQuery;
+import cn.reghao.tnb.search.app.es.SearchService;
+import cn.reghao.tnb.search.app.log.ip.GeoIpTool;
+import cn.reghao.tnb.search.app.log.ip.Location;
+import cn.reghao.tnb.search.app.log.model.dto.DateTimeRange;
+import cn.reghao.tnb.search.app.log.model.vo.ChartData;
+import cn.reghao.tnb.search.app.log.model.vo.ChartMap;
+import cn.reghao.tnb.search.app.log.model.vo.GroupCount;
+import co.elastic.clients.elasticsearch._types.SortOrder;
+import co.elastic.clients.elasticsearch._types.query_dsl.Query;
+import co.elastic.clients.elasticsearch._types.query_dsl.RangeQuery;
+import co.elastic.clients.json.JsonData;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Service;
 
 import java.io.IOException;
-import java.time.*;
-import java.time.format.DateTimeFormatter;
+import java.time.Duration;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
 import java.util.*;
+import java.util.stream.Collectors;
 
 /**
  * @author reghao
@@ -18,16 +37,34 @@ import java.util.*;
 @Slf4j
 @Service
 public class NginxLogService {
-    private String indexName = "nginx_log";
+    private final String indexName = "nginx_log";
     private final NginxLogDocument nginxLogDocument;
+    private final SearchService searchService;
+    private final GeoIpTool geoIpTool;
+    private final String geoJson;
 
-    public NginxLogService(NginxLogDocument nginxLogDocument) {
+    public NginxLogService(NginxLogDocument nginxLogDocument, SearchService searchService,
+                           GeoIpTool geoIpTool, AppProperties appProperties) {
         this.nginxLogDocument = nginxLogDocument;
+        this.searchService = searchService;
+        this.geoIpTool = geoIpTool;
+        this.geoJson = new TextFile().readFile(appProperties.getGeojsonPath());
     }
 
     public void processNginxLog(NginxLog nginxLog) {
         saveNginxLog(nginxLog);
-        //processNginxLog0(nginxLog);
+    }
+
+    public void saveNginxLogs(List<NginxLog> list) {
+        if (list.isEmpty()) {
+            return;
+        }
+
+        try {
+            nginxLogDocument.batchAddDocument(indexName, list);
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
     }
 
     private void saveNginxLog(NginxLog nginxLog) {
@@ -38,146 +75,248 @@ public class NginxLogService {
         }
     }
 
-    String matchedMethod = "GET";
-    String matchedUrl = "/";
-    Map<String, Map<Long, Integer>> map = new HashMap<>();
-    Map<Long, Integer> ngxLogMap = new TreeMap<>();
-    DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss+08:00").withZone(ZoneId.of("UTC"));
-    DateTimeFormatter formatter1 = DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneId.of("UTC"));
-    private void processNginxLog0(NginxLog nginxLog) {
-        String todayStr = LocalDateTime.now().toLocalDate().format(formatter1);
-        String method = nginxLog.getRequestMethod();
-        String url = nginxLog.getUrl();
-        if (method.equals(matchedMethod) && url.startsWith(matchedUrl)) {
-            String date = nginxLog.getTimeIso8601();
-            LocalDateTime localDateTime = LocalDateTime.parse(date, formatter);
-            LocalDate localDate = localDateTime.toLocalDate();
-            String localDateStr = localDate.format(formatter1);
-            Map<Long, Integer> ngxLogMap = map.computeIfAbsent(localDateStr, k -> new TreeMap<>());
-
-            LocalTime localTime = localDateTime.toLocalTime();
-            Long timestampSecond = LocalDateTime.parse(date, formatter).toEpochSecond(ZoneOffset.of("+8"));
-            Integer count = ngxLogMap.get(timestampSecond);
-            if (count == null) {
-                ngxLogMap.put(timestampSecond, 1);
-            } else {
-                int count1 = ngxLogMap.get(timestampSecond) + 1;
-                ngxLogMap.put(timestampSecond, count1);
-            }
+    public List<SelectOption> getDateList() {
+        try {
+            String aggregateField = "timeIso8601";
+            Query query = Query.of(q -> q.matchAll(m -> m));
+            Map<String, Long> groupMap = searchService.aggregateByDay(indexName, aggregateField, query);
+
+            Set<String> dateSet = groupMap.keySet();
+            return dateSet.stream()
+                    .sorted(Comparator.reverseOrder())
+                    .map(dateStr -> new SelectOption(dateStr, dateStr))
+                    .collect(Collectors.toList());
+        } catch (Exception e) {
+            e.printStackTrace();
         }
+        return Collections.emptyList();
     }
 
-    /**
-     * NginxLog 在前端 echarts 中的可视化
-     *
-     * @param
-     * @return
-     * @date 2023-12-01 17:41:07
-     */
-    class PushTask implements Runnable {
-        @Override
-        public void run() {
-            while (!Thread.interrupted()) {
-                Set<String> dateSet = map.keySet();
-                try {
-                    if (ngxLogMap.size() < 3) {
-                        Thread.sleep(10_000);
-                        continue;
-                    }
-
-                    List<Object> chartData = getChartData();
-                    /*TextMessage textMessage = new TextMessage(JsonConverter.objectToJson(chartData));
-                    pullSessionMap.values().forEach(webSocketSession -> {
-                        try {
-                            webSocketSession.sendMessage(textMessage);
-                        } catch (IOException e) {
-                            e.printStackTrace();
-                        }
-                    });*/
-                    Thread.sleep(3_000);
-                } catch (Exception e) {
-                    e.printStackTrace();
-                }
-            }
+    public List<Object> getNginxLog2(DateTimeRange dateTimeRange) throws Exception {
+        String start = dateTimeRange.getStartDateTime();
+        LocalDateTime start1 = DateTimeConverter.localDateTime2(start);
+        String end = dateTimeRange.getEndDateTime();
+        LocalDateTime end1 = DateTimeConverter.localDateTime2(end);
+
+        Duration duration = Duration.between(start1, end1);
+        long days = duration.toHours();
+        if (days < 0) {
+            return Collections.emptyList();
+        } else if (days > 24) {
+            return Collections.emptyList();
         }
+        return null;
+    }
 
-        /**
-         * x 轴显示时分秒
-         * y 轴显示访问次数
-         *
-         * @param
-         * @return
-         * @date 2025-12-24 17:48:10
-         */
-        private List<Object> getChartData() {
-            Long timestampSecondCurrent = LocalDateTime.now().toEpochSecond(ZoneOffset.of("+8"));
-            List<String> xList = new ArrayList<>();
-            List<Integer> yList = new ArrayList<>();
-            Set<Long> keys = new HashSet<>();
-            for (Long timestampSecond : ngxLogMap.keySet()) {
-                if (timestampSecond < timestampSecondCurrent) {
-                    // x 轴显示时分秒
-                    xList.add(DateTimeConverter.format(timestampSecond*1000).split(" ")[1]);
-                    // y 轴显示访问次数
-                    yList.add(ngxLogMap.get(timestampSecond));
-                    keys.add(timestampSecond);
-                }
-            }
+    public List<ChartData> getTopN(String dateStr) throws IOException {
+        int pageSize = 100;
+        int pageNumber = 1;
 
-            keys.forEach(ngxLogMap::remove);
-            keys.clear();
-            List<Object> results = new ArrayList<>();
-            results.add(xList.toArray());
-            results.add(yList.toArray());
-            return results;
-        }
+        String startDate = dateStr;
+        String endDate = dateStr;
+        String start1 = String.format("%sT00:00:00", startDate);
+        String end1 = String.format("%sT23:59:59", endDate);
+        Query dateQuery = RangeQuery.of(q -> q.field("timeIso8601")
+                .gte(JsonData.of(start1)).lte(JsonData.of(end1)))._toQuery();
+        Query termQuery = EsQuery.getTermQuery("status", "200");
+        Query combinedQuery = Query.of(q -> q.bool(b -> b.filter(termQuery).filter(dateQuery)));
+
+        String sortField = "requestTime";
+        SortOrder sortOrder = SortOrder.Desc;
+        List<NginxLog> list = searchService.searchByPage(indexName, pageSize, pageNumber, combinedQuery, sortField, sortOrder);
+        List<ChartData> chartDataList = new ArrayList<>();
+        list.forEach(nginxLog -> {
+            String method = nginxLog.getRequestMethod();
+            String url = nginxLog.getUrl();
+            String name = String.format("%s %s", method, url);
+            double requestTime = nginxLog.getRequestTime();
+            chartDataList.add(new ChartData(name, requestTime));
+        });
+        return chartDataList;
     }
 
-    private List<Object> getChartData(Map<Long, Integer> timeMap) {
-        Long timestampSecondCurrent = LocalDateTime.now().toEpochSecond(ZoneOffset.of("+8"));
-        List<String> xList = new ArrayList<>();
-        List<Integer> yList = new ArrayList<>();
-        Set<Long> keys = new HashSet<>();
-        for (Long timestampSecond : timeMap.keySet()) {
-            if (timestampSecond < timestampSecondCurrent) {
-                // x 轴显示时分秒
-                xList.add(DateTimeConverter.format(timestampSecond*1000).split(" ")[1]);
-                // y 轴显示访问次数
-                yList.add(timeMap.get(timestampSecond));
-                keys.add(timestampSecond);
-            }
+    public long countNginxLog(String dateStr) {
+        String startDate = dateStr;
+        String endDate = dateStr;
+        String start1 = String.format("%sT00:00:00", startDate);
+        String end1 = String.format("%sT23:59:59", endDate);
+        RangeQuery dateQuery = RangeQuery.of(q -> q.field("timeIso8601").gte(JsonData.of(start1)).lte(JsonData.of(end1)));
+        return searchService.count(indexName, dateQuery._toQuery());
+    }
+
+    public List<Object> getChartData2(String dateStr, String fieldValue) throws Exception {
+        String aggregateField = "timeIso8601";
+        String dateField = "timeIso8601";
+        String fieldName = "url.raw";
+
+        String start = String.format("%sT00:00:00", dateStr);
+        String end = String.format("%sT23:59:59", dateStr);
+        /*String start1 = startDateTime.replace(" ", "T");
+        String end1 = endDateTime.replace(" ", "T");*/
+        RangeQuery dateQuery = RangeQuery.of(q -> q.field(dateField).gte(JsonData.of(start)).lte(JsonData.of(end)));
+        Query combinedQuery;
+        if (fieldValue == null || fieldValue.isBlank()) {
+            combinedQuery = Query.of(q -> q.bool(b -> b.filter(dateQuery._toQuery())));
+        } else {
+            Query termQuery = EsQuery.getTermQuery(fieldName, fieldValue);
+            combinedQuery = Query.of(q -> q.bool(b -> b.filter(dateQuery._toQuery()).filter(termQuery)));
         }
+        Map<String, Long> urlGroupMap = searchService.aggregateByQuery(indexName, aggregateField, combinedQuery);
+
+        List<String> xList = new ArrayList<>();
+        List<Long> yList = new ArrayList<>();
+        urlGroupMap.forEach((key, value) -> {
+            // x 轴显示时分秒
+            xList.add(key);
+            // y 轴显示访问次数
+            yList.add(value);
+        });
 
-        keys.forEach(timeMap::remove);
-        keys.clear();
         List<Object> results = new ArrayList<>();
         results.add(xList.toArray());
         results.add(yList.toArray());
         return results;
     }
 
-    public List<DateCount> getDateCountList() {
-        Map<String, Integer> result = new TreeMap<>();
-        List<DateCount> list = new LinkedList<>();
-        for (Map.Entry<String, Map<Long, Integer>> entry : map.entrySet()) {
-            String dateStr = entry.getKey();
-            Integer count = entry.getValue().size();
-            list.add(new DateCount(dateStr, count));
-            result.put(dateStr, count);
-        }
+    public List<GroupCount> getChartData3(String aggregateField, String dateStr) throws Exception {
+        String dateField = "timeIso8601";
+        DateTimeRange dateTimeRange = getDateTimeRange(dateStr);
+        String startDateTime = dateTimeRange.getStartDateTime();
+        String endDateTime = dateTimeRange.getEndDateTime();
+        String start = startDateTime.replace(" ", "T");
+        String end = endDateTime.replace(" ", "T");
+        Query query = RangeQuery.of(q -> q.field(dateField).gte(JsonData.of(start)).lte(JsonData.of(end)))._toQuery();
+        Map<String, Long> groupMap = searchService.aggregateByQuery(indexName, aggregateField, query);
+
+        List<GroupCount> list = new ArrayList<>();
+        groupMap.forEach((key, value) -> {
+            list.add(new GroupCount(key, value));
+        });
+        list.sort(new Comparator<>() {
+            @Override
+            public int compare(GroupCount groupCount1, GroupCount groupCount2) {
+                // 降序
+                long result = groupCount2.getTotal() - groupCount1.getTotal();
+                return (int) result;
+            }
+        });
         return list;
     }
 
-    public List<Object> getChartData1(String dateStr) {
-        Map<Long, Integer> timeMap = map.get(dateStr);
-        return getChartData(timeMap);
+    public ChartMap getChartMap(String aggregateField, String dateStr) throws Exception {
+        int deep = 1;
+        String dateField = "timeIso8601";
+        DateTimeRange dateTimeRange = getDateTimeRange(dateStr);
+        String startDateTime = dateTimeRange.getStartDateTime();
+        String endDateTime = dateTimeRange.getEndDateTime();
+        String start = startDateTime.replace(" ", "T");
+        String end = endDateTime.replace(" ", "T");
+        RangeQuery dateQuery = RangeQuery.of(q -> q.field(dateField).gte(JsonData.of(start)).lte(JsonData.of(end)));
+        Query query = dateQuery._toQuery();
+        Map<String, Long> groupMap1 = searchService.aggregateByQuery(indexName, aggregateField, query);
+
+        Map<String, Long> map0 = new HashMap<>();
+        Map<String, Map<String, Set<String>>> map = new HashMap<>();
+        groupMap1.forEach((key, value) -> {
+            String accessIp = key;
+            Location location = geoIpTool.getLocation(accessIp);
+            String country0 = location.getCountry();
+            String[] array = country0.split("–");
+            int len = array.length;
+            String country = array[0];
+            if (!"中国".equals(country) || len < 2) {
+                return;
+            }
+
+            String province = array[1];
+            Long count = map0.get(province);
+            if (count == null) {
+                count = 1L;
+            } else {
+                count += 1L;
+            }
+            map0.put(province, count);
+
+            Map<String, Set<String>> map1 = map.computeIfAbsent(province, k -> new HashMap<>());
+            if (len == 3) {
+                String city = array[2];
+                Set<String> set = map1.computeIfAbsent(city, k -> new HashSet<>());
+            } else if (len == 4) {
+                String city = array[2];
+                Set<String> set = map1.computeIfAbsent(city, k -> new HashSet<>());
+                String county = array[3];
+                set.add(county);
+            }
+        });
+
+        Map<Integer, String> geoMap = new HashMap<>();
+        JsonObject jsonObject = JsonConverter.jsonToJsonElement(geoJson).getAsJsonObject();
+        for (JsonElement jsonElement : jsonObject.get("features").getAsJsonArray()) {
+            JsonObject propertiesObject = jsonElement.getAsJsonObject().get("properties").getAsJsonObject();
+            int adcode = propertiesObject.get("adcode").getAsInt();
+            String name = propertiesObject.get("name").getAsString();
+            if (deep == 3) {
+                geoMap.put(adcode, name);
+            } else if (deep == 2) {
+                String adcodeStr = (""+adcode).substring(0, 4);
+                geoMap.put(Integer.parseInt(adcodeStr), name);
+            } else if (deep == 1) {
+                String adcodeStr = (""+adcode).substring(0, 2);
+                geoMap.put(Integer.parseInt(adcodeStr), name);
+            }
+        }
+
+        List<String> areaNames = new ArrayList<>(geoMap.values());
+        List<ChartData> chartDataList = new ArrayList<>();
+        map0.forEach((key, value) -> {
+            for (String areaName : areaNames) {
+                if (areaName.startsWith(key)) {
+                    chartDataList.add(new ChartData(areaName, value));
+                }
+            }
+        });
+        return new ChartMap(geoJson, chartDataList);
     }
 
-    public List<DateCount> getNginxLogs() {
-        return getDateCountList();
+    private DateTimeRange getDateTimeRange(String dateStr) {
+        LocalDate localDate = LocalDate.parse(dateStr);
+        String startDate = localDate.minusDays(7).toString();
+        String endDate = localDate.toString();
+
+        String start1 = String.format("%sT00:00:00", startDate);
+        String end1 = String.format("%sT23:59:59", endDate);
+        return new DateTimeRange(start1, end1);
     }
 
-    public List<Object> getChartData(String dateStr) {
-        return getChartData1(dateStr);
+    public List<Object> getChartData5(String dateStr, String fieldValue) throws Exception {
+        String aggregateField = "timeIso8601";
+        String fieldName = "url.raw";
+
+        Query query = Query.of(q -> q.matchAll(m -> m));
+        DateTimeRange dateTimeRange = getDateTimeRange(dateStr);
+        String start1 = dateTimeRange.getStartDateTime();
+        String end1 = dateTimeRange.getEndDateTime();
+        RangeQuery dateQuery = RangeQuery.of(q -> q.field("timeIso8601").gte(JsonData.of(start1)).lte(JsonData.of(end1)));
+        Query combinedQuery;
+        if (fieldValue != null && !fieldValue.isBlank()) {
+            Query termQuery = EsQuery.getTermQuery(fieldName, fieldValue);
+            combinedQuery = Query.of(q -> q.bool(b -> b.filter(termQuery).filter(dateQuery._toQuery())));
+        } else {
+            combinedQuery = Query.of(q -> q.bool(b -> b.filter(dateQuery._toQuery())));
+        }
+        Map<String, Long> groupMap = searchService.aggregateByDay(indexName, aggregateField, combinedQuery);
+
+        List<String> xList = new ArrayList<>();
+        List<Long> yList = new ArrayList<>();
+        groupMap.forEach((key, value) -> {
+            // x 轴显示时分秒
+            xList.add(key);
+            // y 轴显示访问次数
+            yList.add(value);
+        });
+        List<Object> results = new ArrayList<>();
+        results.add(xList.toArray());
+        results.add(yList.toArray());
+        return results;
     }
 }

+ 136 - 0
search/search-service/src/main/java/cn/reghao/tnb/search/app/log/NgxLogService.java

@@ -0,0 +1,136 @@
+package cn.reghao.tnb.search.app.log;
+
+import cn.reghao.jutil.jdk.converter.DateTimeConverter;
+import cn.reghao.jutil.jdk.web.log.NginxLog;
+import lombok.extern.slf4j.Slf4j;
+
+import java.time.*;
+import java.time.format.DateTimeFormatter;
+import java.util.*;
+
+/**
+ * @author reghao
+ * @date 2023-11-08 10:08:02
+ */
+@Slf4j
+@Deprecated
+public class NgxLogService {
+    String matchedMethod = "GET";
+    String matchedUrl = "/";
+    Map<String, Map<Long, Integer>> map = new HashMap<>();
+    Map<Long, Integer> ngxLogMap = new TreeMap<>();
+    DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss+08:00").withZone(ZoneId.of("UTC"));
+    DateTimeFormatter formatter1 = DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneId.of("UTC"));
+
+    public void processNginxLog0(NginxLog nginxLog) {
+        String todayStr = LocalDateTime.now().toLocalDate().format(formatter1);
+        String method = nginxLog.getRequestMethod();
+        String url = nginxLog.getUrl();
+        if (method.equals(matchedMethod) && url.startsWith(matchedUrl)) {
+            String date = nginxLog.getTimeIso8601();
+            LocalDateTime localDateTime = LocalDateTime.parse(date, formatter);
+            LocalDate localDate = localDateTime.toLocalDate();
+            String localDateStr = localDate.format(formatter1);
+            Map<Long, Integer> ngxLogMap = map.computeIfAbsent(localDateStr, k -> new TreeMap<>());
+
+            LocalTime localTime = localDateTime.toLocalTime();
+            Long timestampSecond = LocalDateTime.parse(date, formatter).toEpochSecond(ZoneOffset.of("+8"));
+            Integer count = ngxLogMap.get(timestampSecond);
+            if (count == null) {
+                ngxLogMap.put(timestampSecond, 1);
+            } else {
+                int count1 = ngxLogMap.get(timestampSecond) + 1;
+                ngxLogMap.put(timestampSecond, count1);
+            }
+        }
+    }
+
+    /**
+     * NginxLog 在前端 echarts 中的可视化
+     *
+     * @param
+     * @return
+     * @date 2023-12-01 17:41:07
+     */
+    class PushTask implements Runnable {
+        @Override
+        public void run() {
+            while (!Thread.interrupted()) {
+                Set<String> dateSet = map.keySet();
+                try {
+                    if (ngxLogMap.size() < 3) {
+                        Thread.sleep(10_000);
+                        continue;
+                    }
+
+                    List<Object> chartData = getChartData();
+                    /*TextMessage textMessage = new TextMessage(JsonConverter.objectToJson(chartData));
+                    pullSessionMap.values().forEach(webSocketSession -> {
+                        try {
+                            webSocketSession.sendMessage(textMessage);
+                        } catch (IOException e) {
+                            e.printStackTrace();
+                        }
+                    });*/
+                    Thread.sleep(3_000);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+
+        /**
+         * x 轴显示时分秒
+         * y 轴显示访问次数
+         *
+         * @param
+         * @return
+         * @date 2025-12-24 17:48:10
+         */
+        private List<Object> getChartData() {
+            Long timestampSecondCurrent = LocalDateTime.now().toEpochSecond(ZoneOffset.of("+8"));
+            List<String> xList = new ArrayList<>();
+            List<Integer> yList = new ArrayList<>();
+            Set<Long> keys = new HashSet<>();
+            for (Long timestampSecond : ngxLogMap.keySet()) {
+                if (timestampSecond < timestampSecondCurrent) {
+                    // x 轴显示时分秒
+                    xList.add(DateTimeConverter.format(timestampSecond*1000).split(" ")[1]);
+                    // y 轴显示访问次数
+                    yList.add(ngxLogMap.get(timestampSecond));
+                    keys.add(timestampSecond);
+                }
+            }
+
+            keys.forEach(ngxLogMap::remove);
+            keys.clear();
+            List<Object> results = new ArrayList<>();
+            results.add(xList.toArray());
+            results.add(yList.toArray());
+            return results;
+        }
+    }
+
+    private List<Object> getChartData(Map<Long, Integer> timeMap) {
+        Long timestampSecondCurrent = LocalDateTime.now().toEpochSecond(ZoneOffset.of("+8"));
+        List<String> xList = new ArrayList<>();
+        List<Integer> yList = new ArrayList<>();
+        Set<Long> keys = new HashSet<>();
+        for (Long timestampSecond : timeMap.keySet()) {
+            if (timestampSecond < timestampSecondCurrent) {
+                // x 轴显示时分秒
+                xList.add(DateTimeConverter.format(timestampSecond*1000).split(" ")[1]);
+                // y 轴显示访问次数
+                yList.add(timeMap.get(timestampSecond));
+                keys.add(timestampSecond);
+            }
+        }
+
+        keys.forEach(timeMap::remove);
+        keys.clear();
+        List<Object> results = new ArrayList<>();
+        results.add(xList.toArray());
+        results.add(yList.toArray());
+        return results;
+    }
+}

+ 23 - 0
search/search-service/src/main/java/cn/reghao/tnb/search/app/log/ip/GeoIpTool.java

@@ -0,0 +1,23 @@
+package cn.reghao.tnb.search.app.log.ip;
+
+/**
+ * @author reghao
+ * @date 2025-03-12 17:16:31
+ */
+public class GeoIpTool {
+    private final String geoipPath;
+    private IPLocation ipLocation;
+
+    public GeoIpTool(String geoipPath) throws Exception {
+        this.geoipPath =geoipPath;
+        this.ipLocation = new IPLocation(geoipPath);
+    }
+
+    public void reload() throws Exception {
+        this.ipLocation = new IPLocation(geoipPath);
+    }
+
+    public Location getLocation(String ip) {
+        return ipLocation.fetchIPLocation(ip);
+    }
+}

+ 0 - 23
search/search-service/src/main/java/cn/reghao/tnb/search/app/log/ip/IpTool.java

@@ -1,23 +0,0 @@
-package cn.reghao.tnb.search.app.log.ip;
-
-/**
- * @author reghao
- * @date 2025-03-12 17:16:31
- */
-public class IpTool {
-    static String filePath = "/home/reghao/Downloads/qqwry.dat";
-    static IPLocation ipLocation;
-    static {
-        try {
-            ipLocation = new IPLocation(filePath);
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-    }
-
-
-    public static String getLocation(String ip) {
-        Location loc = ipLocation.fetchIPLocation(ip);
-        return loc.toString();
-    }
-}

+ 5 - 5
search/search-service/src/main/java/cn/reghao/tnb/search/app/log/ip/Location.java

@@ -1,19 +1,19 @@
 package cn.reghao.tnb.search.app.log.ip;
+
+import lombok.Getter;
+
 /**
  * @Description:位置
  * @author:difeng
  * @date:2016年12月13日
  */
+@Getter
 public class Location {
-	
     public String country;
-    
     public String area;
-    
+
 	@Override
 	public String toString() {
 		return "Location [country=" + country + ", area=" + area + "]";
 	}
-    
 }
-

+ 20 - 0
search/search-service/src/main/java/cn/reghao/tnb/search/app/log/kafka/KafkaDemo.java

@@ -0,0 +1,20 @@
+package cn.reghao.tnb.search.app.log.kafka;
+
+/**
+ * @author reghao
+ * @date 2022-04-02 11:20:21
+ */
+public class KafkaDemo {
+    public static void main(String[] args) throws InterruptedException {
+        /*KafkaSub kafkaSub = new KafkaSub();
+        kafkaSub.consume();
+
+        KafkaPub kafkaPub = new KafkaPub();
+        int i = 1;
+        while (!Thread.interrupted()) {
+            kafkaPub.produce(i);
+            Thread.sleep(1_000);
+            i++;
+        }*/
+    }
+}

+ 43 - 0
search/search-service/src/main/java/cn/reghao/tnb/search/app/log/kafka/KafkaPub.java

@@ -0,0 +1,43 @@
+package cn.reghao.tnb.search.app.log.kafka;
+
+import cn.reghao.jutil.jdk.serializer.JsonConverter;
+import cn.reghao.jutil.jdk.web.log.NginxLog;
+import cn.reghao.tnb.search.app.config.AppProperties;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.springframework.stereotype.Component;
+
+import java.util.Properties;
+
+/**
+ * @author reghao
+ * @date 2022-04-02 11:20:37
+ */
+@Component
+public class KafkaPub {
+    private final String topic;
+    private final Producer<String, String> producer;
+
+    public KafkaPub(AppProperties appProperties) {
+        this.topic = appProperties.getKafkaTopic();
+
+        Properties properties = new Properties();
+        properties.put("bootstrap.servers", appProperties.getKafkaUri());
+        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+        this.producer = new KafkaProducer<>(properties);
+    }
+
+    public void produce(int i) {
+        String topic = "CustomerCountry";
+        ProducerRecord<String, String> record = new ProducerRecord<>(topic, "index", ""+i);
+        producer.send(record);
+    }
+
+    public void produce(NginxLog nginxLog) {
+        String json = JsonConverter.objectToJson(nginxLog);
+        ProducerRecord<String, String> record = new ProducerRecord<>(topic, "nginx-log", json);
+        producer.send(record);
+    }
+}

+ 79 - 0
search/search-service/src/main/java/cn/reghao/tnb/search/app/log/kafka/KafkaSub.java

@@ -0,0 +1,79 @@
+package cn.reghao.tnb.search.app.log.kafka;
+
+import cn.reghao.jutil.jdk.serializer.JsonConverter;
+import cn.reghao.jutil.jdk.thread.ThreadPoolWrapper;
+import cn.reghao.jutil.jdk.web.log.NginxLog;
+import cn.reghao.tnb.search.app.config.AppProperties;
+import cn.reghao.tnb.search.app.log.NginxLogService;
+import jakarta.annotation.PostConstruct;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.CommitFailedException;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.springframework.stereotype.Component;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * @author reghao
+ * @date 2022-04-02 11:20:44
+ */
+@Slf4j
+@Component
+public class KafkaSub {
+    private final ExecutorService threadPool = ThreadPoolWrapper.threadPool("kafka-consumer");
+    private final String topic;
+    private final KafkaConsumer<String,String> consumer;
+    private final NginxLogService nginxLogService;
+
+    public KafkaSub(AppProperties appProperties, NginxLogService nginxLogService) {
+        this.topic = appProperties.getKafkaTopic();
+        this.nginxLogService = nginxLogService;
+
+        Properties properties = new Properties();
+        properties.put("bootstrap.servers", appProperties.getKafkaUri());
+        // 消费者群组
+        properties.put("group.id", "test");
+        properties.put("enable.auto.commit", "true");
+        properties.put("auto.commit.interval.ms", "1000");
+        properties.put("session.timeout.ms", "30000");
+        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+        this.consumer = new KafkaConsumer<>(properties);
+    }
+
+    //@PostConstruct
+    public void consume() {
+        consumer.subscribe(Collections.singletonList(topic));
+        threadPool.submit(new ConsumeTask());
+    }
+
+    class ConsumeTask implements Runnable {
+        public void run() {
+            try {
+                while (!Thread.interrupted()) {
+                    List<NginxLog> list = new ArrayList<>();
+                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(100));
+                    for (ConsumerRecord<String, String> record : records) {
+                        String key = record.key();
+                        String value = record.value();
+                        NginxLog nginxLog = JsonConverter.jsonToObject(value, NginxLog.class);
+                        list.add(nginxLog);
+                    }
+                    nginxLogService.saveNginxLogs(list);
+                    consumer.commitAsync();
+                }
+            } catch (CommitFailedException e) {
+                log.error("commit failed", e);
+            } finally {
+                consumer.close();
+            }
+        }
+    }
+}

+ 25 - 0
search/search-service/src/main/java/cn/reghao/tnb/search/app/log/model/dto/DateTimeRange.java

@@ -0,0 +1,25 @@
+package cn.reghao.tnb.search.app.log.model.dto;
+
+import jakarta.validation.constraints.NotBlank;
+import jakarta.validation.constraints.Size;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+/**
+ * @author reghao
+ * @date 2026-01-06 14:16:18
+ */
+@NoArgsConstructor
+@AllArgsConstructor
+@Setter
+@Getter
+public class DateTimeRange {
+    @NotBlank
+    @Size(min = 19, max = 19, message = "日期时间格式 yyyy-mm-dd hh:mm:ss")
+    private String startDateTime;
+    @NotBlank
+    @Size(min = 19, max = 19, message = "日期时间格式 yyyy-mm-dd hh:mm:ss")
+    private String endDateTime;
+}

+ 19 - 0
search/search-service/src/main/java/cn/reghao/tnb/search/app/log/model/vo/ChartData.java

@@ -0,0 +1,19 @@
+package cn.reghao.tnb.search.app.log.model.vo;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+/**
+ * @author reghao
+ * @date 2026-01-07 14:43:45
+ */
+@NoArgsConstructor
+@AllArgsConstructor
+@Setter
+@Getter
+public class ChartData {
+    private String name;
+    private Object value;
+}

+ 23 - 0
search/search-service/src/main/java/cn/reghao/tnb/search/app/log/model/vo/ChartMap.java

@@ -0,0 +1,23 @@
+package cn.reghao.tnb.search.app.log.model.vo;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * @author reghao
+ * @date 2025-08-19 15:17:32
+ */
+@AllArgsConstructor
+@Getter
+public class ChartMap {
+    private String geoJson;
+    private List<ChartData> list;
+
+    public ChartMap() {
+        this.geoJson = "{}";
+        this.list = Collections.emptyList();
+    }
+}

+ 3 - 3
search/search-service/src/main/java/cn/reghao/tnb/search/app/log/model/vo/DateCount.java → search/search-service/src/main/java/cn/reghao/tnb/search/app/log/model/vo/GroupCount.java

@@ -13,7 +13,7 @@ import lombok.Setter;
 @AllArgsConstructor
 @Setter
 @Getter
-public class DateCount {
-    private String date;
-    private int total;
+public class GroupCount {
+    private String name;
+    private long total;
 }

+ 108 - 0
search/search-service/src/main/java/cn/reghao/tnb/search/app/log/pc/LogConsumer.java

@@ -0,0 +1,108 @@
+package cn.reghao.tnb.search.app.log.pc;
+
+import cn.reghao.jutil.jdk.web.log.NginxLog;
+import cn.reghao.tnb.search.app.log.NginxLogService;
+import jakarta.annotation.PostConstruct;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.stereotype.Component;
+
+import java.util.concurrent.Executor;
+
+/**
+ * @author reghao
+ * @date 2025-12-31 14:25:59
+ */
+@Slf4j
+@Component
+public class LogConsumer {
+    private final Executor taskExecutor;
+    private final Object monitor;
+    private final LogProducer logProducer;
+    private final NginxLogService nginxLogService;
+
+    public LogConsumer(@Qualifier("taskExecutor") Executor taskExecutor, LogProducer logProducer,
+                       NginxLogService nginxLogService) {
+        this.taskExecutor = taskExecutor;
+        this.monitor = logProducer.getMonitor();
+        this.logProducer = logProducer;
+        this.nginxLogService = nginxLogService;
+    }
+
+    @PostConstruct
+    public void run() {
+        taskExecutor.execute(new ConsumerThread());
+        log.info("NginxLog producer-consumer 模型启动...");
+    }
+
+    public int getActiveTasks() {
+        int queueSize = logProducer.size();
+        ThreadPoolTaskExecutor executor = (ThreadPoolTaskExecutor) taskExecutor;
+        int capacity = executor.getQueueCapacity();
+        int size = executor.getQueueSize();
+        int activeCount = executor.getActiveCount();
+        log.info("{} -> {} {} {}", queueSize, capacity, size, activeCount);
+        return activeCount;
+    }
+
+    class ConsumerThread implements Runnable {
+        @Override
+        public void run() {
+            ThreadPoolTaskExecutor executor = (ThreadPoolTaskExecutor) taskExecutor;
+            int capacity = executor.getQueueCapacity();
+            while (!Thread.interrupted()) {
+                try {
+                    if (executor.getQueueSize() < capacity) {
+                        dispatch();
+                    } else {
+                        log.info("当前有 {} 个活跃线程, 休眠 1s 等待线程池空闲...", executor.getActiveCount());
+                        Thread.sleep(1_000);
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+
+    private void dispatch() throws Exception {
+        synchronized (monitor) {
+            Object object = logProducer.get();
+            if (object != null) {
+                if (object instanceof NginxLog) {
+                    NginxLog nginxLog = (NginxLog) object;
+                    String timeStr = nginxLog.getTimeIso8601().replace("+08:00", "");
+                    nginxLog.setTimeIso8601(timeStr);
+
+                    LogTask logTask = new LogTask(nginxLogService, nginxLog);
+                    taskExecutor.execute(logTask);
+                } else {
+                    log.error("Object 类型未知");
+                }
+            } else {
+                //log.info("调用 monitor.wait() 等待 DataProducer 中有数据可用");
+                monitor.wait();
+            }
+        }
+    }
+
+    static class LogTask implements Runnable {
+        private final NginxLogService nginxLogService;
+        private final NginxLog nginxLog;
+
+        public LogTask(NginxLogService nginxLogService, NginxLog nginxLog) {
+            this.nginxLogService = nginxLogService;
+            this.nginxLog = nginxLog;
+        }
+
+        @Override
+        public void run() {
+            try {
+                nginxLogService.processNginxLog(nginxLog);
+            } catch (Exception e) {
+                log.error("LogTask failed with message: {}", e.getMessage());
+            }
+        }
+    }
+}

+ 41 - 0
search/search-service/src/main/java/cn/reghao/tnb/search/app/log/pc/LogProducer.java

@@ -0,0 +1,41 @@
+package cn.reghao.tnb.search.app.log.pc;
+
+import org.springframework.stereotype.Component;
+
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * @author reghao
+ * @date 2025-12-31 14:25:53
+ */
+@Component
+public class LogProducer {
+    private final Object monitor;
+    private final LinkedBlockingQueue<Object> dataQueue;
+
+    public LogProducer() {
+        this.monitor = new Object();
+        this.dataQueue = new LinkedBlockingQueue<>(10_000);
+    }
+
+    public Object getMonitor() {
+        return monitor;
+    }
+
+    public void put(Object object) throws InterruptedException {
+        dataQueue.put(object);
+        synchronized (monitor) {
+            // 通知 consumer 线程有数据可用
+            monitor.notify();
+        }
+    }
+
+    public Object get() throws InterruptedException {
+        return dataQueue.poll();
+        //return dataQueue.take();
+    }
+
+    public int size() {
+        return dataQueue.size();
+    }
+}

+ 3 - 3
search/search-service/src/main/java/cn/reghao/tnb/search/app/lucene/LuceneIndex.java

@@ -1,7 +1,7 @@
 package cn.reghao.tnb.search.app.lucene;
 
 import cn.reghao.tnb.search.api.dto.IndexCount;
-import cn.reghao.tnb.search.app.config.ElasticProperties;
+import cn.reghao.tnb.search.app.config.AppProperties;
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.document.Document;
 import org.apache.lucene.index.*;
@@ -26,8 +26,8 @@ public class LuceneIndex {
     private final String nativeLuceneDir;
     private final Analyzer luceneAnalyzer;
 
-    public LuceneIndex(ElasticProperties elasticProperties) {
-        this.nativeLuceneDir = elasticProperties.getNativeLuceneDir();
+    public LuceneIndex(AppProperties appProperties) {
+        this.nativeLuceneDir = appProperties.getNativeLuceneDir();
         this.luceneAnalyzer = new IKAnalyzer();
     }
 

+ 3 - 3
search/search-service/src/main/java/cn/reghao/tnb/search/app/lucene/LuceneSearch.java

@@ -1,7 +1,7 @@
 package cn.reghao.tnb.search.app.lucene;
 
 import cn.reghao.jutil.jdk.web.db.PageList;
-import cn.reghao.tnb.search.app.config.ElasticProperties;
+import cn.reghao.tnb.search.app.config.AppProperties;
 import cn.reghao.tnb.search.app.model.vo.ElasticQuery;
 import cn.reghao.tnb.search.app.model.vo.LuceneQuery;
 import cn.reghao.tnb.search.app.model.vo.VideoQuery;
@@ -40,8 +40,8 @@ public class LuceneSearch {
     private SimpleHTMLFormatter formatter;
     private Map<String, IndexReader> indexReaderMap = new HashMap<>();
 
-    public LuceneSearch(ElasticProperties elasticProperties) {
-        this.nativeLuceneDir = elasticProperties.getNativeLuceneDir();
+    public LuceneSearch(AppProperties appProperties) {
+        this.nativeLuceneDir = appProperties.getNativeLuceneDir();
         this.luceneAnalyzer = new IKAnalyzer();
         this.formatter = new SimpleHTMLFormatter("<span style='color:red;'>", "</span>");
     }

+ 1 - 1
search/search-service/src/main/java/cn/reghao/tnb/search/app/ws/config/WebSocketConfig.java

@@ -29,7 +29,7 @@ public class WebSocketConfig implements WebSocketConfigurer {
                 .addInterceptors(webSocketInterceptor)
                 .setAllowedOrigins("*");
 
-        registry.addHandler(logHandler, WebSocketPath.wsPushLog)
+        registry.addHandler(logHandler, WebSocketPath.wsAgent)
                 .addInterceptors(webSocketInterceptor)
                 .setAllowedOrigins("*");
     }

+ 15 - 11
search/search-service/src/main/java/cn/reghao/tnb/search/app/ws/handler/LogHandler.java

@@ -6,9 +6,9 @@ import cn.reghao.jutil.jdk.serializer.JdkSerializer;
 import cn.reghao.jutil.jdk.serializer.JsonConverter;
 import cn.reghao.jutil.jdk.web.log.GatewayLog;
 import cn.reghao.jutil.jdk.web.log.NginxLog;
-import cn.reghao.tnb.search.app.log.NginxLogService;
-import cn.reghao.tnb.search.app.ws.WsSender;
+import cn.reghao.tnb.search.app.log.kafka.KafkaPub;
 import cn.reghao.tnb.search.app.ws.config.WebSocketPath;
+import cn.reghao.tnb.search.app.log.pc.LogProducer;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Component;
 import org.springframework.web.socket.*;
@@ -29,12 +29,12 @@ public class LogHandler implements WebSocketHandler {
     private final Map<String, WebSocketSession> pullAccessMap = new ConcurrentHashMap<>();
     private final Map<String, WebSocketSession> pushSessionMap = new ConcurrentHashMap<>();
     private final Map<String, WebSocketSession> pullSessionMap = new ConcurrentHashMap<>();
-    private WsSender wsSender;
-    private final NginxLogService nginxLogService;
+    private final LogProducer logProducer;
+    private KafkaPub kafkaPub;
 
-    public LogHandler(WsSender wsSender, NginxLogService nginxLogService) {
-        this.wsSender = wsSender;
-        this.nginxLogService = nginxLogService;
+    public LogHandler(LogProducer logProducer, KafkaPub kafkaPub) {
+        this.logProducer = logProducer;
+        this.kafkaPub = kafkaPub;
     }
 
     private WebSocketSession getPullSession(String app, String host) {
@@ -126,7 +126,7 @@ public class LogHandler implements WebSocketHandler {
 
         String appKey = String.format("%s@%s", app, host);
         String path = webSocketSession.getUri().getPath();
-        if (path.equals(WebSocketPath.wsPushLog)) {
+        if (path.equals(WebSocketPath.wsAgent)) {
             appMap.put(sessionId, appKey);
             pushSessionMap.put(appKey, webSocketSession);
         } else if (path.equals(WebSocketPath.wsPullLog)) {
@@ -148,16 +148,20 @@ public class LogHandler implements WebSocketHandler {
                 BinaryMessage binaryMessage = (BinaryMessage) webSocketMessage;
                 Object object = JdkSerializer.deserialize(binaryMessage.getPayload().array());
                 if (object instanceof String) {
-                    log.info("{} -> {}", appKey, object);
+                    //log.info("{} -> {}", appKey, object);
+                    log.info("WebSocket heartbeat");
+                    webSocketSession.sendMessage(new TextMessage(""));
                 } else if (object instanceof AppLog) {
                     AppLog appLog = (AppLog) object;
                     String dateTimeStr = DateTimeConverter.format(appLog.getTimestamp());
                 } else if (object instanceof NginxLog) {
                     NginxLog nginxLog = (NginxLog) object;
-                    nginxLogService.processNginxLog(nginxLog);
+                    //logProducer.put(nginxLog);
+                    kafkaPub.produce(nginxLog);
                 }
             } else if (webSocketMessage instanceof PingMessage) {
                 log.info("接收到 WebSocket PingMessage");
+                webSocketSession.sendMessage(new PongMessage());
             } else if (webSocketMessage instanceof PongMessage) {
                 log.info("接收到 WebSocket PongMessage");
             } else {
@@ -176,7 +180,7 @@ public class LogHandler implements WebSocketHandler {
 
     @Override
     public void afterConnectionClosed(WebSocketSession webSocketSession, CloseStatus closeStatus) {
-        log.info("WebSocket 断开连接");
+        log.info("WebSocket 断开连接: {} -> {}", closeStatus.getCode(), closeStatus.getReason());
         removeSession(webSocketSession);
     }
 

+ 10 - 6
search/search-service/src/main/resources/application-dev.yml

@@ -75,10 +75,14 @@ eureka:
     fetch-registry: true
     service-url:
       defaultZone: http://127.0.0.1:6060/eureka/
-es:
-  host: 127.0.0.1
-  port: 9200
-  username: elastic
-  password: VLTtN03SSJ4lsyyg56kf
+app:
+  es-host: 192.168.0.81
+  es-port: 9200
+  es-username: elastic
+  es-password: VLTtN03SSJ4lsyyg56kf
   native-lucene-dir: /opt/data/search_data/native_lucene
-  hibernate-lucene-dir: /opt/data/search_data/hibernate_lucene
+  hibernate-lucene-dir: /opt/data/search_data/hibernate_lucene
+  kafka-uri: 127.0.0.1:9092
+  kafka-topic: NginxLog
+  geoip-path: /home/reghao/Downloads/qqwry.dat
+  geojson-path: /home/reghao/Downloads/china1.json