本文介绍ES的数据迁移方案:
由于ES更新速度比较快,很大程度上, 我们需要更新版本、插件、甚至更新分词器, 单纯的upgrade很有可能不能满足业务需求, 更坏的情况下, 可能需要重建索引。本文从Java API 的角度来介绍ES的数据迁移(或数据重新索引)。基于以下逻辑实现,个人已测试过2亿数据的迁移,可以放心使用。
1. 获取clientl连接。本文选择transportClient。
public class ClientUtil { static Settings defaultSettings = ImmutableSettings.settingsBuilder().put("client.transport.sniff", false).put("client.transport.ping_timeout","10s").build(); //如果你的集群node数量是稳定的,那么最好关闭sniff。 同时, 将ping时间设置高于默认5s, 很大程序上可以解决No Node available exception. // 创建私有对象 private static TransportClient targetClient; private static TransportClient sourceClient; static { try { Class<?> clazz = Class.forName(TransportClient.class.getName()); Constructor<?> constructor = clazz.getDeclaredConstructor(Settings.class); constructor.setAccessible(true); Settings finalSettings = ImmutableSettings.settingsBuilder() .put(defaultSettings) .build(); targetClient = (TransportClient) constructor.newInstance(finalSettings); targetClient.addTransportAddress(new InetSocketTransportAddress("192.168.1.100", 9300)) .addTransportAddress(new InetSocketTransportAddress("192.168.1.101", 9300)); sourceClient = (TransportClient) constructor.newInstance(finalSettings); sourceClient.addTransportAddress(new InetSocketTransportAddress("192.168.1.110", 9300)) .addTransportAddress(new InetSocketTransportAddress("192.168.1.111", 9300)); } catch (Exception e) { e.printStackTrace(); } } // 取得源实例 public static synchronized Client getSourceTransportClient() { return sourceClient; } // 取得目标实例 public static synchronized Client getTargetTransportClient() { return targetClient; } }
以上代码用于获取源cluster和目标cluster的client.
2.迁移主方法:
private void doMigrate(Client sourceclient, Client targetclient, String sourceIndexName, String targetIndexName, String indexDocType, int pageSize) { int total = 0; SearchResponse searchResponse = sourceclient.prepareSearch(sourceIndexName).setSearchType(SearchType.SCAN) .setQuery(matchAllQuery()).setSize(pageSize).setScroll(TimeValue.timeValueSeconds(20)).execute() .actionGet(); //scroll 的time不能太大, 以免对集群造成负载 boolean exists = targetclient.admin().indices().prepareExists(targetIndexName).execute().actionGet().isExists(); if (!exists) targetclient .admin() .indices() .prepareCreate(targetIndexName) .setSettings( settingsBuilder().put("index.number_of_replicas", 0).put("index.refresh_interval", "-1")) .execute().actionGet(); //设置replica为0, 不refresh, 为了提高索引速度。 try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } BulkProcessor bulkProcessor = BulkProcessor.builder(targetclient, new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { if (response.hasFailures()) { throw new RuntimeException("BulkResponse show failures: " + response.buildFailureMessage()); } } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { throw new RuntimeException("Caught exception in bulk: " + request + ", failure: " + failure, failure); } }).setConcurrentRequests(10).build(); //设置线程数量, 大小可以根据自己机器调配。 while (true) { searchResponse = sourceclient.prepareSearchScroll(searchResponse.getScrollId()) .setScroll(TimeValue.timeValueSeconds(20)).execute().actionGet(); for (SearchHit hit : searchResponse.getHits()) { IndexRequestBuilder indexRequestBuilder = targetclient.prepareIndex(targetIndexName, indexDocType); indexRequestBuilder.setSource(hit.getSource()); indexRequestBuilder.setId(hit.getId()); indexRequestBuilder.setOpType(IndexRequest.OpType.INDEX); bulkProcessor.add(indexRequestBuilder.request()); total++; } System.out.println("Already migrated : " + total + " records!"); if (searchResponse.getHits().hits().length == 0) { break; } } try { Thread.sleep(10000);//Sleep 10s waiting the cluster. } catch (InterruptedException e) { e.printStackTrace(); } bulkProcessor.close(); targetclient .admin() .indices().prepareUpdateSettings(targetIndexName).setSettings( settingsBuilder().put("index.number_of_replicas", 1).put("index.refresh_interval", "1s")) .execute().actionGet(); }
3.测试:
public static void main(String[] args) throws ElasticSearchException, IOException, InterruptedException { int pageSize = 40; //分页大小, 不能过大, 太大影响集群性能, 可能引起no node 异常。 Client sourceclient = ClientUtil.getSourceTransportClient(); Client targetclient = ClientUtil.getTargetTransportClient(); //调用doMigrate方法。 doMigrate(sourceclient, targetclient, "test", "testnew", "test", pageSize);}
相关推荐
Elasticsearch-migration
Elasticsearch Migration的工作方式与Flyway相似,但是使用yaml文件来描述变更集。 要求 Java(已通过JDK 8+测试) Elasticsearch 6.xx(经过6.2.4+测试。由于它使用的是REST Api,因此可以与较低版本一起使用) ...
解决spring-data-elasticsearch 5.4.0 不支持 5.4.1的elasticsearch问题
Spring Data Elasticsearch API(Spring Data Elasticsearch 开发文档).CHM。 官网 Spring Data Elasticsearch API
spring-data-elasticsearch中文使用文档,spring-data-elasticsearch、elasticsearch、ES、ElasticSearch、ES中文教程
spring-data-elasticsearch api文档
(狂神)ElasticSearch快速入门笔记,ElasticSearch基本操作以及爬虫(Java-ES仿京东实战),包含了小狂神讲的东西,特别适合新手学习,笔记保存下来可以多看看。好记性不如烂笔头哦~,ElasticSearch,简称es,es是一个...
spring-data-elasticsearch api 离线文档, spring-data-elasticsearch2.0.2spring-data-elasticsearch api spring-data-elasticsearch api 离线文档
v /Users/xingyue/Home/xingyue/学习/工程化/es/data:/usr/share/elasticsearch/data -v /Users/xingyue/Home/xingyue/学习/工程化/es/config:/usr/share/elasticsearch/config -v /Users/xingyue/Home/xingyue/学习...
elasticsearch elasticsearch-6.2.2 elasticsearch-6.2.2.zip 下载
spring data elasticsearch
一、概述 一般来说我们开发Elasticsearch会选择...2、elasticsearch-head (方便查看ES中的索引及数据) 3、Kibana(方便开发通过rest api 调试ES,有代码提示) 4、中文分词elasticsearch-analysis-ik (ik) 1、下载ela
该工具实现从ES中导出数据,并且可以对导出的数据格式和数据文件做部分自定义,该工具主要使用ES中srcoll接口多线程导出数据. Design 项目采用 Java 构建。 访问ES部分采用官方 RestClient 构建通信。 数据导出方式为...
elasticsearch-8.2.3 windows 版本。 Elasticsearch是一个基于Lucene的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。Elasticsearch是用Java语言开发的,并作为Apache许可条款下的...
elasticsearch-7.17.6及对应版本IK分词 适合人群:elasticsearch初学者 Elasticsearch 是位于 Elastic Stack 核心的分布式搜索和分析引擎。Logstash 和 Beats 有助于收集、聚合和丰富您的数据并将其存储在 Elastic...
Discover steps to ingest structured and unstructured data using Elasticsearch Find out how to query Elasticsearch with a high degree of performance and scalability See how to slice and dice your data ...
本示例程序主要是对spring data elasticsearch的实践,包含接口声明查询、注解查询和自定义repository查询。运行TestCase时候,请先将配置文件中ES服务端ip:port配置替换成真是的服务端地址
本示例程序主要是对spring data elasticsearch的实践,包含接口声明查询、注解查询和自定义repository查询。运行TestCase时候,请先将配置文件中ES服务端ip:port配置替换成真是的服务端地址
适用于7.17.1系列,例如Elasticsearch的7.17.12版本。 elasticsearch-analysis-ik 是一个常用的中文分词器,在 Elasticsearch 中广泛应用于中文文本的分析和搜索。下面是 elasticsearch-analysis-ik 分词器的几个...