Forráskód Böngészése

日志消费端放到 search-service 中, 因为要将日志存到 es 中, 不想在 bnt 中添加 es 依赖

reghao 7 hónapja
szülő
commit
3854d85906
17 módosított fájl, 680 hozzáadás és 17 törlés
  1. 11 0
      search/search-service/pom.xml
  2. 1 1
      search/search-service/src/main/java/cn/reghao/tnb/search/app/controller/SearchController.java
  3. 1 1
      search/search-service/src/main/java/cn/reghao/tnb/search/app/es/DocumentService.java
  4. 1 1
      search/search-service/src/main/java/cn/reghao/tnb/search/app/es/EsSearch.java
  5. 1 1
      search/search-service/src/main/java/cn/reghao/tnb/search/app/es/MappingService.java
  6. 1 1
      search/search-service/src/main/java/cn/reghao/tnb/search/app/es/SearchService.java
  7. 114 0
      search/search-service/src/main/java/cn/reghao/tnb/search/app/log/NginxLogDocumentService.java
  8. 394 0
      search/search-service/src/main/java/cn/reghao/tnb/search/app/log/NginxLogSearchService.java
  9. 99 6
      search/search-service/src/main/java/cn/reghao/tnb/search/app/log/NginxLogService.java
  10. 40 0
      search/search-service/src/main/java/cn/reghao/tnb/search/app/log/consumer/RabbitListeners.java
  11. 1 1
      search/search-service/src/main/java/cn/reghao/tnb/search/app/log/ip/IPLocation.java
  12. 1 1
      search/search-service/src/main/java/cn/reghao/tnb/search/app/log/ip/IpTool.java
  13. 1 1
      search/search-service/src/main/java/cn/reghao/tnb/search/app/log/ip/Location.java
  14. 1 1
      search/search-service/src/main/java/cn/reghao/tnb/search/app/log/model/NginxLog.java
  15. 6 0
      search/search-service/src/main/resources/application-dev.yml
  16. 6 0
      search/search-service/src/main/resources/application-test.yml
  17. 1 2
      search/search-service/src/test/java/NginxLogTest.java

+ 11 - 0
search/search-service/pom.xml

@@ -18,6 +18,12 @@
     </properties>
 
     <dependencies>
+        <dependency>
+            <groupId>cn.reghao.bnt</groupId>
+            <artifactId>log</artifactId>
+            <version>1.0.0-SNAPSHOT</version>
+        </dependency>
+
         <dependency>
             <groupId>cn.reghao.tnb.search</groupId>
             <artifactId>search-api</artifactId>
@@ -54,6 +60,11 @@
             <artifactId>spring-boot-starter-data-jpa</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-amqp</artifactId>
+        </dependency>
+
         <!-- lucene -->
         <dependency>
             <groupId>org.hibernate.search</groupId>

+ 1 - 1
search/search-service/src/main/java/cn/reghao/tnb/search/app/controller/SearchController.java

@@ -1,6 +1,6 @@
 package cn.reghao.tnb.search.app.controller;
 
-import cn.reghao.tnb.search.app.EsSearch;
+import cn.reghao.tnb.search.app.es.EsSearch;
 import cn.reghao.tnb.search.app.model.po.Wenshu;
 import org.springframework.data.domain.Page;
 import org.springframework.data.domain.PageRequest;

+ 1 - 1
search/search-service/src/main/java/cn/reghao/tnb/search/app/es/DocumentService.java

@@ -1,6 +1,6 @@
 package cn.reghao.tnb.search.app.es;
 
-import cn.reghao.tnb.search.app.model.po.NginxLog;
+import cn.reghao.tnb.search.app.log.model.NginxLog;
 import cn.reghao.tnb.search.app.model.po.VideoText;
 import cn.reghao.tnb.search.app.model.po.Wenshu;
 import cn.reghao.jutil.tool.id.SnowFlake;

+ 1 - 1
search/search-service/src/main/java/cn/reghao/tnb/search/app/EsSearch.java → search/search-service/src/main/java/cn/reghao/tnb/search/app/es/EsSearch.java

@@ -1,4 +1,4 @@
-package cn.reghao.tnb.search.app;
+package cn.reghao.tnb.search.app.es;
 
 import cn.reghao.tnb.search.app.es.ElasticService;
 import cn.reghao.tnb.search.app.es.QueryService;

+ 1 - 1
search/search-service/src/main/java/cn/reghao/tnb/search/app/es/MappingService.java

@@ -23,7 +23,7 @@ public class MappingService {
     Property textPropIk = Property.of(builder -> builder.text(TextProperty.of(pro -> pro.index(true).analyzer("ik_max_word"))));
     Property textKeywordProp = new Property(new TextProperty.Builder().index(true).fields("raw", keywordProp).store(true).build());
 
-    public Map<String, Property> getPropertyMap(Class<?> clazz) {
+    public Map<String, Property> getPropertyMapWithNginxLog(Class<?> clazz) {
         Field[] fields = clazz.getDeclaredFields();
         for (Field field : fields) {
             String name = field.getName();

+ 1 - 1
search/search-service/src/main/java/cn/reghao/tnb/search/app/es/SearchService.java

@@ -1,6 +1,6 @@
 package cn.reghao.tnb.search.app.es;
 
-import cn.reghao.tnb.search.app.model.po.NginxLog;
+import cn.reghao.tnb.search.app.log.model.NginxLog;
 import cn.reghao.tnb.search.app.model.po.VideoText;
 import co.elastic.clients.elasticsearch.ElasticsearchClient;
 import co.elastic.clients.elasticsearch._types.FieldValue;

+ 114 - 0
search/search-service/src/main/java/cn/reghao/tnb/search/app/log/NginxLogDocumentService.java

@@ -0,0 +1,114 @@
+package cn.reghao.tnb.search.app.log;
+
+import cn.reghao.jutil.tool.id.SnowFlake;
+import cn.reghao.tnb.search.app.es.ElasticService;
+import cn.reghao.tnb.search.app.log.model.NginxLog;
+import co.elastic.clients.elasticsearch.ElasticsearchAsyncClient;
+import co.elastic.clients.elasticsearch.ElasticsearchClient;
+import co.elastic.clients.elasticsearch._types.Result;
+import co.elastic.clients.elasticsearch._types.query_dsl.MatchAllQuery;
+import co.elastic.clients.elasticsearch.core.*;
+import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author reghao
+ * @date 2025-03-12 10:45:58
+ */
+@Slf4j
+@Service
+public class NginxLogDocumentService {
+    private final ElasticsearchClient esClient;
+    private final SnowFlake idGenerator;
+
+    public NginxLogDocumentService(ElasticService elasticService) {
+        this.esClient = elasticService.getElasticsearchClient();
+        this.idGenerator = new SnowFlake(1, 1);
+    }
+
+    public void addDocument(String indexName, NginxLog product) throws IOException {
+        IndexResponse indexResponse = esClient.index(i -> i.index(indexName).id(product.getId()).document(product));
+        log.info("add one document result: {}", indexResponse.result().jsonValue());
+    }
+
+    public void batchAddDocument(String indexName, List<NginxLog> nginxLogs) throws IOException {
+        List<BulkOperation> bulkOperations = new ArrayList<>();
+        nginxLogs.forEach(p -> bulkOperations.add(BulkOperation.of(b -> b.index(c -> c.id(p.getId()).document(p)))));
+        BulkResponse bulkResponse = esClient.bulk(s -> s.index(indexName).operations(bulkOperations));
+        //bulkResponse.items().forEach(b -> log.info("bulk response result = {}", b.result()));
+        //log.error("bulk response.error() = {}", bulkResponse.errors());
+    }
+
+    public void saveAll(String index, List<NginxLog> nginxLogs) throws IOException {
+        for (NginxLog nginxLog : nginxLogs) {
+            IndexResponse response = esClient.index(i -> i.index(index).document(nginxLog).id("" + idGenerator.nextId()));
+            Result result = response.result();
+            System.out.println(result.jsonValue());
+        }
+    }
+
+    public void updateDocument(String indexName, NginxLog product) throws IOException {
+        UpdateResponse<NginxLog> updateResponse = esClient.update(s -> s.index(indexName).id(product.getId()).doc(product), NginxLog.class);
+        log.info("update doc result: {}", updateResponse.result());
+    }
+
+    public void update(String index, NginxLog nginxLog) throws IOException {
+        String id = nginxLog.getId();
+        IndexResponse response = esClient.index(i -> i.index(index).document(nginxLog).id(id));
+        Result result = response.result();
+        System.out.println(result.jsonValue());
+    }
+
+    public void deleteDocument(String indexName, String id) {
+        try {
+            DeleteResponse deleteResponse = esClient.delete(s -> s.index(indexName).id(id));
+            log.info("del doc result: {}", deleteResponse.result());
+        } catch (IOException e) {
+            log.error("del doc failed, error: ", e);
+        }
+    }
+
+    /**
+     * 删除索引下的所有文档
+     *
+     * @param
+     * @return
+     * @date 2025-03-13 16:26:20
+     */
+    public void deleteAllDocument(String indexName) {
+        try {
+            DeleteByQueryRequest deleteByQueryRequest = DeleteByQueryRequest.of(s -> s.index(indexName)
+                    .query(m -> m.matchAll(new MatchAllQuery.Builder().build()))
+            );
+            DeleteByQueryResponse deleteByQueryResponse = esClient.deleteByQuery(deleteByQueryRequest);
+            log.info("del doc result: {}", deleteByQueryResponse.total());
+        } catch (IOException e) {
+            log.error("del doc failed, error: ", e);
+        }
+    }
+
+    public void batchDeleteDocument(String indexName, List<String> ids) {
+        List<BulkOperation> bulkOperations = new ArrayList<>();
+        ids.forEach(a -> bulkOperations.add(BulkOperation.of(b -> b.delete(c -> c.id(a)))));
+        try {
+            BulkResponse bulkResponse = esClient.bulk(a -> a.index(indexName).operations(bulkOperations));
+            bulkResponse.items().forEach(a -> log.info("batch del result: {}", a.result()));
+            log.error("batch del bulk resp errors: {}", bulkResponse.errors());
+        } catch (IOException e) {
+            log.error("batch del doc failed, error: ", e);
+        }
+    }
+
+    public void delete(ElasticsearchAsyncClient client, String index) throws IOException {
+        int id = 1;
+        client.delete(i -> i.index(index).id("" + id)).whenComplete((success, failure)->{
+            System.out.println(success.index());
+            System.out.println(success.version());
+        });
+    }
+}

+ 394 - 0
search/search-service/src/main/java/cn/reghao/tnb/search/app/log/NginxLogSearchService.java

@@ -0,0 +1,394 @@
+package cn.reghao.tnb.search.app.log;
+
+import cn.reghao.tnb.search.app.es.ElasticService;
+import cn.reghao.tnb.search.app.log.model.NginxLog;
+import co.elastic.clients.elasticsearch.ElasticsearchClient;
+import co.elastic.clients.elasticsearch._types.FieldValue;
+import co.elastic.clients.elasticsearch._types.ShardStatistics;
+import co.elastic.clients.elasticsearch._types.SortOrder;
+import co.elastic.clients.elasticsearch._types.aggregations.*;
+import co.elastic.clients.elasticsearch._types.query_dsl.*;
+import co.elastic.clients.elasticsearch.core.CountRequest;
+import co.elastic.clients.elasticsearch.core.CountResponse;
+import co.elastic.clients.elasticsearch.core.SearchRequest;
+import co.elastic.clients.elasticsearch.core.SearchResponse;
+import co.elastic.clients.elasticsearch.core.search.Hit;
+import co.elastic.clients.elasticsearch.core.search.HitsMetadata;
+import co.elastic.clients.elasticsearch.core.search.TotalHits;
+import co.elastic.clients.elasticsearch.core.search.TotalHitsRelation;
+import co.elastic.clients.json.JsonData;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * @author reghao
+ * @date 2025-03-12 10:51:21
+ */
+@Slf4j
+@Service
+public class NginxLogSearchService {
+    private final ElasticsearchClient esClient;
+    private final String indexName = "NginxLog";
+
+    public NginxLogSearchService(ElasticService elasticService) throws Exception {
+        this.esClient = elasticService.getElasticsearchClient();
+    }
+
+    public void searchOne(String searchText) throws IOException {
+        SearchResponse<NginxLog> searchResponse = esClient.search(s -> s
+                .index(indexName)
+                // 搜索请求的查询部分(搜索请求也可以有其他组件,如聚合)
+                .query(q -> q
+                        // 在众多可用的查询变体中选择一个。我们在这里选择匹配查询(全文搜索)
+                        .match(t -> t
+                                .field("name")
+                                .query(searchText))), NginxLog.class);
+        TotalHits total = searchResponse.hits().total();
+        boolean isExactResult = total != null && total.relation() == TotalHitsRelation.Eq;
+        if (isExactResult) {
+            log.info("search has: {} results", total.value());
+        } else {
+            log.info("search more than : {} results", total.value());
+        }
+        List<Hit<NginxLog>> hits = searchResponse.hits().hits();
+        for (Hit<NginxLog> hit : hits) {
+            NginxLog source = hit.source();
+            log.info("Found result: {}", source);
+        }
+    }
+
+    public List<NginxLog> searchByPage(int pn, String searchField, String searchText) throws IOException {
+        int ps = 100;
+        String sortField = "timeIso8601";
+        Query query = RangeQuery.of(r -> r.field("age").gte(JsonData.of(8)))._toQuery();
+        Query query11 = getOrQuery();
+        Query query1 = getAndQuery();
+        Query query12 = getTermQuery();
+
+        int start = (pn-1)*ps;
+        SearchRequest searchRequest = SearchRequest.of(s -> s
+                .index(indexName)
+                .query(query1)
+                .from(start)
+                .size(ps)
+                // 按 id 字段降序排列
+                .sort(f -> f.field(o -> o.field(sortField).order(SortOrder.Desc)))
+        );
+
+        SearchResponse<NginxLog> searchResponse = esClient.search(searchRequest, NginxLog.class);
+        /*List<Hit<NginxLog>> hits = searchResponse.hits().hits();
+        for (Hit<NginxLog> hit : hits) {
+            NginxLog product = hit.source();
+            log.info("search page result: {}", product);
+        }*/
+        return searchResponse.hits().hits().stream().map(Hit::source).collect(Collectors.toList());
+    }
+
+    public void searchAll() throws IOException {
+        String fieldName3 = "status";
+        String fieldValue3 = "200";
+        SearchRequest searchRequest = SearchRequest.of(s -> s
+                .index(indexName)
+                .query(q -> q.bool(b -> b
+                                .must(m -> m.term(t -> t.field(fieldName3).value(FieldValue.of(fieldValue3))))
+                        //.must(m -> m.term(t -> t.field("name").value(FieldValue.of("test"))))
+                ))
+        );
+
+        SearchResponse<NginxLog> searchResponse = esClient.search(searchRequest, NginxLog.class);
+        List<NginxLog> list = searchResponse.hits().hits().stream().map(Hit::source).collect(Collectors.toList());
+    }
+
+    public void searchByField(String fieldName, String fieldValue) throws IOException {
+        SearchRequest request = SearchRequest.of(s -> s
+                .index(indexName)
+                .query(query -> query.match(match -> match.field(fieldName).query(fieldValue)))
+        );
+
+        SearchResponse<NginxLog> searchResponse = esClient.search(request, NginxLog.class);
+        log.info("search result: {}", searchResponse);
+    }
+
+    public void search1() throws IOException {
+        String fieldName = "url";
+        String fieldValue = "/api";
+        // 构造查询条件
+        SearchRequest searchRequest = SearchRequest.of(s -> s.index(indexName)
+                /*.sort(s1 -> s1.field(f -> f.field(fieldName).order(SortOrder.Asc)))
+                .scroll(s2 -> s2.offset(0))*/
+                .from(0)
+                .size(500)
+                .query(q -> q.match(m -> m.field(fieldName).query(FieldValue.of(fieldValue))))
+        );
+
+        SearchResponse<NginxLog> searchResponse = esClient.search(searchRequest, NginxLog.class);
+        HitsMetadata<NginxLog> hitsMetadata = searchResponse.hits();
+        //得到总数
+        TotalHits totalHits = hitsMetadata.total();
+        Double maxScore = hitsMetadata.maxScore();
+        //拿到匹配的数据
+        List<Hit<NginxLog>> hits = hitsMetadata.hits();
+        //拿到_source中的数据
+        List<NginxLog> nginxLogs = hits.stream().map(Hit::source).collect(Collectors.toList());
+
+        //最大分数
+        System.out.println(searchResponse.maxScore());
+        //分片数
+        System.out.println(searchResponse.shards());
+        //是否超时
+        System.out.println(searchResponse.timedOut());
+    }
+
+    public void search() throws IOException {
+        Query query = getExistsQuery();
+        SearchRequest searchRequest = new SearchRequest.Builder()
+                .index(indexName)
+                .query(query)
+                .build();
+        SearchResponse<NginxLog> searchResponse = esClient.search(searchRequest, NginxLog.class);
+        TotalHits totalHits = searchResponse.hits().total();
+        List<NginxLog> list = searchResponse.hits().hits().stream().map(Hit::source).collect(Collectors.toList());
+    }
+
+    public void count() throws IOException {
+        Query query1 = getMatchQuery();
+        // remoteAddr 字段为空
+        Query query2 = BoolQuery.of(b ->
+                b.mustNot(m -> m.exists(t -> t.field("remoteAddr"))
+                ))._toQuery();
+        // remoteAddr 字段不为空
+        Query query3 = BoolQuery.of(b ->
+                b.must(m -> m.exists(t -> t.field("remoteAddr"))
+                ))._toQuery();
+
+        CountRequest countRequest = CountRequest.of(s -> s
+                        .index(indexName)
+                //.query(query3)
+        );
+
+        CountResponse countResponse = esClient.count(countRequest);
+        long total = countResponse.count();
+        ShardStatistics shardStatistics = countResponse.shards();
+        System.out.println("total -> " + total);
+    }
+
+    /**
+     * 等值查询
+     *
+     * @param
+     * @return
+     * @date 2025-03-12 13:45:17
+     */
+    public Query getTermQuery() {
+        String fieldName1 = "url";
+        String fieldValue1 = "/datareceive/ReceiveData/SendContentResult\"";
+
+        String fieldName2 = "requestMethod";
+        String fieldValue2 = "POST";
+
+        Query query = TermQuery.of(t -> t
+                .field(fieldName1).value(FieldValue.of(fieldValue1))
+        )._toQuery();
+        return query;
+    }
+
+    /**
+     * MatchQuery 搜索时, 首先会解析查询字符串, 进行分词,然后查询
+     * TermQuery 搜索时, 会根据输入的查询内容进行搜索, 并不会解析查询内容,对它分词
+     *
+     * @param
+     * @return
+     * @date 2025-03-12 15:04:22
+     */
+    public Query getMatchQuery() {
+        String fieldName = "host";
+        String searchText = "api.iquizoo.com";
+
+        Query query = MatchQuery.of(m -> m.field(fieldName).query(searchText))._toQuery();
+        return query;
+    }
+
+    /**
+     * 多值查询, 相当于 SQL 语句中的 in 查询
+     *
+     * @param
+     * @return
+     * @date 2025-03-12 14:12:38
+     */
+    public Query getTermsQuery() {
+        String fieldName = "status";
+        List<FieldValue> fieldValueList = List.of(FieldValue.of("401"), FieldValue.of("500"));
+
+        Query query = TermsQuery.of(t ->
+                t.field(fieldName).terms(TermsQueryField.of(q -> q.value(fieldValueList)))
+        )._toQuery();
+        return query;
+    }
+
+    /**
+     * 或查询, 相当于 SQL 查询
+     * SELECT * FROM test1 where (uid = 1 or uid =2) and phone = 12345678919
+     *
+     * @param
+     * @return
+     * @date 2025-03-12 13:46:26
+     */
+    public Query getOrQuery() {
+        Query query = BoolQuery.of(b ->
+                b.should(m -> m.term(t -> t.field("status").value(FieldValue.of(401))))
+                        .should(m -> m.term(t -> t.field("status").value(FieldValue.of(403))))
+                        .must(m -> m.term(t -> t.field("host").value(FieldValue.of("api.iquizoo.com"))))
+        )._toQuery();
+        return query;
+    }
+
+    public Query getAndQuery() {
+        String fieldName1 = "url.raw";
+        String fieldValue1 = "/datareceive/ReceiveData/SendContentResult";
+
+        String fieldName2 = "requestMethod";
+        String fieldValue2 = "POST";
+
+        Query query = BoolQuery.of(b -> b
+                .must(m -> m.term(t -> t.field(fieldName1).value(FieldValue.of(fieldValue1))))
+                .must(m -> m.term(t -> t.field(fieldName2).value(FieldValue.of(fieldValue2))))
+        )._toQuery();
+        return query;
+    }
+
+    /**
+     * 模糊查询, 相当于 SQL 语句中的 like 查询
+     *
+     * @param
+     * @return
+     * @date 2025-03-12 14:11:45
+     */
+    public Query getWildcardQuery() {
+        Query query = BoolQuery.of(b ->
+                b.must(m -> m.wildcard(t -> t.field("url").value("*result*")))
+        )._toQuery();
+        return query;
+    }
+
+    /**
+     * 存在查询, 相当于 SQL 语句中的 exist
+     *
+     * @param
+     * @return
+     * @date 2025-03-12 14:20:26
+     */
+    public Query getExistsQuery() {
+        // 判断字段是否存在
+        Query query = ExistsQuery.of(t -> t.field("host"))._toQuery();
+        return query;
+    }
+
+    /**
+     * 范围查询, 相当于 SQL 语句中的 > 和 <
+     * gt 是大于,lt 是小于,gte 是大于等于,lte 是小于等于
+     *
+     * @param
+     * @return
+     * @date 2025-03-12 14:21:26
+     */
+    public Query getRangeQuery() {
+        int status1 = 404;
+        int status2 = 600;
+        Query query = RangeQuery.of(t -> t.field("status").gte(JsonData.of(status1)).lte(JsonData.of(status2)))._toQuery();
+        return query;
+    }
+
+    /**
+     * 正则查询
+     *
+     * @param
+     * @return
+     * @date 2025-03-12 14:22:18
+     */
+    public Query getRegexpQuery() {
+        Query query = RegexpQuery.of(t -> t.field("host").value("api.*"))._toQuery();
+        return query;
+    }
+
+    public Query getQuery() {
+        MatchAllQuery.of(m -> m.queryName("host"))._toQuery();
+        // 一般情况下有一个单词错误的情况下,fuzzy 查询可以找到另一个近似的词来代替,主要有以下场景:
+        //
+        //修改一个单词,如:box--->fox。
+        //移除一个单词,如:black-->lack。
+        //插入一个单词,如:sic-->sick。
+        //转换两个单词顺序,如:act-->cat。
+        FuzzyQuery.of(f -> f.field("host").value("lonel"));
+        // 通过指定字段的前缀进行查询
+        Query query = PrefixQuery.of(p -> p.field("host").value("api"))._toQuery();
+        return query;
+    }
+
+    /**
+     * 聚合查询, 相当于 SQL 的 group by
+     *
+     * @param
+     * @return
+     * @date 2025-03-12 15:06:48
+     */
+    public void aggregate() throws Exception {
+        String aggField1 = "status";
+        String aggField2 = "remoteAddr";
+        String aggField3 = "httpUserAgent.raw";
+        String aggField4 = "url.raw";
+        String aggField5 = "timeIso8601";
+
+        String fieldName1 = "url.raw";
+        String fieldValue1 = "/base/Device/PageList";
+
+        SearchRequest searchRequest = new SearchRequest.Builder()
+                .index(indexName)
+                .query(q -> q.bool(b -> b.must(m -> m.term(t -> t.field(fieldName1).value(FieldValue.of(fieldValue1))))))
+                .size(10000)
+                .aggregations("first_agg", a->a.terms(t->t.field(aggField5).size(65535)))
+                //.aggregations("agg1", a->a.histogram(t->t.field("httpUserAgent").interval(50.0)))
+                //.aggregations("agg1", a->a.sum(t->t.field("httpUserAgent")))
+                //.aggregations("second_agg", a->a.avg(t->t.field("status")))
+                .build();
+
+        SearchResponse<NginxLog> searchResponse = esClient.search(searchRequest, NginxLog.class);
+        TotalHits totalHits = searchResponse.hits().total();
+        Map<String, Aggregate> resultMap = searchResponse.aggregations();
+
+        List<Long> countList = new ArrayList<>();
+        resultMap.forEach((k, v) -> {
+            Object value = v._get();
+            if (value instanceof StringTermsAggregate) {
+                StringTermsAggregate terms = (StringTermsAggregate) value;
+                List<StringTermsBucket> list = terms.buckets().array();
+                list.forEach(bucket -> {
+                    String groupKey = (String) bucket.key()._get();
+                    long count = bucket.docCount();
+                    countList.add(count);
+                    //System.out.println(groupKey + " : " + IpTool.getLocation(groupKey) + " -> " + count);
+                    System.out.println(groupKey + " : " + count);
+                });
+                System.out.println("bucket size = " + list.size());
+            } else if (value instanceof LongTermsAggregate) {
+                LongTermsAggregate terms = (LongTermsAggregate) value;
+                List<LongTermsBucket> list = terms.buckets().array();
+                list.forEach(bucket -> {
+                    String groupKey = bucket.key();
+                    String groupKeyStr = bucket.keyAsString();
+                    long count = bucket.docCount();
+                    countList.add(count);
+                    System.out.println(groupKeyStr + " : " + count);
+                });
+            } else {
+                System.out.println(value);
+            }
+        });
+        System.out.println("total = " + countList.stream().mapToLong(Long::longValue).sum());
+    }
+}

+ 99 - 6
search/search-service/src/main/java/cn/reghao/tnb/search/app/NginxLogService.java → search/search-service/src/main/java/cn/reghao/tnb/search/app/log/NginxLogService.java

@@ -1,8 +1,8 @@
-package cn.reghao.tnb.search.app;
+package cn.reghao.tnb.search.app.log;
 
 import cn.reghao.tnb.search.app.config.ElasticProperties;
 import cn.reghao.tnb.search.app.es.*;
-import cn.reghao.tnb.search.app.model.po.NginxLog;
+import cn.reghao.tnb.search.app.log.model.NginxLog;
 import cn.reghao.jutil.jdk.converter.DateTimeConverter;
 import cn.reghao.jutil.jdk.serializer.JsonConverter;
 import cn.reghao.jutil.jdk.text.TextFile;
@@ -84,13 +84,106 @@ public class NginxLogService {
         return results;
     }
 
+    public void logTest() {
+        TextFile textFile = new TextFile();
+        String filePath = "nginx.log";
+        List<String> list = textFile.read(filePath);
+        List<cn.reghao.bnt.log.NginxLog> nginxLogs = new ArrayList<>();
+        for (String line : list) {
+            if (!line.startsWith("{")) {
+                System.out.println("not json data");
+                continue;
+            }
+
+            try {
+                cn.reghao.bnt.log.NginxLog nginxLog = JsonConverter.jsonToObject(line, cn.reghao.bnt.log.NginxLog.class);
+                nginxLogs.add(nginxLog);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+
+        nginxLogs.stream()
+                .filter(nginxLog -> nginxLog.getStatus() >= 400)
+                .forEach(nginxLog -> {
+                    System.out.printf("%s %s -> %s\n", nginxLog.getRequestMethod(), nginxLog.getStatus(), nginxLog.getUpstreamAddr());
+                });
+        System.out.println();
+    }
+
+    Map<Long, Integer> ngxLogMap = new TreeMap<>();
+    void handleNginxLog(cn.reghao.bnt.log.NginxLog nginxLog) {
+        String date = nginxLog.getTimeIso8601();
+        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss+08:00")
+                .withZone(ZoneId.of("UTC"));
+        LocalDateTime localDateTime = LocalDateTime.parse(date, formatter);
+        Long timestamp = localDateTime.toEpochSecond(ZoneOffset.of("+8"));
+        Long key = timestamp;
+        Integer count = ngxLogMap.get(key);
+        if (count == null) {
+            ngxLogMap.put(key, 1);
+        } else {
+            int count1 = ngxLogMap.get(key) + 1;
+            ngxLogMap.put(key, count1);
+        }
+    }
+
+    /**
+     * NginxLog 在前端 echarts 中的可视化
+     *
+     * @param
+     * @return
+     * @date 2023-12-01 17:41:07
+     */
+    class PushTask implements Runnable {
+        @Override
+        public void run() {
+            while (!Thread.interrupted()) {
+                try {
+                    if (ngxLogMap.size() < 3) {
+                        Thread.sleep(10_000);
+                        continue;
+                    }
+
+                    LocalDateTime localDateTime = LocalDateTime.now();
+                    Long baseKey = localDateTime.toEpochSecond(ZoneOffset.of("+8"));
+
+                    List<String> xList = new ArrayList<>();
+                    List<Integer> yList = new ArrayList<>();
+                    Set<Long> keys = new HashSet<>();
+                    for (Long key : ngxLogMap.keySet()) {
+                        if (key < baseKey) {
+                            xList.add(DateTimeConverter.format(key*1000).split(" ")[1]);
+                            yList.add(ngxLogMap.get(key));
+                            keys.add(key);
+                        }
+                    }
+
+                    keys.forEach(ngxLogMap::remove);
+                    keys.clear();
+                    List results = new ArrayList();
+                    results.add(xList.toArray());
+                    results.add(yList.toArray());
+
+                    /*WebSocketSession webSocketSession = getPullSession("admin-service", "172.16.90.200");
+                    TextMessage textMessage = new TextMessage(JsonConverter.objectToJson(results));
+                    webSocketSession.sendMessage(textMessage);*/
+                    Thread.sleep(10_000);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+
     public static void main(String[] args) throws Exception {
         ElasticProperties elasticProperties = new ElasticProperties();
         ElasticService elasticService = new ElasticService(elasticProperties);
         IndexService indexService = new IndexService(elasticService);
         MappingService mappingService = new MappingService();
-        DocumentService documentService = new DocumentService(elasticService);
-        SearchService searchService = new SearchService(elasticService);
+
+        NginxLogDocumentService nginxLogDocumentService = new NginxLogDocumentService(elasticService);
+        NginxLogSearchService nginxLogSearchService = new NginxLogSearchService(elasticService);
         QueryService<NginxLog> queryService = new QueryService<>(elasticService);
         String index = "nginx_log";
 
@@ -98,7 +191,7 @@ public class NginxLogService {
         //indexService.getMapping(index);
 
         //documentService.deleteAllDocument(index);
-        Map<String, Property> propertyMap = mappingService.getPropertyMap(NginxLog.class);
+        Map<String, Property> propertyMap = mappingService.getPropertyMapWithNginxLog(NginxLog.class);
         //indexService.deleteIndex(index);
         //indexService.createIndex(index, propertyMap);
 
@@ -121,7 +214,7 @@ public class NginxLogService {
         //String queryString = "content";
         //List<NginxLog> list = queryService.queryWithHighlight(index, queryString, pn, ps, NginxLog.class);
         //searchService.searchAll(index);
-        searchService.aggregate(index);
+        nginxLogSearchService.aggregate();
         //searchService.count(index);
     }
 }

+ 40 - 0
search/search-service/src/main/java/cn/reghao/tnb/search/app/log/consumer/RabbitListeners.java

@@ -0,0 +1,40 @@
+package cn.reghao.tnb.search.app.log.consumer;
+
+import cn.reghao.bnt.log.AppLog;
+import cn.reghao.bnt.log.GatewayLog;
+import cn.reghao.jutil.jdk.serializer.JsonConverter;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.rabbit.annotation.Exchange;
+import org.springframework.amqp.rabbit.annotation.Queue;
+import org.springframework.amqp.rabbit.annotation.QueueBinding;
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.messaging.handler.annotation.Payload;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author reghao
+ * @date 2024-11-28 14:37:18
+ */
+@Slf4j
+@Component
+public class RabbitListeners {
+    @RabbitListener(bindings =@QueueBinding(
+            value = @Queue(value = "tnb.log.gateway", durable = "true"),
+            key = "tnb.log.gateway",
+            exchange = @Exchange(value = "amq.direct"))
+    )
+    public void accessLogConsumer(@Payload String msg) {
+        GatewayLog gatewayLog = JsonConverter.jsonToObject(msg, GatewayLog.class);
+        log.info("{} -> {}", gatewayLog.getRequestId(), gatewayLog.getRequestUrl());
+    }
+
+    @RabbitListener(bindings =@QueueBinding(
+            value = @Queue(value = "tnb.log.app", durable = "true"),
+            key = "tnb.log.app",
+            exchange = @Exchange(value = "amq.direct"))
+    )
+    public void runtimeLogConsumer(@Payload String msg) {
+        AppLog appLog = JsonConverter.jsonToObject(msg, AppLog.class);
+        log.info("{}:{} -> {}", appLog.getApp(), appLog.getHost(), appLog.getMessage());
+    }
+}

+ 1 - 1
search/search-service/src/main/java/cn/reghao/tnb/search/app/ip/IPLocation.java → search/search-service/src/main/java/cn/reghao/tnb/search/app/log/ip/IPLocation.java

@@ -1,4 +1,4 @@
-package cn.reghao.tnb.search.app.ip;
+package cn.reghao.tnb.search.app.log.ip;
 
 import java.io.File;
 import java.io.IOException;

+ 1 - 1
search/search-service/src/main/java/cn/reghao/tnb/search/app/ip/IpTool.java → search/search-service/src/main/java/cn/reghao/tnb/search/app/log/ip/IpTool.java

@@ -1,4 +1,4 @@
-package cn.reghao.tnb.search.app.ip;
+package cn.reghao.tnb.search.app.log.ip;
 
 /**
  * @author reghao

+ 1 - 1
search/search-service/src/main/java/cn/reghao/tnb/search/app/ip/Location.java → search/search-service/src/main/java/cn/reghao/tnb/search/app/log/ip/Location.java

@@ -1,4 +1,4 @@
-package cn.reghao.tnb.search.app.ip;
+package cn.reghao.tnb.search.app.log.ip;
 /**
  * @Description:位置
  * @author:difeng

+ 1 - 1
search/search-service/src/main/java/cn/reghao/tnb/search/app/model/po/NginxLog.java → search/search-service/src/main/java/cn/reghao/tnb/search/app/log/model/NginxLog.java

@@ -1,4 +1,4 @@
-package cn.reghao.tnb.search.app.model.po;
+package cn.reghao.tnb.search.app.log.model;
 
 import com.google.gson.annotations.SerializedName;
 import lombok.Getter;

+ 6 - 0
search/search-service/src/main/resources/application-dev.yml

@@ -14,6 +14,12 @@ spring:
     url: jdbc:mysql://127.0.0.1/reghao_bnt_rdb?useSSL=false&useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2b8
     username: dev
     password: Dev@123456
+  rabbitmq:
+    host: localhost
+    port: 5672
+    virtual-host: /
+    username: dev
+    password: Dev@123456
 es:
   host: 127.0.0.1
   port: 9200

+ 6 - 0
search/search-service/src/main/resources/application-test.yml

@@ -14,6 +14,12 @@ spring:
     url: jdbc:mysql://192.168.0.209/tnb_content_tdb?useSSL=false&useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2b8
     username: test
     password: Test@123456
+  rabbitmq:
+    host: 192.168.0.209
+    port: 5672
+    virtual-host: /
+    username: test
+    password: Test@123456
 es:
   host: 127.0.0.1
   port: 9200

+ 1 - 2
search/search-service/src/test/java/NginxLogTest.java

@@ -4,7 +4,7 @@ import cn.reghao.tnb.search.app.SearchApplication;
 import cn.reghao.tnb.search.app.es.DocumentService;
 import cn.reghao.tnb.search.app.es.QueryService;
 import cn.reghao.tnb.search.app.es.SearchService;
-import cn.reghao.tnb.search.app.model.po.NginxLog;
+import cn.reghao.tnb.search.app.log.model.NginxLog;
 import lombok.extern.slf4j.Slf4j;
 import org.junit.jupiter.api.Test;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -16,7 +16,6 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
-import java.text.ParseException;
 import java.util.ArrayList;
 import java.util.List;