存在Hadoop集群上的文件,大部分都会经过压缩,如果是压缩后的文件,我们直接在应用程序中如何读取里面的数据?答案是肯定的,但是比普通的文本读取要稍微复杂一点,需要使用到Hadoop的压缩工具类支持,比如处理gz,snappy,lzo,bz压缩的,前提是首先我们的Hadoop集群得支持上面提到的各种压缩文件。
本次就给出一个读取gz压缩文件的例子核心代码:
def readHdfsWriteKafkaByDate(fs:FileSystem,date:String,conf:Configuration,topic:String,finishTimeStamp:Long):Unit={
//访问hdfs文件,只读取gz结尾的压缩文件,如果是.tmp结尾的不会读取
val path=new Path("/collect_data/userlog/"+date+"/log*.gz")
//实例化压缩工厂编码类
val factory = new CompressionCodecFactory(conf)
//读取通配路径
val items=fs.globStatus(path)
var count=0
//遍历每一个路径文件
items.foreach(f=>{
//打印全路径
println(f.getPath)
//通过全路径获取其编码
val codec = factory.getCodec(f.getPath())//获取编码
//读取成数据流
var stream:InputStream = null;
if(codec!=null){
//如果编码识别直接从编码创建输入流
stream = codec.createInputStream(fs.open(f.getPath()))
}else{
//如果不识别则直接打开
stream = fs.open(f.getPath())
}
val writer=new StringWriter()
//将字节流转成字符串流
IOUtils.copy(stream,writer,"UTF-8")
//得到字符串内容
val raw=writer.toString
//根据字符串内容split出所有的行数据,至此解压数据完毕
val raw_array=raw.split("\n")
//遍历数据
raw_array.foreach(line=>{
val array = line.split("--",2) //拆分数组
val map = JSON.parseObject(array(1)).asScala
val userId = map.get("userId").getOrElse("").asInstanceOf[String] //为空为非法数据
val time = map.get("time").getOrElse("") //为空为非法数据
if(StringUtils.isNotEmpty(userId)&&(time+"").toLong<=finishTimeStamp){//只有数据
pushToKafka(topic,userId,line)
count=count+1
}
})
})
}
压缩和解压模块用的工具包是apache-commons下面的类:
import org.apache.commons.io.IOUtils
import org.apache.commons.lang.StringUtils
如果想在Windows上调试,可以直接设置HDFS的地址即可
- val conf = new Configuration()//获取hadoop的conf
// conf.set("fs.defaultFS","hdfs://192.168.10.14:8020/")//windows上调试用
至此数据已经解压并读取完毕,其实并不是很复杂,用java代码和上面的代码也差不多类似,如果直接用原生的api读取会稍微复杂,但如果我们使用Hive,Spark框架的时候,框架内部会自动帮我们完成压缩文件的读取或者写入,对用户透明,当然底层也是封装了不同压缩格式的读取和写入代码,这样以来使用者将会方便许多。
参考文章:
https://blog.matthewrathbone.com/2013/12/28/reading-data-from-hdfs-even-if-it-is-compressed
有什么问题可以扫码关注微信公众号:我是攻城师(woshigcs),在后台留言咨询。
技术债不能欠,健康债更不能欠, 求道之路,与君同行。
分享到:
相关推荐
hbase-2.1.3-bin.tar.gz,hadoop-3.1.2.tar.gz,hadoop-2.7.4.tar.gz,flink-1.7.2-bin-hadoop27-scala_2.11.tgz,apache-hive-3.1.1-bin.tar.gz,apache-hive-2.3.5-bin.tar.gz,以及一些推荐的集群搭建配置文件,...
对于想学习 Spark 的人而言,如何构建 Spark 集群是其最大的难点之一, 为了解决大家构建 Spark 集群的一切困难,Spark 集群的构建分为了五个步骤,从 零起步,不需要任何前置知识,涵盖操作的每一个细节,构建完整...
spark scala认证和代理认证hadoop的kerberos代码示例。
scala-2.11.8.tar.gz
hadoop-2.7.3.tar.gz spark-2.4.8-bin-hadoop2.7.tgz apache-maven-3.6.3-bin.zip jdk-8u171-linux-x64.tar.gz sqoop-1.4.7.bin__hadoop-2.6.0.tar.gz flink-1.10.1-bin-scala_2.12.tgz kafka_2.11-2.3.1.tgz ...
hadoop 集群 搭建 内容齐全 word文档
支持hadoop2.7.2 scala 2.12,编译完成的二进制包flink安装包,亲测可用,注意压缩包是250M,这才是正确的。其他100多M都不对。由于超过220M,所以提供百度云盘链接的形式。
2. 学会在Spark Shell中编写Scala程序; 3. 学会在Spark Shell中运行Scala程序。 二、实验环境 Windows 10 VMware Workstation Pro虚拟机 Hadoop环境 Jdk1.8 三、实验内容 (一)Spark基础知识 (1)输入start-...
flink-1.9.2-bin-scala_2.12-hadoop_2.7.2.zip.002 flink安装包,支持hadoop2.7.2 scala 2.12,编译完成的二进制包flink安装包
亲手在Centos7上安装,所用软件列表 apache-flume-1.8.0-bin.tar.gz apache-phoenix-4.13.0-HBase-1.3-bin.tar.gz hadoop-2.7.4.tar.gz hbase-1.3.1-bin.tar.gz jdk-8u144-linux-x64.tar.gz kafka_2.12-1.0.0.tgz ...
Hadoop集群搭建: 1.CentOS 6.7 minimal http://mirror.neu.edu.cn/centos/6.7/isos/x86_64/CentOS-6.7-x86_64-minimal.iso 2.Hadoop 2.6.3 ...
spark-3.2.4-bin-hadoop3.2-scala2.13 安装包
1.本机虚拟机镜像为ova格式,大小为2.9G,仅限VirtualBox使用,本机无图形界面!请注意! 2.本机为ubuntu16.04服务器版本,无图形...5.使用hadoop集群前,需要根据本地虚拟机的IP地址修改/etc/hosts文件中的host配置。
scala-2.12.7.tar.gz源码
flink-1.0.3-bin-hadoop27-scala_2flink-1.0.3-bin-hadoop27-scala_2
hadoop,spark,scala环境搭建,有详细步骤,一步一步跟着word操作就可以了。
hadoop 2.7.5 集群搭建 spark 2.2.1 集群搭建,配置scala编译环境 hive on spark 安装 hbase 搭建
本范例是一个Scala工程,结合Java组件实现了对spark产品分析的结果json文件进行解析,将其内部的几何对象转换为记录集写入数据集里,由于Scala读写文件的效率比较高,故采用Scala与Java组件实现。
主要介绍了scala 读取txt文件的方法示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧