reghao преди 2 месеца
родител
ревизия
5eee9cb836

+ 48 - 50
search/search-service/src/main/java/cn/reghao/tnb/search/app/es/DocumentService.java

@@ -1,11 +1,9 @@
 package cn.reghao.tnb.search.app.es;
 
 import cn.reghao.jutil.jdk.web.log.NginxLog;
-import cn.reghao.tnb.search.app.model.po.Wenshu;
 import cn.reghao.jutil.jdk.string.SnowFlake;
 import co.elastic.clients.elasticsearch.ElasticsearchAsyncClient;
 import co.elastic.clients.elasticsearch.ElasticsearchClient;
-import co.elastic.clients.elasticsearch._types.Result;
 import co.elastic.clients.elasticsearch._types.query_dsl.MatchAllQuery;
 import co.elastic.clients.elasticsearch.core.*;
 import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
@@ -15,6 +13,7 @@ import org.springframework.stereotype.Service;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 
 /**
  * @author reghao
@@ -24,61 +23,62 @@ import java.util.List;
 @Service
 public class DocumentService {
     private final ElasticsearchClient esClient;
+    private final ElasticsearchAsyncClient asyncClient;
     private final SnowFlake idGenerator;
 
     public DocumentService(ElasticService elasticService) {
         this.esClient = elasticService.getElasticsearchClient();
+        this.asyncClient = elasticService.getElasticsearchAsyncClient();
         this.idGenerator = new SnowFlake(1, 1);
     }
 
-    public void addDocument(String indexName, NginxLog product) throws IOException {
-        IndexResponse indexResponse = esClient.index(i -> i.index(indexName).id(product.getId()).document(product));
-        log.info("add one document result: {}", indexResponse.result().jsonValue());
+    public void save(String indexName, NginxLog nginxLog) throws IOException {
+        long id = idGenerator.nextId();
+        IndexResponse indexResponse = esClient.index(i -> i.index(indexName).id(id + "").document(nginxLog));
+        //log.info("add one document result: {}", indexResponse.result().jsonValue());
     }
 
-    public void batchAddDocument(String indexName, List<NginxLog> nginxLogs) throws IOException {
+    public void saveAll(String indexName, List<NginxLog> nginxLogs) throws IOException {
         List<BulkOperation> bulkOperations = new ArrayList<>();
-        nginxLogs.forEach(p -> bulkOperations.add(BulkOperation.of(b -> b.index(c -> c.document(p)))));
+        nginxLogs.forEach(doc -> bulkOperations.add(BulkOperation.of(b -> b.index(c -> c.id(doc.getId()).document(doc)))));
         BulkResponse bulkResponse = esClient.bulk(s -> s.index(indexName).operations(bulkOperations));
         if (bulkResponse.errors()) {
-            bulkResponse.items().forEach(b -> log.error("bulk response result = {}", b.result()));
+            bulkResponse.items().forEach(b -> log.error("bulk response result = {}", b.error()));
         }
     }
 
-    public void batchAddWenshu(String indexName, List<Wenshu> wenshuList) throws IOException {
-        List<BulkOperation> bulkOperations = new ArrayList<>();
-        wenshuList.forEach(p -> bulkOperations.add(BulkOperation.of(b -> b.index(c -> c.id(p.getId()).document(p)))));
-        BulkResponse bulkResponse = esClient.bulk(s -> s.index(indexName).operations(bulkOperations));
-//        bulkResponse.items().forEach(b -> log.info("bulk response result = {}", b.result()));
-        //log.error("bulk response.error() = {}", bulkResponse.errors());
+    public void update(String indexName, NginxLog nginxLog) throws IOException {
+        String id = nginxLog.getId();
+        UpdateResponse<NginxLog> updateResponse = esClient.update(s -> s.index(indexName).id(id).doc(nginxLog), NginxLog.class);
+        //log.info("update doc result: {}", updateResponse.result());
     }
 
-    public void saveAll(String index, List<NginxLog> nginxLogs) throws IOException {
-        for (NginxLog nginxLog : nginxLogs) {
-            IndexResponse response = esClient.index(i -> i.index(index).document(nginxLog).id("" + idGenerator.nextId()));
-            Result result = response.result();
-            System.out.println(result.jsonValue());
+    public void deleteById(String indexName, String id) {
+        try {
+            DeleteResponse deleteResponse = esClient.delete(s -> s.index(indexName).id(id));
+            log.info("del doc result: {}", deleteResponse.result());
+        } catch (IOException e) {
+            log.error("del doc failed, error: ", e);
         }
     }
 
-    public void updateDocument(String indexName, NginxLog product) throws IOException {
-        UpdateResponse<NginxLog> updateResponse = esClient.update(s -> s.index(indexName).id(product.getId()).doc(product), NginxLog.class);
-        log.info("update doc result: {}", updateResponse.result());
-    }
-
-    public void update(String index, NginxLog nginxLog) throws IOException {
-        String id = nginxLog.getId();
-        IndexResponse response = esClient.index(i -> i.index(index).document(nginxLog).id(id));
-        Result result = response.result();
-        System.out.println(result.jsonValue());
+    public void deleteByIdAsync(String indexName) {
+        int id = 1;
+        asyncClient.delete(i -> i.index(indexName).id("" + id)).whenComplete((success, failure)->{
+            System.out.println(success.index());
+            System.out.println(success.version());
+        });
     }
 
-    public void deleteDocument(String indexName, String id) {
+    public void deleteByIds(String indexName, List<String> ids) {
+        List<BulkOperation> bulkOperations = new ArrayList<>();
+        ids.forEach(a -> bulkOperations.add(BulkOperation.of(b -> b.delete(c -> c.id(a)))));
         try {
-            DeleteResponse deleteResponse = esClient.delete(s -> s.index(indexName).id(id));
-            log.info("del doc result: {}", deleteResponse.result());
+            BulkResponse bulkResponse = esClient.bulk(a -> a.index(indexName).operations(bulkOperations));
+            bulkResponse.items().forEach(a -> log.info("batch del result: {}", a.result()));
+            log.error("batch del bulk resp errors: {}", bulkResponse.errors());
         } catch (IOException e) {
-            log.error("del doc failed, error: ", e);
+            log.error("batch del doc failed, error: ", e);
         }
     }
 
@@ -89,7 +89,7 @@ public class DocumentService {
      * @return
      * @date 2025-03-13 16:26:20
      */
-    public void deleteAllDocument(String indexName) {
+    public void deleteByIndexName(String indexName) {
         try {
             DeleteByQueryRequest deleteByQueryRequest = DeleteByQueryRequest.of(s -> s.index(indexName)
                     .query(m -> m.matchAll(new MatchAllQuery.Builder().build()))
@@ -101,23 +101,21 @@ public class DocumentService {
         }
     }
 
-    public void batchDeleteDocument(String indexName, List<String> ids) {
-        List<BulkOperation> bulkOperations = new ArrayList<>();
-        ids.forEach(a -> bulkOperations.add(BulkOperation.of(b -> b.delete(c -> c.id(a)))));
-        try {
-            BulkResponse bulkResponse = esClient.bulk(a -> a.index(indexName).operations(bulkOperations));
-            bulkResponse.items().forEach(a -> log.info("batch del result: {}", a.result()));
-            log.error("batch del bulk resp errors: {}", bulkResponse.errors());
-        } catch (IOException e) {
-            log.error("batch del doc failed, error: ", e);
-        }
-    }
+    public void deleteByIndexNameAsync(String indexName) {
+        DeleteByQueryRequest deleteByQueryRequest = DeleteByQueryRequest.of(s -> s.index(indexName)
+                .query(m -> m.matchAll(new MatchAllQuery.Builder().build()))
+        );
+        CompletableFuture<DeleteByQueryResponse> future = asyncClient.deleteByQuery(deleteByQueryRequest);
+        future.whenComplete((deleteByQueryResponse, throwable) -> {
+            if (throwable != null) {
+                log.error("{}", throwable.getMessage());
+            }
 
-    public void delete(ElasticsearchAsyncClient client, String index) throws IOException {
-        int id = 1;
-        client.delete(i -> i.index(index).id("" + id)).whenComplete((success, failure)->{
-            System.out.println(success.index());
-            System.out.println(success.version());
+            if (deleteByQueryResponse != null) {
+                deleteByQueryResponse.failures().forEach(failure -> log.error("delete doc result = {}", failure.toString()));
+            }
+
+            log.info("delete doc done...");
         });
     }
 }

+ 12 - 13
search/search-service/src/main/java/cn/reghao/tnb/search/app/es/QueryService.java

@@ -1,6 +1,5 @@
 package cn.reghao.tnb.search.app.es;
 
-import cn.reghao.tnb.content.api.constant.PostScope;
 import cn.reghao.tnb.search.app.model.vo.ElasticQuery;
 import co.elastic.clients.elasticsearch.ElasticsearchClient;
 import co.elastic.clients.elasticsearch._types.FieldValue;
@@ -199,20 +198,19 @@ public class QueryService<T> {
         return Page.empty();
     }
 
-    public Page<T> queryWithHighlight(String indexName, String fieldName, String fieldValue, int pn, int ps, Class<T> clazz) {
-        List<Integer> scopes = List.of();
-        int scope1 = PostScope.PROTECT.getCode();
-        int scope2 = PostScope.PUBLIC.getCode();
+    public Page<T> queryWithHighlight(String indexName, Map<String, String> map, String fieldName, String fieldValue, int pn, int ps, Class<T> clazz) {
+        String scopeField = "scope";
 
-        QueryStringQuery stringQuery = new QueryStringQuery.Builder()
-                // 查询的字段
-                .fields(fieldName)
-                .query(fieldValue)
-                .build();
+        QueryStringQuery.Builder builder0 = new QueryStringQuery.Builder();
+        map.forEach((fieldName0, fieldValue0) -> {
+            builder0.fields(fieldName0).query(fieldValue0);
+        });
+        QueryStringQuery stringQuery = builder0.build();
 
+        List<Integer> scopeValues = List.of(1, 2, 3);
         BoolQuery.Builder builder = new BoolQuery.Builder();
-        for (int scope : scopes) {
-            builder.should(m -> m.term(t -> t.field("scope").value(FieldValue.of(scope))));
+        for (int scope : scopeValues) {
+            builder.should(m -> m.term(t -> t.field(scopeField).value(FieldValue.of(scope))));
         }
         builder.must(m -> m.queryString(stringQuery));
         BoolQuery boolQuery = builder.build();
@@ -220,9 +218,10 @@ public class QueryService<T> {
                 .bool(boolQuery)
                 .build();
 
+        int scopeValue = 1;
         Query query1 = new Query.Builder()
                 .bool(BoolQuery.of(b -> b
-                        .must(m -> m.term(t -> t.field("scope").value(FieldValue.of(scope2))))
+                        .must(m -> m.term(t -> t.field(scopeField).value(FieldValue.of(scopeValue))))
                         .must(m -> m.queryString(stringQuery))))
                 .build();
 

+ 2 - 2
search/search-service/src/main/java/cn/reghao/tnb/search/app/rpc/DataSearchServiceImpl.java

@@ -3,8 +3,8 @@ package cn.reghao.tnb.search.app.rpc;
 import cn.reghao.tnb.search.api.dto.IndexCount;
 import cn.reghao.tnb.search.api.dto.VideoSummary;
 import cn.reghao.tnb.search.api.iface.DataSearchService;
-import cn.reghao.tnb.search.app.es.VideoTextDocument;
-import cn.reghao.tnb.search.app.es.VideoTextQuery;
+import cn.reghao.tnb.search.app.service.VideoTextDocument;
+import cn.reghao.tnb.search.app.service.VideoTextQuery;
 import cn.reghao.tnb.search.app.lucene.LuceneDocument;
 import cn.reghao.tnb.search.app.lucene.LuceneIndex;
 import cn.reghao.tnb.search.app.lucene.LuceneSearch;

+ 2 - 1
search/search-service/src/main/java/cn/reghao/tnb/search/app/es/VideoTextDocument.java → search/search-service/src/main/java/cn/reghao/tnb/search/app/service/VideoTextDocument.java

@@ -1,5 +1,6 @@
-package cn.reghao.tnb.search.app.es;
+package cn.reghao.tnb.search.app.service;
 
+import cn.reghao.tnb.search.app.es.ElasticService;
 import cn.reghao.tnb.search.app.model.po.VideoText;
 import co.elastic.clients.elasticsearch.ElasticsearchClient;
 import co.elastic.clients.elasticsearch._types.Result;

+ 2 - 1
search/search-service/src/main/java/cn/reghao/tnb/search/app/es/VideoTextQuery.java → search/search-service/src/main/java/cn/reghao/tnb/search/app/service/VideoTextQuery.java

@@ -1,5 +1,6 @@
-package cn.reghao.tnb.search.app.es;
+package cn.reghao.tnb.search.app.service;
 
+import cn.reghao.tnb.search.app.es.ElasticService;
 import cn.reghao.tnb.search.app.model.po.VideoText;
 import co.elastic.clients.elasticsearch.ElasticsearchClient;
 import co.elastic.clients.elasticsearch._types.FieldValue;

+ 3 - 1
search/search-service/src/main/java/cn/reghao/tnb/search/app/es/WenshuSearch.java → search/search-service/src/main/java/cn/reghao/tnb/search/app/service/WenshuSearch.java

@@ -1,5 +1,7 @@
-package cn.reghao.tnb.search.app.es;
+package cn.reghao.tnb.search.app.service;
 
+import cn.reghao.tnb.search.app.es.ElasticService;
+import cn.reghao.tnb.search.app.es.QueryService;
 import cn.reghao.tnb.search.app.model.po.Wenshu;
 import org.springframework.data.domain.Page;
 import org.springframework.data.domain.Pageable;

+ 5 - 3
search/search-service/src/test/java/NginxLogTest.java

@@ -145,7 +145,7 @@ public class NginxLogTest {
     }
 
     @Test
-    public void esDocTest() {
+    public void esDocTest() throws InterruptedException {
         ElasticService elasticService = getElasticService();
         SearchService searchService = new SearchService(elasticService);
 
@@ -154,7 +154,9 @@ public class NginxLogTest {
         log.info("Total documents of {}: {}", indexName, total);
 
         DocumentService documentService = new DocumentService(elasticService);
-        //documentService.deleteAllDocument(indexName);
+        //documentService.deleteByIndexName(indexName);
+        documentService.deleteByIndexNameAsync(indexName);
+        Thread.sleep(3600_000);
     }
 
     @Test
@@ -196,7 +198,7 @@ public class NginxLogTest {
                     nginxLog.setMethodUrl(methodUrl);
                 });
 
-                documentService.batchAddDocument(index, nginxLogs);
+                documentService.saveAll(index, nginxLogs);
                 log.info("save {} nginxLogs", nginxLogs.size());
                 nginxLogs.clear();
             }