- 浏览: 108629 次
- 性别:
- 来自: 深圳
文章分类
最新评论
-
土豆蛋儿:
我想读取一个外部文件,以什么方式好了? 文件内容经常编辑
flume 自定义source -
土豆蛋儿:
大神,您好。
flume 自定义source
自定义分隔符
package com.lwz.inputf;
import java.io.IOException;
import java.io.InputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobConfigurable;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.util.LineReader;
/**
* 自定义hadoop的 org.apache.hadoop.mapred.InputFormat
*
* @author winston
* hadoop将数据给到map进行处理前会使用InputFormat对数据进行两方面的预处理,最常见的FormatInput就是TextInputFormat
*/
public class ClickstreamInputFormat extends TextInputFormat implements
JobConfigurable {
public RecordReader<LongWritable, Text> getRecordReader(
InputSplit genericSplit, JobConf job, Reporter reporter)
throws IOException {
//-----genericSplit----hdfs://vmtmstorm01:8020/user/hive/warehouse/hive_test/test:0+13
System.out.println("-----genericSplit----"+genericSplit.toString());
reporter.setStatus(genericSplit.toString());
/*---job----Configuration:
core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml,
yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml,
org.apache.hadoop.hive.conf.LoopingByteArrayInputStream@351d4566,
file:/etc/hive/conf.cloudera.hive/hive-site.xml
*/
System.out.println("---job----"+job);
System.out.println("----reporter----"+reporter);
return new ClickstreamRecordReader((FileSplit) genericSplit,job);
}
public class ClickstreamRecordReader implements
RecordReader<LongWritable, Text> {
private CompressionCodecFactory compressionCodecs = null;
private long start;
private long pos;
private long end;
private LineReader lineReader;
int maxLineLength;
/*FileSplit是一個輸入文件
LineReader 一个类,提供一个输入流行的读者。根据所使用的构造函数,线将被终止:
下列之一:'\n'(LF),‘R’(CR),或“\r\n”(CR + LF)。
或者,一个自定义的字节序列分隔符在这两种情况下,EOF也终止否则无端接线。
*/
public ClickstreamRecordReader(FileSplit inputSplit, Configuration job)
throws IOException {
maxLineLength = job.getInt("mapred.ClickstreamRecordReader.maxlength",
Integer.MAX_VALUE);
start = inputSplit.getStart();
System.out.println("---start---"+start);
System.out.println("----inputSplit.getLength----"+inputSplit.getLength());
end = start + inputSplit.getLength();
//获取文件的路径
final Path file = inputSplit.getPath();
System.out.println("---filepath---"+file);
//一个工厂,将找到一个给定的文件名正确编解码。
compressionCodecs = new CompressionCodecFactory(job);
// 发现对于给定的基于其文件名后缀的文件相关的压缩编解码器。
final CompressionCodec codec = compressionCodecs.getCodec(file);
// Open file and seek to the start of the split
//获取该文件数据那个文件系统
FileSystem fs = file.getFileSystem(job);
System.out.println("---FileSystem----"+fs.toString()+"----fs-----"+fs);
//在指定的path下打开一个FSDataInputStream流
FSDataInputStream fileIn = fs.open(file);
boolean skipFirstLine = false;
if (codec != null) {
//如果解压器不为空时创建一个LineReader实例
lineReader = new LineReader(codec.createInputStream(fileIn), job);
end = Long.MAX_VALUE;
} else {
if (start != 0) {
skipFirstLine = true;
--start;
//寻找到了文件的起始偏移
fileIn.seek(start);
}
//创建一个LineReader实例
lineReader = new LineReader(fileIn, job);
}
if (skipFirstLine) {
start += lineReader.readLine(new Text(), 0,
(int) Math.min((long) Integer.MAX_VALUE, end - start));
}
this.pos = start;
}
将代码导出,传到主机,然后通过add jar /app/home/*.jar;添加jar包,此中法添加的都是临时性的,只在当前会话生效
建表
Create table hive_test(num int,name string,jj string) stored as INPUTFORMAT 'com.lwz.inputf.ClickstreamInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat';
package com.lwz.inputf;
import java.io.IOException;
import java.io.InputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobConfigurable;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.util.LineReader;
/**
* 自定义hadoop的 org.apache.hadoop.mapred.InputFormat
*
* @author winston
* hadoop将数据给到map进行处理前会使用InputFormat对数据进行两方面的预处理,最常见的FormatInput就是TextInputFormat
*/
public class ClickstreamInputFormat extends TextInputFormat implements
JobConfigurable {
public RecordReader<LongWritable, Text> getRecordReader(
InputSplit genericSplit, JobConf job, Reporter reporter)
throws IOException {
//-----genericSplit----hdfs://vmtmstorm01:8020/user/hive/warehouse/hive_test/test:0+13
System.out.println("-----genericSplit----"+genericSplit.toString());
reporter.setStatus(genericSplit.toString());
/*---job----Configuration:
core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml,
yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml,
org.apache.hadoop.hive.conf.LoopingByteArrayInputStream@351d4566,
file:/etc/hive/conf.cloudera.hive/hive-site.xml
*/
System.out.println("---job----"+job);
System.out.println("----reporter----"+reporter);
return new ClickstreamRecordReader((FileSplit) genericSplit,job);
}
public class ClickstreamRecordReader implements
RecordReader<LongWritable, Text> {
private CompressionCodecFactory compressionCodecs = null;
private long start;
private long pos;
private long end;
private LineReader lineReader;
int maxLineLength;
/*FileSplit是一個輸入文件
LineReader 一个类,提供一个输入流行的读者。根据所使用的构造函数,线将被终止:
下列之一:'\n'(LF),‘R’(CR),或“\r\n”(CR + LF)。
或者,一个自定义的字节序列分隔符在这两种情况下,EOF也终止否则无端接线。
*/
public ClickstreamRecordReader(FileSplit inputSplit, Configuration job)
throws IOException {
maxLineLength = job.getInt("mapred.ClickstreamRecordReader.maxlength",
Integer.MAX_VALUE);
start = inputSplit.getStart();
System.out.println("---start---"+start);
System.out.println("----inputSplit.getLength----"+inputSplit.getLength());
end = start + inputSplit.getLength();
//获取文件的路径
final Path file = inputSplit.getPath();
System.out.println("---filepath---"+file);
//一个工厂,将找到一个给定的文件名正确编解码。
compressionCodecs = new CompressionCodecFactory(job);
// 发现对于给定的基于其文件名后缀的文件相关的压缩编解码器。
final CompressionCodec codec = compressionCodecs.getCodec(file);
// Open file and seek to the start of the split
//获取该文件数据那个文件系统
FileSystem fs = file.getFileSystem(job);
System.out.println("---FileSystem----"+fs.toString()+"----fs-----"+fs);
//在指定的path下打开一个FSDataInputStream流
FSDataInputStream fileIn = fs.open(file);
boolean skipFirstLine = false;
if (codec != null) {
//如果解压器不为空时创建一个LineReader实例
lineReader = new LineReader(codec.createInputStream(fileIn), job);
end = Long.MAX_VALUE;
} else {
if (start != 0) {
skipFirstLine = true;
--start;
//寻找到了文件的起始偏移
fileIn.seek(start);
}
//创建一个LineReader实例
lineReader = new LineReader(fileIn, job);
}
if (skipFirstLine) {
start += lineReader.readLine(new Text(), 0,
(int) Math.min((long) Integer.MAX_VALUE, end - start));
}
this.pos = start;
}
将代码导出,传到主机,然后通过add jar /app/home/*.jar;添加jar包,此中法添加的都是临时性的,只在当前会话生效
建表
Create table hive_test(num int,name string,jj string) stored as INPUTFORMAT 'com.lwz.inputf.ClickstreamInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat';
发表评论
-
hive + hbase
2015-01-04 10:42 733环境配置: hadoop-2.0.0-cdh4.3.0 (4 ... -
hive 数据倾斜
2014-08-27 09:03 642链接:http://www.alidata.org/archi ... -
hive 分通总结
2014-08-27 08:42 541总结分析: 1. 定义了桶,但要生成桶的数据,只能是由其他表 ... -
深入了解Hive Index具体实现
2014-08-25 08:51 702索引是标准的数据库技术,hive 0.7版本之后支持索引。hi ... -
explain hive index
2014-08-24 16:44 1117设置索引: 使用聚合索引优化groupby操作 hive> ... -
Hive 中内部表与外部表的区别与创建方法
2014-08-15 17:11 723分类: Hive 2013-12-07 11:56 ... -
hive map和reduce的控制
2014-08-15 16:14 596一、 控制hive任务中的map数: 1. 通 ... -
hive 压缩策略
2014-08-15 15:16 1727Hive使用的是Hadoop的文件 ... -
hive 在mysql中创建备用数据库
2014-08-15 09:21 839修改hive-site.xml <property> ... -
HIVE 窗口及分析函数
2014-08-11 16:21 1151HIVE 窗口及分析函数 使 ... -
hive 内置函数
2014-08-11 09:06 30251.sort_array(): sort_array(arra ... -
hive lateral view
2014-08-09 14:59 1987通过Lateral view可以方便的将UDTF得到的行转列的 ... -
hive数据的导出
2014-07-28 21:53 417在本博客的《Hive几种数据导入方式》文章中,谈到了Hive中 ... -
hive udaf
2014-07-25 16:11 714package com.lwz.udaf; import o ... -
HiveServer2连接ZooKeeper出现Too many connections问题的解决
2014-07-24 08:49 1690HiveServer2连接ZooKeeper出现Too man ... -
hive 常用命令
2014-07-17 22:22 6381.hive通过外部设置参数传入脚本中: hiv ... -
CouderaHadoop中hive的Hook扩展
2014-07-16 21:18 3259最近在做关于CDH4.3.0的hive封装,其中遇到了很多问题 ... -
利用SemanticAnalyzerHook回过滤不加分区条件的Hive查询
2014-07-16 16:43 1420我们Hadoop集群中将近百分之80的作业是通过Hive来提交 ... -
hive 的常用命令
2014-07-16 10:07 0设置、查看hive当前的角色: set sys ... -
hive 授权
2014-07-15 10:51 897Hive授权(Security配置) 博客分类: Hive分 ...
相关推荐
Spark不能使用hive自定义函数
hive数仓、hive SQL 、 hive自定义函数 、hive参数深入浅出
简单介绍了hive自定义函数的编写步骤以及使用。
hive自定义函数demo
udf函数,用户自定义函数,可以直接在sql语句中计算的函数 优点: 允许实现模块化的程序设计、方便修改代码、增加函数 UDF的执行速度很快,通过缓存计划在语句重复执行时降低代码的编译开销,比存储方法的执行效率...
hive的udf函数实现
Hive自定义函数 一. UDF(user defined function) 背景 系统内置函数无法解决所有的实际业务问题,需要开发者自己编写函数实现自身的业务实现诉求。 应用场景非常多,面临的业务不同导致个性化实现很多,故udf...
hive inputformat实例代码,按照空格对日志文件进行拆分
地址转换成经纬度+两地址间距离计算+省市区位置解析(Java代码) Hive自定义函数的封装
hive-udfhive自定义函数主要实现hive3种自定义函数1,udf函数,主要用于处理一对一数据处理2,udtf函数,主要用于处理一对多数据处理2,udaf函数,主要用与处理多对一数据聚合处理
udf开发–做个简单脱敏udf保留前5位,后面全部替换成*****
hive-anttasks.jar hive-cli.jar hive-common.jar hive-contrib.jar hive-hbaseec.jar hive-hbase-handler.jar hive-hwi.jar hive-jdbc.jar hive-metastorejar hive-serde.jar hive-service.jar hive-shims.jar ...
NULL 博文链接:https://chengjianxiaoxue.iteye.com/blog/2235666
Apache Hive 的 InputFormat,在查询 SequenceFiles 时将返回 (Text) 键和 (Text) 值。 我需要在不拆分内容的情况下完整解析大量文本文件。 HDFS 在处理大型连续文件时提供最佳吞吐量,因此我使用 Apache Mahout 将...
hive自定义安全认证使用
详细介绍如何开发hive自定义永久函数,配套有测试数据
自定义 hive udf udaf 有url解析,获取网站主域名,根据ip获取区域码,有rownum,列聚合以及一些业务实现udf。
Ambari下Hive3.0升级到Hive4.0,验证自测;
05.hive中如何自定义函数--json解析函数示例.mp4
使用hive3.1.2和spark3.0.0配置hive on spark的时候,发现官方下载的hive3.1.2和spark3.0.0不兼容,hive3.1.2对应的版本是spark2.3.0,而spark3.0.0对应的hadoop版本是hadoop2.6或hadoop2.7。 所以,如果想要使用高...