`
ssydxa219
  • 浏览: 609685 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
文章分类
社区版块
存档分类
最新评论

es7.16.2 dataoutput

    博客分类:
  • es
阅读更多
package com.wugui.eswr;

import cn.hutool.core.io.FileUtil;
import com.alibaba.fastjson.JSONObject;
import com.google.gson.Gson;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.lucene.search.TotalHits;
import org.elasticsearch.action.get.MultiGetRequest;
import org.elasticsearch.action.search.*;
import org.elasticsearch.client.*;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.client.indices.GetIndexResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.Fuzziness;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.*;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.jetbrains.annotations.NotNull;

import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

public class Main {

    private static String scheme = "http";
    private static String host = "202.107.24.16";
    private static int port = 9201;
    private static String username = "elas918273";
    private static String password = "Elastic#@!123";

    private static String auth = "Authorization Type : Basic Auth";

    private static String _index = ".security-7";

    private static String _type = "_doc";

    public static RestHighLevelClient esClient = null;

    private static RequestOptions COMMON_OPTIONS = null;


    public static RestHighLevelClient ESClientInit() {
        //不需要用户名和密码的认证
        //esClient = new RestHighLevelClient(RestClient.builder(new HttpHost(hostname, port, scheme)));
        //需要用户名和密码的认证
        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
        RestClientBuilder restClientBuilder = RestClient.builder(new HttpHost(host, port, scheme))
                .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                    @Override
                    public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {
                        return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                    }
                });
        esClient = new RestHighLevelClient(restClientBuilder);
        RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
        builder.setHttpAsyncResponseConsumerFactory(
                // 设置查询内容大小限制,默认100 * 1024 * 1024
                new HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory(200 * 1024 * 1024)
        );
        COMMON_OPTIONS = builder.build();

        return esClient;
    }

    public static void main(String[] args) throws Exception {

        //queryAllByHits(ESClientInit());

        queryAllByScollData(ESClientInit());
    }

    /**
     * 创建索引
     *
     * @param esClient
     * @throws Exception
     */
    public static void indexCreate(RestHighLevelClient esClient) throws Exception {

        //创建索引
        CreateIndexRequest request = new CreateIndexRequest(".securityxxx-7");
        CreateIndexResponse createIndexResponse = esClient.indices().create(request, RequestOptions.DEFAULT);

        //拿到响应状态
        boolean acknowledged = createIndexResponse.isAcknowledged();
        System.out.println("索引操作: " + acknowledged);

        //关闭客户端连接
        esClient.close();
    }

    /**
     * 获取索引信息
     *
     * @param esClient
     * @throws Exception
     */
    public static void indexQuery(RestHighLevelClient esClient) throws Exception {
        //创建索引连接
        GetIndexRequest request = new GetIndexRequest(".security-7");
        GetIndexResponse getIndexResponse = esClient.indices().get(request, RequestOptions.DEFAULT);

        System.out.println("索引查询: " + getIndexResponse.getAliases());
        System.out.println("索引查询: " + getIndexResponse.getMappings());
        System.out.println("索引查询: " + getIndexResponse.getSettings());

        //关闭客户端连接
        esClient.close();
    }

    /**
     * @param esClient
     * @throws Exception
     */
    private static void queryALl(RestHighLevelClient esClient) throws Exception {
        //查询索引中的全部数据
        SearchRequest request = new SearchRequest();
        request.indices(".security-7");

        //查询全部 matchAll
        request.source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery()));

        SearchResponse response = esClient.search(request, RequestOptions.DEFAULT);
        SearchHits hits = response.getHits();

        System.out.println(hits.getTotalHits());
        System.out.println(response.getTook());

        for (SearchHit hit : hits.getHits()) {
            System.out.println(hit.getSourceAsString());
        }
    }

    /**
     * 条件查询
     *
     * @param esClient
     * @throws Exception
     */
    private static void queryCondition(RestHighLevelClient esClient) throws Exception {
        //查询索引中的全部数据
        SearchRequest request = new SearchRequest();
        request.indices("user");

        //分词中 同时包含李强才可以,因为底层默认使用英文分词器,所以李强会被分为 李,强没办法符合条件所以查询不到
        //request.source(new SearchSourceBuilder().query(QueryBuilders.termQuery("name","李强")));
        //这样是李强字段不可再分,查询name字段中包含此字段的数据
        request.source(new SearchSourceBuilder().query(QueryBuilders.matchPhraseQuery("name", "李强")));


        SearchResponse response = esClient.search(request, RequestOptions.DEFAULT);
        SearchHits hits = response.getHits();

        System.out.println(hits.getTotalHits());
        System.out.println(response.getTook());

        for (SearchHit hit : hits.getHits()) {
            System.out.println(hit.getSourceAsString());
        }
    }

    /**
     * 分页查询、部分字段显示、排序
     *
     * @param esClient
     * @throws Exception
     */
    private static void queryPage(RestHighLevelClient esClient) throws Exception {
        SearchRequest request = new SearchRequest();
        request.indices(".security-7");

        //构造查询条件 "match_all", "{}"
        SearchSourceBuilder query = new SearchSourceBuilder().query(QueryBuilders.matchAllQuery());

        //分页
        query.from(0); //从第几条数据开始截取 (页码-1)* 每页数量
        query.size(2); //每页的数量

        String[] excludes = {"name"}; //排除字段
        String[] includes = {"tel"}; //展示字段

        query.fetchSource(includes, excludes);

        //排序
        query.sort("create_time", SortOrder.ASC);

        request.source(query);

        SearchResponse response = esClient.search(request, RequestOptions.DEFAULT);
        SearchHits hits = response.getHits();

        for (SearchHit hit : hits.getHits()) {
            System.out.println(hit.getSourceAsString());
        }
    }

    /**
     * 组合条件查询
     *
     * @param esClient
     * @throws Exception
     */
    private static void queryConditionS(RestHighLevelClient esClient) throws Exception {
        SearchRequest request = new SearchRequest();
        request.indices("user");

        //构造查询条件
        SearchSourceBuilder builder = new SearchSourceBuilder();
        BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();

        //组合条件查询
        //与
//        boolQueryBuilder.must(QueryBuilders.matchQuery("name","小"));
//        boolQueryBuilder.must(QueryBuilders.matchQuery("sex","性别1"));
//        boolQueryBuilder.mustNot(QueryBuilders.matchQuery("sex","性别1"));

        //或
        boolQueryBuilder.should(QueryBuilders.matchQuery("name", "华"));
        boolQueryBuilder.should(QueryBuilders.matchQuery("sex", "男"));

        //排序
        builder.sort("tel", SortOrder.ASC);

        builder.query(boolQueryBuilder);
        request.source(builder);

        SearchResponse response = esClient.search(request, RequestOptions.DEFAULT);
        SearchHits hits = response.getHits();

        System.out.println(hits.getTotalHits());
        System.out.println(response.getTook());

        for (SearchHit hit : hits.getHits()) {
            System.out.println(hit.getSourceAsString());
        }
    }

    /**
     * 范围查询
     *
     * @param esClient
     * @throws Exception
     */
    private static void queryBetween(RestHighLevelClient esClient) throws Exception {
        SearchRequest request = new SearchRequest();
        request.indices("user");

        //构造查询条件
        SearchSourceBuilder builder = new SearchSourceBuilder();
        RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery("age");
        rangeQueryBuilder.gte(30);
        rangeQueryBuilder.lte(32);

        //排序
        builder.sort("tel", SortOrder.ASC);

        builder.query(rangeQueryBuilder);
        request.source(builder);

        SearchResponse response = esClient.search(request, RequestOptions.DEFAULT);
        SearchHits hits = response.getHits();

        System.out.println(hits.getTotalHits());
        System.out.println(response.getTook());

        for (SearchHit hit : hits.getHits()) {
            System.out.println(hit.getSourceAsString());
        }
    }

    /**
     * 模糊查询(不针对中文)
     *
     * @param esClient
     * @throws Exception
     */
    private static void queryLike(RestHighLevelClient esClient) throws Exception {
        SearchRequest request = new SearchRequest();
        request.indices("user");

        //构造查询条件
        SearchSourceBuilder builder = new SearchSourceBuilder();
        builder.query(QueryBuilders.fuzzyQuery("name", "zhangsan").fuzziness(Fuzziness.ONE));

        request.source(builder);

        SearchResponse response = esClient.search(request, RequestOptions.DEFAULT);
        SearchHits hits = response.getHits();

        System.out.println(hits.getTotalHits());
        System.out.println(response.getTook());

        for (SearchHit hit : hits.getHits()) {
            System.out.println(hit.getSourceAsString());
        }
    }

    /**
     * 高亮查询
     *
     * @param esClient
     * @throws Exception
     */
    private static void queryHighLighter(RestHighLevelClient esClient) throws Exception {
        SearchRequest request = new SearchRequest();
        request.indices("user");

        //构造查询条件
        SearchSourceBuilder builder = new SearchSourceBuilder();
        TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("name", "zhang");

        HighlightBuilder highlightBuilder = new HighlightBuilder();
        highlightBuilder.preTags("<h1 color='red'>");
        highlightBuilder.postTags("</h1>");
        highlightBuilder.field("name");

        builder.highlighter(highlightBuilder);
        builder.query(termQueryBuilder);

        request.source(builder);

        SearchResponse response = esClient.search(request, RequestOptions.DEFAULT);
        SearchHits hits = response.getHits();

        System.out.println(hits.getTotalHits());
        System.out.println(response.getTook());

        for (SearchHit hit : hits.getHits()) {
            System.out.println(hit.getSourceAsString());
        }
    }

    /**
     * 聚合查询(最大值)
     *
     * @param esClient
     * @throws Exception
     */
    private static void queryAggregation(RestHighLevelClient esClient) throws Exception {
        SearchRequest request = new SearchRequest();
        request.indices("user");

        //构造查询条件
        SearchSourceBuilder builder = new SearchSourceBuilder();
//分组名称,聚合字段
        AggregationBuilder aggregationBuilder = AggregationBuilders.max("maxAge").field("age");

        builder.aggregation(aggregationBuilder);
        request.source(builder);

        SearchResponse response = esClient.search(request, RequestOptions.DEFAULT);
        System.out.println(response);
        SearchHits hits = response.getHits();

        System.out.println(hits.getTotalHits());
        System.out.println(response.getTook());

        for (SearchHit hit : hits.getHits()) {
            System.out.println(hit.getSourceAsString());
        }
    }

    /**
     * 聚合查询(group分组)
     *
     * @param esClient
     * @throws Exception
     */
    private static void queryGroup(RestHighLevelClient esClient) throws Exception {
        SearchRequest request = new SearchRequest();
        request.indices("user");

        //构造查询条件
        SearchSourceBuilder builder = new SearchSourceBuilder();

        AggregationBuilder aggregationBuilder = AggregationBuilders.terms("ageGroup").field("age");

        builder.aggregation(aggregationBuilder);
        request.source(builder);

        SearchResponse response = esClient.search(request, RequestOptions.DEFAULT);
        System.out.println(response);
        SearchHits hits = response.getHits();

        System.out.println(hits.getTotalHits());
        System.out.println(response.getTook());

        for (SearchHit hit : hits.getHits()) {
            System.out.println(hit.getSourceAsString());
        }
    }

    /**
     * 查询全部
     */
    private static void queryAllByHits(RestHighLevelClient esClient) throws IOException {
        Gson gson = new Gson();
        // 创建搜索对象
        SearchRequest searchRequest = new SearchRequest();
        // 查询构建工具
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        //排序
        //No mapping found for [create_time] in order to sort on
        //searchSourceBuilder.sort("create_time", SortOrder.ASC);
        // 添加查询条件,通过QueryBuilders获取各种查询
        searchSourceBuilder.query(QueryBuilders.matchAllQuery());//"match_all", "{}"
        searchRequest.source(searchSourceBuilder);
        // 搜索
        SearchResponse search = esClient.search(searchRequest, RequestOptions.DEFAULT);

        // 解析
        SearchHits hits = search.getHits();
        SearchHit[] hits1 = hits.getHits();
        for (SearchHit hit : hits1) {
            // 取出source数据
            String itemstring = hit.getSourceAsString();
            // 反序列化
            HashMap item = gson.fromJson(itemstring, HashMap.class);
            System.out.println(item);
        }
    }

    private static void queryAllByScoll1(RestHighLevelClient esClient) throws IOException {
        try {
            long startTime = System.currentTimeMillis();
            /*创建客户端*/
            //client startup
            //设置集群名称
            Settings settings = Settings.builder()
                    .put("cluster.name", "elsearch")
                    .put("client.transport.sniff", true)
                    .build();
            //创建client
            RestHighLevelClient client = esClient;

            List<String> result = new ArrayList<>();

            String scrollId = "";

            //第一次请求
            SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();


            //TODO: 设置查询条件
            RangeQueryBuilder rangequerybuilder = QueryBuilders
                    .rangeQuery("inputtime")
                    .from("2016-12-14 02:00:00").to("2016-12-14 07:59:59");
            sourceBuilder.query(QueryBuilders.boolQuery()
                            .must(QueryBuilders
                                    .matchPhraseQuery("pointid", "W3.UNIT1.10HFC01CT013"))
                            .must(rangequerybuilder))
                    .size(100)//如果开启游标,则滚动获取
                    .sort("inputtime", SortOrder.ASC);
            //查询
            SearchRequest request = Requests.searchRequest("pointdata");
            request.scroll("2m");
            request.source(sourceBuilder);
            SearchResponse response = client.search(request,null);//.actionGet();
            //TODO:处理数据
            SearchHits hits = response.getHits();
            for (int i = 0; i < hits.getHits().length; i++) {
                //System.out.println(hits.getHits()[i].getSourceAsString());
                result.add(hits.getHits()[i].getSourceAsString());
            }
            //记录滚动ID
            scrollId = response.getScrollId();


            while (true) {
                //后续的请求
                //scrollId = query.getScollId();
                SearchScrollRequestBuilder searchScrollRequestBuilder = null;//client.prepareSearchScroll(scrollId);
                // 重新设定滚动时间
                //TimeValue timeValue = new TimeValue(30000);
                searchScrollRequestBuilder.setScroll("2m");
                // 请求
                SearchResponse response1 = searchScrollRequestBuilder.get();

                //TODO:处理数据
                SearchHits hits2 = response1.getHits();
                if (hits2.getHits().length == 0) {
                    break;
                }
                for (int i = 0; i < hits2.getHits().length; i++) {
                    result.add(hits2.getHits()[i].getSourceAsString());
                }
                //下一批处理
                scrollId = response1.getScrollId();
            }

            System.out.println(result.size());
            long endTime = System.currentTimeMillis();
            System.out.println("Java程序运行时间:" + (endTime - startTime) + "ms");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static void queryAllByScoll(RestHighLevelClient esClient) throws IOException {

        BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
        boolQuery.should(QueryBuilders.termQuery("match_all","{}"));
        // 存活时间,当索引数据量特别大时,出现超时可能性大,此值适当调大
        Scroll scroll = new Scroll(TimeValue.timeValueMinutes(10L));
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(boolQuery);
        searchSourceBuilder.size(5);
        SearchRequest searchRequest = new SearchRequest()
        // ES7已经去掉type,查询时加type
                .indices(_index)
                .scroll(scroll)
                .source(searchSourceBuilder);
        SearchResponse searchResponse = null;
        try{
            searchResponse = esClient.search(searchRequest, RequestOptions.DEFAULT);
        } catch(
        IOException e) {
            e.printStackTrace();
        }
        String scrollId = searchResponse.getScrollId();
        SearchHit[] searchHits = searchResponse.getHits().getHits();
        for(SearchHit searchHit :searchHits){
                System.out.println(searchHit.getSourceAsString());
            }
        //遍历搜索命中的数据,直到没有数据
        while(searchHits !=null&&searchHits.length >0) {

                SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);

                scrollRequest.scroll(scroll);

                try {
                    searchResponse = esClient.scroll(scrollRequest, RequestOptions.DEFAULT);
                } catch (IOException e) {
                    e.printStackTrace();
                }

                scrollId = searchResponse.getScrollId();
                searchHits = searchResponse.getHits().getHits();
                if (searchHits != null && searchHits.length > 0) {
                    for (SearchHit searchHit : searchHits) {
                        System.out.println(searchHit.getSourceAsString());
                    }

                }

            }
        //clean scroll
            ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
            clearScrollRequest.addScrollId(scrollId);
            ClearScrollResponse clearScrollResponse = null;
        try{
                clearScrollResponse = esClient.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
            } catch(
            IOException e) {
                //log.error("clear-scroll-error:{}", e);
            }
            boolean succeeded = clearScrollResponse.isSucceeded();
        System.out.println(succeeded);
    }

    public static void queryAllByScollData(RestHighLevelClient esClient) throws IOException {
        int i = 1,size = 2000,total = 43316300;

        for(i=909 ; i<total/size ;  i++) {
            SearchRequest searchRequest = new SearchRequest();
            //Scroll scroll = new Scroll(TimeValue.timeValueMinutes(5L));
            //searchRequest.scroll(scroll);
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();

            MatchAllQueryBuilder matchAllQueryBuilder = QueryBuilders.matchAllQuery();
            searchSourceBuilder.query(matchAllQueryBuilder);
            searchSourceBuilder.from(i*size);
            searchSourceBuilder.size(size);

            searchRequest.source(searchSourceBuilder);
            SearchResponse response = esClient.search(searchRequest, COMMON_OPTIONS);
            //String scrollId = response.getScrollId();
            SearchHit[] searchHits = response.getHits().getHits();

            //System.out.println(response.getHits().getTotalHits());
            for (SearchHit searchHit : searchHits) {
                //System.out.println(searchHit.getSourceAsString());
                FileUtil.appendString(searchHit.getSourceAsString()+"\n",new File("D:\\aa\\"+i*size+".csv"),"UTF-8");
            }
            /*while (searchHits != null && searchHits.length > 0) {
                SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);
                scrollRequest.scroll(scroll);
                response = esClient.scroll(scrollRequest, RequestOptions.DEFAULT);
                scrollId = response.getScrollId();
                searchHits = response.getHits().getHits();

                for (SearchHit searchHit : searchHits) {
                    i++;
                    System.out.println(searchHit.getSourceAsString());

                }
                if (i > 10) {
                    break;
                }
            }*/
        }
    }



}
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics