ソースを参照

1.添加 @DateVerify 时间日期参数校验注解
2.es 的 date 类型字段格式为 yyyy-MM-ddTHH:mm:ss+08:00
3.更新 ElasticService

reghao 2 ヶ月 前
コミット
6341c6dfe4

+ 15 - 8
search/search-service/src/main/java/cn/reghao/tnb/search/app/es/ElasticService.java

@@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
 @Service
 public class ElasticService {
     private final ElasticsearchClient esClient;
+    private final ElasticsearchAsyncClient asyncClient;
 
     public ElasticService(AppProperties appProperties) {
         String host = appProperties.getEsHost();
@@ -31,9 +32,10 @@ public class ElasticService {
         String username = appProperties.getEsUsername();
         String password = appProperties.getEsPassword();
         this.esClient = getElasticsearchClient(host, port, username, password);
+        this.asyncClient = getElasticsearchAsyncClient(host, port, username, password);
     }
 
-    private ElasticsearchClient getElasticsearchClient(String host, int port, String username, String password) {
+    private ElasticsearchTransport getElasticsearchTransport(String host, int port, String username, String password) {
         CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
         credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
 
@@ -55,18 +57,19 @@ public class ElasticService {
 
         // 创建一个 Transport 通信和一个 JacksonJsonpMapper 序列化实例
         ElasticsearchTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
+        return transport;
+    }
+
+    private ElasticsearchClient getElasticsearchClient(String host, int port, String username, String password) {
+        ElasticsearchTransport transport = getElasticsearchTransport(host, port, username, password);
         // 得到一个 es 客户端
         ElasticsearchClient client = new ElasticsearchClient(transport);
         return client;
     }
 
-    private ElasticsearchAsyncClient getElasticsearchAsyncClient(String host, int port) {
-        //创建 low-level client
-        RestClient restClient = RestClient.builder(new HttpHost(host, port)).build();
-        //创建一个Transport通信 和 一个JacksonJsonpMapper序列化实例
-        ElasticsearchTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
-
-        //得到一个es客户端
+    private ElasticsearchAsyncClient getElasticsearchAsyncClient(String host, int port, String username, String password) {
+        ElasticsearchTransport transport = getElasticsearchTransport(host, port, username, password);
+        //得到一个 es 异步客户端
         ElasticsearchAsyncClient client = new ElasticsearchAsyncClient(transport);
         return client;
     }
@@ -75,6 +78,10 @@ public class ElasticService {
         return this.esClient;
     }
 
+    public ElasticsearchAsyncClient getElasticsearchAsyncClient() {
+        return this.asyncClient;
+    }
+
     public void close() {
     }
 }

+ 3 - 1
search/search-service/src/main/java/cn/reghao/tnb/search/app/es/SearchService.java

@@ -31,6 +31,7 @@ import java.util.stream.Collectors;
 @Slf4j
 @Service
 public class SearchService {
+    private final String timeZone = "+08:00";
     private final ElasticsearchClient esClient;
 
     public SearchService(ElasticService elasticService)  {
@@ -271,7 +272,8 @@ public class SearchService {
                 .size(0)
                 .query(query)
                 // 按天进行聚合
-                .aggregations("agg1", a->a.dateHistogram(t->t.field(aggregateField).calendarInterval(CalendarInterval.Day).minDocCount(0)))
+                .aggregations("agg1", a->a.dateHistogram(t -> t.field(aggregateField)
+                        .calendarInterval(CalendarInterval.Day).timeZone(timeZone).minDocCount(0)))
                 .build();
 
         SearchResponse<NginxLog> searchResponse = esClient.search(searchRequest, NginxLog.class);

+ 7 - 4
search/search-service/src/main/java/cn/reghao/tnb/search/app/log/model/dto/DateTimeRange.java

@@ -1,5 +1,6 @@
 package cn.reghao.tnb.search.app.log.model.dto;
 
+import cn.reghao.tnb.search.app.model.DateVerify;
 import jakarta.validation.constraints.NotBlank;
 import jakarta.validation.constraints.Size;
 import lombok.AllArgsConstructor;
@@ -17,9 +18,11 @@ import lombok.Setter;
 @Getter
 public class DateTimeRange {
     @NotBlank
-    @Size(min = 19, max = 19, message = "日期时间格式 yyyy-mm-dd hh:mm:ss")
-    private String startDateTime;
+    //@Size(min = 19, max = 19, message = "日期时间格式 yyyy-mm-dd hh:mm:ss")
+    @DateVerify(dateFormat = "yyyy-MM-dd HH:mm:ss")
+    private String start;
     @NotBlank
-    @Size(min = 19, max = 19, message = "日期时间格式 yyyy-mm-dd hh:mm:ss")
-    private String endDateTime;
+    //@Size(min = 19, max = 19, message = "日期时间格式 yyyy-mm-dd hh:mm:ss")
+    @DateVerify(dateFormat = "yyyy-MM-dd HH:mm:ss")
+    private String end;
 }

+ 1 - 1
search/search-service/src/main/java/cn/reghao/tnb/search/app/log/mq/KafkaSub.java

@@ -68,7 +68,7 @@ public class KafkaSub {
                         NginxLog nginxLog = JsonConverter.jsonToObject(value, NginxLog.class);
                         list.add(nginxLog);
                     }
-                    nginxLogService.saveNginxLogs(list);
+                    nginxLogService.processNginxLogs(list);
                     consumer.commitAsync();
                 }
             } catch (CommitFailedException e) {

+ 10 - 19
search/search-service/src/main/java/cn/reghao/tnb/search/app/log/service/NginxLogDocument.java

@@ -1,6 +1,5 @@
 package cn.reghao.tnb.search.app.log.service;
 
-import cn.reghao.jutil.jdk.string.SnowFlake;
 import cn.reghao.jutil.jdk.web.log.NginxLog;
 import cn.reghao.tnb.search.app.es.ElasticService;
 import co.elastic.clients.elasticsearch.ElasticsearchAsyncClient;
@@ -24,22 +23,22 @@ import java.util.List;
 @Service
 public class NginxLogDocument {
     private final ElasticsearchClient esClient;
-    private final SnowFlake idGenerator;
+    private final ElasticsearchAsyncClient asyncClient;
 
     public NginxLogDocument(ElasticService elasticService) {
         this.esClient = elasticService.getElasticsearchClient();
-        this.idGenerator = new SnowFlake(1, 1);
+        this.asyncClient = elasticService.getElasticsearchAsyncClient();
     }
 
-    public void addDocument(String indexName, NginxLog product) throws IOException {
-        IndexResponse indexResponse = esClient.index(i -> i.index(indexName).id(product.getId()).document(product));
+    public void save(String indexName, NginxLog nginxLog) throws IOException {
+        IndexResponse indexResponse = esClient.index(i -> i.index(indexName).id(nginxLog.getId()).document(nginxLog));
         String result = indexResponse.result().jsonValue();
         if (!"created".equals(result)) {
             log.error("add one document result: {}", indexResponse.result().jsonValue());
         }
     }
 
-    public void batchAddDocument(String indexName, List<NginxLog> nginxLogs) throws IOException {
+    public void saveAll(String indexName, List<NginxLog> nginxLogs) throws IOException {
         List<BulkOperation> bulkOperations = new ArrayList<>();
         nginxLogs.forEach(p -> bulkOperations.add(BulkOperation.of(b -> b.index(c -> c.id(p.getId()).document(p)))));
         BulkResponse bulkResponse = esClient.bulk(s -> s.index(indexName).operations(bulkOperations));
@@ -48,22 +47,14 @@ public class NginxLogDocument {
         }
     }
 
-    public void saveAll(String index, List<NginxLog> nginxLogs) throws IOException {
-        for (NginxLog nginxLog : nginxLogs) {
-            IndexResponse response = esClient.index(i -> i.index(index).document(nginxLog).id("" + idGenerator.nextId()));
-            Result result = response.result();
-            System.out.println(result.jsonValue());
-        }
-    }
-
     public void updateDocument(String indexName, NginxLog product) throws IOException {
         UpdateResponse<NginxLog> updateResponse = esClient.update(s -> s.index(indexName).id(product.getId()).doc(product), NginxLog.class);
         log.info("update doc result: {}", updateResponse.result());
     }
 
-    public void update(String index, NginxLog nginxLog) throws IOException {
+    public void update(String indexName, NginxLog nginxLog) throws IOException {
         String id = nginxLog.getId();
-        IndexResponse response = esClient.index(i -> i.index(index).document(nginxLog).id(id));
+        IndexResponse response = esClient.index(i -> i.index(indexName).document(nginxLog).id(id));
         Result result = response.result();
         System.out.println(result.jsonValue());
     }
@@ -96,7 +87,7 @@ public class NginxLogDocument {
         }
     }
 
-    public void batchDeleteDocument(String indexName, List<String> ids) {
+    public void batchDelete(String indexName, List<String> ids) {
         List<BulkOperation> bulkOperations = new ArrayList<>();
         ids.forEach(a -> bulkOperations.add(BulkOperation.of(b -> b.delete(c -> c.id(a)))));
         try {
@@ -108,9 +99,9 @@ public class NginxLogDocument {
         }
     }
 
-    public void delete(ElasticsearchAsyncClient client, String index) throws IOException {
+    public void delete(String indexName) throws IOException {
         int id = 1;
-        client.delete(i -> i.index(index).id("" + id)).whenComplete((success, failure)->{
+        asyncClient.delete(i -> i.index(indexName).id("" + id)).whenComplete((success, failure)->{
             System.out.println(success.index());
             System.out.println(success.version());
         });

+ 28 - 38
search/search-service/src/main/java/cn/reghao/tnb/search/app/log/service/NginxLogService.java

@@ -3,6 +3,7 @@ package cn.reghao.tnb.search.app.log.service;
 import cn.reghao.jutil.jdk.converter.DateTimeConverter;
 import cn.reghao.jutil.jdk.io.TextFile;
 import cn.reghao.jutil.jdk.serializer.JsonConverter;
+import cn.reghao.jutil.jdk.string.SnowFlake;
 import cn.reghao.jutil.jdk.web.log.NginxLog;
 import cn.reghao.tnb.common.db.SelectOption;
 import cn.reghao.tnb.search.app.config.AppProperties;
@@ -17,7 +18,6 @@ import cn.reghao.tnb.search.app.log.model.vo.GroupCount;
 import co.elastic.clients.elasticsearch._types.SortOrder;
 import co.elastic.clients.elasticsearch._types.query_dsl.Query;
 import co.elastic.clients.elasticsearch._types.query_dsl.RangeQuery;
-import co.elastic.clients.json.JsonData;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 import lombok.extern.slf4j.Slf4j;
@@ -37,13 +37,14 @@ import java.util.stream.Collectors;
 @Slf4j
 @Service
 public class NginxLogService {
-    private final String indexName = "nginx_log";
+    private final String indexName = "nginx_log1";
     private final String dateTimeFormat = "yyyy-MM-dd HH:mm:ss";
     private final String timeZone = "+08:00";
     private final NginxLogDocument nginxLogDocument;
     private final SearchService searchService;
     private final GeoIpTool geoIpTool;
     private final String geoJson;
+    private final SnowFlake idGenerator;
 
     public NginxLogService(NginxLogDocument nginxLogDocument, SearchService searchService,
                            GeoIpTool geoIpTool, AppProperties appProperties) {
@@ -51,39 +52,26 @@ public class NginxLogService {
         this.searchService = searchService;
         this.geoIpTool = geoIpTool;
         this.geoJson = new TextFile().readFile(appProperties.getGeojsonPath());
+        this.idGenerator = new SnowFlake(1, 1);
     }
 
-    public void processNginxLog(NginxLog nginxLog) {
-        saveNginxLog(nginxLog);
-    }
-
-    public void saveNginxLogs(List<NginxLog> list) {
+    public void processNginxLogs(List<NginxLog> list) {
         if (list.isEmpty()) {
             return;
         }
 
         list.forEach(nginxLog -> {
+            // es 的日期时间格式 yyyy-MM-ddTHH:mm:ss+08:00
             String timeIso8601 = nginxLog.getTimeIso8601();
-            // es 的日期时间格式 yyyy-MM-ddTHH:mm:ss
-            String dateTimeStr = timeIso8601.replace("+08:00", "");
-            nginxLog.setTimeIso8601(dateTimeStr);
-
             String method = nginxLog.getRequestMethod();
             String url = nginxLog.getUrl();
             String methodUrl = String.format("%s %s", method, url);
             nginxLog.setMethodUrl(methodUrl);
+            nginxLog.setId(idGenerator.nextId() + "");
         });
 
         try {
-            nginxLogDocument.batchAddDocument(indexName, list);
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-    }
-
-    private void saveNginxLog(NginxLog nginxLog) {
-        try {
-            nginxLogDocument.addDocument(indexName, nginxLog);
+            nginxLogDocument.saveAll(indexName, list);
         } catch (IOException e) {
             e.printStackTrace();
         }
@@ -107,9 +95,9 @@ public class NginxLogService {
     }
 
     public List<Object> getNginxLog2(DateTimeRange dateTimeRange) throws Exception {
-        String start = dateTimeRange.getStartDateTime();
+        String start = dateTimeRange.getStart();
         LocalDateTime start1 = DateTimeConverter.localDateTime2(start);
-        String end = dateTimeRange.getEndDateTime();
+        String end = dateTimeRange.getEnd();
         LocalDateTime end1 = DateTimeConverter.localDateTime2(end);
 
         Duration duration = Duration.between(start1, end1);
@@ -133,7 +121,7 @@ public class NginxLogService {
 
         String dateField = "timeIso8601";
         Query dateQuery = RangeQuery.of(q -> q.field(dateField)
-                .from(start1).to(end1).format(dateTimeFormat))._toQuery();
+                .from(start1).to(end1).format(dateTimeFormat).timeZone(timeZone))._toQuery();
         Query termQuery = EsQuery.getTermQuery("status", "200");
         Query combinedQuery = Query.of(q -> q.bool(b -> b.filter(termQuery).filter(dateQuery)));
 
@@ -158,7 +146,7 @@ public class NginxLogService {
         String end1 = String.format("%s 23:59:59", endDate);
         String dateField = "timeIso8601";
         Query dateQuery = RangeQuery.of(q -> q.field(dateField)
-                .from(start1).to(end1).format(dateTimeFormat))._toQuery();
+                .from(start1).to(end1).format(dateTimeFormat).timeZone(timeZone))._toQuery();
         return searchService.count(indexName, dateQuery);
     }
 
@@ -197,13 +185,18 @@ public class NginxLogService {
         return results;
     }
 
+    /**
+     * 按字段和时间范围聚合
+     *
+     * @param
+     * @return
+     * @date 2026-01-09 21:26:58
+     */
     public List<GroupCount> getChartData3(String aggregateField, String dateStr) throws Exception {
         String dateField = "timeIso8601";
         DateTimeRange dateTimeRange = getDateTimeRange(dateStr);
-        String startDateTime = dateTimeRange.getStartDateTime();
-        String endDateTime = dateTimeRange.getEndDateTime();
-        String start = startDateTime;
-        String end = endDateTime;
+        String start = dateTimeRange.getStart();
+        String end = dateTimeRange.getEnd();
         Query dateQuery = RangeQuery.of(q -> q.field(dateField)
                 .from(start).to(end).format(dateTimeFormat).timeZone(timeZone))._toQuery();
         Map<String, Long> groupMap = searchService.aggregateByQuery(indexName, aggregateField, dateQuery);
@@ -227,10 +220,8 @@ public class NginxLogService {
         int deep = 1;
         String dateField = "timeIso8601";
         DateTimeRange dateTimeRange = getDateTimeRange(dateStr);
-        String startDateTime = dateTimeRange.getStartDateTime();
-        String endDateTime = dateTimeRange.getEndDateTime();
-        String start = startDateTime;
-        String end = endDateTime;
+        String start = dateTimeRange.getStart();
+        String end = dateTimeRange.getEnd();
         Query dateQuery = RangeQuery.of(q -> q.field(dateField)
                 .from(start).to(end).format(dateTimeFormat).timeZone(timeZone))._toQuery();
         Map<String, Long> groupMap1 = searchService.aggregateByQuery(indexName, aggregateField, dateQuery);
@@ -303,7 +294,7 @@ public class NginxLogService {
         String startDate = localDate.minusDays(7).toString();
         String endDate = localDate.toString();
 
-        String start1 = String.format("%s 00:00:00", startDate);
+        String start1 = String.format("%s 00:00:00", endDate);
         String end1 = String.format("%s 23:59:59", endDate);
         return new DateTimeRange(start1, end1);
     }
@@ -312,14 +303,13 @@ public class NginxLogService {
         String aggregateField = "timeIso8601";
         String fieldName = "methodUrl.raw";
 
-        Query query = Query.of(q -> q.matchAll(m -> m));
         DateTimeRange dateTimeRange = getDateTimeRange(dateStr);
-        String start1 = dateTimeRange.getStartDateTime();
-        String end1 = dateTimeRange.getEndDateTime();
-
+        String start = dateTimeRange.getStart();
+        String end = dateTimeRange.getEnd();
         String dateField = "timeIso8601";
         Query dateQuery = RangeQuery.of(q -> q.field(dateField)
-                .from(start1).to(end1).format(dateTimeFormat).timeZone(timeZone))._toQuery();
+                .from(start).to(end).format(dateTimeFormat).timeZone(timeZone))._toQuery();
+
         Query combinedQuery;
         if (fieldValue != null && !fieldValue.isBlank()) {
             Query termQuery = EsQuery.getTermQuery(fieldName, fieldValue);

+ 35 - 0
search/search-service/src/main/java/cn/reghao/tnb/search/app/model/DateVerify.java

@@ -0,0 +1,35 @@
+package cn.reghao.tnb.search.app.model;
+
+import jakarta.validation.Constraint;
+import jakarta.validation.Payload;
+
+import java.lang.annotation.*;
+
+/**
+ * 日期时间类型校验注解
+ *
+ * @author reghao
+ * @date 2026-01-09 17:28:42
+ */
+@Target({ElementType.FIELD})
+@Retention(RetentionPolicy.RUNTIME)
+@Documented
+@Constraint(validatedBy = {DateVerifyValidator.class}) // 指定自定义的校验器
+public @interface DateVerify {
+
+    // 提示信息
+    String message() default "日期格式不正确";
+
+    // 日期时间格式
+    String dateFormat() default "yyyy-MM-dd HH:mm:ss";
+
+    /**
+     * 必须包含以下两个属性
+     * 否则会报错 error msg: contains Constraint annotation, but does not contain a groups parameter.
+     *
+     * @return
+     */
+    Class<?>[] groups() default {};
+
+    Class<? extends Payload>[] payload() default {};
+}

+ 52 - 0
search/search-service/src/main/java/cn/reghao/tnb/search/app/model/DateVerifyValidator.java

@@ -0,0 +1,52 @@
+package cn.reghao.tnb.search.app.model;
+
+import jakarta.validation.ConstraintValidator;
+import jakarta.validation.ConstraintValidatorContext;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+
+/**
+ * 日期时间类型校验器
+ *
+ * @author reghao
+ * @date 2026-01-09 17:29:25
+ */
+public class DateVerifyValidator implements ConstraintValidator<DateVerify, String> {
+    private String dateFormat;
+
+    @Override
+    public void initialize(DateVerify obj) {
+        dateFormat = obj.dateFormat();
+    }
+
+    /**
+     * 对参数进行验证
+     *
+     * @param value   修饰字段的值
+     * @param context 上下文
+     * @return true:验证通过, false:验证不通过
+     */
+    @Override
+    public boolean isValid(String value, ConstraintValidatorContext context) {
+        return !value.isBlank() && isValidDate(value, dateFormat);
+    }
+
+    boolean isValidDate(String str, String dateFormat) {
+        boolean convertSuccess = true;
+        if (null != str && null != dateFormat && str.length() == dateFormat.length()) {
+            SimpleDateFormat format = new SimpleDateFormat(dateFormat);
+            try {
+                // 设置lenient为false. 否则SimpleDateFormat会比较宽松地验证日期,比如2007/02/29会被接受,并转换成2007/03/01
+                format.setLenient(false);
+                format.parse(str);
+            } catch (ParseException e) {
+                convertSuccess = false;
+            }
+        } else {
+            convertSuccess = false;
+        }
+
+        return convertSuccess;
+    }
+}

+ 1 - 1
search/search-service/src/main/resources/application-dev.yml

@@ -82,7 +82,7 @@ app:
   es-password: VLTtN03SSJ4lsyyg56kf
   native-lucene-dir: /opt/data/search_data/native_lucene
   hibernate-lucene-dir: /opt/data/search_data/hibernate_lucene
-  kafka-uri: 127.0.0.1:9092
+  kafka-uri: 192.168.0.81:9092
   kafka-topic: NginxLog
   geoip-path: /home/reghao/Downloads/qqwry.dat
   geojson-path: /home/reghao/Downloads/china1.json