通过java代码对HDFS进行操作:创建文件文件,读取文件,删除文件,文件列表,创建目录,当地文件上传到hdfs,获取所有节点信息,文件写入数据。
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; public class HDFSTest { public static void main(String[] args) throws Exception { //uploadLocalFile2HDFS("E:/1.txt","/tmp/1.txt");//E盘下文件传到hdfs上 //createNewHDFSFile("/tmp/create2", "hello"); //String str = new String(readHDFSFile("/tmp/create2")); //System.out.println(str); //mkdir("/tmp/testdir"); //deleteDir("/tmp/testdir"); //listAll("/tmp/"); //getDateNodeHost(); listByFilter(""); } //获取HDFS集群上所有节点名称信息 public static void getDateNodeHost() throws IOException{ Configuration conf = getConf(); FileSystem fs=FileSystem.get(conf); DistributedFileSystem hdfs = (DistributedFileSystem)fs; DatanodeInfo[] dataNodeStats = hdfs.getDataNodeStats(); for(int i=0;i<dataNodeStats.length;i++){ System.out.println("DataNode_"+i+"_Name:"+dataNodeStats[i].getHostName()); } } /* * upload the local file to the hds * 路径是全路径 */ public static void uploadLocalFile2HDFS(String s, String d) throws IOException { Configuration conf = getConf(); FileSystem hdfs = FileSystem.get(conf); Path src = new Path(s); Path dst = new Path(d); hdfs.copyFromLocalFile(src, dst); hdfs.close(); } /* * 创建一个新文件 * create a new file in the hdfs. * notice that the toCreateFilePath is the full path * and write the content to the hdfs file. */ public static void createNewHDFSFile(String toCreateFilePath, String content) throws IOException { Configuration conf = getConf(); FileSystem hdfs = FileSystem.get(conf); FSDataOutputStream os = hdfs.create(new Path(toCreateFilePath)); os.write(content.getBytes("UTF-8")); os.close(); hdfs.close(); } /* * 删除某个文件 * delete the hdfs file * notice that the dst is the full path name */ public static boolean deleteHDFSFile(String dst) throws IOException { Configuration conf = getConf(); FileSystem hdfs = FileSystem.get(conf); Path path = new Path(dst); boolean isDeleted = hdfs.delete(path); hdfs.close(); return isDeleted; } /* * 读取某个文件 * read the hdfs file content * notice that the dst is the full path name */ public static byte[] readHDFSFile(String dst) throws Exception { Configuration conf = getConf(); FileSystem fs = FileSystem.get(conf); // check if the file exists Path path = new Path(dst); if ( fs.exists(path) ) { FSDataInputStream is = fs.open(path); // get the file info to create the buffer FileStatus stat = fs.getFileStatus(path); //IO工具 数据读取到控制面板 //IOUtils.copyBytes(is , System.out, 1024,false ); // create the buffer byte[] buffer = new byte[Integer.parseInt(String.valueOf(stat.getLen()))]; is.readFully(0, buffer); is.close(); fs.close(); return buffer; } else { throw new Exception("the file is not found ."); } } /* * make a new dir in the hdfs * the dir may like '/tmp/testdir' */ public static void mkdir(String dir) throws IOException { Configuration conf = getConf(); FileSystem fs = FileSystem.get(conf); fs.mkdirs(new Path(dir)); fs.close(); } /* * delete a dir in the hdfs * dir may like '/tmp/testdir' */ public static void deleteDir(String dir) throws IOException { Configuration conf = getConf(); FileSystem fs = FileSystem.get(conf); fs.delete(new Path(dir)); fs.close(); } //文件系统连接到 hdfs的配置信息 private static Configuration getConf(){ Configuration conf = new Configuration(); // 这句话很关键 conf.set("mapred.job.tracker", "192.168.142.128:9001"); conf.set("fs.default.name", "hdfs://192.168.142.128:9000"); return conf; } /** * @Title: listAll * @Description: 列出目录下所有文件 * @return void 返回类型 * @throws */ public static void listAll(String dir) throws IOException { Configuration conf = getConf(); FileSystem fs = FileSystem.get(conf); FileStatus[] stats = fs.listStatus(new Path(dir)); for(int i = 0; i < stats.length; ++i) { if (!stats[i].isDir()) { // regular file System.out.println(stats[i].getPath().toString()); } else { // dir System.out.println(stats[i].getPath().toString()); } // else if(stats[i].()) // { // // is s symlink in linux // System.out.println(stats[i].getPath().toString()); // } } fs.close(); } //查询目录中的所有文件,通过统配符号过滤 public static void listByFilter(String dir) throws IOException { Configuration conf = getConf(); FileSystem fs = FileSystem.get(conf); //列出 目录下 符合规则的文件 FileStatus[] stats = fs.globStatus(new Path("/tmp/*"), new PathFilter(){ @Override public boolean accept(Path path) { return !path.toString().matches(".*txt" ); } }); //FileStatus[] stats = fs.listStatus(new Path(dir)); for(int i = 0; i < stats.length; ++i) { if (!stats[i].isDir()) { // regular file System.out.println(stats[i].getPath().toString()); } else { // dir System.out.println(stats[i].getPath().toString()); } // else if(stats[i].()) // { // // is s symlink in linux // System.out.println(stats[i].getPath().toString()); // } } fs.close(); } }
相关推荐
Hadoop 培训课程(2)HDFS 分布式文件系统与HDFS HDFS体系结构与基本概念*** HDFS的shell操作*** java接口及常用api*** ---------------------------加深拓展---------------------- RPC调用** HDFS的分布式存储架构的...
通过调用hdfs的api,进行文件上传、创建、删除、重命名等操作的代码
主要介绍了Python API 操作Hadoop hdfs详解,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
1.java接口操作Hadoop文件系统(文件上传下载删除创建...可举一反三) 2.RPC远程过程调用的java代码实现,便于理解Hadoop的RPC协议,具体使用方法可参考我的博客...
HDFS文件 HDFS.jl 包装了 HDFS C 库...从对open或hdfs_open的调用返回,除了文件系统句柄和文件 URL 之外,它还保存文件句柄。 它需要在涉及文件的未来 API 调用中传递。 type HdfsFile fs::HdfsFS path::Abstrac
HDFS 客户端接口的调用和测试示例,基于 hadoop-release-HDP-2.5.0.0 ,供初学者参考
基于hadoop的Hive数据仓库JavaAPI简单调用的实例,关于Hive的简介在此不赘述。hive提供了三种用户接口:CLI,JDBC/ODBC和 WebUI CLI,即Shell命令行 JDBC/ODBC 是 Hive 的Java,与使用传统数据库JDBC的方式类似 Web...
3 .2.1 WebHdfsFileSystem 的REST API 操作… ·…………… …… …………………………….. 127 3.2.2 WebHdfsFileSystem 的流程调用…….. . . ...….... ..... ........ .. ... . ........ . .…. ... .. .… ……...
1.maven本地安装; 2.Maven引用: <groupId>com.dw.hadoop</groupId> <artifactId>common_tool <version>0.0.1-SNAPSHOT 3.调用静态方法HdfsDAO
它提供了一个 C API 用于直接调用 Namenode RPC 和执行 Datanode 块读写操作。 与 libhdfs 不同,Hadoofus 使用多个版本的 HDFS 协议。 您的选择,您可以用Hadoop 0.20.203通过1.xy(说话HDFS_NN_v1 / HDFS_...
MPI文件夹中包含了一些常见的MPI函数的用法和几个简单的应用,Hadoop文件夹中包含HDFS的java API调用,简单的Mapreduce程序的编写。 MPI mpi_array_sort.c / 2.c 对数组进行并行比较,得到最大值。 将数组的不同部分...
为减少读写速率的损失,系统通过调用 HDFS(Hadoop distributed file system)的API接口方式实现数据存储及对集群的状态监控和管理.实验表明:传输文件的大小和集群中数据节点的 个数对云存储服务的数据读写速率的影响较...
地铁api数据DW分析BI过程源数据收集和HDFS加载在首尔开放数据广场发布和调用与地铁使用相关的数据API密钥通过Anaconda3构建python 3.8.5环境,打开jupyter笔记本端口,并执行收集代码。 通过Nifi流进行HDFS加载和...
C/C++/C# API 需要使用 Microsoft Windows 动态链接库 (NxCore.dll),该库在购买时随历史磁带文件一起提供。 ETL 目录包含使用 Hadoop 的分布式磁带处理和消息提取。 通过 bash 脚本的 hive 流使用 MapReduce。 ...
如何通过Hadoop进行离线数据分析,通过Hive建立数据仓库。 如何将关系型数据库中存储的数据导入HDFS,以及从HDFS中将数据导入关系型数据库。 如何将分析好的数据通过图形展示给用户。 5.1 日志收集 339 ...