Browse Source

更新 search-service 中 es 相关的操作和对 NginxLog 的处理

reghao 4 months ago
parent
commit
34fb46bb82
16 changed files with 503 additions and 410 deletions
  1. 41 0
      search/search-service/src/main/java/cn/reghao/tnb/search/app/controller/NginxLogController.java
  2. 1 1
      search/search-service/src/main/java/cn/reghao/tnb/search/app/es/DocumentService.java
  3. 0 1
      search/search-service/src/main/java/cn/reghao/tnb/search/app/es/IndexService.java
  4. 2 4
      search/search-service/src/main/java/cn/reghao/tnb/search/app/es/MappingService.java
  5. 178 85
      search/search-service/src/main/java/cn/reghao/tnb/search/app/es/SearchService.java
  6. 1 1
      search/search-service/src/main/java/cn/reghao/tnb/search/app/log/NginxLogDocument.java
  7. 1 1
      search/search-service/src/main/java/cn/reghao/tnb/search/app/log/NginxLogSearch.java
  8. 128 165
      search/search-service/src/main/java/cn/reghao/tnb/search/app/log/NginxLogService.java
  9. 15 2
      search/search-service/src/main/java/cn/reghao/tnb/search/app/log/consumer/RabbitListeners.java
  10. 0 33
      search/search-service/src/main/java/cn/reghao/tnb/search/app/log/model/NginxLog.java
  11. 19 0
      search/search-service/src/main/java/cn/reghao/tnb/search/app/log/model/vo/DateCount.java
  12. 0 13
      search/search-service/src/main/java/cn/reghao/tnb/search/app/ws/Keys.java
  13. 4 3
      search/search-service/src/main/java/cn/reghao/tnb/search/app/ws/config/WebSocketConfig.java
  14. 7 6
      search/search-service/src/main/java/cn/reghao/tnb/search/app/ws/config/WebSocketInterceptor.java
  15. 13 0
      search/search-service/src/main/java/cn/reghao/tnb/search/app/ws/config/WebSocketPath.java
  16. 93 95
      search/search-service/src/main/java/cn/reghao/tnb/search/app/ws/handler/LogHandler.java

+ 41 - 0
search/search-service/src/main/java/cn/reghao/tnb/search/app/controller/NginxLogController.java

@@ -0,0 +1,41 @@
+package cn.reghao.tnb.search.app.controller;
+
+import cn.reghao.tnb.common.web.WebResult;
+import cn.reghao.tnb.search.app.log.NginxLogService;
+import cn.reghao.tnb.search.app.log.model.vo.DateCount;
+import io.swagger.v3.oas.annotations.Operation;
+import org.springframework.http.MediaType;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.util.List;
+
+/**
+ * @author reghao
+ * @date 2025-12-25 14:09:51
+ */
+@RestController
+@RequestMapping("/api/search1/nginx")
+public class NginxLogController {
+    private final NginxLogService nginxLogService;
+
+    public NginxLogController(NginxLogService nginxLogService) {
+        this.nginxLogService = nginxLogService;
+    }
+
+    @Operation(summary = "", description = "N")
+    @GetMapping(value = "/log", produces = MediaType.APPLICATION_JSON_VALUE)
+    public String nginxLog() {
+        List<DateCount> list = nginxLogService.getNginxLogs();
+        return WebResult.success(list);
+    }
+
+    @Operation(summary = "", description = "N")
+    @GetMapping(value = "/log/chart", produces = MediaType.APPLICATION_JSON_VALUE)
+    public String getLogChart(@RequestParam(value = "dateStr") String dateStr) {
+        List<Object> chartData = nginxLogService.getChartData(dateStr);
+        return WebResult.success(chartData);
+    }
+}

+ 1 - 1
search/search-service/src/main/java/cn/reghao/tnb/search/app/es/DocumentService.java

@@ -1,6 +1,6 @@
 package cn.reghao.tnb.search.app.es;
 package cn.reghao.tnb.search.app.es;
 
 
-import cn.reghao.tnb.search.app.log.model.NginxLog;
+import cn.reghao.jutil.jdk.web.log.NginxLog;
 import cn.reghao.tnb.search.app.model.po.Wenshu;
 import cn.reghao.tnb.search.app.model.po.Wenshu;
 import cn.reghao.jutil.jdk.string.SnowFlake;
 import cn.reghao.jutil.jdk.string.SnowFlake;
 import co.elastic.clients.elasticsearch.ElasticsearchAsyncClient;
 import co.elastic.clients.elasticsearch.ElasticsearchAsyncClient;

+ 0 - 1
search/search-service/src/main/java/cn/reghao/tnb/search/app/es/IndexService.java

@@ -99,7 +99,6 @@ public class IndexService {
 
 
         DeleteIndexResponse response = esClient.indices().delete(i -> i.index(indexName));
         DeleteIndexResponse response = esClient.indices().delete(i -> i.index(indexName));
         if (!response.acknowledged()) {
         if (!response.acknowledged()) {
-
         }
         }
     }
     }
 
 

+ 2 - 4
search/search-service/src/main/java/cn/reghao/tnb/search/app/es/MappingService.java

@@ -1,6 +1,5 @@
 package cn.reghao.tnb.search.app.es;
 package cn.reghao.tnb.search.app.es;
 
 
-import cn.reghao.tnb.search.app.model.po.VideoText;
 import co.elastic.clients.elasticsearch._types.mapping.*;
 import co.elastic.clients.elasticsearch._types.mapping.*;
 import org.springframework.stereotype.Service;
 import org.springframework.stereotype.Service;
 
 
@@ -50,7 +49,7 @@ public class MappingService {
         return propertyMap;
         return propertyMap;
     }
     }
 
 
-    public Map<String, Property> getPropertyMapByWenshu() {
+    public Map<String, Property> getPropertyMapWithWenshu() {
         Map<String, Property> propertyMap = new HashMap<>();
         Map<String, Property> propertyMap = new HashMap<>();
         propertyMap.put("id", keywordProp);
         propertyMap.put("id", keywordProp);
         propertyMap.put("originalUrl", keywordProp);
         propertyMap.put("originalUrl", keywordProp);
@@ -70,8 +69,7 @@ public class MappingService {
         return propertyMap;
         return propertyMap;
     }
     }
 
 
-    public Map<String, Property> getVideoTextPropertyMap() {
-        String className = VideoText.class.getSimpleName();
+    public Map<String, Property> getPropertyMapWithVideoText(Class<?> clazz) {
         Map<String, Property> propertyMap = new HashMap<>();
         Map<String, Property> propertyMap = new HashMap<>();
         propertyMap.put("id", keywordProp);
         propertyMap.put("id", keywordProp);
         propertyMap.put("title", textPropIk);
         propertyMap.put("title", textPropIk);

+ 178 - 85
search/search-service/src/main/java/cn/reghao/tnb/search/app/es/SearchService.java

@@ -1,6 +1,7 @@
 package cn.reghao.tnb.search.app.es;
 package cn.reghao.tnb.search.app.es;
 
 
-import cn.reghao.tnb.search.app.log.model.NginxLog;
+import cn.reghao.jutil.jdk.converter.DateTimeConverter;
+import cn.reghao.jutil.jdk.web.log.NginxLog;
 import cn.reghao.tnb.search.app.model.po.VideoText;
 import cn.reghao.tnb.search.app.model.po.VideoText;
 import co.elastic.clients.elasticsearch.ElasticsearchClient;
 import co.elastic.clients.elasticsearch.ElasticsearchClient;
 import co.elastic.clients.elasticsearch._types.FieldValue;
 import co.elastic.clients.elasticsearch._types.FieldValue;
@@ -17,6 +18,7 @@ import co.elastic.clients.elasticsearch.core.search.HitsMetadata;
 import co.elastic.clients.elasticsearch.core.search.TotalHits;
 import co.elastic.clients.elasticsearch.core.search.TotalHits;
 import co.elastic.clients.elasticsearch.core.search.TotalHitsRelation;
 import co.elastic.clients.elasticsearch.core.search.TotalHitsRelation;
 import co.elastic.clients.json.JsonData;
 import co.elastic.clients.json.JsonData;
+import co.elastic.clients.util.ObjectBuilder;
 import lombok.extern.slf4j.Slf4j;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Service;
 import org.springframework.stereotype.Service;
 
 
@@ -24,6 +26,8 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
+import java.util.TreeMap;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Collectors;
 
 
 /**
 /**
@@ -39,15 +43,19 @@ public class SearchService {
         this.esClient = elasticService.getElasticsearchClient();
         this.esClient = elasticService.getElasticsearchClient();
     }
     }
 
 
-    public void searchOne(String indexName, String searchText) throws IOException {
+    public void search(String indexName, String fieldValue) throws IOException {
+        String fieldName = "name";
         SearchResponse<NginxLog> searchResponse = esClient.search(s -> s
         SearchResponse<NginxLog> searchResponse = esClient.search(s -> s
                 .index(indexName)
                 .index(indexName)
+                /*.sort(s1 -> s1.field(f -> f.field(fieldName).order(SortOrder.Asc)))
+                .scroll(s2 -> s2.offset(0))*/
+                .from(0)
+                .size(100)
                 // 搜索请求的查询部分(搜索请求也可以有其他组件,如聚合)
                 // 搜索请求的查询部分(搜索请求也可以有其他组件,如聚合)
                 .query(q -> q
                 .query(q -> q
                         // 在众多可用的查询变体中选择一个。我们在这里选择匹配查询(全文搜索)
                         // 在众多可用的查询变体中选择一个。我们在这里选择匹配查询(全文搜索)
-                        .match(t -> t
-                                .field("name")
-                                .query(searchText))), NginxLog.class);
+                        .match(t -> t.field(fieldName).query(fieldValue))), NginxLog.class);
+
         TotalHits total = searchResponse.hits().total();
         TotalHits total = searchResponse.hits().total();
         boolean isExactResult = total != null && total.relation() == TotalHitsRelation.Eq;
         boolean isExactResult = total != null && total.relation() == TotalHitsRelation.Eq;
         if (isExactResult) {
         if (isExactResult) {
@@ -62,31 +70,43 @@ public class SearchService {
         }
         }
     }
     }
 
 
-    public List<NginxLog> searchByPage(String indexName, int pn, String searchField, String searchText) throws IOException {
-        int ps = 100;
+    /**
+     * 分页查询
+     *
+     * @param
+     * @return
+     * @date 2025-12-25 15:54:34
+     */
+    public List<NginxLog> searchByPage(String indexName, int pageSize, int pageNumber,
+                                       String fieldName, String fieldValue) throws IOException {
         String sortField = "timeIso8601";
         String sortField = "timeIso8601";
         Query query = RangeQuery.of(r -> r.field("age").gte(JsonData.of(8)))._toQuery();
         Query query = RangeQuery.of(r -> r.field("age").gte(JsonData.of(8)))._toQuery();
         Query query11 = getOrQuery();
         Query query11 = getOrQuery();
         Query query1 = getAndQuery();
         Query query1 = getAndQuery();
-        Query query12 = getTermQuery();
+        // 等值查询
+        Query termQuery = getTermQuery(fieldName, fieldValue);
 
 
-        int start = (pn-1)*ps;
+        int start = (pageNumber-1)*pageSize;
         SearchRequest searchRequest = SearchRequest.of(s -> s
         SearchRequest searchRequest = SearchRequest.of(s -> s
                 .index(indexName)
                 .index(indexName)
-                .query(query1)
+                .query(termQuery)
                 .from(start)
                 .from(start)
-                .size(ps)
+                .size(pageSize)
                 // 按 id 字段降序排列
                 // 按 id 字段降序排列
                 .sort(f -> f.field(o -> o.field(sortField).order(SortOrder.Desc)))
                 .sort(f -> f.field(o -> o.field(sortField).order(SortOrder.Desc)))
         );
         );
 
 
         SearchResponse<NginxLog> searchResponse = esClient.search(searchRequest, NginxLog.class);
         SearchResponse<NginxLog> searchResponse = esClient.search(searchRequest, NginxLog.class);
-        /*List<Hit<NginxLog>> hits = searchResponse.hits().hits();
-        for (Hit<NginxLog> hit : hits) {
-            NginxLog product = hit.source();
-            log.info("search page result: {}", product);
-        }*/
-        return searchResponse.hits().hits().stream().map(Hit::source).collect(Collectors.toList());
+        HitsMetadata<NginxLog> hitsMetadata = searchResponse.hits();
+        //得到总数
+        TotalHits totalHits = hitsMetadata.total();
+        long total = totalHits.value();
+        Double maxScore = hitsMetadata.maxScore();
+        //拿到匹配的数据
+        List<Hit<NginxLog>> hits = hitsMetadata.hits();
+        //拿到_source中的数据
+        List<NginxLog> nginxLogs = hits.stream().map(Hit::source).collect(Collectors.toList());
+        return nginxLogs;
     }
     }
 
 
     public void searchAll(String indexName) throws IOException {
     public void searchAll(String indexName) throws IOException {
@@ -142,48 +162,10 @@ public class SearchService {
         System.out.println();
         System.out.println();
     }
     }
 
 
-    public void searchByField(String indexName, String fieldName, String fieldValue) throws IOException {
-        SearchRequest request = SearchRequest.of(s -> s
-                .index(indexName)
-                .query(query -> query.match(match -> match.field(fieldName).query(fieldValue)))
-        );
-
-        SearchResponse<NginxLog> searchResponse = esClient.search(request, NginxLog.class);
-        log.info("search result: {}", searchResponse);
-    }
-
-    public void search(String index) throws IOException {
-        String fieldName = "url";
-        String fieldValue = "/api";
-        // 构造查询条件
-        SearchRequest searchRequest = SearchRequest.of(s -> s.index(index)
-                /*.sort(s1 -> s1.field(f -> f.field(fieldName).order(SortOrder.Asc)))
-                .scroll(s2 -> s2.offset(0))*/
-                .from(0)
-                .size(500)
-                .query(q -> q.match(m -> m.field(fieldName).query(FieldValue.of(fieldValue))))
-        );
-
-        SearchResponse<NginxLog> searchResponse = esClient.search(searchRequest, NginxLog.class);
-        HitsMetadata<NginxLog> hitsMetadata = searchResponse.hits();
-        //得到总数
-        TotalHits totalHits = hitsMetadata.total();
-        Double maxScore = hitsMetadata.maxScore();
-        //拿到匹配的数据
-        List<Hit<NginxLog>> hits = hitsMetadata.hits();
-        //拿到_source中的数据
-        List<NginxLog> nginxLogs = hits.stream().map(Hit::source).collect(Collectors.toList());
-
-        //最大分数
-        System.out.println(searchResponse.maxScore());
-        //分片数
-        System.out.println(searchResponse.shards());
-        //是否超时
-        System.out.println(searchResponse.timedOut());
-    }
-
     public void search() throws IOException {
     public void search() throws IOException {
-        Query query = getExistsQuery();
+        // 判断字段是否存在
+        String fieldName = "host";
+        Query query = getExistsQuery(fieldName);
 
 
         String index = "nginx_log";
         String index = "nginx_log";
         SearchRequest searchRequest = new SearchRequest.Builder()
         SearchRequest searchRequest = new SearchRequest.Builder()
@@ -192,10 +174,11 @@ public class SearchService {
                 .build();
                 .build();
         SearchResponse<NginxLog> searchResponse = esClient.search(searchRequest, NginxLog.class);
         SearchResponse<NginxLog> searchResponse = esClient.search(searchRequest, NginxLog.class);
         TotalHits totalHits = searchResponse.hits().total();
         TotalHits totalHits = searchResponse.hits().total();
+        long total = totalHits.value();
         List<NginxLog> list = searchResponse.hits().hits().stream().map(Hit::source).collect(Collectors.toList());
         List<NginxLog> list = searchResponse.hits().hits().stream().map(Hit::source).collect(Collectors.toList());
     }
     }
 
 
-    public void count(String index) throws IOException {
+    public long count(String index) throws IOException {
         Query query1 = getMatchQuery();
         Query query1 = getMatchQuery();
         // remoteAddr 字段为空
         // remoteAddr 字段为空
         Query query2 = BoolQuery.of(b ->
         Query query2 = BoolQuery.of(b ->
@@ -214,7 +197,7 @@ public class SearchService {
         CountResponse countResponse = esClient.count(countRequest);
         CountResponse countResponse = esClient.count(countRequest);
         long total = countResponse.count();
         long total = countResponse.count();
         ShardStatistics shardStatistics = countResponse.shards();
         ShardStatistics shardStatistics = countResponse.shards();
-        System.out.println("total -> " + total);
+        return total;
     }
     }
 
 
     /**
     /**
@@ -224,15 +207,13 @@ public class SearchService {
      * @return
      * @return
      * @date 2025-03-12 13:45:17
      * @date 2025-03-12 13:45:17
      */
      */
-    public Query getTermQuery() {
+    public Query getTermQuery(String fieldName, String fieldValue) {
         String fieldName1 = "url";
         String fieldName1 = "url";
         String fieldValue1 = "/datareceive/ReceiveData/SendContentResult\"";
         String fieldValue1 = "/datareceive/ReceiveData/SendContentResult\"";
-
         String fieldName2 = "requestMethod";
         String fieldName2 = "requestMethod";
         String fieldValue2 = "POST";
         String fieldValue2 = "POST";
-
         Query query = TermQuery.of(t -> t
         Query query = TermQuery.of(t -> t
-                .field(fieldName1).value(FieldValue.of(fieldValue1))
+                .field(fieldName).value(FieldValue.of(fieldValue))
         )._toQuery();
         )._toQuery();
         return query;
         return query;
     }
     }
@@ -322,9 +303,9 @@ public class SearchService {
      * @return
      * @return
      * @date 2025-03-12 14:20:26
      * @date 2025-03-12 14:20:26
      */
      */
-    public Query getExistsQuery() {
+    public Query getExistsQuery(String fieldName) {
         // 判断字段是否存在
         // 判断字段是否存在
-        Query query = ExistsQuery.of(t -> t.field("host"))._toQuery();
+        Query query = ExistsQuery.of(t -> t.field(fieldName))._toQuery();
         return query;
         return query;
     }
     }
 
 
@@ -356,14 +337,16 @@ public class SearchService {
     }
     }
 
 
     public Query getQuery() {
     public Query getQuery() {
-        MatchAllQuery.of(m -> m.queryName("host"))._toQuery();
+        Query query1 = MatchAllQuery.of(m -> m.queryName("host"))._toQuery();
+
         // 一般情况下有一个单词错误的情况下,fuzzy 查询可以找到另一个近似的词来代替,主要有以下场景:
         // 一般情况下有一个单词错误的情况下,fuzzy 查询可以找到另一个近似的词来代替,主要有以下场景:
         //
         //
         //修改一个单词,如:box--->fox。
         //修改一个单词,如:box--->fox。
         //移除一个单词,如:black-->lack。
         //移除一个单词,如:black-->lack。
         //插入一个单词,如:sic-->sick。
         //插入一个单词,如:sic-->sick。
         //转换两个单词顺序,如:act-->cat。
         //转换两个单词顺序,如:act-->cat。
-        FuzzyQuery.of(f -> f.field("host").value("lonel"));
+        Query query2 = FuzzyQuery.of(f -> f.field("host").value("lonel"))._toQuery();
+
         // 通过指定字段的前缀进行查询
         // 通过指定字段的前缀进行查询
         Query query = PrefixQuery.of(p -> p.field("host").value("api"))._toQuery();
         Query query = PrefixQuery.of(p -> p.field("host").value("api"))._toQuery();
         return query;
         return query;
@@ -376,21 +359,37 @@ public class SearchService {
      * @return
      * @return
      * @date 2025-03-12 15:06:48
      * @date 2025-03-12 15:06:48
      */
      */
-    public void aggregate(String index) throws Exception {
+    public void aggregate(String index, String aggregateField) throws Exception {
         String aggField1 = "status";
         String aggField1 = "status";
         String aggField2 = "remoteAddr";
         String aggField2 = "remoteAddr";
         String aggField3 = "httpUserAgent.raw";
         String aggField3 = "httpUserAgent.raw";
         String aggField4 = "url.raw";
         String aggField4 = "url.raw";
         String aggField5 = "timeIso8601";
         String aggField5 = "timeIso8601";
 
 
-        String fieldName1 = "url.raw";
-        String fieldValue1 = "/base/Device/PageList";
+        String fieldName = "url.raw";
+        String fieldValue = "/datareceive/ReceiveData/SendContentResult";
 
 
         SearchRequest searchRequest = new SearchRequest.Builder()
         SearchRequest searchRequest = new SearchRequest.Builder()
                 .index(index)
                 .index(index)
-                .query(q -> q.bool(b -> b.must(m -> m.term(t -> t.field(fieldName1).value(FieldValue.of(fieldValue1))))))
-                .size(10000)
-                .aggregations("first_agg", a->a.terms(t->t.field(aggField5).size(65535)))
+                .size(0)
+                .query(q -> q.bool(b -> b.must(m -> m.term(t -> t.field(fieldName).value(FieldValue.of(fieldValue))))))
+                //.aggregations("first_agg", a->a.terms(t->t.field(aggregateField).size(65535)))
+                /*.aggregations("agg1", a -> a.dateRange(t -> t.field(aggField5).format("yyyy-MM-dd HH:mm:ss")
+                        .ranges(r -> r.from(new Function<>() {
+                            @Override
+                            public ObjectBuilder<FieldDateMath> apply(FieldDateMath.Builder builder) {
+                                return builder.expr("2023-11-20 00:00:00");
+                            }
+                        }).to(new Function<>() {
+                            @Override
+                            public ObjectBuilder<FieldDateMath> apply(FieldDateMath.Builder builder) {
+                                return builder.expr("2023-11-20 23:59:59");
+                            }
+                        }))))*/
+                //
+                //.size(0)
+                // 按天进行聚合
+                .aggregations("agg1", a->a.dateHistogram(t->t.field(aggregateField).calendarInterval(CalendarInterval.Day).minDocCount(0)))
                 //.aggregations("agg1", a->a.histogram(t->t.field("httpUserAgent").interval(50.0)))
                 //.aggregations("agg1", a->a.histogram(t->t.field("httpUserAgent").interval(50.0)))
                 //.aggregations("agg1", a->a.sum(t->t.field("httpUserAgent")))
                 //.aggregations("agg1", a->a.sum(t->t.field("httpUserAgent")))
                 //.aggregations("second_agg", a->a.avg(t->t.field("status")))
                 //.aggregations("second_agg", a->a.avg(t->t.field("status")))
@@ -398,37 +397,131 @@ public class SearchService {
 
 
         SearchResponse<NginxLog> searchResponse = esClient.search(searchRequest, NginxLog.class);
         SearchResponse<NginxLog> searchResponse = esClient.search(searchRequest, NginxLog.class);
         TotalHits totalHits = searchResponse.hits().total();
         TotalHits totalHits = searchResponse.hits().total();
+        long total = totalHits.value();
         Map<String, Aggregate> resultMap = searchResponse.aggregations();
         Map<String, Aggregate> resultMap = searchResponse.aggregations();
 
 
-        List<Long> countList = new ArrayList<>();
+        Map<Long, Long> map = new TreeMap<>();
         resultMap.forEach((k, v) -> {
         resultMap.forEach((k, v) -> {
             Object value = v._get();
             Object value = v._get();
-            if (value instanceof StringTermsAggregate) {
-                StringTermsAggregate terms = (StringTermsAggregate) value;
+            if (value instanceof StringTermsAggregate terms) {
                 List<StringTermsBucket> list = terms.buckets().array();
                 List<StringTermsBucket> list = terms.buckets().array();
                 list.forEach(bucket -> {
                 list.forEach(bucket -> {
                     String groupKey = (String) bucket.key()._get();
                     String groupKey = (String) bucket.key()._get();
                     long count = bucket.docCount();
                     long count = bucket.docCount();
-                    countList.add(count);
                     //System.out.println(groupKey + " : " + IpTool.getLocation(groupKey) + " -> " + count);
                     //System.out.println(groupKey + " : " + IpTool.getLocation(groupKey) + " -> " + count);
-                    System.out.println(groupKey + " : " + count);
+                    System.out.printf("%s -> %s\n", groupKey, count);
                 });
                 });
                 System.out.println("bucket size = " + list.size());
                 System.out.println("bucket size = " + list.size());
-            } else if (value instanceof LongTermsAggregate) {
-                LongTermsAggregate terms = (LongTermsAggregate) value;
+            } else if (value instanceof LongTermsAggregate terms) {
                 List<LongTermsBucket> list = terms.buckets().array();
                 List<LongTermsBucket> list = terms.buckets().array();
                 list.forEach(bucket -> {
                 list.forEach(bucket -> {
                     String groupKey = bucket.key();
                     String groupKey = bucket.key();
-                    String groupKeyStr = bucket.keyAsString();
+                    Long secondTimestamp = Long.parseLong(groupKey)/1000;
+                    //String groupKeyStr = bucket.keyAsString();
+                    String dateTimeStr = DateTimeConverter.format(Long.parseLong(groupKey));
+                    long count = bucket.docCount();
+                    map.put(secondTimestamp, count);
+                    System.out.printf("%s -> %s\n", dateTimeStr, count);
+                });
+            } else if (value instanceof DateRangeAggregate aggregate) {
+                List<RangeBucket> list = aggregate.buckets().array();
+                list.forEach(rangeBucket -> {
+                    String key = rangeBucket.key();
+                    System.out.println(rangeBucket);
+                });
+            } else if (value instanceof DateHistogramAggregate aggregate) {
+                List<DateHistogramBucket> list = aggregate.buckets().array();
+                list.forEach(rangeBucket -> {
+                    long msTimestamp = Long.parseLong(rangeBucket.key());
+                    String dateStr = DateTimeConverter.localDateTime(msTimestamp).toLocalDate().toString();
+                    long count = rangeBucket.docCount();
+                    System.out.printf("%s -> %s\n", dateStr, count);
+                });
+            } else {
+                System.out.println(value);
+            }
+        });
+    }
+
+    /**
+     * 根据天进行聚合
+     *
+     * @param
+     * @return 日期 -> 每日访问量
+     * @date 2025-12-25 17:23:04
+     */
+    public Map<String, Long> aggregateByDay(String index, String dateField) throws Exception {
+        SearchRequest searchRequest = new SearchRequest.Builder()
+                .index(index)
+                .size(0)
+                // 按天进行聚合
+                .aggregations("agg1", a->a.dateHistogram(t->t.field(dateField).calendarInterval(CalendarInterval.Day).minDocCount(0)))
+                .build();
+
+        SearchResponse<NginxLog> searchResponse = esClient.search(searchRequest, NginxLog.class);
+        TotalHits totalHits = searchResponse.hits().total();
+        long total = totalHits.value();
+        Map<String, Aggregate> resultMap = searchResponse.aggregations();
+
+        Map<String, Long> map = new TreeMap<>();
+        resultMap.forEach((k, v) -> {
+            Object value = v._get();
+            if (value instanceof DateHistogramAggregate aggregate) {
+                List<DateHistogramBucket> list = aggregate.buckets().array();
+                list.forEach(rangeBucket -> {
+                    long msTimestamp = Long.parseLong(rangeBucket.key());
+                    String dateStr = DateTimeConverter.localDateTime(msTimestamp).toLocalDate().toString();
+                    long count = rangeBucket.docCount();
+                    map.put(dateStr, count);
+                });
+            } else {
+                System.out.println("unknown aggregate type: " + value.getClass().getName());
+            }
+        });
+
+        return map;
+    }
+
+    /**
+     * 在指定日期里对 url 进行聚合
+     *
+     * @param
+     * @return url -> 每日访问量
+     * @date 2025-12-25 17:31:56
+     */
+    public Map<String, Long> aggregateByUrl(String index, String urlField,
+                                            String dateField, String dateValue) throws Exception {
+        String start = String.format("%sT00:00:00Z", dateValue);
+        String end = String.format("%sT23:59:59Z", dateValue);
+        SearchRequest searchRequest = new SearchRequest.Builder()
+                .index(index)
+                .size(0)
+                .query(q -> q.range(r -> r.field(dateField).gte(JsonData.of(start)).lte(JsonData.of(end))))
+                .aggregations("first_agg", a->a.terms(t->t.field(urlField).size(65535)))
+                .build();
+
+        SearchResponse<NginxLog> searchResponse = esClient.search(searchRequest, NginxLog.class);
+        TotalHits totalHits = searchResponse.hits().total();
+        long total = totalHits.value();
+        Map<String, Aggregate> resultMap = searchResponse.aggregations();
+
+        Map<String, Long> map = new TreeMap<>();
+        resultMap.forEach((k, v) -> {
+            Object value = v._get();
+            if (value instanceof StringTermsAggregate terms) {
+                List<StringTermsBucket> list = terms.buckets().array();
+                list.forEach(bucket -> {
+                    String groupKey = (String) bucket.key()._get();
                     long count = bucket.docCount();
                     long count = bucket.docCount();
-                    countList.add(count);
-                    System.out.println(groupKeyStr + " : " + count);
+                    //System.out.println(groupKey + " : " + IpTool.getLocation(groupKey) + " -> " + count);
+                    // System.out.printf("%s -> %s\n", groupKey, count);
+                    map.put(groupKey, count);
                 });
                 });
             } else {
             } else {
                 System.out.println(value);
                 System.out.println(value);
             }
             }
         });
         });
 
 
-        System.out.println("total = " + countList.stream().mapToLong(Long::longValue).sum());
+        return map;
     }
     }
 }
 }

+ 1 - 1
search/search-service/src/main/java/cn/reghao/tnb/search/app/log/NginxLogDocument.java

@@ -1,8 +1,8 @@
 package cn.reghao.tnb.search.app.log;
 package cn.reghao.tnb.search.app.log;
 
 
 import cn.reghao.jutil.jdk.string.SnowFlake;
 import cn.reghao.jutil.jdk.string.SnowFlake;
+import cn.reghao.jutil.jdk.web.log.NginxLog;
 import cn.reghao.tnb.search.app.es.ElasticService;
 import cn.reghao.tnb.search.app.es.ElasticService;
-import cn.reghao.tnb.search.app.log.model.NginxLog;
 import co.elastic.clients.elasticsearch.ElasticsearchAsyncClient;
 import co.elastic.clients.elasticsearch.ElasticsearchAsyncClient;
 import co.elastic.clients.elasticsearch.ElasticsearchClient;
 import co.elastic.clients.elasticsearch.ElasticsearchClient;
 import co.elastic.clients.elasticsearch._types.Result;
 import co.elastic.clients.elasticsearch._types.Result;

+ 1 - 1
search/search-service/src/main/java/cn/reghao/tnb/search/app/log/NginxLogSearch.java

@@ -1,7 +1,7 @@
 package cn.reghao.tnb.search.app.log;
 package cn.reghao.tnb.search.app.log;
 
 
 import cn.reghao.tnb.search.app.es.ElasticService;
 import cn.reghao.tnb.search.app.es.ElasticService;
-import cn.reghao.tnb.search.app.log.model.NginxLog;
+import cn.reghao.jutil.jdk.web.log.NginxLog;
 import co.elastic.clients.elasticsearch.ElasticsearchClient;
 import co.elastic.clients.elasticsearch.ElasticsearchClient;
 import co.elastic.clients.elasticsearch._types.FieldValue;
 import co.elastic.clients.elasticsearch._types.FieldValue;
 import co.elastic.clients.elasticsearch._types.ShardStatistics;
 import co.elastic.clients.elasticsearch._types.ShardStatistics;

+ 128 - 165
search/search-service/src/main/java/cn/reghao/tnb/search/app/log/NginxLogService.java

@@ -1,19 +1,13 @@
 package cn.reghao.tnb.search.app.log;
 package cn.reghao.tnb.search.app.log;
 
 
-import cn.reghao.tnb.search.app.config.ElasticProperties;
-import cn.reghao.tnb.search.app.es.*;
-import cn.reghao.tnb.search.app.log.model.NginxLog;
+import cn.reghao.jutil.jdk.web.log.NginxLog;
 import cn.reghao.jutil.jdk.converter.DateTimeConverter;
 import cn.reghao.jutil.jdk.converter.DateTimeConverter;
-import cn.reghao.jutil.jdk.serializer.JsonConverter;
-import cn.reghao.jutil.jdk.io.TextFile;
-import co.elastic.clients.elasticsearch._types.mapping.Property;
+import cn.reghao.tnb.search.app.log.model.vo.DateCount;
 import lombok.extern.slf4j.Slf4j;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Service;
 import org.springframework.stereotype.Service;
 
 
-import java.text.ParseException;
-import java.time.LocalDateTime;
-import java.time.ZoneId;
-import java.time.ZoneOffset;
+import java.io.IOException;
+import java.time.*;
 import java.time.format.DateTimeFormatter;
 import java.time.format.DateTimeFormatter;
 import java.util.*;
 import java.util.*;
 
 
@@ -24,108 +18,53 @@ import java.util.*;
 @Slf4j
 @Slf4j
 @Service
 @Service
 public class NginxLogService {
 public class NginxLogService {
-    public List getChartData() throws ParseException {
-        TextFile textFile = new TextFile();
-        String filePath = "nginx.log";
-        List<String> list = textFile.read(filePath);
-
-        List<NginxLog> nginxLogs = new ArrayList<>();
-        for (String line : list) {
-            if (!line.startsWith("{")) {
-                continue;
-            }
+    private String indexName = "nginx_log";
+    private final NginxLogDocument nginxLogDocument;
 
 
-            try {
-                NginxLog nginxLog = JsonConverter.jsonToObject(line, NginxLog.class);
-                nginxLogs.add(nginxLog);
-            } catch (Exception e) {
-                // e.printStackTrace();
-            }
+    public NginxLogService(NginxLogDocument nginxLogDocument) {
+        this.nginxLogDocument = nginxLogDocument;
+    }
+
+    public void processNginxLog(NginxLog nginxLog) {
+        saveNginxLog(nginxLog);
+        //processNginxLog0(nginxLog);
+    }
+
+    private void saveNginxLog(NginxLog nginxLog) {
+        try {
+            nginxLogDocument.addDocument(indexName, nginxLog);
+        } catch (IOException e) {
+            e.printStackTrace();
         }
         }
+    }
 
 
-        Map<Long, Integer> map = new TreeMap<>();
-        for (NginxLog nginxLog : nginxLogs) {
+    String matchedMethod = "GET";
+    String matchedUrl = "/";
+    Map<String, Map<Long, Integer>> map = new HashMap<>();
+    Map<Long, Integer> ngxLogMap = new TreeMap<>();
+    DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss+08:00").withZone(ZoneId.of("UTC"));
+    DateTimeFormatter formatter1 = DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneId.of("UTC"));
+    private void processNginxLog0(NginxLog nginxLog) {
+        String todayStr = LocalDateTime.now().toLocalDate().format(formatter1);
+        String method = nginxLog.getRequestMethod();
+        String url = nginxLog.getUrl();
+        if (method.equals(matchedMethod) && url.startsWith(matchedUrl)) {
             String date = nginxLog.getTimeIso8601();
             String date = nginxLog.getTimeIso8601();
-            DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss+08:00")
-                    .withZone(ZoneId.of("UTC"));
             LocalDateTime localDateTime = LocalDateTime.parse(date, formatter);
             LocalDateTime localDateTime = LocalDateTime.parse(date, formatter);
-            Long timestamp = localDateTime.toEpochSecond(ZoneOffset.of("+8"));
-            Long key = timestamp;
+            LocalDate localDate = localDateTime.toLocalDate();
+            String localDateStr = localDate.format(formatter1);
+            Map<Long, Integer> ngxLogMap = map.computeIfAbsent(localDateStr, k -> new TreeMap<>());
 
 
-            Integer count = map.get(key);
+            LocalTime localTime = localDateTime.toLocalTime();
+            Long timestampSecond = LocalDateTime.parse(date, formatter).toEpochSecond(ZoneOffset.of("+8"));
+            Integer count = ngxLogMap.get(timestampSecond);
             if (count == null) {
             if (count == null) {
-                map.put(key, 1);
+                ngxLogMap.put(timestampSecond, 1);
             } else {
             } else {
-                int count1 = map.get(key) + 1;
-                map.put(key, count1);
-            }
-        }
-
-        LocalDateTime localDateTime = LocalDateTime.now();
-        Long baseKey = localDateTime.toEpochSecond(ZoneOffset.of("+8"));
-
-        List<String> xList = new ArrayList<>();
-        List<Integer> yList = new ArrayList<>();
-        Set<Long> keys = new HashSet<>();
-        for (Long key : map.keySet()) {
-            if (key < baseKey) {
-                //xList.add(DateTimeConverter.format(key*1000).split(" ")[1]);
-                xList.add(DateTimeConverter.format(key*1000));
-                yList.add(map.get(key));
-                keys.add(key);
+                int count1 = ngxLogMap.get(timestampSecond) + 1;
+                ngxLogMap.put(timestampSecond, count1);
             }
             }
         }
         }
-
-        keys.forEach(map::remove);
-        keys.clear();
-        List results = new ArrayList();
-        results.add(xList.toArray());
-        results.add(yList.toArray());
-        return results;
-    }
-
-    public void logTest() {
-        TextFile textFile = new TextFile();
-        String filePath = "nginx.log";
-        List<String> list = textFile.read(filePath);
-        List<NginxLog> nginxLogs = new ArrayList<>();
-        for (String line : list) {
-            if (!line.startsWith("{")) {
-                System.out.println("not json data");
-                continue;
-            }
-
-            try {
-                NginxLog nginxLog = JsonConverter.jsonToObject(line, NginxLog.class);
-                nginxLogs.add(nginxLog);
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-        }
-
-        nginxLogs.stream()
-                .filter(nginxLog -> nginxLog.getStatus() >= 400)
-                .forEach(nginxLog -> {
-                    System.out.printf("%s %s -> %s\n", nginxLog.getRequestMethod(), nginxLog.getStatus(), nginxLog.getUpstreamAddr());
-                });
-        System.out.println();
-    }
-
-    Map<Long, Integer> ngxLogMap = new TreeMap<>();
-    void handleNginxLog(NginxLog nginxLog) {
-        String date = nginxLog.getTimeIso8601();
-        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss+08:00")
-                .withZone(ZoneId.of("UTC"));
-        LocalDateTime localDateTime = LocalDateTime.parse(date, formatter);
-        Long timestamp = localDateTime.toEpochSecond(ZoneOffset.of("+8"));
-        Long key = timestamp;
-        Integer count = ngxLogMap.get(key);
-        if (count == null) {
-            ngxLogMap.put(key, 1);
-        } else {
-            int count1 = ngxLogMap.get(key) + 1;
-            ngxLogMap.put(key, count1);
-        }
     }
     }
 
 
     /**
     /**
@@ -139,82 +78,106 @@ public class NginxLogService {
         @Override
         @Override
         public void run() {
         public void run() {
             while (!Thread.interrupted()) {
             while (!Thread.interrupted()) {
+                Set<String> dateSet = map.keySet();
                 try {
                 try {
                     if (ngxLogMap.size() < 3) {
                     if (ngxLogMap.size() < 3) {
                         Thread.sleep(10_000);
                         Thread.sleep(10_000);
                         continue;
                         continue;
                     }
                     }
 
 
-                    LocalDateTime localDateTime = LocalDateTime.now();
-                    Long baseKey = localDateTime.toEpochSecond(ZoneOffset.of("+8"));
-
-                    List<String> xList = new ArrayList<>();
-                    List<Integer> yList = new ArrayList<>();
-                    Set<Long> keys = new HashSet<>();
-                    for (Long key : ngxLogMap.keySet()) {
-                        if (key < baseKey) {
-                            xList.add(DateTimeConverter.format(key*1000).split(" ")[1]);
-                            yList.add(ngxLogMap.get(key));
-                            keys.add(key);
+                    List<Object> chartData = getChartData();
+                    /*TextMessage textMessage = new TextMessage(JsonConverter.objectToJson(chartData));
+                    pullSessionMap.values().forEach(webSocketSession -> {
+                        try {
+                            webSocketSession.sendMessage(textMessage);
+                        } catch (IOException e) {
+                            e.printStackTrace();
                         }
                         }
-                    }
-
-                    keys.forEach(ngxLogMap::remove);
-                    keys.clear();
-                    List results = new ArrayList();
-                    results.add(xList.toArray());
-                    results.add(yList.toArray());
-
-                    /*WebSocketSession webSocketSession = getPullSession("admin-service", "172.16.90.200");
-                    TextMessage textMessage = new TextMessage(JsonConverter.objectToJson(results));
-                    webSocketSession.sendMessage(textMessage);*/
-                    Thread.sleep(10_000);
+                    });*/
+                    Thread.sleep(3_000);
                 } catch (Exception e) {
                 } catch (Exception e) {
                     e.printStackTrace();
                     e.printStackTrace();
                 }
                 }
             }
             }
         }
         }
+
+        /**
+         * x 轴显示时分秒
+         * y 轴显示访问次数
+         *
+         * @param
+         * @return
+         * @date 2025-12-24 17:48:10
+         */
+        private List<Object> getChartData() {
+            Long timestampSecondCurrent = LocalDateTime.now().toEpochSecond(ZoneOffset.of("+8"));
+            List<String> xList = new ArrayList<>();
+            List<Integer> yList = new ArrayList<>();
+            Set<Long> keys = new HashSet<>();
+            for (Long timestampSecond : ngxLogMap.keySet()) {
+                if (timestampSecond < timestampSecondCurrent) {
+                    // x 轴显示时分秒
+                    xList.add(DateTimeConverter.format(timestampSecond*1000).split(" ")[1]);
+                    // y 轴显示访问次数
+                    yList.add(ngxLogMap.get(timestampSecond));
+                    keys.add(timestampSecond);
+                }
+            }
+
+            keys.forEach(ngxLogMap::remove);
+            keys.clear();
+            List<Object> results = new ArrayList<>();
+            results.add(xList.toArray());
+            results.add(yList.toArray());
+            return results;
+        }
+    }
+
+    private List<Object> getChartData(Map<Long, Integer> timeMap) {
+        Long timestampSecondCurrent = LocalDateTime.now().toEpochSecond(ZoneOffset.of("+8"));
+        List<String> xList = new ArrayList<>();
+        List<Integer> yList = new ArrayList<>();
+        Set<Long> keys = new HashSet<>();
+        for (Long timestampSecond : timeMap.keySet()) {
+            if (timestampSecond < timestampSecondCurrent) {
+                // x 轴显示时分秒
+                xList.add(DateTimeConverter.format(timestampSecond*1000).split(" ")[1]);
+                // y 轴显示访问次数
+                yList.add(timeMap.get(timestampSecond));
+                keys.add(timestampSecond);
+            }
+        }
+
+        keys.forEach(timeMap::remove);
+        keys.clear();
+        List<Object> results = new ArrayList<>();
+        results.add(xList.toArray());
+        results.add(yList.toArray());
+        return results;
+    }
+
+    public List<DateCount> getDateCountList() {
+        Map<String, Integer> result = new TreeMap<>();
+        List<DateCount> list = new LinkedList<>();
+        for (Map.Entry<String, Map<Long, Integer>> entry : map.entrySet()) {
+            String dateStr = entry.getKey();
+            Integer count = entry.getValue().size();
+            list.add(new DateCount(dateStr, count));
+            result.put(dateStr, count);
+        }
+        return list;
+    }
+
+    public List<Object> getChartData1(String dateStr) {
+        Map<Long, Integer> timeMap = map.get(dateStr);
+        return getChartData(timeMap);
+    }
+
+    public List<DateCount> getNginxLogs() {
+        return getDateCountList();
     }
     }
 
 
-    public static void main(String[] args) throws Exception {
-        ElasticProperties elasticProperties = new ElasticProperties();
-        ElasticService elasticService = new ElasticService(elasticProperties);
-        IndexService indexService = new IndexService(elasticService);
-        MappingService mappingService = new MappingService();
-
-        NginxLogDocument nginxLogDocument = new NginxLogDocument(elasticService);
-        NginxLogSearch nginxLogSearch = new NginxLogSearch(elasticService);
-        QueryService<NginxLog> queryService = new QueryService<>(elasticService);
-        String index = "nginx_log";
-
-        //indexService.getIndex(index);
-        //indexService.getMapping(index);
-
-        //documentService.deleteAllDocument(index);
-        Map<String, Property> propertyMap = mappingService.getPropertyMapWithNginxLog(NginxLog.class);
-        //indexService.deleteIndex(index);
-        //indexService.createIndex(index, propertyMap);
-
-        //searchService.search(index);
-//        search1(esClient, "app_log");
-//        index = "app_log";
-//        deleteAll(index);
-//        searchService.aggregate(index);
-
-        /*int pn = 1;
-        while (pn < 100) {
-            List<NginxLog> list = searchService.searchByPage(index, pn, "", "");
-            System.out.println();
-            pn++;
-        }*/
-
-        //searchService.searchAll(index);
-//        indexService.updateMapping(index);
-
-        //String queryString = "content";
-        //List<NginxLog> list = queryService.queryWithHighlight(index, queryString, pn, ps, NginxLog.class);
-        //searchService.searchAll(index);
-        //nginxLogSearch.aggregate();
-        //searchService.count(index);
+    public List<Object> getChartData(String dateStr) {
+        return getChartData1(dateStr);
     }
     }
 }
 }

+ 15 - 2
search/search-service/src/main/java/cn/reghao/tnb/search/app/log/consumer/RabbitListeners.java

@@ -3,6 +3,7 @@ package cn.reghao.tnb.search.app.log.consumer;
 import cn.reghao.jutil.jdk.web.log.AppLog;;
 import cn.reghao.jutil.jdk.web.log.AppLog;;
 import cn.reghao.jutil.jdk.web.log.GatewayLog;
 import cn.reghao.jutil.jdk.web.log.GatewayLog;
 import cn.reghao.jutil.jdk.serializer.JsonConverter;
 import cn.reghao.jutil.jdk.serializer.JsonConverter;
+import cn.reghao.jutil.jdk.web.log.NginxLog;
 import cn.reghao.tnb.search.app.log.db.AccessLogMongo;
 import cn.reghao.tnb.search.app.log.db.AccessLogMongo;
 import cn.reghao.tnb.search.app.log.model.po.AccessLog;
 import cn.reghao.tnb.search.app.log.model.po.AccessLog;
 import cn.reghao.tnb.search.app.ws.handler.LogHandler;
 import cn.reghao.tnb.search.app.ws.handler.LogHandler;
@@ -29,12 +30,24 @@ public class RabbitListeners {
         this.accessLogMongo = accessLogMongo;
         this.accessLogMongo = accessLogMongo;
     }
     }
 
 
+    @RabbitListener(bindings =@QueueBinding(
+            value = @Queue(value = "tnb.log.nginx", durable = "true"),
+            key = "tnb.log.nginx",
+            exchange = @Exchange(value = "amq.direct"))
+    )
+    public void nginxLogConsumer(@Payload String msg) {
+        try {
+            NginxLog nginxLog = JsonConverter.jsonToObject(msg, NginxLog.class);
+        } catch (Exception e) {
+        }
+    }
+
     @RabbitListener(bindings =@QueueBinding(
     @RabbitListener(bindings =@QueueBinding(
             value = @Queue(value = "tnb.log.gateway", durable = "true"),
             value = @Queue(value = "tnb.log.gateway", durable = "true"),
             key = "tnb.log.gateway",
             key = "tnb.log.gateway",
             exchange = @Exchange(value = "amq.direct"))
             exchange = @Exchange(value = "amq.direct"))
     )
     )
-    public void accessLogConsumer(@Payload String msg) {
+    public void gatewayLogConsumer(@Payload String msg) {
         try {
         try {
             GatewayLog gatewayLog = JsonConverter.jsonToObject(msg, GatewayLog.class);
             GatewayLog gatewayLog = JsonConverter.jsonToObject(msg, GatewayLog.class);
             logHandler.pushGatewayLog(gatewayLog);
             logHandler.pushGatewayLog(gatewayLog);
@@ -51,7 +64,7 @@ public class RabbitListeners {
             key = "tnb.log.app",
             key = "tnb.log.app",
             exchange = @Exchange(value = "amq.direct"))
             exchange = @Exchange(value = "amq.direct"))
     )
     )
-    public void runtimeLogConsumer(@Payload String msg) {
+    public void appLogConsumer(@Payload String msg) {
         AppLog appLog = JsonConverter.jsonToObject(msg, AppLog.class);
         AppLog appLog = JsonConverter.jsonToObject(msg, AppLog.class);
         logHandler.pushAppLog(appLog);
         logHandler.pushAppLog(appLog);
         log.info("{}:{} -> {}", appLog.getApp(), appLog.getHost(), appLog.getMessage());
         log.info("{}:{} -> {}", appLog.getApp(), appLog.getHost(), appLog.getMessage());

+ 0 - 33
search/search-service/src/main/java/cn/reghao/tnb/search/app/log/model/NginxLog.java

@@ -1,33 +0,0 @@
-package cn.reghao.tnb.search.app.log.model;
-
-import com.google.gson.annotations.SerializedName;
-import lombok.Getter;
-import lombok.Setter;
-
-import java.io.Serializable;
-
-/**
- * @author reghao
- * @date 2023-11-07 14:58:07
- */
-@Setter
-@Getter
-public class NginxLog implements Serializable {
-    private static final long serialVersionUID = 1L;
-
-    private String id;
-    @SerializedName("time_iso8601") private String timeIso8601;
-    @SerializedName("remote_addr") private String remoteAddr;
-    private String request;
-    private Integer status;
-    @SerializedName("request_method") private String requestMethod;
-    @SerializedName("body_bytes_sent") private Integer bodyBytesSent;
-    @SerializedName("request_time") private Double requestTime;
-    @SerializedName("upstream_response_time") private String upstreamResponseTime;
-    @SerializedName("upstream_addr") private String upstreamAddr;
-    private String host;
-    private String url;
-    @SerializedName("http_x_forwarded_for") private String httpXForwardedFor;
-    @SerializedName("http_referer") private String httpReferer;
-    @SerializedName("http_user_agent") private String httpUserAgent;
-}

+ 19 - 0
search/search-service/src/main/java/cn/reghao/tnb/search/app/log/model/vo/DateCount.java

@@ -0,0 +1,19 @@
+package cn.reghao.tnb.search.app.log.model.vo;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+/**
+ * @author reghao
+ * @date 2025-12-25 11:28:13
+ */
+@NoArgsConstructor
+@AllArgsConstructor
+@Setter
+@Getter
+public class DateCount {
+    private String date;
+    private int total;
+}

+ 0 - 13
search/search-service/src/main/java/cn/reghao/tnb/search/app/ws/Keys.java

@@ -1,13 +0,0 @@
-package cn.reghao.tnb.search.app.ws;
-
-/**
- * @author reghao
- * @date 2021-07-07 13:45:22
- */
-public class Keys {
-    public static final String USER_UUID = "user-uuid";
-    // SSH 连接操作
-    public static final String OPS_CONNECT = "connect";
-    // SSH 命令操作
-    public static final String OPS_COMMAND = "command";
-}

+ 4 - 3
search/search-service/src/main/java/cn/reghao/tnb/search/app/ws/WebSocketConfig.java → search/search-service/src/main/java/cn/reghao/tnb/search/app/ws/config/WebSocketConfig.java

@@ -1,4 +1,4 @@
-package cn.reghao.tnb.search.app.ws;
+package cn.reghao.tnb.search.app.ws.config;
 
 
 import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.context.annotation.Configuration;
@@ -25,10 +25,11 @@ public class WebSocketConfig implements WebSocketConfigurer {
 
 
     @Override
     @Override
     public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
     public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
-        registry.addHandler(logHandler, "/logws/pull/access")
+        registry.addHandler(logHandler, WebSocketPath.wsPullLog)
                 .addInterceptors(webSocketInterceptor)
                 .addInterceptors(webSocketInterceptor)
                 .setAllowedOrigins("*");
                 .setAllowedOrigins("*");
-        registry.addHandler(logHandler, "/logws/pull/runtime")
+
+        registry.addHandler(logHandler, WebSocketPath.wsPushLog)
                 .addInterceptors(webSocketInterceptor)
                 .addInterceptors(webSocketInterceptor)
                 .setAllowedOrigins("*");
                 .setAllowedOrigins("*");
     }
     }

+ 7 - 6
search/search-service/src/main/java/cn/reghao/tnb/search/app/ws/WebSocketInterceptor.java → search/search-service/src/main/java/cn/reghao/tnb/search/app/ws/config/WebSocketInterceptor.java

@@ -1,4 +1,4 @@
-package cn.reghao.tnb.search.app.ws;
+package cn.reghao.tnb.search.app.ws.config;
 
 
 import org.springframework.http.server.ServerHttpRequest;
 import org.springframework.http.server.ServerHttpRequest;
 import org.springframework.http.server.ServerHttpResponse;
 import org.springframework.http.server.ServerHttpResponse;
@@ -19,14 +19,15 @@ public class WebSocketInterceptor implements HandshakeInterceptor {
     @Override
     @Override
     public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,
     public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,
                                    WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
                                    WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
+        // TODO 使用经过 HTTP 认证的用户信息
         if (request instanceof ServletServerHttpRequest) {
         if (request instanceof ServletServerHttpRequest) {
             ServletServerHttpRequest httpRequest = (ServletServerHttpRequest) request;
             ServletServerHttpRequest httpRequest = (ServletServerHttpRequest) request;
             String path = httpRequest.getURI().getPath();
             String path = httpRequest.getURI().getPath();
-            if (path.startsWith("/ws")) {
+            if (path.equals(WebSocketPath.wsAgent)) {
                 String query = httpRequest.getURI().getQuery();
                 String query = httpRequest.getURI().getQuery();
-                String token = query.replace("token=", "");
-                if (!token.isBlank()) {
-                    attributes.put(Keys.USER_UUID, token);
+                String machineId = query.replace("token=", "");
+                if (!machineId.isBlank()) {
+                    attributes.put("MACHINE_ID", machineId);
                     return true;
                     return true;
                 }
                 }
             } else {
             } else {
@@ -34,7 +35,7 @@ public class WebSocketInterceptor implements HandshakeInterceptor {
             }
             }
         }
         }
 
 
-        return true;
+        return false;
     }
     }
 
 
     @Override
     @Override

+ 13 - 0
search/search-service/src/main/java/cn/reghao/tnb/search/app/ws/config/WebSocketPath.java

@@ -0,0 +1,13 @@
+package cn.reghao.tnb.search.app.ws.config;
+
+/**
+ * @author reghao
+ * @date 2025-09-26 09:30:11
+ */
+public class WebSocketPath {
+    public static String wsAgent = "/bgws/agent";
+    public static String wsSsh = "/bgws/ssh";
+    public static String wsPullLog = "/logws/log/pull";
+    public static String wsPushLog = "/logws/log/push";
+    public static String wsFrontend = "/bgws/frontend";
+}

+ 93 - 95
search/search-service/src/main/java/cn/reghao/tnb/search/app/ws/handler/LogHandler.java

@@ -1,10 +1,14 @@
 package cn.reghao.tnb.search.app.ws.handler;
 package cn.reghao.tnb.search.app.ws.handler;
 
 
-import cn.reghao.jutil.jdk.web.log.AppLog;;
-import cn.reghao.jutil.jdk.web.log.GatewayLog;
 import cn.reghao.jutil.jdk.converter.DateTimeConverter;
 import cn.reghao.jutil.jdk.converter.DateTimeConverter;
+import cn.reghao.jutil.jdk.web.log.AppLog;
 import cn.reghao.jutil.jdk.serializer.JdkSerializer;
 import cn.reghao.jutil.jdk.serializer.JdkSerializer;
 import cn.reghao.jutil.jdk.serializer.JsonConverter;
 import cn.reghao.jutil.jdk.serializer.JsonConverter;
+import cn.reghao.jutil.jdk.web.log.GatewayLog;
+import cn.reghao.jutil.jdk.web.log.NginxLog;
+import cn.reghao.tnb.search.app.log.NginxLogService;
+import cn.reghao.tnb.search.app.ws.WsSender;
+import cn.reghao.tnb.search.app.ws.config.WebSocketPath;
 import lombok.extern.slf4j.Slf4j;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Component;
 import org.springframework.stereotype.Component;
 import org.springframework.web.socket.*;
 import org.springframework.web.socket.*;
@@ -12,7 +16,6 @@ import org.springframework.web.socket.*;
 import java.io.IOException;
 import java.io.IOException;
 import java.util.*;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Collectors;
 
 
 /**
 /**
  * @author reghao
  * @author reghao
@@ -24,54 +27,72 @@ public class LogHandler implements WebSocketHandler {
     private final Map<String, String> appMap = new HashMap<>();
     private final Map<String, String> appMap = new HashMap<>();
     private final Map<String, WebSocketSession> pullRuntimeMap = new ConcurrentHashMap<>();
     private final Map<String, WebSocketSession> pullRuntimeMap = new ConcurrentHashMap<>();
     private final Map<String, WebSocketSession> pullAccessMap = new ConcurrentHashMap<>();
     private final Map<String, WebSocketSession> pullAccessMap = new ConcurrentHashMap<>();
-
-    public List<String> getApps() {
-        return new ArrayList<>(appMap.values());
+    private final Map<String, WebSocketSession> pushSessionMap = new ConcurrentHashMap<>();
+    private final Map<String, WebSocketSession> pullSessionMap = new ConcurrentHashMap<>();
+    private WsSender wsSender;
+    private final NginxLogService nginxLogService;
+
+    public LogHandler(WsSender wsSender, NginxLogService nginxLogService) {
+        this.wsSender = wsSender;
+        this.nginxLogService = nginxLogService;
     }
     }
 
 
-    public List<String> getApp(String app) {
-        return appMap.values().stream()
-                .filter(appKey -> appKey.startsWith(app))
-                .collect(Collectors.toList());
+    private WebSocketSession getPullSession(String app, String host) {
+        String suffix = String.format("%s@%s", app, host);
+        Set<String> keys = pullSessionMap.keySet();
+        for (String key : keys) {
+            if (key.endsWith(suffix)) {
+                return pullSessionMap.get(key);
+            }
+        }
+
+        return null;
     }
     }
 
 
-    @Override
-    public void afterConnectionEstablished(WebSocketSession webSocketSession) throws IOException {
-        String sessionId = webSocketSession.getId();
-        String query = webSocketSession.getUri().getQuery();
-        Map<String, String> map = parseParams(query);
-        String app = map.get("app");
-        String host = map.get("host");
-        if (app == null || host == null) {
-            if (webSocketSession.isOpen()) {
-                webSocketSession.close();
+    private WebSocketSession getPullRuntimeSession(String app, String host) {
+        String suffix = String.format("%s@%s", app, host);
+        Set<String> keys = pullRuntimeMap.keySet();
+        for (String key : keys) {
+            if (key.endsWith(suffix)) {
+                return pullRuntimeMap.get(key);
             }
             }
         }
         }
 
 
-        String appKey = String.format("%s@%s", app, host);
-        String path = webSocketSession.getUri().getPath();
-        if (path.equals("/logws/pull/runtime")) {
-            appMap.put(sessionId, appKey);
-            pullRuntimeMap.put(appKey, webSocketSession);
-        } else if (path.equals("/logws/pull/access")) {
-            pullAccessMap.put(appKey, webSocketSession);
+        return null;
+    }
+
+    private WebSocketSession getPullAccessSession(String app, String host) {
+        String suffix = String.format("%s@%s", app, host);
+        Set<String> keys = pullAccessMap.keySet();
+        for (String key : keys) {
+            if (key.endsWith(suffix)) {
+                return pullAccessMap.get(key);
+            }
         }
         }
 
 
-        log.info("WebSocket 建立连接");
+        return null;
     }
     }
 
 
-    private void removeSession(WebSocketSession webSocketSession) {
-        String sessionId = webSocketSession.getId();
-        String appKey = appMap.get(sessionId);
-        if (appKey == null) {
-            return;
+    private Map<String, String> parseParams(String query) {
+        String[] params = query.split("&");
+        Map<String, String> map = new HashMap<>();
+        for (String param : params) {
+            String[] arr = param.split("=");
+            map.put(arr[0], arr[1]);
         }
         }
+        return map;
+    }
 
 
-        String path = webSocketSession.getUri().getPath();
-        if (path.equals("/logws/pull/runtime")) {
-            pullRuntimeMap.remove(appKey);
-        } else if (path.equals("/logws/pull/access")) {
-            pullAccessMap.remove(appKey);
+    public void pushGatewayLog(GatewayLog gatewayLog) {
+        WebSocketSession pullSession = getPullAccessSession("tnb", "127.0.0.1");
+        if (pullSession != null) {
+            String jsonData = JsonConverter.objectToJson(gatewayLog);
+            WebSocketMessage<String> message1 = new TextMessage(jsonData);
+            try {
+                pullSession.sendMessage(message1);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
         }
         }
     }
     }
 
 
@@ -90,31 +111,31 @@ public class LogHandler implements WebSocketHandler {
         }
         }
     }
     }
 
 
-    public void pushGatewayLog(GatewayLog gatewayLog) {
-        WebSocketSession pullSession = getPullAccessSession("tnb", "127.0.0.1");
-        if (pullSession != null) {
-            String jsonData = JsonConverter.objectToJson(gatewayLog);
-            WebSocketMessage<String> message1 = new TextMessage(jsonData);
-            try {
-                pullSession.sendMessage(message1);
-            } catch (Exception e) {
-                e.printStackTrace();
+    @Override
+    public void afterConnectionEstablished(WebSocketSession webSocketSession) throws IOException {
+        String sessionId = webSocketSession.getId();
+        String query = webSocketSession.getUri().getQuery();
+        Map<String, String> map = parseParams(query);
+        String app = map.get("app");
+        String host = map.get("host");
+        if (app == null || host == null) {
+            if (webSocketSession.isOpen()) {
+                webSocketSession.close();
             }
             }
         }
         }
-    }
 
 
-    private Map<String, String> parseParams(String query) {
-        String[] params = query.split("&");
-        Map<String, String> map = new HashMap<>();
-        for (String param : params) {
-            String[] arr = param.split("=");
-            map.put(arr[0], arr[1]);
+        String appKey = String.format("%s@%s", app, host);
+        String path = webSocketSession.getUri().getPath();
+        if (path.equals(WebSocketPath.wsPushLog)) {
+            appMap.put(sessionId, appKey);
+            pushSessionMap.put(appKey, webSocketSession);
+        } else if (path.equals(WebSocketPath.wsPullLog)) {
+            pullSessionMap.put(appKey, webSocketSession);
         }
         }
 
 
-        return map;
+        log.info("WebSocket 建立连接");
     }
     }
 
 
-    Map<Long, Integer> ngxLogMap = new TreeMap<>();
     @Override
     @Override
     public void handleMessage(WebSocketSession webSocketSession, WebSocketMessage<?> webSocketMessage)
     public void handleMessage(WebSocketSession webSocketSession, WebSocketMessage<?> webSocketMessage)
             throws IOException {
             throws IOException {
@@ -123,32 +144,18 @@ public class LogHandler implements WebSocketHandler {
             if (webSocketMessage instanceof TextMessage) {
             if (webSocketMessage instanceof TextMessage) {
                 String payload = (String) webSocketMessage.getPayload();
                 String payload = (String) webSocketMessage.getPayload();
             } else if (webSocketMessage instanceof BinaryMessage) {
             } else if (webSocketMessage instanceof BinaryMessage) {
+                //log.info("接收到 WebSocket 二进制消息");
                 BinaryMessage binaryMessage = (BinaryMessage) webSocketMessage;
                 BinaryMessage binaryMessage = (BinaryMessage) webSocketMessage;
                 Object object = JdkSerializer.deserialize(binaryMessage.getPayload().array());
                 Object object = JdkSerializer.deserialize(binaryMessage.getPayload().array());
                 if (object instanceof String) {
                 if (object instanceof String) {
-                    //log.info("{} -> {}", appKey, object);
+                    log.info("{} -> {}", appKey, object);
                 } else if (object instanceof AppLog) {
                 } else if (object instanceof AppLog) {
                     AppLog appLog = (AppLog) object;
                     AppLog appLog = (AppLog) object;
                     String dateTimeStr = DateTimeConverter.format(appLog.getTimestamp());
                     String dateTimeStr = DateTimeConverter.format(appLog.getTimestamp());
-                    //appLog.setDateTimeStr(dateTimeStr);
-                    //pushAppLog(appLog);
-                    //runtimeLogService.addAppLog(appLog);
-                } /*else if (object instanceof NginxLog) {
+                } else if (object instanceof NginxLog) {
                     NginxLog nginxLog = (NginxLog) object;
                     NginxLog nginxLog = (NginxLog) object;
-                    String date = nginxLog.getTimeIso8601();
-                    DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss+08:00")
-                            .withZone(ZoneId.of("UTC"));
-                    LocalDateTime localDateTime = LocalDateTime.parse(date, formatter);
-                    Long timestamp = localDateTime.toEpochSecond(ZoneOffset.of("+8"));
-                    Long key = timestamp;
-                    Integer count = ngxLogMap.get(key);
-                    if (count == null) {
-                        ngxLogMap.put(key, 1);
-                    } else {
-                        int count1 = ngxLogMap.get(key) + 1;
-                        ngxLogMap.put(key, count1);
-                    }
-                }*/
+                    nginxLogService.processNginxLog(nginxLog);
+                }
             } else if (webSocketMessage instanceof PingMessage) {
             } else if (webSocketMessage instanceof PingMessage) {
                 log.info("接收到 WebSocket PingMessage");
                 log.info("接收到 WebSocket PingMessage");
             } else if (webSocketMessage instanceof PongMessage) {
             } else if (webSocketMessage instanceof PongMessage) {
@@ -163,7 +170,7 @@ public class LogHandler implements WebSocketHandler {
 
 
     @Override
     @Override
     public void handleTransportError(WebSocketSession webSocketSession, Throwable throwable) {
     public void handleTransportError(WebSocketSession webSocketSession, Throwable throwable) {
-        log.error("WebSocket 数据传输错误");
+        log.error("WebSocket 数据传输错误: {}", throwable.getMessage());
         removeSession(webSocketSession);
         removeSession(webSocketSession);
     }
     }
 
 
@@ -173,28 +180,19 @@ public class LogHandler implements WebSocketHandler {
         removeSession(webSocketSession);
         removeSession(webSocketSession);
     }
     }
 
 
-    private WebSocketSession getPullRuntimeSession(String app, String host) {
-        String suffix = String.format("%s@%s", app, host);
-        Set<String> keys = pullRuntimeMap.keySet();
-        for (String key : keys) {
-            if (key.endsWith(suffix)) {
-                return pullRuntimeMap.get(key);
-            }
+    private void removeSession(WebSocketSession webSocketSession) {
+        String sessionId = webSocketSession.getId();
+        String appKey = appMap.get(sessionId);
+        if (appKey == null) {
+            return;
         }
         }
 
 
-        return null;
-    }
-
-    private WebSocketSession getPullAccessSession(String app, String host) {
-        String suffix = String.format("%s@%s", app, host);
-        Set<String> keys = pullAccessMap.keySet();
-        for (String key : keys) {
-            if (key.endsWith(suffix)) {
-                return pullAccessMap.get(key);
-            }
+        String path = webSocketSession.getUri().getPath();
+        if (path.equals("/ws/log/push")) {
+            pushSessionMap.remove(appKey);
+        } else if (path.equals("/ws/log/pull")) {
+            pullSessionMap.remove(appKey);
         }
         }
-
-        return null;
     }
     }
 
 
     @Override
     @Override