- 浏览: 60077 次
- 性别:
- 来自: 深圳
文章分类
最新评论
-
软件开发学习者lilonghui:
学习了,以前真没注意过这种细节
关于C/C++main函数 -
Branding:
受教!感谢!
html块级元素和内联元素
HDFS写入文件的重要概念
HDFS一个文件由多个block构成。HDFS在进行block读写的时候是以packet(默认每个packet为64K)为单位 进行的。每一个packet由若干个chunk(默认512Byte)组成。Chunk是进行数据校验的基本单位,对每一个chunk生成一个校验和(默 认4Byte)并将校验和进行存储。
在写入一个block的时候,数据传输的基本单位是packet,每个packet由若干个chunk组成。
HDFS客户端写文件示例代码
FileSystem hdfs = FileSystem.get(new Configuration());
Path path = new Path("/testfile");
// writing
FSDataOutputStream dos = hdfs.create(path);
byte[] readBuf = "Hello World".getBytes("UTF-8");
dos.write(readBuf, 0, readBuf.length);
dos.close();
hdfs.close();
文件的打开
上传一个文件到hdfs,一般会调用DistributedFileSystem.create,其实现如下:
public FSDataOutputStream create(Path f, FsPermission ermission,boolean overwrite,int bufferSize, short replication, long lockSize,Progressable progress) throws IOException
{
return new FSDataOutputStream (dfs.create(getPathName(f), permission,overwrite, replication, blockSize, progress, bufferSize), statistics);
}
其最终生成一个FSDataOutputStream用于向新生成的文件中写入数据。其成员变量dfs的类型为DFSClient,DFSClient的create函数如下:
public OutputStream create(String src,FsPermission permission,boolean overwrite,short replication,long blockSize,Progressable progress,int buffersize) throws IOException
{
checkOpen();
if (permission == null)
{
permission = FsPermission.getDefault();
}
FsPermission masked = permission.applyUMask(FsPermission.getUMask(conf));
OutputStream result = new DFSOutputStream(src, masked,overwrite, replication, blockSize, progress, buffersize, conf.getInt("io.bytes.per.checksum", 512));
leasechecker.put(src, result);
return result;
}
其中构造了一个DFSOutputStream,在其构造函数中,同过RPC调用NameNode的create来创建一个文件。
当然,构造函数中还做了一件重要的事情,就是streamer.start(),也即启动了一个pipeline,用于写数据,在写入数据的过程中,我们会仔细分析。
DFSOutputStream(String src, FsPermission masked, boolean overwrite,short replication, long blockSize, Progressable progress,
int buffersize, int bytesPerChecksum) throws IOException
{
this(src, blockSize, progress, bytesPerChecksum);
computePacketChunkSize(writePacketSize, bytesPerChecksum);
try
{
namenode.create(src, masked, clientName, overwrite, replication, blockSize);
} catch(RemoteException re)
{
throw re.unwrapRemoteException(AccessControlException.class,QuotaExceededException.class);
}
streamer.start();
}
通过rpc调用NameNode的create函数,调用namesystem.startFile函数,其又调用startFileInternal函数,它创建一个新的文件,状态为under construction,没有任何data block与之对应。
dfsclient文件的写入
下面轮到客户端向新创建的文件中写入数据了,一般会使用FSDataOutputStream的write方法:
按照hdfs的设计,对block的数据写入使用的是pipeline的方式,也即将数据分成一个个的package,如果需要复制三分,分别写入DataNode 1, 2, 3,则会进行如下的过程:
首先将package 1写入DataNode 1
然后由DataNode 1负责将package 1写入DataNode 2,同时客户端可以将pacage 2写入DataNode 1
然后DataNode 2负责将package 1写入DataNode 3, 同时客户端可以讲package 3写入DataNode 1,DataNode 1将package 2写入DataNode 2
就这样将一个个package排着队的传递下去,直到所有的数据全部写入并复制完毕
FSDataOutputStream的write方法会调用DFSOutputStream的write方法,而DFSOutputStream继承自FSOutputSummer,所以实际上是调用FSOutputSummer的write方法,如下:
public synchronized void write(byte b[], int off, int len)
throws IOException
{
//参数检查
for (int n=0;n<len;n+=write1(b, off+n, len-n)) { }
}
FSOutputSummer的write1的方法如下:
private int write1(byte b[], int off, int len) throws IOException {
if(count==0 && len>=buf.length)
{
// buf初始化的大小是chunk的大小,默认是512,这里的代码会在写入的数据的剩余内容大于或等于一个chunk的大小时调用
// 这里避免多余一次复制
final int length = buf.length;
sum.update(b, off, length);//length是一个完整chunk的大小,默认是512,这里根据一个chunk内容计算校验和
writeChecksumChunk(b, off, length, false);
return length;
}
// buf初始化的大小是chunk的大小,默认是512,这里的代码会在写入的数据的剩余内容小于一个chunk的大小时调用
// 规避了数组越界问题
int bytesToCopy = buf.length-count;
bytesToCopy = (len<bytesToCopy) ? len : bytesToCopy;
sum.update(b, off, bytesToCopy);//bytesToCopy不足一个chunk,是写入的内容的最后一个chunk的剩余字节数目
System.arraycopy(b, off, buf, count, bytesToCopy);
count += bytesToCopy;
if (count == buf.length) {//如果不足一个chunk,就缓存到本地buffer,如果还有下一次写入,就填充这个chunk,满一个chunk再flush,count清0
// local buffer is full
flushBuffer();//最终调用writeChecksumChunk方法实现
}
return bytesToCopy;
}
writeChecksumChunk的实现如下:
//写入一个chunk的数据长度(默认512),忽略len的长度
private void writeChecksumChunk(byte b[], int off, int len, boolean keep)
throws IOException
{
int tempChecksum = (int)sum.getValue();
if (!keep)
{
sum.reset();
}
int2byte(tempChecksum, checksum);//把当前chunk的校验和从int转换为字节
writeChunk(b, off, len, checksum);
}
writeChunk由子类DFSOutputStream实现,如下:
protected synchronized void writeChunk(byte[] b, int offset, int len, byte[] checksum)throws IOException
{
//创建一个package,并写入数据
currentPacket = new Packet(packetSize, chunksPerPacket,bytesCurBlock);
currentPacket.writeChecksum(checksum, 0, cklen);
currentPacket.writeData(b, offset, len);
currentPacket.numChunks++;
bytesCurBlock += len;
//如果此package已满,则放入队列中准备发送
if (currentPacket.numChunks == currentPacket.maxChunks ||bytesCurBlock == blockSize) {
......
dataQueue.addLast(currentPacket);
//唤醒等待dataqueue的传输线程,也即DataStreamer
dataQueue.notifyAll();
currentPacket = null;
......
}
}
writeChunk比较简单,就是把数据填充packet,填充完毕,就放到dataQueue,再唤醒DataStreamer。
DataStreamer完成了数据的传输,DataStreamer的run函数如下:
public void run()
{
while (!closed && clientRunning) {
Packet one = null;
synchronized (dataQueue) {
boolean doSleep = processDatanodeError(hasError, false);
//如果ack出错,则处理IO错误
//如果队列中没有package,则等待
while ((!closed && !hasError && clientRunning && dataQueue.size() == 0) || doSleep) {
try {
dataQueue.wait(1000);
} catch (InterruptedException e) {
}
doSleep = false;
}
try {
//得到队列中的第一个package
one = dataQueue.getFirst();
long offsetInBlock = one.offsetInBlock;
//由NameNode分配block,并生成一个写入流指向此block
if (blockStream == null) {
nodes = nextBlockOutputStream(src);
response = new ResponseProcessor(nodes);
response.start();
}
ByteBuffer buf = one.getBuffer();
//将packet从dataQueue移至ackQueue,等待确认
dataQueue.removeFirst();
dataQueue.notifyAll();
synchronized (ackQueue) {
ackQueue.addLast(one);
ackQueue.notifyAll();
}
//利用生成的写入流将数据写入DataNode中的block
blockStream.write(buf.array(), buf.position(), buf.remaining());
if (one.lastPacketInBlock) {
blockStream.writeInt(0); //表示此block写入完毕
}
blockStream.flush();
} catch (Throwable e) {
}
if (one.lastPacketInBlock) {
//数据块写满,做一些清理工作,下次再申请块
response.close(); // ignore all errors in Response
synchronized (dataQueue) {
IOUtils.cleanup(LOG, blockStream, blockReplyStream);
nodes = null;
response = null;
blockStream = null;
//设置为null,下次就会判断blockStream为null,申请新的块
blockReplyStream = null;
}
}
}
......
}
DataStreamer线程负责把准备好的数据packet,顺序写入到DataNode,未确认写入成功的packet则移动到ackQueue,等待确认。
DataStreamer线程传输数据到DataNode时,要向namenode申请数据块,方法是nextBlockOutputStream,再调用locateFollowingBlock,通过RPC调用namenode.addBlock(src, clientName),在NameNode分配了DataNode和block以后,createBlockOutputStream开始写入数据。
客户端在DataStreamer的run函数中创建了写入流后,调用blockStream.write将packet写入DataNode
DataStreamer还会启动ResponseProcessor线程,它负责接收datanode的ack,当接收到所有 datanode对一个packet确认成功的ack,ResponseProcessor从ackQueue中删除相应的packet。在出错时,从 ackQueue中移除packet到dataQueue,移除失败的datanode,恢复数据块,建立新的pipeline。实现如下:
public void run() {
...
PipelineAck ack = new PipelineAck();
while (!closed && clientRunning && !lastPacketInBlock) {
try {
// read an ack from the pipeline
ack.readFields(blockReplyStream);
...
//处理所有DataNode响应的状态
for (int i = ack.getNumOfReplies()-1; i >=0 && clientRunning; i--) {
short reply = ack.getReply(i);
if (reply != DataTransferProtocol.OP_STATUS_SUCCESS) {//ack验证,如果DataNode写入packet失败,则出错
errorIndex = i; //记录损坏的DataNode,会在processDatanodeError方法移除该失败的DataNode
throw new IOException("Bad response " + reply + " for block " + block + " from datanode " + targets[i].getName());
}
}
long seqno = ack.getSeqno();
if (seqno == Packet.HEART_BEAT_SEQNO) { // 心跳ack,忽略
continue;
}
Packet one = null;
synchronized (ackQueue) {
one = ackQueue.getFirst();
}
...
synchronized (ackQueue) {
assert ack.getSeqno() == lastAckedSeqno + 1;//验证ack
lastAckedSeqno = ack.getSeqno();
ackQueue.removeFirst();//移除确认写入成功的packet
ackQueue.notifyAll();
}
} catch (Exception e) {
if (!closed) {
hasError = true;//设置ack错误,让
...
closed = true;
}
}
}
}
当ResponseProcessor在确认packet失败时,processDatanodeError方法用于处理datanode的错误,当调用返回后需要休眠一段时间时,返回true。下面是其简单的处理流程:
1.关闭blockStream和blockReplyStream
2.将packet从ackQueue移到dataQueue
3.删除坏datanode
4.通过RPC调用datanode的recoverBlock方法来恢复块,如果有错,返回true
5.如果没有可用的datanode,关闭DFSOutputStream和streamer,返回false
6.创建块输出流,如果不成功,转到3
实现如下:
private boolean processDatanodeError(boolean hasError, boolean isAppend) {
if (!hasError) {//DataNode没有发生错误,直接返回
return false;
}
//将未确认写入成功的packets从ack queue移动到data queue的前面
synchronized (ackQueue) {
dataQueue.addAll(0, ackQueue);
ackQueue.clear();
}
boolean success = false;
while (!success && clientRunning) {
DatanodeInfo[] newnodes = null;
//根据errorIndex确定失败的DataNode,从所有的DataNode nodes移除失败的DataNode,复制到newnodes
// 通知primary datanode做数据块恢复,更新合适的时间戳
LocatedBlock newBlock = null;
ClientDatanodeProtocol primary = null;
DatanodeInfo primaryNode = null;
try {
// Pick the "least" datanode as the primary datanode to avoid deadlock.
primaryNode = Collections.min(Arrays.asList(newnodes));
primary = createClientDatanodeProtocolProxy(primaryNode, conf, block, accessToken, socketTimeout);
newBlock = primary.recoverBlock(block, isAppend, newnodes);//恢复数据块
} catch (IOException e) {
//循环创建块输出流,如果不成功,移除失败的DataNode
return true; // 需要休眠
} finally {
RPC.stopProxy(primary);
}
recoveryErrorCount = 0; // 数据块恢复成功
block = newBlock.getBlock();
accessToken = newBlock.getBlockToken();
nodes = newBlock.getLocations();
this.hasError = false;
lastException = null;
errorIndex = 0;
success = createBlockOutputStream(nodes, clientName, true);
}
response = new ResponseProcessor(nodes);
response.start();//启动ResponseProcessor做ack确认处理
return false; // 不休眠,继续处理
}
相关推荐
hdfs上传文件过程源码分析
5.8.3 源码分析 第6章 AvatarNode使用 6.1 方案说明 6.1.1 网络拓扑 6.1.2 操作系统安装及配置 6.2 使用Avatar打补丁版本 6.2.1 Hadoop源码联机Build 6.2.2 Hadoop源码本地Build 6.2.3 NFS服务器构建 6.2.4 Avatar...
039 HDFS文件系统读写流程及HDFS API两种方式读取文件 040 详解HDFS API之FileSystem方式基本操作二 041 讲解分析Configuration和FileSystem类源代码 042 引出HDFS实际应用场景之合并文件和使用getmerge命令并查看...
- 收集MovieLens数据集,包含16万个电影,2400万条评分,67万条评价标签,将csv文件上传到完全分布式HDFS文件系统 - scala、spark读取HDFS文件,整理导入MongoDB数据库 - MongoDB中加载数据,利用sparkRdd统计热门...
为了实现高效的分布式处理过程,系统的分布式环境采用spark+hadoop,在利用hdfs分布式存储的情况下充分发挥出spark快速处理分布式数据的优势,并支持用户采用pyspark + jupyter notebook对任务程序进行编写并提交任务...
大数据hadoop期末答辩的代码,答辩题目是基于贝叶斯的情感分析,压缩包中包括十个文件,分别是hadoop的配置文档、总结文档、源代码、答辩ppt、运行结果等等,这是可以直接下载的,具体的大家可以直接下载查看,谢谢...
2.2 .3 HDFS 缓存管理机制分析· ····· ·…………………………………………………. . 45 2.2.4 HDFS 中心缓存疑问点…….. .. .. .………..... . . …….. ... . .. . .………….. . ............ . ............
在使用过程中,如有问题或建议,请及时沟通。 5.期待你能在项目中找到乐趣和灵感,也欢迎你的分享和反馈! 【资源说明】 包含爬虫,Scala代码,Spark,Hadoop,ElasticSearch,logstash,Flume,echarts,log4j emotional_...
为了实现高效的分布式处理过程,系统的分布式环境采用spark+hadoop,在利用hdfs分布式存储的情况下充分发挥出spark快速处理分布式数据的优势,并支持用户 - 不懂运行,下载完可以私聊问,可远程教学 该资源内项目源码是...
Hadoop(MapReduce+HDFS+Yarn),PageRank算法,LPA算法。 ### 实现过程 1.数据预处理 从原始的金庸小说文本中,抽取出与人物互动相关的数据,而屏蔽掉与人物关系无关的文本内容。 2.特征抽取 完成基于单词...
这个过程本质上是一个批处理,适合于分析或者是非交互式的计算任务。 正因为如此,Hadoop本身不是一个数据库或数据仓库的解决方案,而是分析的辅助。 最知名的Hadoop的用户之一是Facebook。它的MySQL数据库存储核心...