|
|
@@ -1,25 +1,40 @@
|
|
|
package com.sckw.elasticsearch.service.es;
|
|
|
|
|
|
+import com.alibaba.fastjson.JSON;
|
|
|
+import com.alibaba.fastjson.JSONArray;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
|
|
|
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
|
|
|
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
|
|
|
import org.elasticsearch.action.bulk.BulkRequest;
|
|
|
+import org.elasticsearch.action.bulk.BulkResponse;
|
|
|
+import org.elasticsearch.action.delete.DeleteRequest;
|
|
|
+import org.elasticsearch.action.index.IndexRequest;
|
|
|
import org.elasticsearch.action.search.ClearScrollRequest;
|
|
|
import org.elasticsearch.action.search.SearchRequest;
|
|
|
import org.elasticsearch.action.search.SearchResponse;
|
|
|
import org.elasticsearch.action.support.IndicesOptions;
|
|
|
+import org.elasticsearch.action.update.UpdateRequest;
|
|
|
import org.elasticsearch.client.RequestOptions;
|
|
|
import org.elasticsearch.client.RestHighLevelClient;
|
|
|
import org.elasticsearch.client.core.CountRequest;
|
|
|
import org.elasticsearch.client.core.CountResponse;
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
|
import org.elasticsearch.common.xcontent.XContentType;
|
|
|
+import org.elasticsearch.index.query.TermQueryBuilder;
|
|
|
+import org.elasticsearch.index.reindex.BulkByScrollResponse;
|
|
|
+import org.elasticsearch.index.reindex.DeleteByQueryRequest;
|
|
|
+import org.elasticsearch.index.reindex.UpdateByQueryRequest;
|
|
|
+import org.elasticsearch.script.Script;
|
|
|
import org.elasticsearch.search.SearchHits;
|
|
|
import org.elasticsearch.search.aggregations.Aggregations;
|
|
|
+import org.elasticsearch.search.builder.SearchSourceBuilder;
|
|
|
+import org.springframework.util.CollectionUtils;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
import java.util.Arrays;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Objects;
|
|
|
|
|
|
/**
|
|
|
* @author yuzhongchuan
|
|
|
@@ -43,7 +58,6 @@ public class EsUtils {
|
|
|
*
|
|
|
* @param idxName
|
|
|
* @param idxSql
|
|
|
- * @throws Exception
|
|
|
*/
|
|
|
public static void createIndex(String idxName, String idxSql) {
|
|
|
if (!indexExist(idxName)) {
|
|
|
@@ -54,6 +68,11 @@ public class EsUtils {
|
|
|
CreateIndexRequest request = new CreateIndexRequest(idxName);
|
|
|
buildSetting(request);
|
|
|
request.mapping("_doc", idxSql, XContentType.JSON);
|
|
|
+ try {
|
|
|
+ esUtils.client.indices().create(request, RequestOptions.DEFAULT);
|
|
|
+ } catch (IOException e) {
|
|
|
+ log.error("ES createIndex异常", e);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -79,8 +98,8 @@ public class EsUtils {
|
|
|
* @param request
|
|
|
*/
|
|
|
public static void buildSetting(CreateIndexRequest request) {
|
|
|
- request.settings(Settings.builder().put("index.number_of_shards", 9)
|
|
|
- .put("index.number_of_replicas", 1));
|
|
|
+ request.settings(Settings.builder().put("index.number_of_shards", 3)
|
|
|
+ .put("index.number_of_replicas", 0));
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -176,20 +195,159 @@ public class EsUtils {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-
|
|
|
/**
|
|
|
* 批量新增
|
|
|
*
|
|
|
- * @param bulkRequest
|
|
|
+ * @param indexName
|
|
|
+ * @param list
|
|
|
* @return
|
|
|
*/
|
|
|
- public static boolean bulkIndex(BulkRequest bulkRequest) {
|
|
|
+ public static <T> void bulkSave(String indexName, List<T> list) {
|
|
|
+ if (CollectionUtils.isEmpty(list)) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ BulkRequest request = new BulkRequest();
|
|
|
+ for (Object t : list) {
|
|
|
+ String content = JSON.toJSONString(t);
|
|
|
+ String id = JSON.parseObject(content).get("id").toString();
|
|
|
+ request.add(new IndexRequest(indexName).id(id).source(content, XContentType.JSON));
|
|
|
+ }
|
|
|
try {
|
|
|
- esUtils.client.bulk(bulkRequest, RequestOptions.DEFAULT);
|
|
|
- return true;
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("批量插入数据异常:", e);
|
|
|
- return false;
|
|
|
+ BulkResponse bulkResponse = esUtils.client.bulk(request, RequestOptions.DEFAULT);
|
|
|
+ log.info("批量新增 操作es成功,{}", bulkResponse.status());
|
|
|
+ } catch (IOException e) {
|
|
|
+ log.error("es批量新增数据异常:", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 批量修改
|
|
|
+ *
|
|
|
+ * @param indexName
|
|
|
+ * @param list
|
|
|
+ * @param script
|
|
|
+ */
|
|
|
+ public static <T> void bulkUpdate(String indexName, List<T> list, Script script) {
|
|
|
+ if (CollectionUtils.isEmpty(list)) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ BulkRequest request = new BulkRequest();
|
|
|
+ for (Object t : list) {
|
|
|
+ String content = JSON.toJSONString(t);
|
|
|
+ String id = JSON.parseObject(content).get("id").toString();
|
|
|
+ UpdateRequest doc = new UpdateRequest(indexName, id);
|
|
|
+ request.add(doc.script(script));
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ BulkResponse bulkResponse = esUtils.client.bulk(request, RequestOptions.DEFAULT);
|
|
|
+ log.info("批量更新 操作es成功,{}", bulkResponse.status());
|
|
|
+ } catch (IOException e) {
|
|
|
+ log.error("es批量更新数据异常:", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 批量删除
|
|
|
+ *
|
|
|
+ * @param indexName
|
|
|
+ * @param list
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ public static <T> void bulkDel(String indexName, List<T> list) {
|
|
|
+ if (CollectionUtils.isEmpty(list)) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ BulkRequest request = new BulkRequest();
|
|
|
+ for (Object t : list) {
|
|
|
+ String content = JSON.toJSONString(t);
|
|
|
+ String id = JSON.parseObject(content).get("id").toString();
|
|
|
+ request.add(new DeleteRequest(indexName).id(id));
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ BulkResponse bulkResponse = esUtils.client.bulk(request, RequestOptions.DEFAULT);
|
|
|
+ log.info("批量删除 操作es成功,{}", bulkResponse.status());
|
|
|
+ } catch (IOException e) {
|
|
|
+ log.error("es批量删除数据异常:", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 根据条件更新
|
|
|
+ *
|
|
|
+ * @param idxName
|
|
|
+ * @param builder
|
|
|
+ * @param script
|
|
|
+ */
|
|
|
+ public static void updateByQuery(String idxName, SearchSourceBuilder builder, Script script) {
|
|
|
+ SearchRequest request = new SearchRequest(idxName);
|
|
|
+ request.source(builder);
|
|
|
+ UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(idxName);
|
|
|
+ updateByQueryRequest.setQuery(builder.query());
|
|
|
+ updateByQueryRequest.setScript(script);
|
|
|
+ try {
|
|
|
+ esUtils.client.updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT);
|
|
|
+ } catch (IOException e) {
|
|
|
+ log.error("es根据条件更新数据异常:", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 条件删除
|
|
|
+ *
|
|
|
+ * @param idxName
|
|
|
+ * @param builder
|
|
|
+ * @return
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ public static void deleteByQuery(String idxName, TermQueryBuilder builder) {
|
|
|
+ DeleteByQueryRequest request = new DeleteByQueryRequest(idxName);
|
|
|
+ request.setQuery(builder);
|
|
|
+ try {
|
|
|
+ BulkByScrollResponse response = esUtils.client.deleteByQuery(request, RequestOptions.DEFAULT);
|
|
|
+ log.info("删除操作状态", JSONArray.toJSONString(response.getStatus().getDeleted()));
|
|
|
+ } catch (IOException e) {
|
|
|
+ log.error("es根据条件删除数据异常:", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * es根据id更新数据
|
|
|
+ *
|
|
|
+ * @param idxName
|
|
|
+ * @param id
|
|
|
+ * @param script
|
|
|
+ */
|
|
|
+ public static void update(String idxName, String id, Script script) {
|
|
|
+ UpdateRequest updateRequest = new UpdateRequest(idxName, id);
|
|
|
+ updateRequest.script(script);
|
|
|
+ try {
|
|
|
+ esUtils.client.update(updateRequest, RequestOptions.DEFAULT);
|
|
|
+ } catch (IOException e) {
|
|
|
+ log.error("es根据id更新数据异常:", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 插入单条
|
|
|
+ *
|
|
|
+ * @param idxName
|
|
|
+ * @param t
|
|
|
+ * @param <T>
|
|
|
+ */
|
|
|
+ public static <T> void bulk(String idxName, T t) {
|
|
|
+ if (Objects.isNull(idxName) || t == null) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ BulkRequest request = new BulkRequest();
|
|
|
+ String content = JSON.toJSONString(t);
|
|
|
+ String id = JSON.parseObject(content).get("id").toString();
|
|
|
+ request.add(new IndexRequest(idxName).id(id).source(content, XContentType.JSON));
|
|
|
+ try {
|
|
|
+ esUtils.client.bulk(request, RequestOptions.DEFAULT);
|
|
|
+ } catch (IOException e) {
|
|
|
+ log.error("es插入单条新数据异常:", e);
|
|
|
}
|
|
|
}
|
|
|
|