`
lc_koven
  • 浏览: 349965 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

在不同版本hdfs集群之间转移数据

阅读更多
本文仅供记录一下程序心得:
    很多人会有这样一个需求:将一个hdfs集群上的数据写入另一个hdfs集群所在的hbase数据库。通常情况下两个hdfs集群的版本差距并不大,这样的程序会很容易写。但有时会跨大版本。比如作者所在的厂子,数据都在基于hadoop0.19.2版本修改的hdfs集群上,要将这样的数据导入版本为0.20.2+的hdfs集群,就不能使用同一个hadoop jar包来完成了。如何实现呢?
    最简单的办法就是把src集群的数据导到本地,然后起另一个进程将本地数据传到des集群上去。
    不过这有几个问题:
  • 效率降低
  • 占用本地磁盘空间
  • 不能应付实时导数据需求
  • 两个进程需要协调,复杂度增加

    更好的办法是在同一个进程内一边读src数据,一边写des集群。不过这相当于在同一个进程空间内加载两个版本的hadoop jar包,这就需要在程序中使用两个classloader来实现。
    以下代码可以实现classloader加载自定义的jar包,并生成需要的Configuration对象:
URL[] jarUrls = new URL[1];
jarUrls[0]=new File(des_jar_path).toURI().toURL();
ClassLoader jarloader = new URLClassLoader(jarUrls, null);
Class Proxy = Class.forName("yourclass", true, jarloader);
Configuration conf = (Configuration)Proxy.newInstance();


    但是由于在生成HTable对象时,需要使用这个conf对象,而加载这个conf对象的代码本身是由默认的classloader加载的,也就是0.19.2的jar包。所以在以上代码最后一行所强制转换的Configuration对象仍然是0.19.2版本的。那怎么办呢?
    琢磨了一会,发现如果要实现以上功能,必须将生成HTable对象,以及以后的所有hbase操作都使用这个新的classloader,因此这个新的classloader必须加载除了0.19.2的jar包外所有需要用到的jar包,然后把所有操作都封装进去。在外面用反射来调用。
    这样的话,通常构造函数都不为空了,因此需要用到Constructor来构造一个自定义的构造函数
    代码段如下:
main.java
void init(){
	ClassLoader jarloader = generateJarLoader();
	Class Proxy = Class.forName("test.writer.hbasewriter.HBaseProxy", true, jarloader);
	Constructor con = Proxy.getConstructor(new Class[]{String.class, String.class, boolean.class});
	Boolean autoflush = param.getBoolValue(ParamsKey.HbaseWriter.autoFlush, true);
	proxy = con.newInstance(new Object[]{path, tablename, autoflush});
}
void put(){
...
	while((line = getLine()) != null) {
		proxy.getClass().getMethod("generatePut",String.class).invoke(proxy, line.getField(rowkey));
		Method addPut = proxy.getClass().getMethod("addPut",
				new Class[]{String.class, String.class, String.class});
		addPut.invoke(proxy, new Object[]{field, column, encode});
		proxy.getClass().getMethod("putLine").invoke(proxy);
	}
}

ClassLoader generateJarLoader() throws IOException {
      String libPath = System.getProperty("java.ext.dirs");
      FileFilter filter = new FileFilter() {
      @Override
      public boolean accept(File pathname) {
      	if(pathname.getName().startsWith("hadoop-0.19.2"))
          return false;
      	else
      		return pathname.getName().endsWith(".jar");
      }
      };
      File[] jars = new File(libPath).listFiles(filter);
      URL[] jarUrls = new URL[jars.length+1];
		
      int k = 0;
      for (int i = 0; i < jars.length; i++) {
        jarUrls[k++] = jars[i].toURI().toURL();
      }
      jarUrls[k] = new File("hadoop-0.20.205.jar")
      ClassLoader jarloader = new URLClassLoader(jarUrls, null);
      return jarloader;
}


HBaseProxy.java
public HBaseProxy(String hbase_conf, String tableName, boolean autoflush)
     throws IOException{
		Configuration conf = new Configuration();
		conf.addResource(new Path(hbase_conf));
		config = new Configuration(conf);
		htable = new HTable(config, tableName);
		admin = new HBaseAdmin(config);
		htable.setAutoFlush(autoflush);
	}
public void addPut(String field, String column, String encode) throws IOException {
    try {
			p.add(column.split(":")[0].getBytes(), column.split(":")[1].getBytes(),
					field.getBytes(encode));
		} catch (UnsupportedEncodingException e) {
			p.add(column.split(":")[0].getBytes(), column.split(":")[1].getBytes(),
					field.getBytes());
		}
		
	}
    public void generatePut(String rowkey){
		p = new Put(rowkey.getBytes());
	}
	
    public void putLine() throws IOException{
		htable.put(p);
	}

    总之,在同一个进程中加载多个classloader时一定要注意,classloader A所加载的对象是不能转换成classloader B的对象的,当然也不能使用。两个空间的相互调用只能用java的基本类型或是反射。
分享到:
评论
3 楼 brandom520 2014-07-24  
请问lz,我从hbase0.94版本上的数据导入到0.96.1.1-cdh5.0.3版本里。按照你提供的方法测试报错。帮看看什么原因
Caused by: java.lang.RuntimeException: hbase-default.xml file seems to be for and old version of HBase (0.94.13), this version is 0.96.1.1-cdh5.0.3
        at org.apache.hadoop.hbase.HBaseConfiguration.checkDefaultsVersion(HBaseConfiguration.java:70)
        at org.apache.hadoop.hbase.HBaseConfiguration.addHbaseResources(HBaseConfiguration.java:102)
        at org.apache.hadoop.hbase.HBaseConfiguration.create(HBaseConfiguration.java:113)
        at org.apache.hadoop.hbase.client.HConnectionManager.<clinit>(HConnectionManager.java:212)
        ... 9 more
2 楼 lc_koven 2011-10-27  
fakechris 写道
hadoop distcp hftp://from hdfs://to 也可以

恩,很多情况下这也是一个选择,我忘写上去了:)。不过这个方法有三个弱点:1 对于自己修改的版本通常不能使用(比如cloudrea版本hdfs无法读入ugi密码等问题)。这是让我们放弃这个方法的主要原因 2 hftp安全性问题 3 不灵活,必须拷贝完成后再做一次数据导入
1 楼 fakechris 2011-10-27  
hadoop distcp hftp://from hdfs://to 也可以

相关推荐

    Hadoop2.2.0集群搭建手册

    2、采用hadoop2.0官方提供QJM —HDFS,HA的解决方案,在QJM方案中,主备NameNode之间通过一组JournalNode同步元数据信息。 3、这里还配置了一个zookeeper集群,用于ZKFC(DFSZKFailoverController)故障转移,当...

    大数据hadoop集群部署-最详细的部署-2.8.5.pdf

    在该方案中,主备NameNode之间通过一组JournalNode同步元数据信息,一条数据只要成功写入多数JournalNode即认为写入成功。通常配置奇数个JournalNode,这里还配置了一个Zookeeper集群,用于ZKFC故障转移,当Active ...

    HDFS-HA工作机制

    原理:通过双NameNode消除单点故障。 1.HDFS-HA工作要点 ...实现了一个zkfailover(Hadoop的一个进程-故障转移),常驻在每一个namenode所在的节点,每一个zkfailover负责监控自己所在NameNode节点

    Hadoop实战中文版

    9.4 在EC2 上运行MapReduce 程序 9.4.1 将代码转移到Hadoop集群上 9.4.2 访问Hadoop集群上的数据 9.5 清空和关闭EC2 实例 9.6 Amazon Elastic MapReduce 和其他AWS 服务 9.6.1 Amazon Elastic MapReduce 9.6.2...

    在Hadoop集群环境中为MySQL安装配置Sqoop的教程

    Sqoop是一个用来将Hadoop和关系型数据库中的数据相互转移的工具,可以将一个关系型数据库(例如 : MySQL ,Oracle ,Postgres等)中的数据导进到Hadoop的HDFS中,也可以将HDFS的数据导进到关系型数据库中。...

    网站架构技术

    问题: 读库与写库的数据同步 解决方案: 不同的数据库都有自己的数据库的主从复制功能 使用反向代理与CDN加速网站响应 反向代理产品 ngix 使用分布式文件系统和分布式数据库系统 使用no-sql和搜索...

    Hadoop实战(陆嘉恒)译

    在云上运行Hadoop9.1 Amazon Web Services 简介9.2 安装AWS9.2.1 获得AWS身份认证凭据9.2.2 获得命令行工具9.2.3 准备SSH密钥对9.3 在EC2 上安装Hadoop9.3.1 配置安全参数9.3.2 配置集群类型9.4 在EC2 上运行...

    word源码java-mycat-src-1.6.1-RELEASE:源码解读,在代码行间里简洁明白的注释

    支持MySQL集群、percona集群或mariadb集群,提供数据碎片集群的高可用 支持自动故障转移和高可用性 支持读写分离、双主多从、单主多主MySQL模型 支持全局表,自动将数据分片到多个节点进行高效的关系

    hadoop2.4.1三台机器分布式安装

    在该方案中,主备NameNode之间通过一组JournalNode同步元数据 信息,一条数据只要成功写入多数JournalNode即认为写入成功。通常配置奇数个JournalNode 这里还配置了一个zookeeper集群,用于ZKFC...

    Hadoop实战中文版.PDF

    1689.3 在EC2上安装Hadoop 1699.3.1 配置安全参数 1699.3.2 配置集群类型 1699.4 在EC2上运行MapReduce程序 1719.4.1 将代码转移到Hadoop集群上 1719.4.2 访问Hadoop集群上的数据 1729.5 清空和关闭EC2...

    Hadoop实战

    1699.3.2 配置集群类型 1699.4 在EC2上运行MapReduce程序 1719.4.1 将代码转移到Hadoop集群上 1719.4.2 访问Hadoop集群上的数据 1729.5 清空和关闭EC2实例 1759.6 Amazon Elastic MapReduce和其他AWS服务 1769.6.1 ...

    beihu-bigdata:大数据

    数据存储:HBase、HDFS、Cassandra、Kudu、Alluxio、Parquet 数据仓库:Hive、HAWQ 分布式协调:Zookeeper 序列化:Avro、protobuf、Parquet、apacheORC、Thrift 数据转移:Sqoop、SSIS 集群部署&管理&监控&调度&...

    hops:Hops Hadoop是具有分布式元数据的Apache Hadoop发行版

    HopsFS是Hadoop文件系统(HDFS)的新实现,它支持多个无状态NameNode,其中元数据存储在 (内存中的分布式数据库)中。 与Apache HDFS相比,HopsFS支持更多的可扩展群集(最大可达十倍),并且可以自定义和分析...

Global site tag (gtag.js) - Google Analytics