`

Hadoop HDFS API编程

 
阅读更多
HDFS API编程

修改hadoop-env.sh
export HADOOP_CLASSPATH=/home/anker/hadoop-1.1.2/myclass

设置环境变量
修改.bash_profile,当用户一登陆,就会执行此文件
PATH=$PATH:$HOME/bin:/usr/jdk1.7.0_51/bin
JAVA_HOME=/usr/jdk1.7.0_51/

export JAVA_HOME
export PATH

设置好后,可以通过env来检查变量。

//编译时要指定classpath javac -classpath ../hadoop-core-1.1.2.jar URLCat.java

//若是编译的类在jar包中,需要指定具体的jar包。
//保险的办法是加载$HADOOP_HOME目录以及lib子目录下所有的jar包

//执行 ../bin/hadoop URLCat hdfs://anker.centos1:9000/user/anker/in/test2.txt

//1.通过URL来读取HDFS 中的文件内容
import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
import org.apache.hadoop.io.IOUtils;
import java.io.InputStream;
import java.net.URL;

//从Hadoop URL中读取数据
public class  URLCat
{
	static{//JVM只能调用一次此方法,所以选择此静态方法。这个限制意味着玉如其他程序的其他组件,已经声明了一个URLStreamHandlerFactory,将无法再使用上述方法从Hadoop中读取数据。
		URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());//通过FsurlStreamHandlerFactory示例调用setURLStreamHandlerFactory实现java能够实现Hadoop的hdfs url
	}
	public static void main(String[] args) 
	{
		InputStream in = null;
		try{
			in = new URL(args[0]).openStream();
			IOUtils.copyBytes(in,System.out,4096,false);//IOUtils类可以实现输入流和输出流(system.out)之间复制数据,4096为缓存区大小,false为复制结束后是否关闭数据流
		}finally{
			IOUtils.closeStream(in);
		}
	}
}


//2. ant程序示例

可以将此放入到.bash_profile文件中
export HADOOP_HOME=/home/anker/hadoop-1.1.2

/* This sample demostrate use of HDFS Java API
 * This sample is loosely based on the 
 * http://wiki.apache.org/hadoop/HadoopDfsReadWriteExample
*/
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class HDFSJavaAPIDemo {

        public static void main(String[] args) throws IOException {
                Configuration conf = new Configuration(); //Provides access to configuration parameters. A New Configuration
                conf.addResource(new Path(
                                "/u/hadoop-1.0.2/conf/core-site.xml"));//Add a configuration resource
                conf.addResource(new Path(
                                "/u/hadoop-1.0.2/conf/hdfs-site.xml"));//Add a configuration resource Add a configuration resourcefile -
									//file-path of resource to be added, the local filesystem is examined directly to find the resource, 
									//without referring to the classpath

                FileSystem fileSystem = FileSystem.get(conf);//Returns the configured filesystem implementation. 
                System.out.println(fileSystem.getUri());//Returns a URI whose scheme and authority identify this FileSystem

                Path file = new Path("demo.txt");
                if (fileSystem.exists(file)) {
                        System.out.println("File exists.");
                } else {
                        // Writing to file
                        FSDataOutputStream outStream = fileSystem.create(file);//Utility that wraps a OutputStream in a DataOutputStream, buffers output through a BufferedOutputStream and creates a checksum file.
                        outStream.writeUTF("Welcome to HDFS Java API!!!");
                        outStream.close();
                }

                // Reading from file
                FSDataInputStream inStream = fileSystem.open(file);
                String data = inStream.readUTF();
                System.out.println(data);
                inStream.close();

                // deleting the file. Non-recursively.
                // fileSystem.delete(file, false);
                fileSystem.close();
        }
}




如何执行该程序
bin/hadoop jar HDFSJavaAPI.jar HDFSJavaAPIDemo


Hadoop文件系统中通过Hadoop Path对象来代表文件,可以将一条路径视为一个文件。如hdfs://localhost/user/tom/quangle.txt
FileSystem是一个通用的文件系统API,所以第一步是检索我们需要使用的文件系统实例。获取FileSystem实例有几种静态工厂方法:
public static FileSystem get(Configuration conf) throws IOException
public static FileSystem get(URI uri, Configuration conf) throws IOException
public static FileSystem get(URI uri, Configuration conf, String user)
throws IOException
Configuration对象封装了客户端或服务器的配置,通过设置配置文件读取类路径(conf/core-site.xml)。第一个方法返回的是
默认文件系统(在conf/core-site.xml中指定。若没有指定,则使用默认的本地文件系统)
第二个通过给定的URI方案和权限来确定要使用的文件系统,如果给定的URI没有指定方案,则返回默认方案。 
第三种是通过一个指定的用户获取filesystem

在某些例子中,如果想获取一个本地的filesystem实例,可以使用以下方法:
public static LocalFileSystem getLocal(Configuration conf) throws IOException

当获取到Filesystem的实例后,我们调用open()函数来获取文件的输入流。
public FSDataInputStream open(Path f) throws IOException
public abstract FSDataInputStream open(Path f, int bufferSize) throws IOException

第一个方法默认的buffer size 为4k

//3.FileSystem API读取数据
import org.apache.hadoop.conf.Configuration;
import java.io.InputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import java.net.URI;
public class FileSystemCat {
public static void main(String[] args) throws Exception {
String uri = args[0];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);//Returns the FileSystem for this URI's scheme and authority.
InputStream in = null;
try {
in = fs.open(new Path(uri));//Opens an FSDataInputStream at the indicated Path
IOUtils.copyBytes(in, System.out, 4096, false);
} finally {
IOUtils.closeStream(in);
}
}
}
//实际上,FileSystem对象中的open()方法返回的是FSdataINputStream对象,这个类继承了java.io.DataInputStream接口的一个特殊类,
并支持随机访问,因此可以从流的任意位置读取数据。
Seekable接口支持在文件中找到指定位置,并提供一个查询当前位置相对于文件起始位置便宜量(getpos())的方法。
调用seek()来定位大于文件长度的位置将导致IOException异常。

package org.apache.hadoop.fs;
public class FSDataInputStream extends DataInputStream
implements Seekable, PositionedReadable,Closeable {
// implementation elided
}

FSDataInputStream类也实现了PositionedReable接口,从一个指定偏移量处读取文件一部分:


public interface PositionedReadable {
public int read(long position, byte[] buffer, int offset, int length)
throws IOException;
public void readFully(long position, byte[] buffer, int offset, int length)
throws IOException;
public void readFully(long position, byte[] buffer) throws IOException;
}



//4.FSDataInputStream API读取数据

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
public class  FileSystemDoubleCat
{
	public static void main(String[] args) 
	{
		String uri = args[0];
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(URI.create(uri),conf);
		FSDataInputStream in = null;
		try{
			in = fs.open(new Path(uri));
			IOUtils.copyBytes(in,System.out,4096,false);
			in.seek(0);//go back to the start of the file
			IOUtils.copyBytes(in,System.out,4096,false);	
		}finally{
			IOUtils.closeStream(in);
		}
	}
};




//5.写入数据

FileSystem类有一系列创建文件的方法。最简单的方法时给准备创建的文件制定一个path对象,然后返回一个
用户写入数据的数据流。
public FSDataOutputStream create(Path f) throws IOException

上述方法有多个重载对象,允许我们制定是否需要强制覆盖已有的文件,文件备份数量,写入文件时所用的缓冲区
大小,文件块大小及文件权限。

还有一个重载方法Progressable,用于传递回调接口,如此一来,可以把数据写入数据节点的进度通知到应用:

package org.apache.hadoop.util;
public interface Progressable {
public void progress();
}

另一种新建文件的方法,是使用append()方法在一个已有文件末尾追加数据
public FSDataOutputStream append(Path f) throws IOException

该追加操作允许一个writer打开文件后再访问该文件的最后偏移量追加数据。
每次Hadoop调用progress方法时,也就是每次将64kb数据包写入datanode管线后--
打印一个时间点来显示整个过程。


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import java.net.URI;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import java.io.InputStream;
import java.io.FileInputStream;
import java.io.BufferedInputStream;
import java.io.OutputStream;
import org.apache.hadoop.util.Progressable

public class FileCopyWithProgress {
public static void main(String[] args) throws Exception {
String localSrc = args[0];
String dst = args[1];
InputStream in = new BufferedInputStream(new FileInputStream(localSrc));
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(dst), conf);
OutputStream out = fs.create(new Path(dst), new Progressable() { //Create an FSDataOutputStream at the indicated Path with write-progress reporting.
public void progress() {
System.out.print(".");
}
});
IOUtils.copyBytes(in, out, 4096, true);//Copies from one stream to another. true means close the inputstream and outputstream at then end.
}
}


FSDataoutputStream对象

FileSystem 实例的create()方法返回FSDataOutputStream对象,与FSDataInputStream类相似,
它也有一个查询文件当前位置的方法:
package org.apache.hadoop.fs;
public class FSDataOutputStream extends DataOutputStream implements Syncable {
public long getPos() throws IOException {
	// implementation elided
}
// implementation elided
}

但与FSDataInputStream类不同的是,FSDataOutputStream类不允许在文件中定位。这是因为HDFS只允许对一个
已打开的文件顺序写入,或在现有文件的末尾追加数据。换句话说,它不支持在除文件末尾之外的其他位置进行写入,
因此写入时定位就没有任何意义。



目录
FileSystem实例提供了创建目录的方法
public boolean mkdirs(Path f) throws IOException

这个方法可以一次性创建所有必要还没有的父目录,就像java.io.file类的mkdirs()方法。如果目录都已创建成功,
则返回true。

通常,你不需要显式创建一个目录,因为调用create()写入文件时会自动创建父目录。

FileSystem的getFileStatus()方法用于获取文件或目录的FileStatus对象。


public class ShowFileStatusTest {
private MiniDFSCluster cluster; // use an in-process HDFS cluster for testing
private FileSystem fs;
@Before
public void setUp() throws IOException {
Configuration conf = new Configuration();
if (System.getProperty("test.build.data") == null) {
System.setProperty("test.build.data", "/tmp");
}
cluster = new MiniDFSCluster(conf, 1, true, null);
fs = cluster.getFileSystem();

OutputStream out = fs.create(new Path("/dir/file"));
out.write("content".getBytes("UTF-8"));
out.close();
}
@After
public void tearDown() throws IOException {
if (fs != null) { fs.close(); }
if (cluster != null) { cluster.shutdown(); }
}
@Test(expected = FileNotFoundException.class)
public void throwsFileNotFoundForNonExistentFile() throws IOException {
fs.getFileStatus(new Path("no-such-file"));
}
@Test
public void fileStatusForFile() throws IOException {
Path file = new Path("/dir/file");
FileStatus stat = fs.getFileStatus(file);
assertThat(stat.getPath().toUri().getPath(), is("/dir/file"));
assertThat(stat.isDir(), is(false));
assertThat(stat.getLen(), is(7L));
assertThat(stat.getModificationTime(),
is(lessThanOrEqualTo(System.currentTimeMillis())));
assertThat(stat.getReplication(), is((short) 1));
assertThat(stat.getBlockSize(), is(64 * 1024 * 1024L));
assertThat(stat.getOwner(), is("tom"));
assertThat(stat.getGroup(), is("supergroup"));
assertThat(stat.getPermission().toString(), is("rw-r--r--"));
}
@Test
public void fileStatusForDirectory() throws IOException {
Path dir = new Path("/dir");
FileStatus stat = fs.getFileStatus(dir);
assertThat(stat.getPath().toUri().getPath(), is("/dir"));
assertThat(stat.isDir(), is(true));
assertThat(stat.getLen(), is(0L));
assertThat(stat.getModificationTime(),
is(lessThanOrEqualTo(System.currentTimeMillis())));
assertThat(stat.getReplication(), is((short) 0));
assertThat(stat.getBlockSize(), is(0L));
assertThat(stat.getOwner(), is("tom"));
assertThat(stat.getGroup(), is("supergroup"));
assertThat(stat.getPermission().toString(), is("rwxr-xr-x"));
}
}

列出文件

列出一个文件或者目录的信息,通过FileSystem的listStatus()方法。
public FileStatus[] listStatus(Path f) throws IOException
public FileStatus[] listStatus(Path f, PathFilter filter) throws IOException
public FileStatus[] listStatus(Path[] files) throws IOException
public FileStatus[] listStatus(Path[] files, PathFilter filter) throws IOException

当传入的参数是一个文件时,它会简单转变为以数组形式长度为1的FileStatus对象。当传入的参数
是一个目录时,则返回0或多个FileStatus对象,表示此目录包含的文件和目录。

一个重载的方法时允许使用PathFilter来限制匹配的文件和目录。如果指定一组路径,其执行结果相当
与依次轮流传递每条路径并调用listStatus()方法,再将FileStatus对象数组累积存入同一数组中。
但该方法更为方便,这从文件系统树的不同分支构建输入文件列表时,这是很有用的。

注意FileUtil中stat2Paths()方法的使用,它将一个FileStatus对象数组转换为Path对象数组。

% hadoop ListStatus hdfs://localhost/ hdfs://localhost/user/tom

public class ListStatus {
public static void main(String[] args) throws Exception {
String uri = args[0];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
Path[] paths = new Path[args.length];
for (int i = 0; i < paths.length; i++) {
paths[i] = new Path(args[i]);
}
FileStatus[] status = fs.listStatus(paths);
Path[] listedPaths = FileUtil.stat2Paths(status);
for (Path p : listedPaths) {
System.out.println(p);
}
}
}


文件模式,在单个操作中处理一批文件,是一个常见要求。在一个表达式中使用通配符来匹配多个文件
是比较方便的,无需列举每个文件和目录来指定输入,该操作称为统配(globbing),Hadoop为执行统配提供了
两个FileSystem方法:
public FileStatus[] globStatus(Path pathPattern) throws IOException
public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throws
IOException

globStatus()方法返回与路径匹配的所有文件的FileStatus对象数组,并按照路径排序。PathFileter命令
可以作为可选项进一步对匹配进行限制。
Hadoop支持的统配符与Unix bash相同。

通配符及含义
*		匹配0或多个字符
?		匹配单一字符
[ab]	匹配{a,b}集合中的一个字符
[ ab]	匹配非{a,b}集合中的一个字符
[a-b]	匹配一个在{a,b}范围内的字符(包括ab),a在字母顺序要小于或等于b
[ a-b]	匹配一个不在{a,b}范围内的字符(包括ab),a在字母顺序要小于或等于b
{a,b}	匹配包含a或b中的一个表达式
\c		转义字符

PathFilter对象

统配符并不总能精确地描述我们想要访问的文件集。FileSystem中的listStatus()和globStatus()
提供了可选的PathFilter对象,使我们可以通过编程方式控制通配符:
public interface PathFilter
{
	boolean accpet(Path path);//Tests whether or not the specified abstract pathname should be included in a pathname list.
}
public class RegexExcludePathFilter implements PathFilter {
private final String regex;
public RegexExcludePathFilter(String regex) {
this.regex = regex;
}
public boolean accept(Path path) {
return !path.toString().matches(regex);
}
}

删除数据
使用FileSystem的delete()方法可以永久性删除文件或目录
public boolean delete(Path f,boolean recursive) throws IOException
如果f是一个文件或空目录,那么recursive值就会被忽略。只有在recrusive值为true时,一个非空目录
及其内容才会被删除。(否则会抛出IOException异常。)

 

分享到:
评论

相关推荐

    实验二、HDFS shell操作及HDFS Java API编程

    适合刚接触hadoop的学生或小白,内容包括HDFS shell操作及HDFS Java API编程 有图片有代码

    java操作Hadoop源码之HDFS Java API操作-上传文件

    java操作Hadoop源码之HDFS Java API操作-上传文件,附带所需全部jar包,欢迎下载学习。

    大数据实验二-HDFS编程实践

    (1)在本实验中,我深入了解了HDFS在Hadoop体系结构中的角色,并熟练掌握了HDFS操作的常用Shell命令和Java API。 (2)首先,我们探讨了HDFS文件操作的常用Shell命令。通过使用`hdfs dfs -put`、`hdfs dfs -get`、...

    HDFS文件系统基本文件命令、编程读写HDFS

    本文档时Hadoop云计算平台下运用Hadoop API对HDFS进行相关的操作,详细的记录了程序在eclipse中的应用,代码内容和运行结果,是学习Hadoop非常有用的资料。

    实验二:熟悉常用的HDFS操作

    A.2实验二:熟悉常用的HDFS操作 ...(3)熟悉HDFS操作常用的Java API。 A.2.2 实验平台 (1)操作系统:Linux(建议Ubuntu 16.04)。(2) Hadoop版本:2.7.1。 (3)JDK版本:1.7或以上版本。(4) Java IDE:Eclipse。

    Hadoop从入门到上手企业开发

    036 HDFS Java API 两种方式介绍及使用URL API详解一 037 使用URL API详解二 038 使用HDFS FileSystem API 详解 039 HDFS文件系统读写流程及HDFS API两种方式读取文件 040 详解HDFS API之FileSystem方式基本操作二 ...

    HDFS判断文件或目录是否存在——Shell命令实现 + Java代码实现

    hdfs dfs -test -e 文件或目录名 第三步,查看检测结果: echo $? 若输出结果为0,则说明文件或目录存在,若为1,则说明文件或目录不存在。 二、Java代码实现 import org.apache.hadoop.conf.Configuration; ...

    Hello-Hadoop-netbeans-OS-X:用于在没有任何 3rd 方 hadoop 插件的情况下使用 HDFS API 的 Java Maven 项目

    用于在没有任何 3rd 方 hadoop 插件的情况下使用 HDFS API 的 Java Maven 项目。 项目宗旨 你好世界/教程级别,用于在 OS X 上使用伪分布式 hadoop 配置进行编程操作。优胜美地上的 Hadoop 2.7。 这个项目展示了...

    hadoop-3.3.4 版本(最新版)

    Hadoop 架构是一个开源的、基于 Java 的编程框架,设计用于跨电脑集群来 处理大数据。Hadoop 是一个能够让用户轻松搭建和使用的分布式计算平台,能 够让用户轻松地在 Hadoop 上开发和运行处理海量数据的应用程序。 ...

    大数据资料集锦

    Hadoop集群监控与Hive高可用,Hadoop现场演示与编程过程,hadoop中文版API,HAWQ, 分布式SQL数据库引擎原生基于Hadoop HDFS,QCon-曹龙-Hadoop2.0应用-基于Yarn的淘宝海量数据服务平台V03,高可用性的HDFS-Hadoop...

    新版Hadoop视频教程 段海涛老师Hadoop八天完全攻克Hadoop视频教程 Hadoop开发

    第一天 hadoop的基本概念 伪分布式hadoop集群安装 hdfs mapreduce 演示 01-hadoop职位需求状况.avi 02-hadoop课程安排.avi 03-hadoop应用场景.avi 04-hadoop对海量数据处理的解决思路.avi 05-hadoop版本选择和...

    HDFS两种操作方式:命令行和Java API

    另一种是 Java API,即利用 Hadoop 的 Java 库,采用编程的方式操作 HDFS 的文件。 本节将介绍 Linux 操作系统中关于 HDFS 文件操作的常用命令行,并将介绍利用 Hadoop 提供的 Java API 进行基本的文件操作,以及...

    实验2 熟悉常用的HDFS操作

    3. 熟悉HDFS操作常用的Java API。 二、实验平台 1. 操作系统:Linux(建议Ubuntu16.04或Ubuntu18.04); 2. Hadoop版本:3.1.3; 3. JDK版本:1.8; 4. Java IDE:Eclipse。 三、实验步骤(每个步骤下均需有运行截图...

    Hadoop技术内幕:深入解析MapReduce架构设计与实现原理

    MapReduce设计理念与基本架构2.1 Hadoop发展史2.1.1 Hadoop产生背景2.1.2 Apache ...编程实例2.4 Hadoop基本架构2.4.1 HDFS架构2.4.2 Hadoop MapReduce架构2.5 Hadoop MapReduce作业的生命周期2.6 小结第二部分...

    Hadoop实战中文版

    3.1 HDFS 文件操作 3.1.1 基本文件命令 3.1.2 编程读写HDFS 3.2 剖析MapReduce 程序 3.2.1 Hadoop数据类型 3.2.2 Mapper 3.2.3 Reducer 3.2.4 Partitioner:重定向Mapper输出 3.2.5 Combiner:本地reduce ...

    大数据技术原理与应用实验

    实验七 HDFS Java API编程 69 第三章 分布式协调服务系统ZooKeeper 75 实验八 Zookeeper安装部署 75 实验九 Zookeeper Shell命令使用 79 实验十 Zookeeper Java API编程 82 第四章 分布式离线计算框架MapReduce 85 ...

    Hadoop 0.20.2 版本安装包

    伪分布式安装包,Hadoop 1.0指的是版本为Apache Hadoop 0.20.x、1.x或者CDH3系列的Hadoop,内核主要由HDFS和MapReduce两个系统组成,其中,MapReduce是一个离线处理框架,由编程模型(新旧API)、运行时环境和数据...

    python大数据-为什么Python编程非常适合大数据?.pdf

    HDFSAPI 对您有何好处? 所以,你去。 HDFS API使您可以轻松地在⽂件,⽬录和全局⽂件系统属性上读写信息,⽽不会遇到任何障碍。 2.提供MapReduce API Pydoop提供了MapReduce API,以最少的编程⼯作即可解决复杂的...

    Hadoop实战中文版.PDF

    30第3章 Hadoop组件 313.1 HDFS文件操作 313.1.1 基本文件命令 323.1.2 编程读写HDFS 353.2 剖析MapReduce程序 373.2.1 Hadoop数据类型 393.2.2 Mapper 403.2.3 Reducer 413.2.4 Partitioner:...

Global site tag (gtag.js) - Google Analytics