`

重温hadoop(3)--序列化

阅读更多

Hadoop的序列化机制特征:

  • 紧凑:带宽是hadoop集群中最稀缺的资源,一个紧凑的序列化机制可以充分利用带宽。
  • 快速:mapreduce会大量的使用序列化机制。因此,要尽可能减少序列化开销。
  • 可扩张:序列化机制需要可定制
  • 互操作:可以支持不同开发语言间的通信。

    java本身的序列化,将要序列化的类,类签名、类的所有非暂态和非静态成员的值,以及所有的父类都要写入,导致序列化的对象过于充实。可能比原来扩大了几十上百倍。

 

   由上面的条件,hadoop自定义了序列化机制,引入org.apache.hadoop.io.Writable

 

/**
 * A serializable object which implements a simple, efficient, serialization 
 * protocol, based on {@link DataInput} and {@link DataOutput}.
 *
 * <p>Any <code>key</code> or <code>value</code> type in the Hadoop Map-Reduce
 * framework implements this interface.</p>
 * 
 * <p>Implementations typically implement a static <code>read(DataInput)</code>
 * method which constructs a new instance, calls {@link #readFields(DataInput)} 
 * and returns the instance.</p>
 * 
 * <p>Example:</p>
 * <p><blockquote><pre>
 *     public class MyWritable implements Writable {
 *       // Some data     
 *       private int counter;
 *       private long timestamp;
 *       
 *       public void write(DataOutput out) throws IOException {
 *         out.writeInt(counter);
 *         out.writeLong(timestamp);
 *       }
 *       
 *       public void readFields(DataInput in) throws IOException {
 *         counter = in.readInt();
 *         timestamp = in.readLong();
 *       }
 *       
 *       public static MyWritable read(DataInput in) throws IOException {
 *         MyWritable w = new MyWritable();
 *         w.readFields(in);
 *         return w;
 *       }
 *     }
 * </pre></blockquote></p>
 */
public interface Writable {
  /** 
   * Serialize the fields of this object to <code>out</code>.
   * 输出(序列化)对象到流中
   * @param out <code>DataOuput</code> to serialize this object into.
   * @throws IOException
   */
  void write(DataOutput out) throws IOException;

  /** 
   * Deserialize the fields of this object from <code>in</code>.  
   * 从流中读取(反序列化)对象,为了效率请尽可能服用现有的对象
   * <p>For efficiency, implementations should attempt to re-use storage in the 
   * existing object where possible.</p>
   * 
   * @param in <code>DataInput</code> to deseriablize this object from.
   * @throws IOException
   */
  void readFields(DataInput in) throws IOException;
}

 

 

    hadoop序列化机制还包括几个重要的接口org.apache.hadoop.io.WritableComparable,org.apache.hadoop.io.WritableComparator,RawComparator

org.apache.hadoop.io.WritableComparable:继承java.lang.Comparable,来提供类型比较。

 

org.apache.hadoop.io.RawComparator:继承java.util.Comparator,允许去比较未被序列化为对象的记录,省去了创建对象的所有开销。

 

java.util.Comparator和的java.lang.Comparable区别可见 http://lavasoft.blog.51cto.com/62575/68380

 

 每个java基本类型都对应Writable封装。

 

      ObjectWritable可应用在需要序列化不同类型的对象到某一个字段,也可用在hadoop远程过程调用中参数的序列化和反序列化。

     ObjectWritable的实现:

              有三个成员变量,包括被封装的对象实例instance、该对象运行时类的Class对象和Configuration对象。

     ObjectWritable的write方法调用的是静态方法ObjectWritable .writeObject()该方法可以往DataOutput接口中写入各种java对象。

/** Write a {@link Writable}, {@link String}, primitive type, or an array of
   * the preceding. */
  public static void writeObject(DataOutput out, Object instance,
                                 Class declaredClass, 
                                 Configuration conf) throws IOException {

    if (instance == null) {                       // null 空
      instance = new NullInstance(declaredClass, conf);
      declaredClass = Writable.class;
    }

    UTF8.writeString(out, declaredClass.getName()); // always write declared

    if (declaredClass.isArray()) {                // array 数组
      int length = Array.getLength(instance);
      out.writeInt(length);
      for (int i = 0; i < length; i++) {
        writeObject(out, Array.get(instance, i),
                    declaredClass.getComponentType(), conf);
      }
      
    } else if (declaredClass == String.class) {   // String 字符串
      UTF8.writeString(out, (String)instance);
      
    } else if (declaredClass.isPrimitive()) {     // primitive type 基本类型

      if (declaredClass == Boolean.TYPE) {        // boolean
        out.writeBoolean(((Boolean)instance).booleanValue());
      } else if (declaredClass == Character.TYPE) { // char
        out.writeChar(((Character)instance).charValue());
      } else if (declaredClass == Byte.TYPE) {    // byte
        out.writeByte(((Byte)instance).byteValue());
      } else if (declaredClass == Short.TYPE) {   // short
        out.writeShort(((Short)instance).shortValue());
      } else if (declaredClass == Integer.TYPE) { // int
        out.writeInt(((Integer)instance).intValue());
      } else if (declaredClass == Long.TYPE) {    // long
        out.writeLong(((Long)instance).longValue());
      } else if (declaredClass == Float.TYPE) {   // float
        out.writeFloat(((Float)instance).floatValue());
      } else if (declaredClass == Double.TYPE) {  // double
        out.writeDouble(((Double)instance).doubleValue());
      } else if (declaredClass == Void.TYPE) {    // void
      } else {
        throw new IllegalArgumentException("Not a primitive: "+declaredClass);
      }
    } else if (declaredClass.isEnum()) {         // enum 枚举
      UTF8.writeString(out, ((Enum)instance).name());
    } else if (Writable.class.isAssignableFrom(declaredClass)) { // Writable Writable的子类
      UTF8.writeString(out, instance.getClass().getName());
      ((Writable)instance).write(out);

    } else {
      throw new IOException("Can't write: "+instance+" as "+declaredClass);
    }
  }

     

    和输出对应的调用的是org.apache.hadoop.io.ObjectWritable.readObject(DataInput in, ObjectWritable objectWritable, Configuration conf)

   

 /** Read a {@link Writable}, {@link String}, primitive type, or an array of
   * the preceding. */
  @SuppressWarnings("unchecked")
  public static Object readObject(DataInput in, ObjectWritable objectWritable, Configuration conf)
    throws IOException {
    String className = UTF8.readString(in);
    Class<?> declaredClass = PRIMITIVE_NAMES.get(className);
    if (declaredClass == null) {
      try {
        declaredClass = conf.getClassByName(className);
      } catch (ClassNotFoundException e) {
        throw new RuntimeException("readObject can't find class " + className, e);
      }
    }    

    Object instance;
    
    if (declaredClass.isPrimitive()) {            // primitive types

      if (declaredClass == Boolean.TYPE) {             // boolean
        instance = Boolean.valueOf(in.readBoolean());
      } else if (declaredClass == Character.TYPE) {    // char
        instance = Character.valueOf(in.readChar());
      } else if (declaredClass == Byte.TYPE) {         // byte
        instance = Byte.valueOf(in.readByte());
      } else if (declaredClass == Short.TYPE) {        // short
        instance = Short.valueOf(in.readShort());
      } else if (declaredClass == Integer.TYPE) {      // int
        instance = Integer.valueOf(in.readInt());
      } else if (declaredClass == Long.TYPE) {         // long
        instance = Long.valueOf(in.readLong());
      } else if (declaredClass == Float.TYPE) {        // float
        instance = Float.valueOf(in.readFloat());
      } else if (declaredClass == Double.TYPE) {       // double
        instance = Double.valueOf(in.readDouble());
      } else if (declaredClass == Void.TYPE) {         // void
        instance = null;
      } else {
        throw new IllegalArgumentException("Not a primitive: "+declaredClass);
      }

    } else if (declaredClass.isArray()) {              // array
      int length = in.readInt();
      instance = Array.newInstance(declaredClass.getComponentType(), length);
      for (int i = 0; i < length; i++) {
        Array.set(instance, i, readObject(in, conf));
      }
      
    } else if (declaredClass == String.class) {        // String
      instance = UTF8.readString(in);
    } else if (declaredClass.isEnum()) {         // enum
      instance = Enum.valueOf((Class<? extends Enum>) declaredClass, UTF8.readString(in));
    } else {                                      // Writable
      Class instanceClass = null;
      String str = "";
      try {
        str = UTF8.readString(in);
        instanceClass = conf.getClassByName(str);
      } catch (ClassNotFoundException e) {
        throw new RuntimeException("readObject can't find class " + str, e);
      }
      //利用instanceClass创建WritableFactories
      Writable writable = WritableFactories.newInstance(instanceClass, conf);
      writable.readFields(in);
      instance = writable;

      if (instanceClass == NullInstance.class) {  // null
        declaredClass = ((NullInstance)instance).declaredClass;
        instance = null;
      }
    }

    if (objectWritable != null) {                 // store values
      objectWritable.declaredClass = declaredClass;
      objectWritable.instance = instance;
    }

    return instance;

 

 //保存了类型和WritableFactories工厂的对应关系
 private static final HashMap<Class, WritableFactory> CLASS_TO_FACTORY =
    new HashMap<Class, WritableFactory>();

  /** Create a new instance of a class with a defined factory. */
  public static Writable newInstance(Class<? extends Writable> c, Configuration conf) {
    WritableFactory factory = WritableFactories.getFactory(c);
    if (factory != null) {
      Writable result = factory.newInstance();
      if (result instanceof Configurable) {
        ((Configurable) result).setConf(conf);
      }
      return result;
    } else {
      //采用传统的反射工具,创建对象
      return ReflectionUtils.newInstance(c, conf);
    }
  }

 

    

    

 

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics