Browse Source

更新 search-service log 包中的相关功能

reghao 2 months ago
parent
commit
b5089dd301

+ 2 - 2
search/search-service/src/main/java/cn/reghao/tnb/search/app/es/EsQuery.java

@@ -81,8 +81,8 @@ public class EsQuery {
     }
 
     public static Query getAndQuery() {
-        String fieldName1 = "url.raw";
-        String fieldValue1 = "/datareceive/ReceiveData/SendContentResult";
+        String fieldName1 = "status";
+        String fieldValue1 = "200";
 
         String fieldName2 = "requestMethod";
         String fieldValue2 = "POST";

+ 2 - 7
search/search-service/src/main/java/cn/reghao/tnb/search/app/es/SearchService.java

@@ -122,9 +122,6 @@ public class SearchService {
     }
 
     public void searchAll(String indexName) throws IOException {
-        String fieldName1 = "url.raw";
-        String fieldValue1 = "/";
-
         String fieldName2 = "requestMethod.raw";
         String fieldValue2 = "POST";
 
@@ -185,12 +182,10 @@ public class SearchService {
         String aggField1 = "status";
         String aggField2 = "remoteAddr";
         String aggField3 = "httpUserAgent.raw";
-        String aggField4 = "url.raw";
         String aggField5 = "timeIso8601";
 
-        String fieldName = "url.raw";
-        String fieldValue = "/datareceive/ReceiveData/SendContentResult";
-
+        String fieldName = "status";
+        String fieldValue = "200";
         SearchRequest searchRequest = new SearchRequest.Builder()
                 .index(index)
                 .size(0)

+ 1 - 1
search/search-service/src/main/java/cn/reghao/tnb/search/app/log/controller/NginxLogController.java

@@ -77,7 +77,7 @@ public class NginxLogController {
                                @RequestParam(value = "aggregateField") String aggregateField) throws Exception {
         String aggregateField0;
         if ("url".equals(aggregateField)) {
-            aggregateField0 = "url.raw";
+            aggregateField0 = "methodUrl.raw";
         } else if ("statusCode".equals(aggregateField)) {
             aggregateField0 = "status";
         } else if ("ip".equals(aggregateField)) {

+ 0 - 6
search/search-service/src/main/java/cn/reghao/tnb/search/app/log/mq/KafkaPub.java

@@ -29,12 +29,6 @@ public class KafkaPub {
         this.producer = new KafkaProducer<>(properties);
     }
 
-    public void produce(int i) {
-        String topic = "CustomerCountry";
-        ProducerRecord<String, String> record = new ProducerRecord<>(topic, "index", ""+i);
-        producer.send(record);
-    }
-
     public void produce(NginxLog nginxLog) {
         String json = JsonConverter.objectToJson(nginxLog);
         ProducerRecord<String, String> record = new ProducerRecord<>(topic, "nginx-log", json);

+ 5 - 2
search/search-service/src/main/java/cn/reghao/tnb/search/app/log/mq/KafkaSub.java

@@ -5,6 +5,7 @@ import cn.reghao.jutil.jdk.thread.ThreadPoolWrapper;
 import cn.reghao.jutil.jdk.web.log.NginxLog;
 import cn.reghao.tnb.search.app.config.AppProperties;
 import cn.reghao.tnb.search.app.log.service.NginxLogService;
+import jakarta.annotation.PostConstruct;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.CommitFailedException;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -39,15 +40,17 @@ public class KafkaSub {
         properties.put("bootstrap.servers", appProperties.getKafkaUri());
         // 消费者群组
         properties.put("group.id", "test");
+        // 自动提交偏移量
         properties.put("enable.auto.commit", "true");
-        properties.put("auto.commit.interval.ms", "1000");
+        // 自动提交偏移量的频率, 这里设置为 5s
+        properties.put("auto.commit.interval.ms", "5000");
         properties.put("session.timeout.ms", "30000");
         properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
         properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
         this.consumer = new KafkaConsumer<>(properties);
     }
 
-    //@PostConstruct
+    @PostConstruct
     public void consume() {
         consumer.subscribe(Collections.singletonList(topic));
         threadPool.submit(new ConsumeTask());

+ 0 - 394
search/search-service/src/main/java/cn/reghao/tnb/search/app/log/service/NginxLogSearch.java

@@ -1,394 +0,0 @@
-package cn.reghao.tnb.search.app.log.service;
-
-import cn.reghao.tnb.search.app.es.ElasticService;
-import cn.reghao.jutil.jdk.web.log.NginxLog;
-import co.elastic.clients.elasticsearch.ElasticsearchClient;
-import co.elastic.clients.elasticsearch._types.FieldValue;
-import co.elastic.clients.elasticsearch._types.ShardStatistics;
-import co.elastic.clients.elasticsearch._types.SortOrder;
-import co.elastic.clients.elasticsearch._types.aggregations.*;
-import co.elastic.clients.elasticsearch._types.query_dsl.*;
-import co.elastic.clients.elasticsearch.core.CountRequest;
-import co.elastic.clients.elasticsearch.core.CountResponse;
-import co.elastic.clients.elasticsearch.core.SearchRequest;
-import co.elastic.clients.elasticsearch.core.SearchResponse;
-import co.elastic.clients.elasticsearch.core.search.Hit;
-import co.elastic.clients.elasticsearch.core.search.HitsMetadata;
-import co.elastic.clients.elasticsearch.core.search.TotalHits;
-import co.elastic.clients.elasticsearch.core.search.TotalHitsRelation;
-import co.elastic.clients.json.JsonData;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.stereotype.Service;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-/**
- * @author reghao
- * @date 2025-03-12 10:51:21
- */
-@Slf4j
-@Service
-public class NginxLogSearch {
-    private final ElasticsearchClient esClient;
-    private final String indexName = "nginx_log";
-
-    public NginxLogSearch(ElasticService elasticService) throws Exception {
-        this.esClient = elasticService.getElasticsearchClient();
-    }
-
-    public void searchOne(String searchText) throws IOException {
-        SearchResponse<NginxLog> searchResponse = esClient.search(s -> s
-                .index(indexName)
-                // 搜索请求的查询部分(搜索请求也可以有其他组件,如聚合)
-                .query(q -> q
-                        // 在众多可用的查询变体中选择一个。我们在这里选择匹配查询(全文搜索)
-                        .match(t -> t
-                                .field("name")
-                                .query(searchText))), NginxLog.class);
-        TotalHits total = searchResponse.hits().total();
-        boolean isExactResult = total != null && total.relation() == TotalHitsRelation.Eq;
-        if (isExactResult) {
-            log.info("search has: {} results", total.value());
-        } else {
-            log.info("search more than : {} results", total.value());
-        }
-        List<Hit<NginxLog>> hits = searchResponse.hits().hits();
-        for (Hit<NginxLog> hit : hits) {
-            NginxLog source = hit.source();
-            log.info("Found result: {}", source);
-        }
-    }
-
-    public List<NginxLog> searchByPage(int pn, String searchField, String searchText) throws IOException {
-        int ps = 100;
-        String sortField = "timeIso8601";
-        Query query = RangeQuery.of(r -> r.field("age").gte(JsonData.of(8)))._toQuery();
-        Query query11 = getOrQuery();
-        Query query1 = getAndQuery();
-        Query query12 = getTermQuery();
-
-        int start = (pn-1)*ps;
-        SearchRequest searchRequest = SearchRequest.of(s -> s
-                .index(indexName)
-                .query(query1)
-                .from(start)
-                .size(ps)
-                // 按 id 字段降序排列
-                .sort(f -> f.field(o -> o.field(sortField).order(SortOrder.Desc)))
-        );
-
-        SearchResponse<NginxLog> searchResponse = esClient.search(searchRequest, NginxLog.class);
-        /*List<Hit<NginxLog>> hits = searchResponse.hits().hits();
-        for (Hit<NginxLog> hit : hits) {
-            NginxLog product = hit.source();
-            log.info("search page result: {}", product);
-        }*/
-        return searchResponse.hits().hits().stream().map(Hit::source).collect(Collectors.toList());
-    }
-
-    public void searchAll() throws IOException {
-        String fieldName3 = "status";
-        String fieldValue3 = "200";
-        SearchRequest searchRequest = SearchRequest.of(s -> s
-                .index(indexName)
-                .query(q -> q.bool(b -> b
-                                .must(m -> m.term(t -> t.field(fieldName3).value(FieldValue.of(fieldValue3))))
-                        //.must(m -> m.term(t -> t.field("name").value(FieldValue.of("test"))))
-                ))
-        );
-
-        SearchResponse<NginxLog> searchResponse = esClient.search(searchRequest, NginxLog.class);
-        List<NginxLog> list = searchResponse.hits().hits().stream().map(Hit::source).collect(Collectors.toList());
-    }
-
-    public void searchByField(String fieldName, String fieldValue) throws IOException {
-        SearchRequest request = SearchRequest.of(s -> s
-                .index(indexName)
-                .query(query -> query.match(match -> match.field(fieldName).query(fieldValue)))
-        );
-
-        SearchResponse<NginxLog> searchResponse = esClient.search(request, NginxLog.class);
-        log.info("search result: {}", searchResponse);
-    }
-
-    public void search1() throws IOException {
-        String fieldName = "url";
-        String fieldValue = "/api";
-        // 构造查询条件
-        SearchRequest searchRequest = SearchRequest.of(s -> s.index(indexName)
-                /*.sort(s1 -> s1.field(f -> f.field(fieldName).order(SortOrder.Asc)))
-                .scroll(s2 -> s2.offset(0))*/
-                .from(0)
-                .size(500)
-                .query(q -> q.match(m -> m.field(fieldName).query(FieldValue.of(fieldValue))))
-        );
-
-        SearchResponse<NginxLog> searchResponse = esClient.search(searchRequest, NginxLog.class);
-        HitsMetadata<NginxLog> hitsMetadata = searchResponse.hits();
-        //得到总数
-        TotalHits totalHits = hitsMetadata.total();
-        Double maxScore = hitsMetadata.maxScore();
-        //拿到匹配的数据
-        List<Hit<NginxLog>> hits = hitsMetadata.hits();
-        //拿到_source中的数据
-        List<NginxLog> nginxLogs = hits.stream().map(Hit::source).collect(Collectors.toList());
-
-        //最大分数
-        System.out.println(searchResponse.maxScore());
-        //分片数
-        System.out.println(searchResponse.shards());
-        //是否超时
-        System.out.println(searchResponse.timedOut());
-    }
-
-    public void search() throws IOException {
-        Query query = getExistsQuery();
-        SearchRequest searchRequest = new SearchRequest.Builder()
-                .index(indexName)
-                .query(query)
-                .build();
-        SearchResponse<NginxLog> searchResponse = esClient.search(searchRequest, NginxLog.class);
-        TotalHits totalHits = searchResponse.hits().total();
-        List<NginxLog> list = searchResponse.hits().hits().stream().map(Hit::source).collect(Collectors.toList());
-    }
-
-    public void count() throws IOException {
-        Query query1 = getMatchQuery();
-        // remoteAddr 字段为空
-        Query query2 = BoolQuery.of(b ->
-                b.mustNot(m -> m.exists(t -> t.field("remoteAddr"))
-                ))._toQuery();
-        // remoteAddr 字段不为空
-        Query query3 = BoolQuery.of(b ->
-                b.must(m -> m.exists(t -> t.field("remoteAddr"))
-                ))._toQuery();
-
-        CountRequest countRequest = CountRequest.of(s -> s
-                        .index(indexName)
-                //.query(query3)
-        );
-
-        CountResponse countResponse = esClient.count(countRequest);
-        long total = countResponse.count();
-        ShardStatistics shardStatistics = countResponse.shards();
-        System.out.println("total -> " + total);
-    }
-
-    /**
-     * 等值查询
-     *
-     * @param
-     * @return
-     * @date 2025-03-12 13:45:17
-     */
-    public Query getTermQuery() {
-        String fieldName1 = "url";
-        String fieldValue1 = "/datareceive/ReceiveData/SendContentResult\"";
-
-        String fieldName2 = "requestMethod";
-        String fieldValue2 = "POST";
-
-        Query query = TermQuery.of(t -> t
-                .field(fieldName1).value(FieldValue.of(fieldValue1))
-        )._toQuery();
-        return query;
-    }
-
-    /**
-     * MatchQuery 搜索时, 首先会解析查询字符串, 进行分词,然后查询
-     * TermQuery 搜索时, 会根据输入的查询内容进行搜索, 并不会解析查询内容,对它分词
-     *
-     * @param
-     * @return
-     * @date 2025-03-12 15:04:22
-     */
-    public Query getMatchQuery() {
-        String fieldName = "host";
-        String searchText = "api.iquizoo.com";
-
-        Query query = MatchQuery.of(m -> m.field(fieldName).query(searchText))._toQuery();
-        return query;
-    }
-
-    /**
-     * 多值查询, 相当于 SQL 语句中的 in 查询
-     *
-     * @param
-     * @return
-     * @date 2025-03-12 14:12:38
-     */
-    public Query getTermsQuery() {
-        String fieldName = "status";
-        List<FieldValue> fieldValueList = List.of(FieldValue.of("401"), FieldValue.of("500"));
-
-        Query query = TermsQuery.of(t ->
-                t.field(fieldName).terms(TermsQueryField.of(q -> q.value(fieldValueList)))
-        )._toQuery();
-        return query;
-    }
-
-    /**
-     * 或查询, 相当于 SQL 查询
-     * SELECT * FROM test1 where (uid = 1 or uid =2) and phone = 12345678919
-     *
-     * @param
-     * @return
-     * @date 2025-03-12 13:46:26
-     */
-    public Query getOrQuery() {
-        Query query = BoolQuery.of(b ->
-                b.should(m -> m.term(t -> t.field("status").value(FieldValue.of(401))))
-                        .should(m -> m.term(t -> t.field("status").value(FieldValue.of(403))))
-                        .must(m -> m.term(t -> t.field("host").value(FieldValue.of("api.iquizoo.com"))))
-        )._toQuery();
-        return query;
-    }
-
-    public Query getAndQuery() {
-        String fieldName1 = "url.raw";
-        String fieldValue1 = "/datareceive/ReceiveData/SendContentResult";
-
-        String fieldName2 = "requestMethod";
-        String fieldValue2 = "POST";
-
-        Query query = BoolQuery.of(b -> b
-                .must(m -> m.term(t -> t.field(fieldName1).value(FieldValue.of(fieldValue1))))
-                .must(m -> m.term(t -> t.field(fieldName2).value(FieldValue.of(fieldValue2))))
-        )._toQuery();
-        return query;
-    }
-
-    /**
-     * 模糊查询, 相当于 SQL 语句中的 like 查询
-     *
-     * @param
-     * @return
-     * @date 2025-03-12 14:11:45
-     */
-    public Query getWildcardQuery() {
-        Query query = BoolQuery.of(b ->
-                b.must(m -> m.wildcard(t -> t.field("url").value("*result*")))
-        )._toQuery();
-        return query;
-    }
-
-    /**
-     * 存在查询, 相当于 SQL 语句中的 exist
-     *
-     * @param
-     * @return
-     * @date 2025-03-12 14:20:26
-     */
-    public Query getExistsQuery() {
-        // 判断字段是否存在
-        Query query = ExistsQuery.of(t -> t.field("host"))._toQuery();
-        return query;
-    }
-
-    /**
-     * 范围查询, 相当于 SQL 语句中的 > 和 <
-     * gt 是大于,lt 是小于,gte 是大于等于,lte 是小于等于
-     *
-     * @param
-     * @return
-     * @date 2025-03-12 14:21:26
-     */
-    public Query getRangeQuery() {
-        int status1 = 404;
-        int status2 = 600;
-        Query query = RangeQuery.of(t -> t.field("status").gte(JsonData.of(status1)).lte(JsonData.of(status2)))._toQuery();
-        return query;
-    }
-
-    /**
-     * 正则查询
-     *
-     * @param
-     * @return
-     * @date 2025-03-12 14:22:18
-     */
-    public Query getRegexpQuery() {
-        Query query = RegexpQuery.of(t -> t.field("host").value("api.*"))._toQuery();
-        return query;
-    }
-
-    public Query getQuery() {
-        MatchAllQuery.of(m -> m.queryName("host"))._toQuery();
-        // 一般情况下有一个单词错误的情况下,fuzzy 查询可以找到另一个近似的词来代替,主要有以下场景:
-        //
-        //修改一个单词,如:box--->fox。
-        //移除一个单词,如:black-->lack。
-        //插入一个单词,如:sic-->sick。
-        //转换两个单词顺序,如:act-->cat。
-        FuzzyQuery.of(f -> f.field("host").value("lonel"));
-        // 通过指定字段的前缀进行查询
-        Query query = PrefixQuery.of(p -> p.field("host").value("api"))._toQuery();
-        return query;
-    }
-
-    /**
-     * 聚合查询, 相当于 SQL 的 group by
-     *
-     * @param
-     * @return
-     * @date 2025-03-12 15:06:48
-     */
-    public void aggregate() throws Exception {
-        String aggField1 = "status";
-        String aggField2 = "remoteAddr";
-        String aggField3 = "httpUserAgent.raw";
-        String aggField4 = "url.raw";
-        String aggField5 = "timeIso8601";
-
-        String fieldName1 = "url.raw";
-        String fieldValue1 = "/base/Device/PageList";
-
-        SearchRequest searchRequest = new SearchRequest.Builder()
-                .index(indexName)
-                .query(q -> q.bool(b -> b.must(m -> m.term(t -> t.field(fieldName1).value(FieldValue.of(fieldValue1))))))
-                .size(10000)
-                .aggregations("first_agg", a->a.terms(t->t.field(aggField5).size(65535)))
-                //.aggregations("agg1", a->a.histogram(t->t.field("httpUserAgent").interval(50.0)))
-                //.aggregations("agg1", a->a.sum(t->t.field("httpUserAgent")))
-                //.aggregations("second_agg", a->a.avg(t->t.field("status")))
-                .build();
-
-        SearchResponse<NginxLog> searchResponse = esClient.search(searchRequest, NginxLog.class);
-        TotalHits totalHits = searchResponse.hits().total();
-        Map<String, Aggregate> resultMap = searchResponse.aggregations();
-
-        List<Long> countList = new ArrayList<>();
-        resultMap.forEach((k, v) -> {
-            Object value = v._get();
-            if (value instanceof StringTermsAggregate) {
-                StringTermsAggregate terms = (StringTermsAggregate) value;
-                List<StringTermsBucket> list = terms.buckets().array();
-                list.forEach(bucket -> {
-                    String groupKey = (String) bucket.key()._get();
-                    long count = bucket.docCount();
-                    countList.add(count);
-                    //System.out.println(groupKey + " : " + GeoIpTool.getLocation(groupKey) + " -> " + count);
-                    System.out.println(groupKey + " : " + count);
-                });
-                System.out.println("bucket size = " + list.size());
-            } else if (value instanceof LongTermsAggregate) {
-                LongTermsAggregate terms = (LongTermsAggregate) value;
-                List<LongTermsBucket> list = terms.buckets().array();
-                list.forEach(bucket -> {
-                    String groupKey = bucket.key();
-                    String groupKeyStr = bucket.keyAsString();
-                    long count = bucket.docCount();
-                    countList.add(count);
-                    System.out.println(groupKeyStr + " : " + count);
-                });
-            } else {
-                System.out.println(value);
-            }
-        });
-        System.out.println("total = " + countList.stream().mapToLong(Long::longValue).sum());
-    }
-}

+ 52 - 29
search/search-service/src/main/java/cn/reghao/tnb/search/app/log/service/NginxLogService.java

@@ -38,6 +38,8 @@ import java.util.stream.Collectors;
 @Service
 public class NginxLogService {
     private final String indexName = "nginx_log";
+    private final String dateTimeFormat = "yyyy-MM-dd HH:mm:ss";
+    private final String timeZone = "+08:00";
     private final NginxLogDocument nginxLogDocument;
     private final SearchService searchService;
     private final GeoIpTool geoIpTool;
@@ -60,6 +62,18 @@ public class NginxLogService {
             return;
         }
 
+        list.forEach(nginxLog -> {
+            String timeIso8601 = nginxLog.getTimeIso8601();
+            // es 的日期时间格式 yyyy-MM-ddTHH:mm:ss
+            String dateTimeStr = timeIso8601.replace("+08:00", "");
+            nginxLog.setTimeIso8601(dateTimeStr);
+
+            String method = nginxLog.getRequestMethod();
+            String url = nginxLog.getUrl();
+            String methodUrl = String.format("%s %s", method, url);
+            nginxLog.setMethodUrl(methodUrl);
+        });
+
         try {
             nginxLogDocument.batchAddDocument(indexName, list);
         } catch (IOException e) {
@@ -114,10 +128,12 @@ public class NginxLogService {
 
         String startDate = dateStr;
         String endDate = dateStr;
-        String start1 = String.format("%sT00:00:00", startDate);
-        String end1 = String.format("%sT23:59:59", endDate);
-        Query dateQuery = RangeQuery.of(q -> q.field("timeIso8601")
-                .gte(JsonData.of(start1)).lte(JsonData.of(end1)))._toQuery();
+        String start1 = String.format("%s 00:00:00", startDate);
+        String end1 = String.format("%s 23:59:59", endDate);
+
+        String dateField = "timeIso8601";
+        Query dateQuery = RangeQuery.of(q -> q.field(dateField)
+                .from(start1).to(end1).format(dateTimeFormat))._toQuery();
         Query termQuery = EsQuery.getTermQuery("status", "200");
         Query combinedQuery = Query.of(q -> q.bool(b -> b.filter(termQuery).filter(dateQuery)));
 
@@ -138,28 +154,31 @@ public class NginxLogService {
     public long countNginxLog(String dateStr) {
         String startDate = dateStr;
         String endDate = dateStr;
-        String start1 = String.format("%sT00:00:00", startDate);
-        String end1 = String.format("%sT23:59:59", endDate);
-        RangeQuery dateQuery = RangeQuery.of(q -> q.field("timeIso8601").gte(JsonData.of(start1)).lte(JsonData.of(end1)));
-        return searchService.count(indexName, dateQuery._toQuery());
+        String start1 = String.format("%s 00:00:00", startDate);
+        String end1 = String.format("%s 23:59:59", endDate);
+        String dateField = "timeIso8601";
+        Query dateQuery = RangeQuery.of(q -> q.field(dateField)
+                .from(start1).to(end1).format(dateTimeFormat))._toQuery();
+        return searchService.count(indexName, dateQuery);
     }
 
     public List<Object> getChartData2(String dateStr, String fieldValue) throws Exception {
         String aggregateField = "timeIso8601";
         String dateField = "timeIso8601";
-        String fieldName = "url.raw";
+        String fieldName = "methodUrl.raw";
 
-        String start = String.format("%sT00:00:00", dateStr);
-        String end = String.format("%sT23:59:59", dateStr);
+        String start = String.format("%s 00:00:00", dateStr);
+        String end = String.format("%s 23:59:59", dateStr);
         /*String start1 = startDateTime.replace(" ", "T");
         String end1 = endDateTime.replace(" ", "T");*/
-        RangeQuery dateQuery = RangeQuery.of(q -> q.field(dateField).gte(JsonData.of(start)).lte(JsonData.of(end)));
+        Query dateQuery = RangeQuery.of(q -> q.field(dateField)
+                .from(start).to(end).format(dateTimeFormat).timeZone(timeZone))._toQuery();
         Query combinedQuery;
         if (fieldValue == null || fieldValue.isBlank()) {
-            combinedQuery = Query.of(q -> q.bool(b -> b.filter(dateQuery._toQuery())));
+            combinedQuery = Query.of(q -> q.bool(b -> b.filter(dateQuery)));
         } else {
             Query termQuery = EsQuery.getTermQuery(fieldName, fieldValue);
-            combinedQuery = Query.of(q -> q.bool(b -> b.filter(dateQuery._toQuery()).filter(termQuery)));
+            combinedQuery = Query.of(q -> q.bool(b -> b.filter(dateQuery).filter(termQuery)));
         }
         Map<String, Long> urlGroupMap = searchService.aggregateByQuery(indexName, aggregateField, combinedQuery);
 
@@ -183,10 +202,11 @@ public class NginxLogService {
         DateTimeRange dateTimeRange = getDateTimeRange(dateStr);
         String startDateTime = dateTimeRange.getStartDateTime();
         String endDateTime = dateTimeRange.getEndDateTime();
-        String start = startDateTime.replace(" ", "T");
-        String end = endDateTime.replace(" ", "T");
-        Query query = RangeQuery.of(q -> q.field(dateField).gte(JsonData.of(start)).lte(JsonData.of(end)))._toQuery();
-        Map<String, Long> groupMap = searchService.aggregateByQuery(indexName, aggregateField, query);
+        String start = startDateTime;
+        String end = endDateTime;
+        Query dateQuery = RangeQuery.of(q -> q.field(dateField)
+                .from(start).to(end).format(dateTimeFormat).timeZone(timeZone))._toQuery();
+        Map<String, Long> groupMap = searchService.aggregateByQuery(indexName, aggregateField, dateQuery);
 
         List<GroupCount> list = new ArrayList<>();
         groupMap.forEach((key, value) -> {
@@ -209,11 +229,11 @@ public class NginxLogService {
         DateTimeRange dateTimeRange = getDateTimeRange(dateStr);
         String startDateTime = dateTimeRange.getStartDateTime();
         String endDateTime = dateTimeRange.getEndDateTime();
-        String start = startDateTime.replace(" ", "T");
-        String end = endDateTime.replace(" ", "T");
-        RangeQuery dateQuery = RangeQuery.of(q -> q.field(dateField).gte(JsonData.of(start)).lte(JsonData.of(end)));
-        Query query = dateQuery._toQuery();
-        Map<String, Long> groupMap1 = searchService.aggregateByQuery(indexName, aggregateField, query);
+        String start = startDateTime;
+        String end = endDateTime;
+        Query dateQuery = RangeQuery.of(q -> q.field(dateField)
+                .from(start).to(end).format(dateTimeFormat).timeZone(timeZone))._toQuery();
+        Map<String, Long> groupMap1 = searchService.aggregateByQuery(indexName, aggregateField, dateQuery);
 
         Map<String, Long> map0 = new HashMap<>();
         Map<String, Map<String, Set<String>>> map = new HashMap<>();
@@ -283,26 +303,29 @@ public class NginxLogService {
         String startDate = localDate.minusDays(7).toString();
         String endDate = localDate.toString();
 
-        String start1 = String.format("%sT00:00:00", startDate);
-        String end1 = String.format("%sT23:59:59", endDate);
+        String start1 = String.format("%s 00:00:00", startDate);
+        String end1 = String.format("%s 23:59:59", endDate);
         return new DateTimeRange(start1, end1);
     }
 
     public List<Object> getChartData5(String dateStr, String fieldValue) throws Exception {
         String aggregateField = "timeIso8601";
-        String fieldName = "url.raw";
+        String fieldName = "methodUrl.raw";
 
         Query query = Query.of(q -> q.matchAll(m -> m));
         DateTimeRange dateTimeRange = getDateTimeRange(dateStr);
         String start1 = dateTimeRange.getStartDateTime();
         String end1 = dateTimeRange.getEndDateTime();
-        RangeQuery dateQuery = RangeQuery.of(q -> q.field("timeIso8601").gte(JsonData.of(start1)).lte(JsonData.of(end1)));
+
+        String dateField = "timeIso8601";
+        Query dateQuery = RangeQuery.of(q -> q.field(dateField)
+                .from(start1).to(end1).format(dateTimeFormat).timeZone(timeZone))._toQuery();
         Query combinedQuery;
         if (fieldValue != null && !fieldValue.isBlank()) {
             Query termQuery = EsQuery.getTermQuery(fieldName, fieldValue);
-            combinedQuery = Query.of(q -> q.bool(b -> b.filter(termQuery).filter(dateQuery._toQuery())));
+            combinedQuery = Query.of(q -> q.bool(b -> b.filter(termQuery).filter(dateQuery)));
         } else {
-            combinedQuery = Query.of(q -> q.bool(b -> b.filter(dateQuery._toQuery())));
+            combinedQuery = Query.of(q -> q.bool(b -> b.filter(dateQuery)));
         }
         Map<String, Long> groupMap = searchService.aggregateByDay(indexName, aggregateField, combinedQuery);
 

+ 0 - 253
search/search-service/src/test/java/ElasticTest.java

@@ -1,253 +0,0 @@
-import cn.reghao.jutil.jdk.serializer.JsonConverter;
-import cn.reghao.jutil.jdk.web.log.NginxLog;
-import cn.reghao.tnb.search.app.config.AppProperties;
-import cn.reghao.tnb.search.app.es.*;
-import co.elastic.clients.elasticsearch.ElasticsearchClient;
-import co.elastic.clients.elasticsearch._types.SortOrder;
-import co.elastic.clients.elasticsearch._types.mapping.Property;
-import co.elastic.clients.elasticsearch._types.query_dsl.Query;
-import co.elastic.clients.elasticsearch._types.query_dsl.RangeQuery;
-import co.elastic.clients.elasticsearch.indices.AnalyzeRequest;
-import co.elastic.clients.elasticsearch.indices.AnalyzeResponse;
-import co.elastic.clients.elasticsearch.indices.analyze.AnalyzeToken;
-import co.elastic.clients.json.JsonData;
-import lombok.extern.slf4j.Slf4j;
-import org.junit.jupiter.api.Test;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.util.*;
-
-/**
- * @author reghao
- * @date 2025-03-18 15:51:21
- */
-@Slf4j
-public class ElasticTest {
-    ElasticService getElasticService() {
-        AppProperties appProperties = new AppProperties();
-        appProperties.setEsHost("192.168.0.81");
-        appProperties.setEsPort(9200);
-        appProperties.setEsUsername("elastic");
-        appProperties.setEsPassword("VLTtN03SSJ4lsyyg56kf");
-
-        ElasticService elasticService = new ElasticService(appProperties);
-        return elasticService;
-    }
-
-    @Test
-    public void logTest() throws Exception {
-        ElasticService elasticService = getElasticService();
-        SearchService searchService = new SearchService(elasticService);
-
-        String indexName = "nginx_log";
-        int pageSize = 1000;
-        int pageNumber = 1;
-
-        String fieldName = "url.raw";
-        String fieldValue = "/datareceive/ReceiveData/SendContentResult";
-        String sortField = "timeIso8601";
-        sortField = "requestTime";
-        SortOrder sortOrder = SortOrder.Desc;
-
-        String startDate = "2024-05-20";
-        String endDate = "2024-05-20";
-        String start1 = String.format("%sT00:00:00", startDate);
-        String end1 = String.format("%sT23:59:59", endDate);
-        Query dateQuery = RangeQuery.of(q -> q.field("timeIso8601")
-                .gte(JsonData.of(start1)).lte(JsonData.of(end1)))._toQuery();
-
-        Query combinedQuery = Query.of(q -> q.matchAll(m -> m));
-        if (!fieldValue.isBlank()) {
-            Query termQuery = EsQuery.getTermQuery(fieldName, fieldValue);
-            Query termQuery1 = EsQuery.getTermQuery("status", "200");
-            //combinedQuery = Query.of(q -> q.bool(b -> b.filter(termQuery1).filter(termQuery)));
-            combinedQuery = Query.of(q -> q.bool(b -> b.filter(termQuery1).filter(dateQuery)));
-        }
-
-        List<NginxLog> list0 = searchService.searchByPage(indexName, pageSize, pageNumber, combinedQuery, sortField, sortOrder);
-        System.out.println();
-    }
-
-    @Test
-    public void aggregateTest() throws Exception {
-        ElasticService elasticService = getElasticService();
-        SearchService searchService = new SearchService(elasticService);
-
-        String indexName = "nginx_log";
-        String dateField = "timeIso8601";
-        String startDateTime = "2024-08-10 00:00:00";
-        String endDateTime = "2024-08-10 23:59:59";
-
-        String start = startDateTime.replace(" ", "T");
-        String end = endDateTime.replace(" ", "T");
-        RangeQuery dateQuery = RangeQuery.of(q -> q.field(dateField).gte(JsonData.of(start)).lte(JsonData.of(end)));
-        Query query = dateQuery._toQuery();
-
-        String aggregateField = "httpUserAgent.raw";
-        Map<String, Long> groupMap2 = searchService.aggregateByQuery(indexName, aggregateField, query);
-
-        String windows = "Windows NT";
-        String linux = "Linux ";
-        String mac = "Macintosh";
-        String ipad = "iPad";
-        String iphone = "iPhone";
-        String android = "Android";
-        String client = "client";
-        String others = "others";
-        Map<String, Integer> mapCount = new HashMap<>();
-        mapCount.put(windows, 0);
-        mapCount.put(linux, 0);
-        mapCount.put(mac, 0);
-        mapCount.put(ipad, 0);
-        mapCount.put(iphone, 0);
-        mapCount.put(android, 0);
-        mapCount.put(client, 0);
-        mapCount.put(others, 0);
-
-        groupMap2.keySet().forEach(userAgent -> {
-            if (userAgent.contains(windows)) {
-                incr(windows, mapCount);
-            } else if (userAgent.contains(linux)) {
-                incr(linux, mapCount);
-            } else if (userAgent.contains(mac)) {
-                incr(mac, mapCount);
-            } else if (userAgent.contains(ipad)) {
-                incr(ipad, mapCount);
-            } else if (userAgent.contains(iphone)) {
-                incr(iphone, mapCount);
-            } else if (userAgent.contains(android)) {
-                incr(android, mapCount);
-            } else if (userAgent.contains(client) || userAgent.contains("Client")) {
-                incr(client, mapCount);
-            } else {
-                incr(others, mapCount);
-            }
-        });
-        System.out.println();
-    }
-
-    private void incr(String key, Map<String, Integer> map) {
-        map.put(key, map.get(key)+1);
-    }
-
-    @Test
-    public void esDocTest() throws Exception {
-        String indexName = "nginx_log";
-        ElasticService elasticService = getElasticService();
-        SearchService searchService = new SearchService(elasticService);
-
-        Query combinedQuery = Query.of(q -> q.matchAll(m -> m));
-        long total = searchService.count(indexName, combinedQuery);
-        log.info("Total documents of {}: {}", indexName, total);
-
-        DocumentService documentService = new DocumentService(elasticService);
-        //documentService.deleteAllDocument(indexName);
-    }
-
-    @Test
-    public void esIndexTest() throws IOException {
-        String indexName = "nginx_log";
-        ElasticService elasticService = getElasticService();
-        IndexService indexService = new IndexService(elasticService);
-        MappingService mappingService = new MappingService();
-
-        indexService.deleteIndex(indexName);
-        Map<String, Property> propertyMap = mappingService.getPropertyMapWithNginxLog(NginxLog.class);
-        indexService.createIndex(indexName, propertyMap);
-    }
-
-    @Test
-    public void nginxLogTest() {
-        ElasticService elasticService = getElasticService();
-        DocumentService documentService = new DocumentService(elasticService);
-
-        String filePath = "/home/reghao/Downloads/access-20231107_073356-20240905_165944.log";
-        String indexName = "nginx_log1";
-        readFileFileChannel(new File(filePath), indexName, documentService);
-    }
-
-    void readFileFileChannel(File file, String index, DocumentService documentService) {
-        List<String> lines = new ArrayList<>();
-        List<NginxLog> nginxLogs = new ArrayList<>();
-        try {
-            FileInputStream fis = new FileInputStream(file);
-            FileChannel fileChannel = fis.getChannel();
-
-            // 10MB
-            int capacity = 10*1024*1024;
-            ByteBuffer byteBuffer = ByteBuffer.allocate(capacity);
-            StringBuffer buffer = new StringBuffer();
-            while(fileChannel.read(byteBuffer) != -1) {
-                //读取后,将位置置为0,将limit置为容量, 以备下次读入到字节缓冲中,从0开始存储
-                byteBuffer.clear();
-                byte[] bytes = byteBuffer.array();
-
-                String str = new String(bytes);
-                buffer.append(str);
-                String[] strArray = buffer.toString().split(System.lineSeparator());
-                for (int i = 0; i < strArray.length-1; i++) {
-                    try {
-                        NginxLog nginxLog = JsonConverter.jsonToObject(strArray[i], NginxLog.class);
-                        String timeStr = nginxLog.getTimeIso8601().replace("+08:00", "");
-                        nginxLog.setTimeIso8601(timeStr);
-                        //LocalDateTime localDateTime = DateTimeConverter.localDateTime(nginxLog.getTimeIso8601());
-                        //long timestamp = DateTimeConverter.msTimestamp(localDateTime);
-                        //String localDateTimeStr = DateTimeConverter.format(localDateTime);
-                        //nginxLog.setRequestTimestamp(timestamp);
-                        try {
-                            documentService.addDocument(index, nginxLog);
-                        } catch (IOException e) {
-                            e.printStackTrace();
-                        }
-
-                        nginxLogs.add(nginxLog);
-                        if (nginxLogs.size() > 10) {
-                            //nginxLogs.forEach(nginxLog -> nginxLog.setId(idGenerator.nextId()+""));
-                            //NginxLog nginxLog = nginxLogs.get(0);
-                            //documentService.update(index, nginxLog);
-
-                            //documentService.batchAddDocument(index, nginxLogs);
-                            //log.info("save {} nginxLogs", nginxLogs.size());
-                            break;
-                        }
-                    } catch (Exception e) {
-                        lines.add(strArray[i]);
-                        //e.printStackTrace();
-                    }
-                }
-
-                if (nginxLogs.size() > 10) {
-                    break;
-                }
-            }
-        } catch (IOException e) {
-            e.printStackTrace();
-        } finally {
-            // TODO close 处理
-        }
-    }
-
-    @Test
-    public void analyzerTest() throws IOException {
-        ElasticService elasticService = getElasticService();
-        // es 默认的标准分词器
-        String analyzer = "standard";
-        // ik 的两种分词器
-        //analyzer = "ik_max_word";
-        analyzer = "ik_smart";
-
-        String text = "中华人民共和国国歌";
-        AnalyzeRequest analyzeRequest = new AnalyzeRequest.Builder()
-                .text(text)
-                .analyzer(analyzer)
-                .build();
-        ElasticsearchClient esClient = elasticService.getElasticsearchClient();
-        AnalyzeResponse analyzeResponse = esClient.indices().analyze(analyzeRequest);
-        List<AnalyzeToken> tokens = analyzeResponse.tokens();
-        System.out.println();
-    }
-}

+ 198 - 142
search/search-service/src/test/java/NginxLogTest.java

@@ -1,20 +1,21 @@
 import cn.reghao.jutil.jdk.serializer.JsonConverter;
-import cn.reghao.jutil.jdk.string.SnowFlake;
-import cn.reghao.tnb.search.app.SearchApplication;
+import cn.reghao.tnb.search.app.config.AppProperties;
 import cn.reghao.tnb.search.app.es.*;
 import cn.reghao.jutil.jdk.web.log.NginxLog;
+import co.elastic.clients.elasticsearch.ElasticsearchClient;
+import co.elastic.clients.elasticsearch._types.SortOrder;
 import co.elastic.clients.elasticsearch._types.mapping.Property;
+import co.elastic.clients.elasticsearch._types.query_dsl.Query;
+import co.elastic.clients.elasticsearch._types.query_dsl.RangeQuery;
+import co.elastic.clients.elasticsearch.indices.AnalyzeRequest;
+import co.elastic.clients.elasticsearch.indices.AnalyzeResponse;
+import co.elastic.clients.elasticsearch.indices.analyze.AnalyzeToken;
+import co.elastic.clients.json.JsonData;
 import lombok.extern.slf4j.Slf4j;
 import org.junit.jupiter.api.Test;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.test.context.ActiveProfiles;
 
-import java.io.File;
-import java.io.FileInputStream;
 import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
+import java.io.RandomAccessFile;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -24,154 +25,209 @@ import java.util.Map;
  * @date 2025-07-03 15:56:02
  */
 @Slf4j
-@ActiveProfiles("dev")
-@SpringBootTest(classes = SearchApplication.class)
 public class NginxLogTest {
-    @Autowired
-    QueryService<NginxLog> queryService;
-    @Autowired
-    SearchService searchService;
-    @Autowired
-    DocumentService documentService;
-    @Autowired
-    IndexService indexService;
-    @Autowired
-    MappingService mappingService;
-    @Autowired
-    ElasticService elasticService;
+    ElasticService getElasticService() {
+        AppProperties appProperties = new AppProperties();
+        appProperties.setEsHost("192.168.0.81");
+        appProperties.setEsPort(9200);
+        appProperties.setEsUsername("elastic");
+        appProperties.setEsPassword("VLTtN03SSJ4lsyyg56kf");
+
+        ElasticService elasticService = new ElasticService(appProperties);
+        return elasticService;
+    }
+
+    @Test
+    public void logTest() throws Exception {
+        ElasticService elasticService = getElasticService();
+        SearchService searchService = new SearchService(elasticService);
+
+        int pageSize = 1000;
+        int pageNumber = 1;
+
+        String fieldName = "url.raw";
+        String fieldValue = "/datareceive/ReceiveData/SendContentResult";
+        String sortField = "timeIso8601";
+        //sortField = "requestTime";
+        SortOrder sortOrder = SortOrder.Desc;
+
+        String startDate = "2023-11-08";
+        String endDate = "2024-05-20";
+        String start1 = String.format("%s 00:00:00", startDate);
+        String end1 = String.format("%s 23:59:59", startDate);
+        Query dateQuery0 = RangeQuery.of(q -> q.field("timeIso8601")
+                .gte(JsonData.of(start1)).lte(JsonData.of(end1)).format("yyyy-MM-dd HH:mm:ss"))._toQuery();
+
+        Query combinedQuery = Query.of(q -> q.matchAll(m -> m));
+        if (!fieldValue.isBlank()) {
+            Query termQuery = EsQuery.getTermQuery(fieldName, fieldValue);
+            combinedQuery = dateQuery0;
+        }
+        List<NginxLog> list0 = searchService.searchByPage(indexName, pageSize, pageNumber, combinedQuery, sortField, sortOrder);
+
+        String dateTimeFormat = "yyyy-MM-dd HH:mm:ss";
+        String timeZone = "+08:00";
+        String dateField = "timeIso8601";
+        Query dateQuery = RangeQuery.of(q -> q.field(dateField)
+                .from(start1).to(end1).format(dateTimeFormat).timeZone(timeZone))._toQuery();
+        String aggregateField = "timeIso8601";
+        Map<String, Long> groupMap2 = searchService.aggregateByQuery(indexName, aggregateField, dateQuery);
+        System.out.println();
+    }
+
+    String indexName = "nginx_log";
+    @Test
+    public void aggregateTest() throws Exception {
+        ElasticService elasticService = getElasticService();
+        SearchService searchService = new SearchService(elasticService);
+
+        Query query1 = Query.of(q -> q.matchAll(m -> m));
+        String aggregateField = "timeIso8601";
+        Map<String, Long> groupMap1 = searchService.aggregateByDay(indexName, aggregateField, query1);
+
+        String dateField = "timeIso8601";
+        String startDateTime = "2023-11-09 00:00:00";
+        String endDateTime = "2023-11-09 23:59:59";
+        String start = startDateTime.replace(" ", "T");
+        String end = endDateTime.replace(" ", "T");
+        RangeQuery dateQuery = RangeQuery.of(q -> q.field(dateField).gte(JsonData.of(start)).lte(JsonData.of(end)));
+        Query query = dateQuery._toQuery();
+
+        //aggregateField = "httpUserAgent.raw";
+        aggregateField = "timeIso8601";
+        Map<String, Long> groupMap2 = searchService.aggregateByQuery(indexName, aggregateField, query);
+
+        aggregateField = "methodUrl.raw";
+        Map<String, Long> groupMap3 = searchService.aggregateByQuery(indexName, aggregateField, query);
+
+        /*String windows = "Windows NT";
+        String linux = "Linux ";
+        String mac = "Macintosh";
+        String ipad = "iPad";
+        String iphone = "iPhone";
+        String android = "Android";
+        String client = "client";
+        String others = "others";
+        Map<String, Integer> mapCount = new HashMap<>();
+        mapCount.put(windows, 0);
+        mapCount.put(linux, 0);
+        mapCount.put(mac, 0);
+        mapCount.put(ipad, 0);
+        mapCount.put(iphone, 0);
+        mapCount.put(android, 0);
+        mapCount.put(client, 0);
+        mapCount.put(others, 0);
+
+        groupMap2.keySet().forEach(userAgent -> {
+            if (userAgent.contains(windows)) {
+                incr(windows, mapCount);
+            } else if (userAgent.contains(linux)) {
+                incr(linux, mapCount);
+            } else if (userAgent.contains(mac)) {
+                incr(mac, mapCount);
+            } else if (userAgent.contains(ipad)) {
+                incr(ipad, mapCount);
+            } else if (userAgent.contains(iphone)) {
+                incr(iphone, mapCount);
+            } else if (userAgent.contains(android)) {
+                incr(android, mapCount);
+            } else if (userAgent.contains(client) || userAgent.contains("Client")) {
+                incr(client, mapCount);
+            } else {
+                incr(others, mapCount);
+            }
+        });*/
+        System.out.println();
+    }
+
+    private void incr(String key, Map<String, Integer> map) {
+        map.put(key, map.get(key)+1);
+    }
 
     @Test
-    public void indexTest() throws IOException {
-        String indexName = "nginx_log";
+    public void esDocTest() {
+        ElasticService elasticService = getElasticService();
+        SearchService searchService = new SearchService(elasticService);
+
+        Query combinedQuery = Query.of(q -> q.matchAll(m -> m));
+        long total = searchService.count(indexName, combinedQuery);
+        log.info("Total documents of {}: {}", indexName, total);
+
+        DocumentService documentService = new DocumentService(elasticService);
+        //documentService.deleteAllDocument(indexName);
+    }
+
+    @Test
+    public void esIndexTest() throws IOException {
+        ElasticService elasticService = getElasticService();
+        IndexService indexService = new IndexService(elasticService);
+        MappingService mappingService = new MappingService();
+
         indexService.deleteIndex(indexName);
         Map<String, Property> propertyMap = mappingService.getPropertyMapWithNginxLog(NginxLog.class);
         indexService.createIndex(indexName, propertyMap);
+    }
 
-        //Map<String, Property> propertyMap = mappingService.getVideoTextPropertyMap();
-//        indexService.deleteIndex(index);
-//        indexService.createIndex(index, propertyMap);
-        //indexService.getIndex(index);
-        //indexService.deleteIndex(indexName);
-        //indexService.getIndex(index);
-        //indexService.updateMapping(index);
-        //searchService.searchAll(index);
+    @Test
+    public void nginxLogTest() throws IOException {
+        ElasticService elasticService = getElasticService();
+        DocumentService documentService = new DocumentService(elasticService);
+        String filePath = "/home/reghao/Downloads/access-20231107_073356-20240905_165944.log";
+        readFile(filePath, indexName, documentService);
     }
 
-    static SnowFlake idGenerator = new SnowFlake(1, 1);
-    static long total = 0L;
-    public static void readFileFileChannel(File file, String index, DocumentService documentService) {
-        List<String> lines = new ArrayList<>();
+    int total = 0;
+    void readFile(String filePath, String index, DocumentService documentService) throws IOException {
         List<NginxLog> nginxLogs = new ArrayList<>();
-        try {
-            FileInputStream fis = new FileInputStream(file);
-            FileChannel fileChannel = fis.getChannel();
-
-            // 10MB
-            int capacity = 10*1024*1024;
-            ByteBuffer byteBuffer = ByteBuffer.allocate(capacity);
-            StringBuffer buffer = new StringBuffer();
-            while(fileChannel.read(byteBuffer) != -1) {
-                //读取后,将位置置为0,将limit置为容量, 以备下次读入到字节缓冲中,从0开始存储
-                byteBuffer.clear();
-                byte[] bytes = byteBuffer.array();
-
-                String str = new String(bytes);
-                buffer.append(str);
-                String[] strArray = buffer.toString().split(System.lineSeparator());
-                for (int i = 0; i < strArray.length-1; i++) {
-                    try {
-                        NginxLog nginxLog = JsonConverter.jsonToObject(strArray[i], NginxLog.class);
-                        nginxLogs.add(nginxLog);
-                    } catch (Exception e) {
-                        lines.add(strArray[i]);
-                        //e.printStackTrace();
-                    }
-                }
-
-                String lastLine = strArray[strArray.length-1];
-                if (!lastLine.endsWith("}")) {
-                    buffer = new StringBuffer();
-                    buffer.append(strArray[strArray.length-1]);
-                }
-
-                while (nginxLogs.size() > 10_000) {
-                    nginxLogs.forEach(nginxLog -> nginxLog.setId(idGenerator.nextId()+""));
-                    //NginxLog nginxLog = nginxLogs.get(0);
-                    //documentService.update(index, nginxLog);
-
-                    documentService.batchAddDocument(index, nginxLogs);
-                    log.info("save {} nginxLogs", nginxLogs.size());
-                    total += nginxLogs.size();
-                    nginxLogs.clear();
-                }
+        RandomAccessFile raf  = new RandomAccessFile(filePath, "r");
+        String line = raf.readLine();
+        while (line != null) {
+            NginxLog nginxLog0 = JsonConverter.jsonToObject(line, NginxLog.class);
+            nginxLogs.add(nginxLog0);
+
+            if (nginxLogs.size() > 10_000) {
+                total += nginxLogs.size();
+                nginxLogs.forEach(nginxLog -> {
+                    String timeIso8601 = nginxLog.getTimeIso8601();
+                    // 日期时间格式 yyyy-MM-ddTHH:mm:ss
+                    String dateTimeStr = timeIso8601.replace("+08:00", "");
+                    nginxLog.setTimeIso8601(dateTimeStr);
+
+                    String method = nginxLog.getRequestMethod();
+                    String url = nginxLog.getUrl();
+                    String methodUrl = String.format("%s %s", method, url);
+                    nginxLog.setMethodUrl(methodUrl);
+                });
+
+                documentService.batchAddDocument(index, nginxLogs);
+                log.info("save {} nginxLogs", nginxLogs.size());
+                nginxLogs.clear();
             }
-        } catch (IOException e) {
-            e.printStackTrace();
-        } finally {
-            // TODO close 处理
-        }
-    }
 
-    @Test
-    public void addNginxLogTest() {
-        String index = "nginx_log";
-        String baseDir = "/home/reghao/work/azy/data/konglog";
-        File dir = new File(baseDir);
-        for (File file : dir.listFiles()) {
-            readFileFileChannel(file, index, documentService);
-            log.info("total -> {}", total);
+            if (total > 1_000_000) {
+                break;
+            }
+            line = raf.readLine();
         }
-        log.info("total -> {}", total);
     }
 
     @Test
-    public void nginxLogTest() throws Exception {
-        String index = "nginx_log";
-        int pageSize = 1000;
-        int pageNumber = 1;
-        String fieldName = "url.raw";
-        String fieldValue = "/datareceive/ReceiveData/SendContentResult";
-        //List<NginxLog> list = searchService.searchByPage(index, pageSize, pageNumber, fieldName, fieldValue);
-
-        String aggregateField = "url.raw";
-        aggregateField = "timeIso8601";
-        //searchService.aggregate(index, aggregateField);
-
-        //AppProperties elasticProperties = new AppProperties();
-        //ElasticService elasticService = new ElasticService(elasticProperties);
-        //IndexService indexService = new IndexService(elasticService);
-        //MappingService mappingService = new MappingService();
-
-        //NginxLogDocument nginxLogDocument = new NginxLogDocument(elasticService);
-        //NginxLogSearch nginxLogSearch = new NginxLogSearch(elasticService);
-        //QueryService<NginxLog> queryService = new QueryService<>(elasticService);
-        //indexService.getIndex(index);
-        //indexService.getMapping(index);
-
-        //documentService.deleteAllDocument(index);
-        //Map<String, Property> propertyMap = mappingService.getPropertyMapWithNginxLog(NginxLog.class);
-        //indexService.deleteIndex(index);
-        //indexService.createIndex(index, propertyMap);
-
-        //searchService.search(index);
-//        search1(esClient, "app_log");
-//        index = "app_log";
-//        deleteAll(index);
-//        searchService.aggregate(index);
-
-        /*int pn = 1;
-        while (pn < 100) {
-            List<NginxLog> list = searchService.searchByPage(index, pn, "", "");
-            System.out.println();
-            pn++;
-        }*/
-
-        //searchService.searchAll(index);
-//        indexService.updateMapping(index);
-
-        //String queryString = "content";
-        //List<NginxLog> list = queryService.queryWithHighlight(index, queryString, pn, ps, NginxLog.class);
-        //searchService.searchAll(index);
-        //nginxLogSearch.aggregate();
+    public void analyzerTest() throws IOException {
+        ElasticService elasticService = getElasticService();
+        // es 默认的标准分词器
+        String analyzer = "standard";
+        // ik 的两种分词器
+        //analyzer = "ik_max_word";
+        analyzer = "ik_smart";
+
+        String text = "中华人民共和国国歌";
+        AnalyzeRequest analyzeRequest = new AnalyzeRequest.Builder()
+                .text(text)
+                .analyzer(analyzer)
+                .build();
+        ElasticsearchClient esClient = elasticService.getElasticsearchClient();
+        AnalyzeResponse analyzeResponse = esClient.indices().analyze(analyzeRequest);
+        List<AnalyzeToken> tokens = analyzeResponse.tokens();
+        System.out.println();
     }
 }