`
guoyunsky
  • 浏览: 839113 次
  • 性别: Icon_minigender_1
  • 来自: 上海
博客专栏
3d3a22a0-f00f-3227-8d03-d2bbe672af75
Heritrix源码分析
浏览量:203194
Group-logo
SQL的MapReduce...
浏览量:0
社区版块
存档分类
最新评论

Hadoop Core 学习笔记(一) SequenceFile文件写入和读取Writable数据

 
阅读更多

 

本博客属原创文章,转载请注明出处:http://guoyunsky.iteye.com/blog/1265944

欢迎加入Hadoop超级群: 180941958   

 

     刚接触Hadoop时,对SequenceFile和Writable还产生了一点联想,以为是什么神奇的东西.后来也明白,不过就是自己IO的一些协议,用于自己的输入输出.这里介绍下如何从sequence file中读出和写入Writable数据.

     Writable类似传输的数据,相对于Java来说等同于对象,只是引用到Hadoop中需要一套协议去进行传输转换这个对象.于是有了里面的 public void write(DataOutput out) throws IOException 和public void readFields(DataInput in) throws IOException方法,一个怎么写入,一个怎么读取.如此这些对象才可以在整个Hadoop集群无障碍的通行.至于Hadoop为什么要另起炉灶自己搞一套序列化的东西,之前也看过一些介绍,但还没有心得,日后再慢慢领会.

      所以这个例子就是自己构造一个Writable对象,然后写入到sequence file以及读出.最后将读出的数据进行对比,是否正确.具体看代码吧:

 

package com.guoyun.hadoop.io.study;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;

public class SequenceFileStudy {
  
  public static class UserWritable implements Writable,Comparable{
    private long userId;
    private String userName;
    private int userAge;
    
    
    public long getUserId() {
      return userId;
    }

    public void setUserId(long userId) {
      this.userId = userId;
    }

    public String getUserName() {
      return userName;
    }

    public void setUserName(String userName) {
      this.userName = userName;
    }

    public int getUserAge() {
      return userAge;
    }

    public void setUserAge(int userAge) {
      this.userAge = userAge;
    }

    public UserWritable(long userId, String userName, int userAge) {
      super();
      this.userId = userId;
      this.userName = userName;
      this.userAge = userAge;
    }

    public UserWritable() {
      super();
    }

    @Override
    public void write(DataOutput out) throws IOException {
      out.writeLong(this.userId);
      out.writeUTF(this.userName);
      out.writeInt(this.userAge);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
      this.userId=in.readLong();
      this.userName=in.readUTF();
      this.userAge=in.readInt();
    }

    @Override
    public String toString() {
     return this.userId+"\t"+this.userName+"\t"+this.userAge;
    }

    /**
     * 只对比userId
     */
    @Override
    public boolean equals(Object obj) {
      if(obj instanceof UserWritable){
        UserWritable u1=(UserWritable)obj;
        return u1.getUserId()==this.getUserId();
      }
      return false;
    }
    
    /**
     * 只对比userId
     */
    @Override
    public int compareTo(Object obj) {
      int result=-1;
      if(obj instanceof UserWritable){
       UserWritable u1=(UserWritable)obj;
       if(this.userId>u1.userId){
         result=1;
       }else if(this.userId==u1.userId){
         result=1;
       }
      }
      return result; 
    }
    
    @Override
    public int hashCode() {
      return (int)this.userId&Integer.MAX_VALUE;
    }
    
  }
  
  /**
   * 写入到sequence file
   * 
   * @param filePath
   * @param conf
   * @param datas
   */
  public static void write2SequenceFile(String filePath,Configuration conf,Collection<UserWritable> datas){
    FileSystem fs=null;
    SequenceFile.Writer writer=null;
    Path path=null;
    LongWritable idKey=new LongWritable(0);
    
    try {
      fs=FileSystem.get(conf);
      path=new Path(filePath);
      writer=SequenceFile.createWriter(fs, conf, path, LongWritable.class, UserWritable.class);
      
      for(UserWritable user:datas){
        idKey.set(user.getUserId());  // userID为Key
        writer.append(idKey, user);
      }
    } catch (IOException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }finally{
      IOUtils.closeStream(writer);
    }
  }
  
  /**
   * 从sequence file文件中读取数据
   * 
   * @param sequeceFilePath
   * @param conf
   * @return
   */
  public static List<UserWritable> readSequenceFile(String sequeceFilePath,Configuration conf){
    List<UserWritable> result=null;
    FileSystem fs=null;
    SequenceFile.Reader reader=null;
    Path path=null;
    Writable key=null;
    UserWritable value=new UserWritable();
    
    try {
      fs=FileSystem.get(conf);
      result=new ArrayList<UserWritable>();
      path=new Path(sequeceFilePath);
      reader=new SequenceFile.Reader(fs, path, conf); 
      key=(Writable)ReflectionUtils.newInstance(reader.getKeyClass(), conf); // 获得Key,也就是之前写入的userId
      while(reader.next(key, value)){
        result.add(value);
        value=new UserWritable();
      }
      
    } catch (IOException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }catch (Exception e){
      e.printStackTrace();
    }finally{
        IOUtils.closeStream(reader);
    }
    return result;
  }
  
  private  static Configuration getDefaultConf(){
    Configuration conf=new Configuration();
    conf.set("mapred.job.tracker", "local");
    conf.set("fs.default.name", "file:///");
    //conf.set("io.compression.codecs", "com.hadoop.compression.lzo.LzoCodec");
    return conf;
  }
  
  /**
   * @param args
   */
  public static void main(String[] args) {
    String filePath="data/user.sequence"; // 文件路径
    Set<UserWritable> users=new HashSet<UserWritable>();
    UserWritable user=null;
    // 生成数据
    for(int i=1;i<=10;i++){
      user=new UserWritable(i+(int)(Math.random()*100000),"name-"+(i+1),(int)(Math.random()*50)+10);
      users.add(user);
    }
    // 写入到sequence file
    write2SequenceFile(filePath,getDefaultConf(),users);
    //从sequence file中读取
    List<UserWritable> readDatas=readSequenceFile(filePath,getDefaultConf());
    
    // 对比数据是否正确并输出
    for(UserWritable u:readDatas){
      if(users.contains(u)){
        System.out.println(u.toString());
      }else{
        System.err.println("Error data:"+u.toString());
      }
    }
    
  }

}

 

更多技术文章、感悟、分享、勾搭,请用微信扫描:

3
1
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics