`
jin8000608172
  • 浏览: 136063 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

ES聚合查询大全

阅读更多
package com.xxx.es;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.GregorianCalendar;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.lucene.search.TotalHits;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.composite.CompositeValuesSourceBuilder;
import org.elasticsearch.search.aggregations.bucket.composite.ParsedComposite;
import org.elasticsearch.search.aggregations.bucket.composite.ParsedComposite.ParsedBucket;
import org.elasticsearch.search.aggregations.bucket.composite.TermsValuesSourceBuilder;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.ParsedSum;
import org.elasticsearch.search.aggregations.metrics.ParsedTopHits;
import org.elasticsearch.search.aggregations.metrics.Sum;
import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.xxx.HuiYanLandmarkApplication;
import com.xxx.gateway.es.LandmarkAgentGO;

@SpringBootTest(classes = HuiYanLandmarkApplication.class)
@RunWith(SpringJUnit4ClassRunner.class)
public class EsTest {

	private static final Logger log = LoggerFactory.getLogger(EsTest.class);

	@Autowired
	private RestHighLevelClient restHighLevelClient;

	private String indexName = "huiyan_landmark_test";
//	private String indexName="landmark-agent-hall*";

	private void insertIndex(LandmarkAgentGO hall) throws Exception {
		IndexRequest indexRequest = new IndexRequest(indexName);
		String userJson = JSONObject.toJSONString(hall);
		indexRequest.source(userJson, XContentType.JSON);
		try {
			IndexResponse indexResponse = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
			if (indexResponse != null) {
				String id = indexResponse.getId();
				String index = indexResponse.getIndex();
				long version = indexResponse.getVersion();
				if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
					System.out.println("新增文档成功!" + index + "-" + id + "-" + version);
					log.info("新增文档成功!" + index + "-" + id + "-" + version);
				} else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
					log.info("修改文档成功!");
				}
				// 分片处理信息
				ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
				if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
					log.info("分片处理信息.....");
				}
				// 如果有分片副本失败,可以获得失败原因信息
				if (shardInfo.getFailed() > 0) {
					for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
						String reason = failure.reason();
						log.info("副本失败原因:" + reason);
						System.out.println("副本失败原因:" + reason);
					}
				}
			}
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	private void insertUsers(User user) {
		IndexRequest indexRequest = new IndexRequest(indexName);
		String userJson = JSONObject.toJSONString(user);
		indexRequest.source(userJson, XContentType.JSON);
		try {
			IndexResponse indexResponse = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
			if (indexResponse != null) {
				String id = indexResponse.getId();
				String index = indexResponse.getIndex();
				long version = indexResponse.getVersion();
				if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
					System.out.println("新增文档成功!" + index + "-" + id + "-" + version);
					log.info("新增文档成功!" + index + "-" + id + "-" + version);
				} else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
					log.info("修改文档成功!");
				}
				// 分片处理信息
				ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
				if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
					log.info("分片处理信息.....");
				}
				// 如果有分片副本失败,可以获得失败原因信息
				if (shardInfo.getFailed() > 0) {
					for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
						String reason = failure.reason();
						log.info("副本失败原因:" + reason);
						System.out.println("副本失败原因:" + reason);
					}
				}
			}
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	@Test
	public void batchInsertUsers() {
		Date date = new Date();
		Calendar calendar = new GregorianCalendar();
		calendar.setTime(date);
		for (int i = 0; i < 365; i++) {
			calendar.add(Calendar.DATE, -1);
			User user = new User();
			user.setAddress("上海市青浦区555号");
			user.setAge(21);
			user.setBirthday(calendar.getTime());
			user.setMoney(300D + i);
			user.setName("闫1");
			insertUsers(user);
		}
		date = new Date();
		calendar = new GregorianCalendar();
		calendar.setTime(date);
		for (int i = 0; i < 365; i++) {
			calendar.add(Calendar.DATE, -1);
			User user = new User();
			user.setAddress("上海市青浦区556号");
			user.setAge(18);
			user.setBirthday(calendar.getTime());
			user.setMoney(500D + i);
			user.setName("金1");
			insertUsers(user);
		}
		date = new Date();
		calendar = new GregorianCalendar();
		calendar.setTime(date);
		for (int i = 0; i < 365; i++) {
			calendar.add(Calendar.DATE, -1);
			User user = new User();
			user.setAddress("上海市青浦区556号");
			user.setAge(17);
			user.setBirthday(calendar.getTime());
			user.setMoney(600D + i);
			user.setName("金2");
			insertUsers(user);
		}
		date = new Date();
		calendar = new GregorianCalendar();
		calendar.setTime(date);
		for (int i = 0; i < 365; i++) {
			calendar.add(Calendar.DATE, -1);
			User user = new User();
			user.setAddress("上海市青浦区556号");
			user.setAge(16);
			user.setBirthday(calendar.getTime());
			user.setMoney(700D + i);
			user.setName("金3");
			insertUsers(user);
		}
		date = new Date();
		calendar = new GregorianCalendar();
		calendar.setTime(date);
		for (int i = 0; i < 365; i++) {
			calendar.add(Calendar.DATE, -1);
			User user = new User();
			user.setAddress("上海市青浦区556号");
			user.setAge(15);
			user.setBirthday(calendar.getTime());
			user.setMoney(800D + i);
			user.setName("金4");
			insertUsers(user);
		}
		date = new Date();
		calendar = new GregorianCalendar();
		calendar.setTime(date);
		for (int i = 0; i < 365; i++) {
			calendar.add(Calendar.DATE, -1);
			User user = new User();
			user.setAddress("上海市青浦区556号");
			user.setAge(14);
			user.setBirthday(calendar.getTime());
			user.setMoney(900D + i);
			user.setName("金5");
			insertUsers(user);
		}
		date = new Date();
		calendar = new GregorianCalendar();
		calendar.setTime(date);
		for (int i = 0; i < 365; i++) {
			calendar.add(Calendar.DATE, -1);
			User user = new User();
			user.setAddress("上海市青浦区556号");
			user.setAge(13);
			user.setBirthday(calendar.getTime());
			user.setMoney(1000D + i);
			user.setName("金6");
			insertUsers(user);
		}
		date = new Date();
		calendar = new GregorianCalendar();
		calendar.setTime(date);
		for (int i = 0; i < 365; i++) {
			calendar.add(Calendar.DATE, -1);
			User user = new User();
			user.setAddress("上海市青浦区556号");
			user.setAge(12);
			user.setBirthday(calendar.getTime());
			user.setMoney(1100D + i);
			user.setName("金8");
			insertUsers(user);
		}
		date = new Date();
		calendar = new GregorianCalendar();
		calendar.setTime(date);
		for (int i = 0; i < 365; i++) {
			calendar.add(Calendar.DATE, -1);
			User user = new User();
			user.setAddress("上海市青浦区556号");
			user.setAge(11);
			user.setBirthday(calendar.getTime());
			user.setMoney(1200D + i);
			user.setName("金9");
			insertUsers(user);
		}
		date = new Date();
		calendar = new GregorianCalendar();
		calendar.setTime(date);
		for (int i = 0; i < 365; i++) {
			calendar.add(Calendar.DATE, -1);
			User user = new User();
			user.setAddress("上海市青浦区556号");
			user.setAge(10);
			user.setBirthday(calendar.getTime());
			user.setMoney(1300D + i);
			user.setName("金7");
			insertUsers(user);
		}

	}

	@Test
	public void batchInsertIndex() throws Exception {
		for (int i = 0; i < 1; i++) {
			// manage_area_code,manage_area_name,league_id,leaguename,scan_site_id,scan_site,hall_code,hall_name
			// ,hall_type,company_code,company_name,certify_name,certify_code,
			// certify_type,delivery_amount,third_relay_amount,third_guiji_amount,d_store_amount,d_guiji_amount
			// ,disp_date,ds
			
			LandmarkAgentGO hall = new LandmarkAgentGO();
						/*
			 * hall.setDispDate(new Date()); hall.setDs(new Date());
			 */
			insertIndex(hall);
		}

	}

	@Test
	public void createIndex() throws Exception {
		try {
			XContentBuilder builder = XContentFactory.jsonBuilder().startObject().field("properties").startObject()
					.field("name").startObject().field("index", "true").field("type", "keyword").endObject()
					.field("age").startObject().field("index", "true").field("type", "integer").endObject()
					.field("money").startObject().field("index", "true").field("type", "double").endObject()
					.field("address").startObject().field("index", "true").field("type", "text")
					.field("analyzer", "ik_max_word").endObject().field("birthday").startObject().field("index", "true")
					.field("type", "date").field("format", "strict_date_optional_time||epoch_millis").endObject()
					.endObject().endObject();
			CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
			createIndexRequest.mapping(builder);
			CreateIndexResponse createIndexResponse = restHighLevelClient.indices().create(createIndexRequest,
					RequestOptions.DEFAULT);
			boolean acknowledged = createIndexResponse.isAcknowledged();
			if (acknowledged) {
				log.info("创建成功");
			} else {
				log.info("创建失败");
			}
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	@Test
	public void queryAggIndex() throws Exception {
		SearchRequest searchRequest = new SearchRequest(indexName);
		BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
		RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery("birthday");
		rangeQueryBuilder.gte("2020-10-01");
		rangeQueryBuilder.lte("2020-10-31");
		boolQueryBuilder.must(rangeQueryBuilder);
		SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
		sourceBuilder.query(boolQueryBuilder);
		TermsAggregationBuilder aggregation = AggregationBuilders.terms("nameAgg").field("name")
//				.subAggregation(AggregationBuilders.terms("ageAgg").field("age"))
				.subAggregation(AggregationBuilders.sum("moneyAgg").field("money"))
				.subAggregation(AggregationBuilders.sum("ageAgg").field("age"));
		sourceBuilder.aggregation(aggregation);
		searchRequest.source(sourceBuilder);
		SearchResponse response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
		Terms nameAgg = response.getAggregations().get("nameAgg");
		for (Terms.Bucket entry : nameAgg.getBuckets()) {
			Sum sum = entry.getAggregations().get("moneyAgg");
			Sum agesum = entry.getAggregations().get("ageAgg");
//            Sum sum = entry2.getAggregations().get("moneyAgg");
			System.out.println("name:" + entry.getKey() + "----------money:" + sum.getValue() + "----------age:"
					+ agesum.getValue());
//            for (Terms.Bucket entry2 : ageAgg.getBuckets()) {
//                log.info("name:" + entry.getKey()+"-------age:"+entry2.getKey() + "----------money:" + sum.getValue());
//            }
		}
	}

	@Test
	public void querySumAggIndex() throws Exception {
		SearchRequest searchRequest = new SearchRequest("landmark-agent-hall-*");
		BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
		RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery("dispDate");
		rangeQueryBuilder.gte("20201101");
		rangeQueryBuilder.lte("20201131");
		boolQueryBuilder.must(rangeQueryBuilder);
		SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
		sourceBuilder.query(boolQueryBuilder);
		SumAggregationBuilder aggregation1 = AggregationBuilders.sum("thirdRelayAmount").field("thirdRelayAmount");
		SumAggregationBuilder aggregation2 = AggregationBuilders.sum("thirdGuijiAmount").field("thirdGuijiAmount");
		sourceBuilder.aggregation(aggregation1).aggregation(aggregation2);
		searchRequest.source(sourceBuilder);
		SearchResponse response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
		Sum sum1 = response.getAggregations().get("thirdRelayAmount");
		Sum sum2 = response.getAggregations().get("thirdGuijiAmount");
		System.out.println(sum1.getValue());
		System.out.println(sum2.getValue());
		System.out.println("=================================");
	}

	@Test
	public void queryManyFieldAggIndex() throws Exception {
		SearchRequest searchRequest = new SearchRequest(indexName);
		BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
		RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery("birthday");
		rangeQueryBuilder.gte("2020-10-01");
		rangeQueryBuilder.lte("2020-10-31");
		boolQueryBuilder.must(rangeQueryBuilder);
		boolQueryBuilder.must(QueryBuilders.wildcardQuery("name", "*金聪敏*"));
		SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
		sourceBuilder.query(boolQueryBuilder);
		TermsAggregationBuilder aggregation = AggregationBuilders.terms("nameAgg").field("name")
				.order(BucketOrder.aggregation("moneyAgg", true))
//				.subAggregation(AggregationBuilders.terms("ageAgg").field("age"))
				.subAggregation(AggregationBuilders.sum("moneyAgg").field("money"))
				.subAggregation(AggregationBuilders.sum("ageAgg").field("age"))
				.subAggregation(AggregationBuilders.topHits("details").size(1));
		sourceBuilder.aggregation(aggregation);
		searchRequest.source(sourceBuilder);
		SearchResponse response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
		Terms nameAgg = response.getAggregations().get("nameAgg");
		for (Terms.Bucket entry : nameAgg.getBuckets()) {
			Sum moneysum = entry.getAggregations().get("moneyAgg");
			Sum agesum = entry.getAggregations().get("ageAgg");
			Aggregations bucketAggregations = entry.getAggregations();
			ParsedTopHits topHits = bucketAggregations.get("details");
//            Sum sum = entry2.getAggregations().get("moneyAgg");
			Map<String, Object> sourceAsMap = topHits.getHits().getHits()[0].getSourceAsMap();
			sourceAsMap.put("money", moneysum.getValue());
			sourceAsMap.put("age", agesum.getValue());
			System.out.println("name:" + entry.getKey() + "----------money:" + moneysum.getValue() + "----------age:"
					+ agesum.getValue());
//            for (Terms.Bucket entry2 : ageAgg.getBuckets()) {
//                log.info("name:" + entry.getKey()+"-------age:"+entry2.getKey() + "----------money:" + sum.getValue());
//            }
		}
	}

	private void pageAggIndex(int pageIndex, int pageSize) throws Exception {
		int startInxdex = (pageIndex - 1) * pageSize;
		int endIndex = pageSize * pageIndex;
		SearchRequest searchRequest = new SearchRequest(indexName);
		BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
		RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery("birthday");
		rangeQueryBuilder.gte("2020-10-01");
		rangeQueryBuilder.lte("2020-10-31");
		boolQueryBuilder.must(rangeQueryBuilder);
		SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
		sourceBuilder.query(boolQueryBuilder).trackTotalHits(true);
		TermsAggregationBuilder aggregation = AggregationBuilders.terms("nameAgg").field("name")
				.order(BucketOrder.aggregation("moneyAgg", true)).size(Integer.MAX_VALUE)
				.subAggregation(AggregationBuilders.sum("moneyAgg").field("money"))
				.subAggregation(AggregationBuilders.sum("ageAgg").field("age"))
				.subAggregation(AggregationBuilders.topHits("details").size(1));
		sourceBuilder.aggregation(aggregation);
		searchRequest.source(sourceBuilder);
		SearchResponse response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
		Terms nameAgg = response.getAggregations().get("nameAgg");
		int i = 0;
		for (Terms.Bucket entry : nameAgg.getBuckets()) {
			if (i++ < startInxdex) {
				continue;
			}
			if (i > endIndex) {
				break;
			}
			Sum moneysum = entry.getAggregations().get("moneyAgg");
			Sum agesum = entry.getAggregations().get("ageAgg");
			Aggregations bucketAggregations = entry.getAggregations();
			ParsedTopHits topHits = bucketAggregations.get("details");
			Map<String, Object> sourceAsMap = topHits.getHits().getHits()[0].getSourceAsMap();
			sourceAsMap.put("money", moneysum.getValue());
			sourceAsMap.put("age", agesum.getValue());
			System.out.println("name:" + entry.getKey() + "----------money:" + moneysum.getValue() + "----------age:"
					+ agesum.getValue());
		}
	}

	@Test
	public void pageManyFieldAggIndex() throws Exception {
		pageAggIndex(1, 10);
		System.out.println("***************************************************************");
		pageAggIndex(2, 10);
		System.out.println("---------------------------------------------------------------");
	}

	@Test
	public void queryManyFieldAggIndexByCombination() throws Exception {
		SearchRequest searchRequest = new SearchRequest(indexName);
		BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
		RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery("birthday");
		rangeQueryBuilder.gte("2020-10-01");
		rangeQueryBuilder.lte("2020-10-31");
		boolQueryBuilder.must(rangeQueryBuilder);
		SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
		sourceBuilder.query(boolQueryBuilder);
//		TermsAggregationBuilder aggregation = AggregationBuilders
//				.terms("nameAgg").field("name")
//				.subAggregation(AggregationBuilders.sum("moneyAgg").field("money"))
//				.subAggregation(AggregationBuilders.sum("ageAgg").field("age"))
//				.subAggregation(AggregationBuilders.topHits("details").size(1));
		Script script = new Script("doc['name'].value +'{#}'+ doc['address'].value");
		TermsAggregationBuilder aggregation = AggregationBuilders.terms("nameAgg").script(script)
				.subAggregation(AggregationBuilders.sum("moneyAgg").field("money"))
				.subAggregation(AggregationBuilders.sum("ageAgg").field("age"))
				.subAggregation(AggregationBuilders.topHits("details").size(1));

		sourceBuilder.aggregation(aggregation);
		searchRequest.source(sourceBuilder);
		SearchResponse response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
		Terms nameAgg = response.getAggregations().get("nameAgg");
		for (Terms.Bucket entry : nameAgg.getBuckets()) {
			Sum sum = entry.getAggregations().get("moneyAgg");
			Sum agesum = entry.getAggregations().get("ageAgg");
			Aggregations bucketAggregations = entry.getAggregations();
			ParsedTopHits topHits = bucketAggregations.get("details");
			Map<String, Object> sourceAsMap = topHits.getHits().getHits()[0].getSourceAsMap();
			sourceAsMap.put("money", sum.getValue());
			sourceAsMap.put("age", agesum.getValue());
			System.out.println("name:" + entry.getKey() + "----------money:" + sum.getValue() + "----------age:"
					+ agesum.getValue());
		}
	}

	@Test
	public void queryIndex() {
		SearchRequest searchRequest = new SearchRequest("landmark-agent-hall*");
		SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
		// 如果用name直接查询,其实是匹配name分词过后的索引查到的记录(倒排索引);如果用name.keyword查询则是不分词的查询,正常查询到的记录
//        RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery("birthday").from("1989-01-01").to("2021-10-10").format("yyyy-MM-dd");//范围查询
//        TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("name.keyword", name);//精准查询
//        PrefixQueryBuilder prefixQueryBuilder = QueryBuilders.prefixQuery("name.keyword", "张");//前缀查询
//        WildcardQueryBuilder wildcardQueryBuilder = QueryBuilders.wildcardQuery("name.keyword", "*三");//通配符查询
//        FuzzyQueryBuilder fuzzyQueryBuilder = QueryBuilders.fuzzyQuery("name", "三");//模糊查询
//        FieldSortBuilder fieldSortBuilder = SortBuilders.fieldSort("age");//按照年龄排序
//        fieldSortBuilder.sortMode(SortMode.MIN);//从小到大排序

		RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery("dispDate")
//				.gte("20201109").lte("20201111")
//				.format("yyyyMMdd")
		;
		rangeQueryBuilder.gte("20201109");
		rangeQueryBuilder.lte("20201111");
		BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
        boolQueryBuilder.must(rangeQueryBuilder);
        //.must(QueryBuilders.wildcardQuery("hallName.keyword", "*聚金家园(丰巢智能快递柜)*"))
//        .should(prefixQueryBuilder)
		;// and or 查询
//        boolQueryBuilder.must(rangeQueryBuilder).should(prefixQueryBuilder);//and or  查询

		sourceBuilder.query(boolQueryBuilder)
//        .sort(fieldSortBuilder)
				.trackTotalHits(true).from(0).size(10000);// 多条件查询
		sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));
		searchRequest.source(sourceBuilder);
		try { SearchResponse response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
			SearchHits hits = response.getHits();
			JSONArray jsonArray = new JSONArray();

//            List<UserTable> users=Arrays.stream(response.getHits().getHits()).map(item -> JSON.parseObject(item.getSourceAsString(), UserTable.class)).collect(Collectors.toList());
			for (SearchHit hit : hits) {
				String sourceAsString = hit.getSourceAsString();
				JSONObject jsonObject = JSON.parseObject(sourceAsString);
				jsonArray.add(jsonObject);
			}
//            return new ResponseBean(200, "查询成功", jsonArray);
			log.info("查询成功:" + jsonArray);
		} catch (IOException e) {
			e.printStackTrace();
			log.info("查询成功:" + e);
		}
	}

	private SearchHits querySearchHits() {
		SearchRequest searchRequest = new SearchRequest(indexName);
		SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
		// 如果用name直接查询,其实是匹配name分词过后的索引查到的记录(倒排索引);如果用name.keyword查询则是不分词的查询,正常查询到的记录
//        RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery("birthday").from("1989-01-01").to("2021-10-10").format("yyyy-MM-dd");//范围查询
//        TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("name.keyword", name);//精准查询
//        PrefixQueryBuilder prefixQueryBuilder = QueryBuilders.prefixQuery("name.keyword", "张");//前缀查询
//        WildcardQueryBuilder wildcardQueryBuilder = QueryBuilders.wildcardQuery("name.keyword", "*三");//通配符查询
//        FuzzyQueryBuilder fuzzyQueryBuilder = QueryBuilders.fuzzyQuery("name", "三");//模糊查询
//        FieldSortBuilder fieldSortBuilder = SortBuilders.fieldSort("age");//按照年龄排序
//        fieldSortBuilder.sortMode(SortMode.MIN);//从小到大排序
		RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery("dispDate").from("20201109").to("20201111")
				.format("yyyyMMdd");
		BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
//		boolQueryBuilder.must(rangeQueryBuilder)
//        .should(prefixQueryBuilder)
		;// and or 查询

		sourceBuilder.query(boolQueryBuilder)
//        .sort(fieldSortBuilder)
				.from(0).size(10000);// 多条件查询
		sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));
		searchRequest.source(sourceBuilder);
		try {
			SearchResponse response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
			SearchHits hits = response.getHits();
			JSONArray jsonArray = new JSONArray();

//            List<UserTable> users=Arrays.stream(response.getHits().getHits()).map(item -> JSON.parseObject(item.getSourceAsString(), UserTable.class)).collect(Collectors.toList());
//            for (SearchHit hit : hits) {
//                String sourceAsString = hit.getSourceAsString();
//                JSONObject jsonObject = JSON.parseObject(sourceAsString);
//                jsonArray.add(jsonObject);
//            }
//            return new ResponseBean(200, "查询成功", jsonArray);
			log.info("查询成功:" + jsonArray);
			return hits;
		} catch (IOException e) {
			e.printStackTrace();
			log.info("查询成功:" + e);
		}
		return null;
	}

	private void deleteIndex(String id) {
		DeleteRequest deleteRequest = new DeleteRequest(indexName);
		deleteRequest.id(id);
		try {
			DeleteResponse deleteResponse = restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);
			if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) {
				log.info("删除失败");
			} else {
				log.info("删除成功");
			}
		} catch (IOException e) {
			e.printStackTrace();
			log.info("删除异常:" + e);
		}
	}

	@Test
	public void deleteIndexs() {
		for (int i = 0; i < 10; i++) {
			SearchHits hits = querySearchHits();
			for (SearchHit hit : hits) {
				deleteIndex(hit.getId());
			}
		}

	}

	private void deleteByQuery(String prefixIndex, String suffixIndex, String from, String to) {
		String indexName = prefixIndex + "" + (suffixIndex == null ? "" : suffixIndex);
		RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery("dispDate")
//				.gte("20201109").lte("20201111")
//				.format("yyyyMMdd")
		;
		rangeQueryBuilder.gte("20201110");
		rangeQueryBuilder.lte("20201110");
		DeleteByQueryRequest request = new DeleteByQueryRequest(indexName);
		request.setQuery(rangeQueryBuilder);
		try {
			BulkByScrollResponse resp = restHighLevelClient.deleteByQuery(request, RequestOptions.DEFAULT);
			log.info(resp.getStatus() + "");
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

	@Test
	public void deleteIndexsByCondition() {
		deleteByQuery("landmark-agent-hall", "-202011", "20201101", "20201131");
	}

//	public static void main(String[] args) {
//		System.out.println(System.currentTimeMillis());
//	}

	@Test
	public void queryIndexGroup() throws IOException {
		SearchRequest searchRequest = new SearchRequest("landmark-agent-hall-*");
		SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
		// 以batchId为分组条件,terms为分组后的字段名称,field为将被分组的字段名称
		TermsAggregationBuilder aggregation = AggregationBuilders.terms("scanSiteId").field("scanSiteId")
				.order(BucketOrder.aggregation("thirdRelay", true)).size(Integer.MAX_VALUE)
				// BucketOrder.aggregation("tx_sum", false)对聚合结果的排序 true为正序 false为倒序
				// .order(BucketOrder.aggregation("siteId", true))
				// 分组求和字段,并将求和后的字段名改为score
				// subAggregation为子聚合,即在batchId分组后的小组内聚合
				.subAggregation(AggregationBuilders.sum("thirdRelay").field("thirdRelayAmount"))
				.subAggregation(AggregationBuilders.sum("thirdGuiji").field("thirdGuijiAmount"))
				.subAggregation(AggregationBuilders.sum("dStore").field("dStoreAmount"))
				.subAggregation(AggregationBuilders.sum("dGuiji").field("dGuijiAmount"))
				// 注意这里,下面介绍
				.subAggregation(AggregationBuilders.topHits("details").size(1)).size(Integer.MAX_VALUE);

		sourceBuilder.aggregation(aggregation);

		BoolQueryBuilder boolBuilder = QueryBuilders.boolQuery();

		boolBuilder.must(QueryBuilders.termQuery("manageAreaCode", "8011"));
		//boolBuilder.must(QueryBuilders.termQuery("hallCode", "FC5710480"));
		boolBuilder.must(QueryBuilders.fuzzyQuery("hallName.keyword", "xxx"));
		sourceBuilder.query(boolBuilder).trackTotalHits(true);
		/*
		 * boolBuilder.must(QueryBuilders.termQuery("manageAreaCode", "8023"))
		 * .must(QueryBuilders.termQuery("leaguename", "xxx"));
		 */
		// sourceBuilder.query(boolBuilder);

		// 组合查询
		/*
		 * sourceBuilder.query(QueryBuilders.boolQuery()
		 * .must(QueryBuilders.termsQuery("price", "35.99"))
		 * .must(QueryBuilders.termQuery("price", "49.99"))
		 * .must(QueryBuilders.rangeQuery("dispDate").gte("20201109").lte("20201111").
		 * format("yyyyMMdd")));
		 */

		// sourceBuilder.from(0).size(10);
		searchRequest.source(sourceBuilder);

		SearchResponse search = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
		SearchHits hits = search.getHits();
		TotalHits totalHits = hits.getTotalHits();
		long value = totalHits.value;
		// 和之前不同的是这里需要getAggregations获取聚合后的数据
		Aggregations aggregations = search.getAggregations();
		// 从分组后的数据中拿到hallCode的数据,这里以hallCode分组,则分组后的数据都在hallCode里
		Terms terms = aggregations.get("scanSiteId");
		// 获取到分组后的所有bucket
		List<? extends Terms.Bucket> buckets = terms.getBuckets();

		int size = buckets.size();
		for (Terms.Bucket bucket : buckets) {
			String s = bucket.getKey().toString();
			// 解析bucket 因为一级聚合为以batchId分组,二级聚合为求和,所以这里还需要getAggregations获取求和的数据
			Aggregations bucketAggregations = bucket.getAggregations();
			// 所以我们拿到了ParsedTopHits,这里我们是取了一个,所以这个值的数组长度为1
			ParsedTopHits topHits = bucketAggregations.get("details");
			// 因为求和和下面的topHits都是AggregationBuilders.terms("batchId").field("batchId.keyword")的subAggreation,所以都属于batchId组内
			// 获取到求和的数据信息
			ParsedSum thirdRelayAmount = bucketAggregations.get("thirdRelay");
			ParsedSum thirdGuijiAmount = bucketAggregations.get("thirdGuiji");
			ParsedSum dStoreAmount = bucketAggregations.get("dStore");
			ParsedSum dGuijiAmount = bucketAggregations.get("dGuiji");
			// 因为topHits中命中的hits肯定至少有一个,要不然也不会成组,所以这里直接获取第一个,并解析成map
			Map<String, Object> sourceAsMap = topHits.getHits().getHits()[0].getSourceAsMap();
			// 将求和后的integral覆盖到原数据中

			Map<String, Object> result = new HashMap<>();
			result.put("thirdRelayAmount", thirdRelayAmount.value());
			result.put("thirdGuijiAmount", thirdGuijiAmount.value());
			result.put("dStoreAmount", dStoreAmount.value());
			result.put("dGuijiAmount", dGuijiAmount.value());
			result.put("scanSiteId", sourceAsMap.get("scanSiteId"));
			result.put("scanSite", sourceAsMap.get("scanSite"));
			// 打印出统计后的数据
			System.out.println(result);
		}
	}

	@Test
	public void queryIndexGroupNoLimit() throws IOException {
		SearchRequest searchRequest = new SearchRequest("landmark-agent-hall-*");
		BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
		SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
		sourceBuilder.query(boolQueryBuilder).trackTotalHits(true);
		TermsAggregationBuilder aggregation = AggregationBuilders.terms("scanSiteIdAgg").field("scanSiteId")
				.size(100000).order(BucketOrder.aggregation("thirdRelayAmountAgg", true))
				.subAggregation(AggregationBuilders.sum("thirdRelayAmountAgg").field("thirdRelayAmount"))
				.subAggregation(AggregationBuilders.sum("thirdGuijiAmountAgg").field("thirdGuijiAmount"))
				.subAggregation(AggregationBuilders.topHits("details").size(1));
		sourceBuilder.aggregation(aggregation);
		searchRequest.source(sourceBuilder);

		SearchResponse response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
		Terms nameAgg = response.getAggregations().get("scanSiteIdAgg");
		for (Terms.Bucket entry : nameAgg.getBuckets()) {
			Sum moneysum = entry.getAggregations().get("thirdRelayAmountAgg");
			Sum agesum = entry.getAggregations().get("thirdGuijiAmountAgg");
			Aggregations bucketAggregations = entry.getAggregations();
			ParsedTopHits topHits = bucketAggregations.get("details");
			Map<String, Object> sourceAsMap = topHits.getHits().getHits()[0].getSourceAsMap();
			sourceAsMap.put("thirdRelayAmount", moneysum.getValue());
			sourceAsMap.put("thirdGuijiAmount", agesum.getValue());
			System.out.println("name:" + entry.getKey() + "----------money:" + moneysum.getValue() + "----------age:"
					+ agesum.getValue() + "-----size:" + nameAgg.getBuckets().size());
		}
		System.out.println("************************************************");
	}

	@Test
	public void queryCompositeIndex() throws Exception {
		SearchRequest searchRequest = new SearchRequest("landmark-agent-hall-*");
		SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
		searchSourceBuilder.size(0);
		/******************** 以下组装聚合的三个字段 ****************************/
		List<CompositeValuesSourceBuilder<?>> sources = new ArrayList<>();
		TermsValuesSourceBuilder scanSiteId = new TermsValuesSourceBuilder("scanSiteId").field("scanSiteId")
				.missingBucket(false);
		sources.add(scanSiteId);
		TermsValuesSourceBuilder thirdRelayAmount = new TermsValuesSourceBuilder("thirdRelayAmount")
				.field("thirdRelayAmount").missingBucket(true);
		sources.add(thirdRelayAmount);
		TermsValuesSourceBuilder thirdGuijiAmount = new TermsValuesSourceBuilder("thirdGuijiAmount")
				.field("thirdGuijiAmount").missingBucket(true);
		sources.add(thirdGuijiAmount);
		CompositeAggregationBuilder composite = new CompositeAggregationBuilder("my_buckets", sources);
		composite.size(100000);
		/********************* 执行查询 ******************************/
		searchSourceBuilder.aggregation(composite);
		searchRequest.source(searchSourceBuilder);
		SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
		/******************** 取出数据 *******************/
		Aggregations aggregations = searchResponse.getAggregations();
		ParsedComposite parsedComposite = aggregations.get("my_buckets");
		List<ParsedBucket> list = parsedComposite.getBuckets();
		Map<String, Object> data = new HashMap<>();
		for (ParsedBucket parsedBucket : list) {
			data.clear();
			for (Map.Entry<String, Object> m : parsedBucket.getKey().entrySet()) {
				data.put(m.getKey(), m.getValue());
			}
			data.put("count", parsedBucket.getDocCount());
			System.out.println(data);
		}
	}

	@Test
	public void queryCompsiteIndexForUser() throws Exception {
		SearchRequest searchRequest = new SearchRequest(indexName);
		BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
		RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery("birthday");
		rangeQueryBuilder.gte("2020-10-01");
		rangeQueryBuilder.lte("2020-10-31");
		boolQueryBuilder.must(rangeQueryBuilder);
		SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
		sourceBuilder.query(boolQueryBuilder);
//		List<CompositeValuesSourceBuilder<?>> sources = new ArrayList<>();
//		sources.add(new TermsValuesSourceBuilder("name").field("name").missingBucket(false));
		CompositeAggregationBuilder aggregation =
				new CompositeAggregationBuilder("sumAgg",null).size(10000);
		aggregation.subAggregation(AggregationBuilders.sum("money").field("money"))
				   .subAggregation(AggregationBuilders.sum("age").field("age"));

		sourceBuilder.aggregation(aggregation);
		searchRequest.source(sourceBuilder);
		SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
		Aggregations aggregations = searchResponse.getAggregations();
		ParsedComposite parsedComposite = aggregations.get("sumAgg");
		List<ParsedBucket> list = parsedComposite.getBuckets();
//		Map<String, Object> data = new HashMap<>();
//		List<User> users=new ArrayList<User>();
		for (ParsedBucket parsedBucket : list) {
			ParsedSum moneysum= parsedBucket.getAggregations().get("money");
			double money=moneysum.getValue();
			ParsedSum agesum= parsedBucket.getAggregations().get("age");
			double age=agesum.getValue();
			System.out.println("-----------money:"+money+"-------------age:"+age);
//			data.clear();
//			for (Map.Entry<String, Object> m : parsedBucket.getKey().entrySet()) {
//				User user=new User();
//				user.setName(m.getValue().toString());
//				user.setMoney(money);
//				user.setAge((int) age);
//				users.add(user);
//				data.put(m.getKey(), m.getValue());
//			}
//			data.put("count", parsedBucket.getDocCount());
//			System.out.println(JSON.toJSONString(users));
		}
		System.out.println("**************************************************");
	}

}

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics