先来看下org.apache.hadoop.io.serializer的类图(hadoop2.7.1):
由类图看:
接口三个:
1、Deserializer:定义反序列化接口;
2、Serializer:定义序列化接口;
3、Serialization:定义了一系列和序列化相关并相互依赖对象的接口。
依据这三个接口,分别实现了2个类,分别是支持Writable机制的WritableSerialization和支持Java序列化的JavaSerialization,这样一共是6个实现类。
SerilizationFactory:维护一个Serilization的ArrayList。它具有参数为Configuration的构造函数,把parameter io.serializations中逗号隔开的serialization都添加进来。
Deserializer:将字节流转为一个对象。这个接口的方法有:打开流,反序列化,关闭流
源码:
package org.apache.hadoop.io.serializer;
import java.io.IOException;
import java.io.InputStream;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* <p>
* Provides a facility for deserializing objects of type <T> from an
* {@link InputStream}.
* </p>
*
* <p>
* Deserializers are stateful, but must not buffer the input since
* other producers may read from the input between calls to
* {@link #deserialize(Object)}.
* </p>
* @param <T>
*/
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
@InterfaceStability.Evolving
public interface Deserializer<T> {
/**
* <p>Prepare the deserializer for reading.</p>
*/
void open(InputStream in) throws IOException;
/**
* <p>
* Deserialize the next object from the underlying input stream.
* If the object <code>t</code> is non-null then this deserializer
* <i>may</i> set its internal state to the next object read from the input
* stream. Otherwise, if the object <code>t</code> is null a new
* deserialized object will be created.
* </p>
* @return the deserialized object
*/
T deserialize(T t) throws IOException;
/**
* <p>Close the underlying input stream and clear up any resources.</p>
*/
void close() throws IOException;
}
Serializer:将一个对象转换为一个字节流的实现实例,该接口的方法有:打开流,序列化,关闭流
源码:
package org.apache.hadoop.io.serializer;
import java.io.IOException;
import java.io.OutputStream;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* <p>
* Provides a facility for serializing objects of type <T> to an
* {@link OutputStream}.
* </p>
*
* <p>
* Serializers are stateful, but must not buffer the output since
* other producers may write to the output between calls to
* {@link #serialize(Object)}.
* </p>
* @param <T>
*/
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
@InterfaceStability.Evolving
public interface Serializer<T> {
/**
* <p>Prepare the serializer for writing.</p>
*/
void open(OutputStream out) throws IOException;
/**
* <p>Serialize <code>t</code> to the underlying output stream.</p>
*/
void serialize(T t) throws IOException;
/**
* <p>Close the underlying output stream and clear up any resources.</p>
*/
void close() throws IOException;
}
Serialization:使用抽象工厂的设计模式,封装了一对Serializer/Deserializer,判断是否支持输入的类,根据输入的类给出序列化接口和反序列化接口。
源码:
package org.apache.hadoop.io.serializer;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* <p>
* Encapsulates a {@link Serializer}/{@link Deserializer} pair.
* </p>
* @param <T>
*/
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
@InterfaceStability.Evolving
public interface Serialization<T> {
/**
* Allows clients to test whether this {@link Serialization}
* supports the given class.
*/
boolean accept(Class<?> c);
/**
* @return a {@link Serializer} for the given class.
*/
Serializer<T> getSerializer(Class<T> c);
/**
* @return a {@link Deserializer} for the given class.
*/
Deserializer<T> getDeserializer(Class<T> c);
}
SerializationFactory :序列化工厂,初始化时从配置项io.serializations中获取序列化工具,默认使用org.apache.hadoop.io.serializer.WritableSerialization作为序列化工具。通过调用getSerializer和getDeserializer来获取序列化与反序列化工具。
源码:
package org.apache.hadoop.io.serializer;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.io.serializer.avro.AvroReflectSerialization;
import org.apache.hadoop.io.serializer.avro.AvroSpecificSerialization;
import org.apache.hadoop.util.ReflectionUtils;
/**
* <p>
* A factory for {@link Serialization}s.
* </p>
*/
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
@InterfaceStability.Evolving
public class SerializationFactory extends Configured {
private static final Log LOG =
LogFactory.getLog(SerializationFactory.class.getName());
private List<Serialization<?>> serializations = new ArrayList<Serialization<?>>();
/**
* <p>
* Serializations are found by reading the <code>io.serializations</code>
* property from <code>conf</code>, which is a comma-delimited list of
* classnames.
* </p>
*/
public SerializationFactory(Configuration conf) {
super(conf);
for (String serializerName : conf.getTrimmedStrings(
CommonConfigurationKeys.IO_SERIALIZATIONS_KEY,
new String[]{WritableSerialization.class.getName(),
AvroSpecificSerialization.class.getName(),
AvroReflectSerialization.class.getName()})) {
add(conf, serializerName);
}
}
@SuppressWarnings("unchecked")
private void add(Configuration conf, String serializationName) {
try {
Class<? extends Serialization> serializionClass =
(Class<? extends Serialization>) conf.getClassByName(serializationName);
serializations.add((Serialization)
ReflectionUtils.newInstance(serializionClass, getConf()));
} catch (ClassNotFoundException e) {
LOG.warn("Serialization class not found: ", e);
}
}
public <T> Serializer<T> getSerializer(Class<T> c) {
Serialization<T> serializer = getSerialization(c);
if (serializer != null) {
return serializer.getSerializer(c);
}
return null;
}
public <T> Deserializer<T> getDeserializer(Class<T> c) {
Serialization<T> serializer = getSerialization(c);
if (serializer != null) {
return serializer.getDeserializer(c);
}
return null;
}
@SuppressWarnings("unchecked")
public <T> Serialization<T> getSerialization(Class<T> c) {
for (Serialization serialization : serializations) {
if (serialization.accept(c)) {
return (Serialization<T>) serialization;
}
}
return null;
}
}
下面对SerializationFactory生产Serializations做个简单的解析说明:
首先来看其构造函数里的一个全局参数:CommonConfigurationKeys.IO_SERIALIZATIONS_KEY,它的值定义如下:
/** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */
public static final String IO_SERIALIZATIONS_KEY = "io.serializations";
而使用SerializationFactory的构造函数:publicSerializationFactory(Configurationconf) 时,使用配置文件:Configuration:core-default.xml,core-site.xml。如:
SerializationFactoryfactory=newSerializationFactory(conf);
而在hadoop2.7.1中默认配置文件core-default.xml的io.serializations的属性如下:
<property>
<name>io.serializations</name>
<value>org.apache.hadoop.io.serializer.WritableSerialization,org.apache.hadoop.io.serializer.avro.AvroSpecificSerialization,org.apache.hadoop.io.serializer.avro.AvroReflectSerialization<alue>
<description>A list of serialization classes that can be used for
obtaining serializers and deserializers.</description>
</property>
由此,通过SerializationFactory生产的Serializations有三个:
org.apache.hadoop.io.serializer.WritableSerialization,
org.apache.hadoop.io.serializer.avro.AvroSpecificSerialization,
org.apache.hadoop.io.serializer.avro.AvroReflectSerialization
通过其方法public <T> Serializer<T> getSerializer(Class<T> c),public <T> Serialization<T> getSerialization(Class<T> c)便能得到相应的Serialization:
public <T> Serializer<T> getSerializer(Class<T> c) {
Serialization<T> serializer = getSerialization(c);
if (serializer != null) {
return serializer.getSerializer(c);
}
return null;
}
<div>@SuppressWarnings("unchecked")
public<T>Serialization<T>getSerialization(Class<T>c){
for(Serializationserialization:serializations){
if(serialization.accept(c))<strong></strong>{ //注1
return(Serialization<T>)serialization;
}
}
returnnull;
}</div>
注1:if (serialization.accept(c))将会调用相应类的accept函数,例如:如果serialization的值为:org.apache.hadoop.io.serializer.WritableSerialization,则将调用:
@InterfaceAudience.Private
@Override
public boolean accept(Class<?> c) {
return Writable.class.isAssignableFrom(c);
}
如果serialization的值为:org.apache.hadoop.io.serializer.avro.AvroSpecificSerialization,则将调用:
@InterfaceAudience.Private
@Override
public boolean accept(Class<?> c) {
return SpecificRecord.class.isAssignableFrom(c); //注2
}
注2:
public boolean isAssignableFrom(Class<?>cls)
判定此 Class
对象所表示的类或接口与指定的 Class
参数所表示的类或接口是否相同,或是否是其超类或超接口。如果是则返回
true
;否则返回 false
。如果该 Class
表示一个基本类型,且指定的
Class
参数正是该 Class
对象,则该方法返回 true
;否则返回 false
。
特别地,通过身份转换或扩展引用转换,此方法能测试指定 Class
参数所表示的类型能否转换为此 Class
对象所表示的类型。有关详细信息,请参阅
Java Language Specification 的第 5.1.1 和 5.1.4 节。
参数:
cls
- 要检查的 Class
对象
返回:
表明 cls
类型的对象能否赋予此类对象的 boolean
值
抛出:
NullPointerException - 如果指定的 Class 参数为 null。
分享到:
相关推荐
Hadoop安装教程_单机/伪分布式配置_Hadoop2.7.1/Ubuntu 16.04
hadoop2.7.1的eclipse插件,编译环境,eclipse 4.4(luna) ,jdk1.7,ant1.9.6,maven3.3,hadoop2.7.1,centos6.7,jdk1.7 要注意的是开发黄金下jdk版本必须是jdk1.7及以上,否则无法使用
eclipse hadoop2.7.1 plugin 配置,包括操作步骤说明,eclipse hadoop2.7.1的插件,还有hadoop.dll和winutils.exe等文件。
Hadoop2.7.1中文文档
hadoop2.7.1的linux下eclipse支持插件。已经经过检查,安装后在eclipse下,可以正常上传,下载,删除hdfs文件,以及跑mapreduce程序。 编译环境:ant1.9.6 jdk1.8 hadoop2.7.1
hadoop2.7.1平台搭建
Spark所需的hadoop2.7.1相关资源 hadoop2.7.1版本的hadoop.dll,winutils.exe 适用Spark2.0.0+版本
hadoop2.7.1版本的hadoop.dll,winutils.exe hadoop2.7.1版本的hadoop.dll,winutils.exe
本人用7个多小时成功编译 hadoop 2.7.1 64位编译包(JDK1.8 64),由于文件太大,分3卷压缩。 hadoop 2.7.1 相对于2.7.0修复了上百个Bug,是可用于生产环境的版本了。
hadoop2.7.1 windows 64位 相关 dll文件
在网上下了好多2.6版本的hadoop.dll,但是都不好使,昨天有个好心网友给我发了一份,实际测试通过。开发环境是64位win7+hadoop2.7.1+redhat版本的linux。
Hadoop-2.7.1-Windows-64-binaries, 预先编译,非官方的Hadoop 2.7.1的Win64二进制文件 用于 Windows 64位 平台的 2.7.1二进制文件这是针对 Windows 64位 平台的非官方预先编译的Apache Hadoop 2.7.1. 这里的tar.gz ...
Windows 7 or 10 eclipse hadoop2.7.1 配置需要文件和工具
Hadoop 2.7.1 中文文档 Hadoop 2.7.1 中文文档 Hadoop 2.7.1 中文文档
hadoop.dll winutils.exe for hadoop2.7.1 下载版本,
win32平台winutils,hadoop win32 native code
hadoop2.7.1运行Wordcount错误 at java.lang.ProcessBuilder.start(ProcessBuilder.java:1012) Exception in thread "main" java.lang.UnsatisfiedLinkError
hadoop2.7.1 windows7 32 位 hadoop.dll winutils.exe.在window7 32位下,编译的源码。
hadoop-2.7.1-64位编译包,本人亲测,绝对有用。
hadoop 2.7.1 hdfs-over-ftp http://yunpan.cn/cHaQvV4UdLxX3 访问密码 f09f