Sfoglia il codice sorgente

es版本提升为8.5.3

15902849627 3 anni fa
parent
commit
7d54381b6e

+ 3 - 10
sckw-common/sckw-common-elasticsearch/pom.xml

@@ -18,14 +18,12 @@
         <maven.compiler.target>17</maven.compiler.target>
         <maven.compiler.compilerVersion>17</maven.compiler.compilerVersion>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-        <elasticsearch.version>7.14.0</elasticsearch.version>
+        <elasticsearch.version>8.5.3</elasticsearch.version>
     </properties>
-
-
     <dependencies>
         <dependency>
-            <groupId>org.elasticsearch.client</groupId>
-            <artifactId>elasticsearch-rest-high-level-client</artifactId>
+            <groupId>co.elastic.clients</groupId>
+            <artifactId>elasticsearch-java</artifactId>
             <version>${elasticsearch.version}</version>
         </dependency>
         <dependency>
@@ -33,11 +31,6 @@
             <artifactId>elasticsearch-rest-client</artifactId>
             <version>${elasticsearch.version}</version>
         </dependency>
-        <dependency>
-            <groupId>org.elasticsearch</groupId>
-            <artifactId>elasticsearch</artifactId>
-            <version>${elasticsearch.version}</version>
-        </dependency>
         <dependency>
             <groupId>com.alibaba</groupId>
             <artifactId>fastjson</artifactId>

+ 339 - 192
sckw-common/sckw-common-elasticsearch/src/main/java/com/sckw/elasticsearch/service/es/EsUtils.java

@@ -1,39 +1,29 @@
 package com.sckw.elasticsearch.service.es;
 
+import co.elastic.clients.elasticsearch.ElasticsearchClient;
+import co.elastic.clients.elasticsearch._types.ElasticsearchException;
+import co.elastic.clients.elasticsearch._types.FieldValue;
+import co.elastic.clients.elasticsearch._types.SortOrder;
+import co.elastic.clients.elasticsearch._types.aggregations.Aggregate;
+import co.elastic.clients.elasticsearch._types.aggregations.TermsAggregation;
+import co.elastic.clients.elasticsearch._types.mapping.TypeMapping;
+import co.elastic.clients.elasticsearch._types.query_dsl.Query;
+import co.elastic.clients.elasticsearch.core.*;
+import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
+import co.elastic.clients.elasticsearch.core.search.Hit;
+import co.elastic.clients.elasticsearch.indices.IndexSettings;
+import co.elastic.clients.json.JsonData;
+import co.elastic.clients.util.ObjectBuilder;
 import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONArray;
 import lombok.extern.slf4j.Slf4j;
-import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
-import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.delete.DeleteRequest;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.search.ClearScrollRequest;
-import org.elasticsearch.action.search.SearchRequest;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.action.update.UpdateRequest;
-import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.RestHighLevelClient;
-import org.elasticsearch.client.core.CountRequest;
-import org.elasticsearch.client.core.CountResponse;
-import org.elasticsearch.client.indices.CreateIndexRequest;
-import org.elasticsearch.client.indices.GetIndexRequest;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.xcontent.XContentType;
-import org.elasticsearch.index.query.TermQueryBuilder;
-import org.elasticsearch.index.reindex.BulkByScrollResponse;
-import org.elasticsearch.index.reindex.DeleteByQueryRequest;
-import org.elasticsearch.index.reindex.UpdateByQueryRequest;
-import org.elasticsearch.script.Script;
-import org.elasticsearch.search.SearchHits;
-import org.elasticsearch.search.aggregations.Aggregations;
-import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.springframework.util.CollectionUtils;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Objects;
+import java.util.function.Function;
 
 /**
  * @author yuzhongchuan
@@ -43,305 +33,462 @@ import java.util.Objects;
 @Slf4j
 public class EsUtils {
 
-    private RestHighLevelClient client;
+    private ElasticsearchClient client;
 
     private static EsUtils esUtils;
 
-    public EsUtils(RestHighLevelClient client) {
+    public EsUtils(ElasticsearchClient client) {
         esUtils = this;
         this.client = client;
     }
 
     /**
-     * 创建索引
+     * @param name
+     * @return void
+     * @desc: 创建索引
+     * @author: yzc
+     * @date: 2023-06-12 11:46
+     */
+    public static void addIndex(String name) {
+        try {
+            if (indexExists(name)) {
+                log.error(" idxName={} 已经存在", name);
+                return;
+            }
+            esUtils.client.indices().create(c -> c.index(name));
+        } catch (IOException e) {
+            log.error("ES addIndex异常", e);
+        }
+    }
+
+    /**
+     * @param name
+     * @return boolean
+     * @desc: 索引是否存在
+     * @author: yzc
+     * @date: 2023-06-12 11:46
+     */
+    public static boolean indexExists(String name) throws IOException {
+        return esUtils.client.indices().exists(b -> b.index(name)).value();
+    }
+
+    /**
+     * @param name
+     * @return void
+     * @desc: 删除索引
+     * @author: yzc
+     * @date: 2023-06-12 11:47
+     */
+    public static void delIndex(String name) {
+        try {
+            if (!indexExists(name)) {
+                log.error(" idxName={} 不存在", name);
+                return;
+            }
+            esUtils.client.indices().delete(c -> c.index(name));
+        } catch (IOException e) {
+            log.error("ES delIndex异常", e);
+        }
+    }
+
+    /**
+     * @param idxName, settingFn, mappingFn
+     * @return void
+     * @desc: 带mapping、setting创建索引
+     * @author: yzc
+     * @date: 2023-06-12 11:47
+     */
+    public static void createIndex(String idxName,
+                                   Function<IndexSettings.Builder, ObjectBuilder<IndexSettings>> settingFn,
+                                   Function<TypeMapping.Builder, ObjectBuilder<TypeMapping>> mappingFn) {
+        try {
+            if (indexExists(idxName)) {
+                log.error(" idxName={} 已经存在", idxName);
+                return;
+            }
+            esUtils.client.indices().create(c -> c
+                    .index(idxName)
+                    .settings(settingFn)
+                    .mappings(mappingFn));
+        } catch (IOException e) {
+            log.error("ES createIndex异常", e);
+        }
+    }
+
+    /**
+     * 添加文档信息
      *
      * @param idxName
-     * @param idxSql
+     * @param t
+     * @param <T>
      */
-    public static void createIndex(String idxName, String idxSql) {
-        if (indexExist(idxName)) {
-            log.error(" idxName={} 已经存在,idxSql={}", idxName, idxSql);
+    public static <T> void add(String idxName, T t) {
+        if (Objects.isNull(idxName) || t == null) {
             return;
         }
-        log.info(" idxName={} 创建索引,idxSql={}", idxName, idxSql);
-        CreateIndexRequest request = new CreateIndexRequest(idxName);
-        buildSetting(request);
-        request.mapping(idxSql, XContentType.JSON);
         try {
-            esUtils.client.indices().create(request, RequestOptions.DEFAULT);
+            esUtils.client.index(s ->
+                    s.index(idxName).document(t));
         } catch (IOException e) {
-            log.error("ES createIndex异常", e);
+            log.error("es插入单条新数据异常:", e);
         }
     }
 
     /**
-     * 删除索引
+     * 添加文档信息 指定id
      *
-     * @param idxName 索引名称
+     * @param idxName
+     * @param id
      */
-    public static void deleteIndex(String idxName) {
-        if (!indexExist(idxName)) {
-            log.error(" idxName={} 索引不存在", idxName);
+    public static <T> void addSpecifyId(String idxName, String id, T t) {
+        if (Objects.isNull(idxName) || t == null) {
             return;
         }
         try {
-            esUtils.client.indices().delete(new DeleteIndexRequest(idxName), RequestOptions.DEFAULT);
+            esUtils.client.index(s ->
+                    s.index(idxName).id(id).document(t));
         } catch (IOException e) {
-            log.error("ES deleteIndex异常", e);
+            log.error("es插入指定id单条新数据异常:", e);
         }
     }
 
     /**
-     * 设置分片
+     * 修改文档自定义属性
      *
-     * @param request
+     * @param idxName
+     * @param id
+     * @param t
      */
-    public static void buildSetting(CreateIndexRequest request) {
-        request.settings(Settings.builder().put("index.number_of_shards", 3)
-                .put("index.number_of_replicas", 0));
+    public static <T> void update(String idxName, String id, T t) {
+        if (Objects.isNull(idxName) || t == null) {
+            return;
+        }
+        try {
+            esUtils.client.update(x -> x.index(idxName).id(id).doc(t), Object.class);
+        } catch (IOException e) {
+            log.error("es修改文档数据异常:", e);
+        }
     }
 
     /**
-     * 断某个index是否存在
+     * 批量新增
      *
-     * @param idxName
+     * @param indexName
+     * @param list
      * @return
      */
-    public static boolean indexExist(String idxName) {
-        GetIndexRequest request = new GetIndexRequest(idxName);
+    public static <T> void bulkSave(String indexName, List<T> list) {
+        if (Objects.isNull(indexName) || CollectionUtils.isEmpty(list)) {
+            return;
+        }
+        BulkRequest.Builder br = new BulkRequest.Builder();
+        //可不指定id es会自动生成id
+        list.forEach(e -> br.operations(op -> op
+                .index(idx -> idx.index(indexName).document(e))));
         try {
-            return esUtils.client.indices().exists(request, RequestOptions.DEFAULT);
+            esUtils.client.bulk(br.build());
         } catch (IOException e) {
-            log.error("ES indexExist查询异常", e);
-            throw new RuntimeException(e);
+            log.error("es批量新增数据异常:", e);
         }
     }
 
-
     /**
-     * 布尔查询
+     * 批量新增 指定id
      *
-     * @param searchRequest
+     * @param indexName
+     * @param list
      * @return
      */
-    public static SearchHits boolSearch(SearchRequest searchRequest) {
-        log.info("ES开始boolSearch查询SearchRequest:{}", searchRequest);
+    public static <T> void bulkSaveSpecifyId(String indexName, List<T> list) {
+        if (Objects.isNull(indexName) || CollectionUtils.isEmpty(list)) {
+            return;
+        }
+        BulkRequest.Builder br = new BulkRequest.Builder();
+        //对象id为es id
+        list.forEach(e -> br.operations(op -> op
+                .index(idx -> idx.index(indexName).id(JSON.parseObject((String) e).get("id").toString()).document(e))));
         try {
-            SearchResponse searchResponse = esUtils.client.search(searchRequest, RequestOptions.DEFAULT);
-            return searchResponse.getHits();
-        } catch (Exception e) {
-            log.error("ES boolSearch查询SearchRequest异常", e);
-            return null;
+            esUtils.client.bulk(br.build());
+        } catch (IOException e) {
+            log.error("es批量新增指定id数据异常:", e);
         }
     }
 
     /**
-     * 分组聚合查询
+     * ids批量删除
      *
-     * @param searchRequest
+     * @param indexName
+     * @param list
      * @return
      */
-    public static Aggregations groupBySearch(SearchRequest searchRequest) {
-        log.info("ES开始groupBySearch查询SearchRequest:{}", searchRequest);
+    public static <T> void bulkDel(String indexName, List<T> list) {
+        if (CollectionUtils.isEmpty(list)) {
+            return;
+        }
+        List<BulkOperation> bulkOperations = new ArrayList<>();
+        list.forEach(a ->
+                bulkOperations.add(BulkOperation.of(b ->
+                        b.delete(c -> c.id(JSON.parseObject((String) a).get("id").toString()))
+                )));
+        try {
+            esUtils.client.bulk(a -> a.index(indexName).operations(bulkOperations));
+        } catch (IOException e) {
+            log.error("es批量ids删除数据异常:", e);
+        }
+    }
+
+    /**
+     * term匹配 多次匹配
+     *
+     * @param index
+     * @param field
+     * @param fieldValues
+     * @return Object
+     */
+    public static List<Hit<Object>> advancedQueryByTerm(String index, String field, List<FieldValue> fieldValues) {
         try {
-            SearchResponse searchResponse = esUtils.client.search(searchRequest, RequestOptions.DEFAULT);
-            return searchResponse.getAggregations();
-        } catch (Exception e) {
-            log.error("ES groupBySearch查询SearchRequest异常", e);
+            SearchResponse<Object> response = esUtils.client.search(e -> e
+                    .index(index)
+                    .query(q -> q.terms(t -> t.field(field).terms(terms -> terms.value(fieldValues))))
+                    .query(q -> q.matchAll(m -> m)), Object.class);
+            return response.hits().hits();
+        } catch (IOException e) {
+            log.error("es term匹配多次匹配查询异常:", e);
             return null;
         }
     }
 
     /**
-     * 清除滚屏
+     * term匹配 单次匹配
      *
-     * @param scrollId
+     * @param index
+     * @param field
+     * @param value
+     * @return Object
      */
-    public static void clearScroll(String scrollId) {
-        if (scrollId != null) {
-            ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
-            clearScrollRequest.addScrollId(scrollId);
-            try {
-                esUtils.client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
-                log.info("ES清除滚屏成功");
-            } catch (Exception e) {
-                log.error("ES清除滚屏出错:" + e.getMessage());
-            }
+    public static List<Hit<Object>> advancedQueryByTerm(String index, String field, long value) {
+        try {
+            SearchResponse<Object> response = esUtils.client.search(e -> e
+                    .index(index)
+                    .query(q -> q.term(t -> t.field(field).value(value)))
+                    .query(q -> q.matchAll(m -> m)), Object.class);
+            return response.hits().hits();
+        } catch (IOException e) {
+            log.error("es term匹配单次匹配查询异常:", e);
+            return null;
         }
     }
 
     /**
-     * 简单聚合查询
+     * 分页查询
      *
-     * @param countRequest
+     * @param index
+     * @param from
+     * @param size
      * @return
      */
-    public static long count(CountRequest countRequest) {
+    public static List<Hit<Object>> advancedQueryByPage(String index, Integer from, Integer size) {
         try {
-
-            log.info("开始ES查询indexName:{}的count", Arrays.toString(countRequest.indices()));
-            CountResponse count = esUtils.client.count(countRequest, RequestOptions.DEFAULT);
-            log.info("ES查询到:count={}", count.getCount());
-            return count.getCount();
-        } catch (Exception e) {
-            log.error("ES count异常", e);
-            return 0;
+            SearchResponse<Object> response = esUtils.client.search(e -> e
+                    .index(index)
+                    .query(q -> q.matchAll(m -> m))
+                    .from(from).size(size), Object.class);
+            return response.hits().hits();
+        } catch (IOException e) {
+            log.error("es 分页查询异常:", e);
+            return null;
         }
     }
 
     /**
-     * 批量新增
+     * multiMatch多field匹配
      *
      * @param indexName
-     * @param list
+     * @param fields
+     * @param query
+     * @param sortField
+     * @param order
+     * @param from
+     * @param size
      * @return
+     * @throws Exception
      */
-    public static <T> void bulkSave(String indexName, List<T> list) {
-        if (CollectionUtils.isEmpty(list)) {
-            return;
-        }
-        BulkRequest request = new BulkRequest();
-        for (Object t : list) {
-            String content = JSON.toJSONString(t);
-            String id = JSON.parseObject(content).get("id").toString();
-            request.add(new IndexRequest(indexName).id(id).source(content, XContentType.JSON));
-        }
+    public static List<Hit<Object>> queryMultiMatch(String indexName, List<String> fields, String query, String sortField, SortOrder order, Integer from, Integer size) {
         try {
-            BulkResponse bulkResponse = esUtils.client.bulk(request, RequestOptions.DEFAULT);
-            log.info("批量新增 操作es成功,{}", bulkResponse.status());
+            SearchResponse<Object> searchResponse = esUtils.client.search(e -> e
+                    .index(indexName)
+                    .query(q -> q.multiMatch(m -> m.fields(fields).query(query)))
+                    .sort(sort -> sort.field(f -> f.field(sortField).order(order)))
+                    .from(from).size(size), Object.class);
+            return searchResponse.hits().hits();
         } catch (IOException e) {
-            log.error("es批量新增数据异常:", e);
+            log.error("ES queryMultiMatch查询异常", e);
+            return null;
         }
     }
 
 
     /**
-     * 批量修改
+     * Range范围匹配
      *
      * @param indexName
-     * @param list
-     * @param script
+     * @param field
+     * @param gte       大于等于
+     * @param lte       小于等于
+     * @param sortField
+     * @param order
+     * @param from
+     * @param size
+     * @return
      */
-    public static <T> void bulkUpdate(String indexName, List<T> list, Script script) {
-        if (CollectionUtils.isEmpty(list)) {
-            return;
-        }
-        BulkRequest request = new BulkRequest();
-        for (Object t : list) {
-            String content = JSON.toJSONString(t);
-            String id = JSON.parseObject(content).get("id").toString();
-            UpdateRequest doc = new UpdateRequest(indexName, id);
-            request.add(doc.script(script));
+    public static List<Hit<Object>> queryBoolRange(String indexName, String field, Object gte, Object lte, String sortField, SortOrder order, Integer from, Integer size) {
+        try {
+            SearchResponse<Object> searchResponse = esUtils.client.search(e -> e
+                    .index(indexName)
+                    .query(q -> q.bool(b -> b.filter(f -> f.range(r -> r.field(field)
+                            .gte((Objects.isNull(gte) ? null : JsonData.of(gte)))
+                            .lte((Objects.isNull(lte) ? null : JsonData.of(lte)))))))
+                    .sort(sort -> sort.field(f -> f.field(sortField).order(order)))
+                    .from(from).size(size), Object.class);
+            return searchResponse.hits().hits();
+        } catch (IOException | ElasticsearchException e) {
+            log.error("ES queryBoolRange查询异常", e);
+            return null;
         }
+    }
+
+    /**
+     * filter过滤
+     *
+     * @param indexName
+     * @param excludes
+     * @param includes
+     * @param sortField
+     * @param order
+     * @param from
+     * @param size
+     * @return
+     */
+    public static List<Hit<Object>> queryFilter(String indexName, List<String> excludes, List<String> includes, String sortField, SortOrder order, Integer from, Integer size) {
         try {
-            BulkResponse bulkResponse = esUtils.client.bulk(request, RequestOptions.DEFAULT);
-            log.info("批量更新 操作es成功,{}", bulkResponse.status());
+            SearchResponse<Object> searchResponse = esUtils.client.search(e -> e
+                    .index(indexName)
+                    .query(q -> q.matchAll(m -> m))
+                    .source(s -> s.filter(f -> f.excludes(excludes).includes(includes)))
+                    .sort(sort -> sort.field(f -> f.field(sortField).order(order)))
+                    .from(from).size(size), Object.class);
+            return searchResponse.hits().hits();
         } catch (IOException e) {
-            log.error("es批量更新数据异常:", e);
+            log.error("ES queryFilter查询异常", e);
+            return null;
         }
     }
 
-
     /**
-     * 批量删除
+     * fuzzy模糊匹配
      *
      * @param indexName
-     * @param list
+     * @param field
+     * @param value
+     * @param fuzziness
+     * @param sortField
+     * @param order
+     * @param from
+     * @param size
      * @return
      */
-    public static <T> void bulkDel(String indexName, List<T> list) {
-        if (CollectionUtils.isEmpty(list)) {
-            return;
-        }
-        BulkRequest request = new BulkRequest();
-        for (Object t : list) {
-            String content = JSON.toJSONString(t);
-            String id = JSON.parseObject(content).get("id").toString();
-            request.add(new DeleteRequest(indexName).id(id));
-        }
+    public static List<Hit<Object>> queryFuzzy(String indexName, String field, String value, String fuzziness, String sortField, SortOrder order, Integer from, Integer size) {
         try {
-            BulkResponse bulkResponse = esUtils.client.bulk(request, RequestOptions.DEFAULT);
-            log.info("批量删除 操作es成功,{}", bulkResponse.status());
+            SearchResponse<Object> searchResponse = esUtils.client.search(e -> e
+                    .index(indexName)
+                    .query(q -> q.fuzzy(f -> f.field(field).value(value).fuzziness(fuzziness)))
+                    .sort(sort -> sort.field(f -> f.field(sortField).order(order)))
+                    .from(from).size(size), Object.class);
+            return searchResponse.hits().hits();
         } catch (IOException e) {
-            log.error("es批量删除数据异常:", e);
+            log.error("ES queryFuzzy查询异常", e);
+            return null;
         }
     }
 
     /**
-     * 根据条件更新
-     *
-     * @param idxName
-     * @param builder
-     * @param script
+     * @param indexName, field
+     * @return co.elastic.clients.elasticsearch._types.aggregations.Aggregate
+     * @desc: 分组聚合查询
+     * @author: yzc
+     * @date: 2023-06-12 17:36
      */
-    public static void updateByQuery(String idxName, SearchSourceBuilder builder, Script script) {
-        SearchRequest request = new SearchRequest(idxName);
-        request.source(builder);
-        UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(idxName);
-        updateByQueryRequest.setQuery(builder.query());
-        updateByQueryRequest.setScript(script);
+    public static Aggregate queryGroupAggregation(String indexName, String field) {
         try {
-            esUtils.client.updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT);
+            SearchResponse<Void> searchResponse = esUtils.client.search(
+                    b -> b.index(indexName).size(0)
+                            .aggregations(field, a -> a.terms(TermsAggregation.of(s -> s.field(field)))),
+                    Void.class);
+            return searchResponse.aggregations().get(field);
         } catch (IOException e) {
-            log.error("es根据条件更新数据异常:", e);
+            log.error("ES queryGroupAggregation查询异常", e);
+            return null;
         }
     }
 
     /**
-     * 条件删除
+     * 简单聚合查询
      *
-     * @param idxName
-     * @param builder
+     * @param countRequest
      * @return
-     * @throws IOException
      */
-    public static void deleteByQuery(String idxName, TermQueryBuilder builder) {
-        DeleteByQueryRequest request = new DeleteByQueryRequest(idxName);
-        request.setQuery(builder);
+    public static long count(CountRequest countRequest) {
         try {
-            BulkByScrollResponse response = esUtils.client.deleteByQuery(request, RequestOptions.DEFAULT);
-            log.info("删除操作状态", JSONArray.toJSONString(response.getStatus().getDeleted()));
+            log.info("开始ES查询indexName:{}的count", Arrays.toString(countRequest.index().toArray()));
+            CountResponse count = esUtils.client.count(countRequest);
+            return count.count();
         } catch (IOException e) {
-            log.error("es根据条件删除数据异常:", e);
+            log.error("ES count异常", e);
+            return 0;
         }
     }
 
     /**
-     * es根据id更新数据
+     * SearchRequest原生查询
      *
-     * @param idxName
-     * @param id
-     * @param script
+     * @param searchRequest
+     * @return
      */
-    public static void update(String idxName, String id, Script script) {
-        UpdateRequest updateRequest = new UpdateRequest(idxName, id);
-        updateRequest.script(script);
+    public static List<Hit<Object>> queryOriginal(SearchRequest searchRequest) {
+        //Query of = Query.of(q -> q.bool(b -> b.must(m -> m.match(match -> match.field("").query("")))));
         try {
-            esUtils.client.update(updateRequest, RequestOptions.DEFAULT);
+            SearchResponse<Object> searchResponse = esUtils.client.search(searchRequest, Object.class);
+            return searchResponse.hits().hits();
         } catch (IOException e) {
-            log.error("es根据id更新数据异常:", e);
+            log.error("ES queryOriginal查询异常", e);
+            return null;
         }
     }
 
     /**
-     * 插入单条
+     * Query原生查询
      *
-     * @param idxName
-     * @param t
-     * @param <T>
+     * @param indexName
+     * @param field
+     * @param query
+     * @param order
+     * @param from
+     * @param size
+     * @return
+     * @throws Exception
      */
-    public static <T> void bulk(String idxName, T t) {
-        if (Objects.isNull(idxName) || t == null) {
-            return;
-        }
-        BulkRequest request = new BulkRequest();
-        String content = JSON.toJSONString(t);
-        String id = JSON.parseObject(content).get("id").toString();
-        request.add(new IndexRequest(idxName).id(id).source(content, XContentType.JSON));
+    public static List<Hit<Object>> queryOriginal(String indexName, String field, Query query, SortOrder order, Integer from, Integer size) {
+        //Query of = Query.of(q -> q.bool(b -> b.must(m -> m.match(match -> match.field("").query("")))));
         try {
-            esUtils.client.bulk(request, RequestOptions.DEFAULT);
+            SearchResponse<Object> searchResponse = esUtils.client.search(e -> e
+                    .index(indexName)
+                    .query(query)
+                    .sort(sort -> sort.field(f -> f.field(field).order(order)))
+                    .from(from).size(size), Object.class);
+            return searchResponse.hits().hits();
         } catch (IOException e) {
-            log.error("es插入单条新数据异常:", e);
+            log.error("ES queryOriginal查询异常", e);
+            return null;
         }
     }
 

+ 16 - 10
sckw-common/sckw-common-elasticsearch/src/main/java/com/sckw/elasticsearch/service/es/autoconfigure/EsAutoConfiguration.java

@@ -1,6 +1,8 @@
 package com.sckw.elasticsearch.service.es.autoconfigure;
 
-
+import co.elastic.clients.elasticsearch.ElasticsearchClient;
+import co.elastic.clients.json.jackson.JacksonJsonpMapper;
+import co.elastic.clients.transport.rest_client.RestClientTransport;
 import com.sckw.elasticsearch.service.es.EsUtils;
 import com.sckw.elasticsearch.service.es.constants.EsConstant;
 import com.sckw.elasticsearch.service.es.properties.EsProperties;
@@ -13,7 +15,10 @@ import org.apache.http.client.CredentialsProvider;
 import org.apache.http.impl.client.BasicCredentialsProvider;
 import org.apache.http.impl.nio.reactor.IOReactorConfig;
 import org.apache.http.message.BasicHeader;
-import org.elasticsearch.client.*;
+import org.elasticsearch.client.Node;
+import org.elasticsearch.client.NodeSelector;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
@@ -36,16 +41,16 @@ public class EsAutoConfiguration {
 
 
     @Bean
-    @ConditionalOnClass(RestHighLevelClient.class)
+    @ConditionalOnClass(ElasticsearchClient.class)
     @ConditionalOnMissingBean(EsUtils.class)
-    public EsUtils esUtils(RestHighLevelClient restHighLevelClient) {
+    public EsUtils esUtils(ElasticsearchClient elasticsearchClient) {
         log.info("---------------EsUtils 正在初始化");
-        return new EsUtils(restHighLevelClient);
+        return new EsUtils(elasticsearchClient);
     }
 
     @Bean
-    @ConditionalOnMissingBean(RestHighLevelClient.class)
-    public RestHighLevelClient restHighLevelClient() {
+    @ConditionalOnMissingBean(ElasticsearchClient.class)
+    public ElasticsearchClient restHighLevelClient() {
         log.info("---------------es配置正在加载,当前配置为:{}", esProperties);
         if (EsConstant.DEFAULT_HOST.equals(esProperties.getHostList())) {
             throw new RuntimeException("es配置错误,无法加载");
@@ -101,8 +106,9 @@ public class EsAutoConfiguration {
                 new UsernamePasswordCredentials(esProperties.getUsername(), esProperties.getPassword()));
         clientBuilder.setHttpClientConfigCallback(callback -> callback.setDefaultCredentialsProvider(credentialsProvider)
                 .setKeepAliveStrategy((response, context) -> TimeUnit.MINUTES.toMillis(3)));
-
-        //创建RestHighLevelClient客户端
-        return new RestHighLevelClient(clientBuilder);
+        //使用JSON映射器传输数据
+        RestClientTransport restClientTransport = new RestClientTransport(clientBuilder.build(), new JacksonJsonpMapper());
+        //创建API客户端
+        return new ElasticsearchClient(restClientTransport);
     }
 }

+ 43 - 27
sckw-modules/sckw-example/src/main/java/com/sckw/example/controller/EsController.java

@@ -1,18 +1,20 @@
 package com.sckw.example.controller;
 
+import co.elastic.clients.elasticsearch._types.mapping.Property;
+import co.elastic.clients.elasticsearch._types.mapping.TypeMapping;
+import co.elastic.clients.elasticsearch.core.search.Hit;
+import co.elastic.clients.elasticsearch.indices.IndexSettings;
+import co.elastic.clients.util.ObjectBuilder;
 import com.sckw.elasticsearch.service.es.EsUtils;
-import com.sckw.example.model.Employees;
+import com.sckw.example.model.Product;
 import lombok.RequiredArgsConstructor;
 import org.apache.commons.compress.utils.Lists;
-import org.elasticsearch.action.search.SearchRequest;
-import org.elasticsearch.search.SearchHits;
-import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.RequestMapping;
-import org.springframework.web.bind.annotation.RequestParam;
 import org.springframework.web.bind.annotation.RestController;
 
 import java.util.List;
+import java.util.function.Function;
 
 /**
  * @author: yzc
@@ -27,44 +29,58 @@ public class EsController {
 
     @GetMapping(value = "/createIndex")
     public void createIndex() {
-        String mappings = "{\n" +
-                "  \"properties\": {\n" +
-                "    \"id\": {\n" +
-                "      \"type\": \"keyword\"\n" +
-                "    },\n" +
-                "    \"name\": {\n" +
-                "      \"type\": \"text\"\n" +
-                "    }\n" + "  }\n" +
-                "}";
-        System.out.println("mapping is as follows: ");
-        System.out.println(mappings);
+        // 索引名
+        String indexName = "product002";
+
+        // 构建setting时,builder用到的lambda
+        Function<IndexSettings.Builder, ObjectBuilder<IndexSettings>> settingFn = sBuilder -> sBuilder
+                .index(iBuilder -> iBuilder
+                        // 三个分片
+                        .numberOfShards("3")
+                        // 一个副本
+                        .numberOfReplicas("1")
+                );
+
+        // 新的索引有三个字段,每个字段都有自己的property,这里依次创建
+        Property keywordProperty = Property.of(pBuilder -> pBuilder.keyword(kBuilder -> kBuilder.ignoreAbove(256)));
+        Property textProperty = Property.of(pBuilder -> pBuilder.text(tBuilder -> tBuilder));
+        Property integerProperty = Property.of(pBuilder -> pBuilder.integer(iBuilder -> iBuilder));
+
+        // // 构建mapping时,builder用到的lambda
+        Function<TypeMapping.Builder, ObjectBuilder<TypeMapping>> mappingFn = mBuilder -> mBuilder
+                .properties("name", keywordProperty)
+                .properties("description", textProperty)
+                .properties("price", integerProperty);
+
+        // 创建索引,并且指定了setting和mapping
+        EsUtils.createIndex(indexName, settingFn, mappingFn);
 
-        EsUtils.createIndex("test", mappings);
     }
 
 
     @GetMapping(value = "/deleteIndex")
     public void deleteIndex() {
-        EsUtils.deleteIndex("test");
+        EsUtils.delIndex("product002");
     }
 
     @GetMapping(value = "/bulkSave")
     public void bulkSave() {
-        List<Employees> list = Lists.newArrayList();
-        Employees nancy = Employees.builder().id("2").name("Nancy").build();
-        Employees jason = Employees.builder().id("3").name("Jason").build();
+        List<Product> list = Lists.newArrayList();
+        Product nancy = Product.builder().name("Nancy").description("试试").price(1000).build();
+        Product jason = Product.builder().name("Jason").description("玩玩").price(2000).build();
+        Product xason = Product.builder().name("xason").description("试试2").price(3000).build();
+        Product qason = Product.builder().name("qason").description("玩玩2").price(3000).build();
         list.add(nancy);
         list.add(jason);
-        EsUtils.bulkSave("test", list);
+        list.add(xason);
+        list.add(qason);
+        EsUtils.bulkSave("product002", list);
 
     }
 
     @GetMapping(value = "/search")
     public void search() {
-        SearchRequest request = new SearchRequest("test");
-        SearchSourceBuilder builder = new SearchSourceBuilder();
-        request.source(builder);
-        SearchHits hits = EsUtils.boolSearch(request);
-        System.out.println(hits);
+        List<Hit<Object>> list = EsUtils.advancedQueryByPage("product002", 3, 2);
+        System.out.println(list);
     }
 }

+ 22 - 0
sckw-modules/sckw-example/src/main/java/com/sckw/example/model/Product.java

@@ -0,0 +1,22 @@
+package com.sckw.example.model;
+
+import lombok.Builder;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+
+/**
+ * @desc: TODO
+ * @author: yzc
+ * @date: 2023-06-12 17:37
+ */
+@Getter
+@Setter
+@ToString
+@Builder
+public class Product {
+    private String id;
+    private String name;
+    private String description;
+    private Integer price;
+}