- 浏览: 2147065 次
- 性别:
- 来自: 北京
文章分类
- 全部博客 (682)
- 软件思想 (7)
- Lucene(修真篇) (17)
- Lucene(仙界篇) (20)
- Lucene(神界篇) (11)
- Solr (48)
- Hadoop (77)
- Spark (38)
- Hbase (26)
- Hive (19)
- Pig (25)
- ELK (64)
- Zookeeper (12)
- JAVA (119)
- Linux (59)
- 多线程 (8)
- Nutch (5)
- JAVA EE (21)
- Oracle (7)
- Python (32)
- Xml (5)
- Gson (1)
- Cygwin (1)
- JavaScript (4)
- MySQL (9)
- Lucene/Solr(转) (5)
- 缓存 (2)
- Github/Git (1)
- 开源爬虫 (1)
- Hadoop运维 (7)
- shell命令 (9)
- 生活感悟 (42)
- shell编程 (23)
- Scala (11)
- MongoDB (3)
- docker (2)
- Nodejs (3)
- Neo4j (5)
- storm (3)
- opencv (1)
最新评论
-
qindongliang1922:
粟谷_sugu 写道不太理解“分词字段存储docvalue是没 ...
浅谈Lucene中的DocValues -
粟谷_sugu:
不太理解“分词字段存储docvalue是没有意义的”,这句话, ...
浅谈Lucene中的DocValues -
yin_bp:
高性能elasticsearch ORM开发库使用文档http ...
为什么说Elasticsearch搜索是近实时的? -
hackWang:
请问博主,有用solr做电商的搜索项目?
Solr中Group和Facet的用法 -
章司nana:
遇到的问题同楼上 为什么会返回null
Lucene4.3开发之第八步之渡劫初期(八)
Pig里面内置大量的工具函数,也开放了大量的接口,来给我们开发者使用,通过UDF,我们可以非常方便的完成某些Pig不直接支持或没有的的功能,比如散仙前面几篇文章写的将pig分析完的结果,存储到各种各样的介质里面,而不仅仅局限于HDFS,当然,我们也可以在都存。
那么如何实现自己的存储UDF呢? 提到这里,我们不得不说下pig里面的load和store函数,load函数是从某个数据源,加载数据,一般都是从HDFS上加载,而store函数则是将分析完的结果,存储到HDFS用的,所以,我们只需继承重写store的功能函数StoreFunc即可完成我们的大部分需求,懂的了这个,我们就可以将结果任意存储了,可以存到数据库,也可以存到索引文件,也可以存入本地txt,excel等等
下面先看下StoreFunc的源码:
这里面有许多方法,但并不都需要我们重新定义的,一般来说,我们只需要重写如下的几个抽象方法即可:
(1)getOutputFormat方法,与Hadoop的OutFormat对应,在最终的输出时,会根据不同的format方法,生成不同的形式。
(2)setStoreLocation方法,这个方法定义了生成文件的路径,如果不是存入HDFS上,则可以忽略。
(3)prepareToWrite 在写入数据之前做一些初始化工作
(4)putNext从Pig里面传递过来最终需要存储的数据
在1的步骤我们知道,需要提供一个outputFormat的类,这时就需要我们继承hadoop里面的某个outputformat基类,然后重写getRecordWriter方法,接下来我们还可能要继承RecordWriter类,来定义我们自己的输出格式,可能是一行txt数据,也有可能是一个对象,或一个索引集合等等,如下面支持lucene索引的outputformat
最后总结一下,自定义输入格式的步骤:
(1)继承StoreFunc函数,重写其方法
(2)继承一个outputformat基类,重写自己的outputformat类
(2)继承一个RecodeWriter,重写自己的writer方法
当然这并不都是必须的,比如在向数据库存储的时候,我们就可以直接在putNext的时候,获取,保存为集合,然后在OutputCommitter提交成功之后,commit我们的数据,如果保存失败,我们也可以在abort方法里回滚我们的数据。
这样以来,无论我们存储哪里,都可以通过以上步骤实现,非常灵活
欢迎扫码关注微信公众号:我是攻城师(woshigcs)
本公众号的内容是有关搜索和大数据技术和互联网等方面内容的分享,也是一个温馨的技术互动交流的小家园,有什么问题随时都可以留言,欢迎大家来访!
那么如何实现自己的存储UDF呢? 提到这里,我们不得不说下pig里面的load和store函数,load函数是从某个数据源,加载数据,一般都是从HDFS上加载,而store函数则是将分析完的结果,存储到HDFS用的,所以,我们只需继承重写store的功能函数StoreFunc即可完成我们的大部分需求,懂的了这个,我们就可以将结果任意存储了,可以存到数据库,也可以存到索引文件,也可以存入本地txt,excel等等
下面先看下StoreFunc的源码:
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.pig; import java.io.IOException; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.pig.classification.InterfaceAudience; import org.apache.pig.classification.InterfaceStability; import org.apache.pig.data.Tuple; import org.apache.pig.impl.util.UDFContext; import org.apache.pig.tools.pigstats.PigStatusReporter; /** * StoreFuncs take records from Pig's processing and store them into a data store. Most frequently * this is an HDFS file, but it could also be an HBase instance, RDBMS, etc. */ @InterfaceAudience.Public @InterfaceStability.Stable public abstract class StoreFunc implements StoreFuncInterface { /** * This method is called by the Pig runtime in the front end to convert the * output location to an absolute path if the location is relative. The * StoreFunc implementation is free to choose how it converts a relative * location to an absolute location since this may depend on what the location * string represent (hdfs path or some other data source). * * * @param location location as provided in the "store" statement of the script * @param curDir the current working direction based on any "cd" statements * in the script before the "store" statement. If there are no "cd" statements * in the script, this would be the home directory - * <pre>/user/<username> </pre> * @return the absolute location based on the arguments passed * @throws IOException if the conversion is not possible */ @Override public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException { return LoadFunc.getAbsolutePath(location, curDir); } /** * Return the OutputFormat associated with StoreFunc. This will be called * on the front end during planning and on the backend during * execution. * @return the {@link OutputFormat} associated with StoreFunc * @throws IOException if an exception occurs while constructing the * OutputFormat * */ public abstract OutputFormat getOutputFormat() throws IOException; /** * Communicate to the storer the location where the data needs to be stored. * The location string passed to the {@link StoreFunc} here is the * return value of {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)} * This method will be called in the frontend and backend multiple times. Implementations * should bear in mind that this method is called multiple times and should * ensure there are no inconsistent side effects due to the multiple calls. * {@link #checkSchema(ResourceSchema)} will be called before any call to * {@link #setStoreLocation(String, Job)}. * * @param location Location returned by * {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)} * @param job The {@link Job} object * @throws IOException if the location is not valid. */ public abstract void setStoreLocation(String location, Job job) throws IOException; /** * Set the schema for data to be stored. This will be called on the * front end during planning if the store is associated with a schema. * A Store function should implement this function to * check that a given schema is acceptable to it. For example, it * can check that the correct partition keys are included; * a storage function to be written directly to an OutputFormat can * make sure the schema will translate in a well defined way. Default implementation * is a no-op. * @param s to be checked * @throws IOException if this schema is not acceptable. It should include * a detailed error message indicating what is wrong with the schema. */ @Override public void checkSchema(ResourceSchema s) throws IOException { // default implementation is a no-op } /** * Initialize StoreFunc to write data. This will be called during * execution on the backend before the call to putNext. * @param writer RecordWriter to use. * @throws IOException if an exception occurs during initialization */ public abstract void prepareToWrite(RecordWriter writer) throws IOException; /** * Write a tuple to the data store. * * @param t the tuple to store. * @throws IOException if an exception occurs during the write */ public abstract void putNext(Tuple t) throws IOException; /** * This method will be called by Pig both in the front end and back end to * pass a unique signature to the {@link StoreFunc} which it can use to store * information in the {@link UDFContext} which it needs to store between * various method invocations in the front end and back end. This method * will be called before other methods in {@link StoreFunc}. This is necessary * because in a Pig Latin script with multiple stores, the different * instances of store functions need to be able to find their (and only their) * data in the UDFContext object. The default implementation is a no-op. * @param signature a unique signature to identify this StoreFunc */ @Override public void setStoreFuncUDFContextSignature(String signature) { // default implementation is a no-op } /** * This method will be called by Pig if the job which contains this store * fails. Implementations can clean up output locations in this method to * ensure that no incorrect/incomplete results are left in the output location. * The default implementation deletes the output location if it * is a {@link FileSystem} location. * @param location Location returned by * {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)} * @param job The {@link Job} object - this should be used only to obtain * cluster properties through {@link Job#getConfiguration()} and not to set/query * any runtime job information. */ @Override public void cleanupOnFailure(String location, Job job) throws IOException { cleanupOnFailureImpl(location, job); } /** * This method will be called by Pig if the job which contains this store * is successful, and some cleanup of intermediate resources is required. * Implementations can clean up output locations in this method to * ensure that no incorrect/incomplete results are left in the output location. * @param location Location returned by * {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)} * @param job The {@link Job} object - this should be used only to obtain * cluster properties through {@link Job#getConfiguration()} and not to set/query * any runtime job information. */ @Override public void cleanupOnSuccess(String location, Job job) throws IOException { // DEFAULT: DO NOTHING, user-defined overrides can // call cleanupOnFailureImpl(location, job) or ...? } /** * Default implementation for {@link #cleanupOnFailure(String, Job)} * and {@link #cleanupOnSuccess(String, Job)}. This removes a file * from HDFS. * @param location file name (or URI) of file to remove * @param job Hadoop job, used to access the appropriate file system. * @throws IOException */ public static void cleanupOnFailureImpl(String location, Job job) throws IOException { Path path = new Path(location); FileSystem fs = path.getFileSystem(job.getConfiguration()); if(fs.exists(path)){ fs.delete(path, true); } } /** * Issue a warning. Warning messages are aggregated and reported to * the user. * @param msg String message of the warning * @param warningEnum type of warning */ public final void warn(String msg, Enum warningEnum) { Counter counter = PigStatusReporter.getInstance().getCounter(warningEnum); counter.increment(1); } }
这里面有许多方法,但并不都需要我们重新定义的,一般来说,我们只需要重写如下的几个抽象方法即可:
(1)getOutputFormat方法,与Hadoop的OutFormat对应,在最终的输出时,会根据不同的format方法,生成不同的形式。
(2)setStoreLocation方法,这个方法定义了生成文件的路径,如果不是存入HDFS上,则可以忽略。
(3)prepareToWrite 在写入数据之前做一些初始化工作
(4)putNext从Pig里面传递过来最终需要存储的数据
在1的步骤我们知道,需要提供一个outputFormat的类,这时就需要我们继承hadoop里面的某个outputformat基类,然后重写getRecordWriter方法,接下来我们还可能要继承RecordWriter类,来定义我们自己的输出格式,可能是一行txt数据,也有可能是一个对象,或一个索引集合等等,如下面支持lucene索引的outputformat
package com.pig.support.lucene; import java.io.File; import java.io.IOException; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.lucene.analysis.standard.StandardAnalyzer; import org.apache.lucene.document.Document; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.LogByteSizeMergePolicy; import org.apache.lucene.index.SerialMergeScheduler; import org.apache.lucene.store.FSDirectory; import org.apache.lucene.util.Version; /** * 继承FileOutputFormat,重写支持Lucene格式的outputFormat策略 * */ public class LuceneOutputFormat extends FileOutputFormat<Writable, Document> { String location; FileSystem fs; String taskid; FileOutputCommitter committer; AtomicInteger counter = new AtomicInteger(); public LuceneOutputFormat(String location) { this.location = location; } @Override public RecordWriter<Writable, Document> getRecordWriter( TaskAttemptContext ctx) throws IOException, InterruptedException { Configuration conf = ctx.getConfiguration(); fs = FileSystem.get(conf); File baseDir = new File(System.getProperty("java.io.tmpdir")); String baseName = System.currentTimeMillis() + "-"; File tempDir = new File(baseDir, baseName + counter.getAndIncrement()); tempDir.mkdirs(); tempDir.deleteOnExit(); return new LuceneRecordWriter( (FileOutputCommitter) getOutputCommitter(ctx), tempDir); } /** * Write out the LuceneIndex to a local temporary location.<br/> * On commit/close the index is copied to the hdfs output directory.<br/> * */ static class LuceneRecordWriter extends RecordWriter<Writable, Document> { final IndexWriter writer; final FileOutputCommitter committer; final File tmpdir; public LuceneRecordWriter(FileOutputCommitter committer, File tmpdir) { try { this.committer = committer; this.tmpdir = tmpdir; IndexWriterConfig config = new IndexWriterConfig(Version.LUCENE_4_10_2, new StandardAnalyzer()); LogByteSizeMergePolicy mergePolicy = new LogByteSizeMergePolicy(); mergePolicy.setMergeFactor(10); //mergePolicy.setUseCompoundFile(false); config.setMergePolicy(mergePolicy); config.setMergeScheduler(new SerialMergeScheduler()); writer = new IndexWriter(FSDirectory.open(tmpdir), config); } catch (IOException e) { RuntimeException exc = new RuntimeException(e.toString(), e); exc.setStackTrace(e.getStackTrace()); throw exc; } } @Override public void close(final TaskAttemptContext ctx) throws IOException, InterruptedException { //use a thread for status polling final Thread th = new Thread() { public void run() { ctx.progress(); try { Thread.sleep(500); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return; } } }; th.start(); try { writer.forceMerge(1); writer.close(); // move all files to part Configuration conf = ctx.getConfiguration(); Path work = committer.getWorkPath(); Path output = new Path(work, "index-" + ctx.getTaskAttemptID().getTaskID().getId()); FileSystem fs = FileSystem.get(conf); FileUtil.copy(tmpdir, fs, output, true, conf); } finally { th.interrupt(); } } @Override public void write(Writable key, Document doc) throws IOException, InterruptedException { writer.addDocument(doc); } } }
最后总结一下,自定义输入格式的步骤:
(1)继承StoreFunc函数,重写其方法
(2)继承一个outputformat基类,重写自己的outputformat类
(2)继承一个RecodeWriter,重写自己的writer方法
当然这并不都是必须的,比如在向数据库存储的时候,我们就可以直接在putNext的时候,获取,保存为集合,然后在OutputCommitter提交成功之后,commit我们的数据,如果保存失败,我们也可以在abort方法里回滚我们的数据。
这样以来,无论我们存储哪里,都可以通过以上步骤实现,非常灵活
欢迎扫码关注微信公众号:我是攻城师(woshigcs)
本公众号的内容是有关搜索和大数据技术和互联网等方面内容的分享,也是一个温馨的技术互动交流的小家园,有什么问题随时都可以留言,欢迎大家来访!
发表评论
-
Apache Tez0.7编译笔记
2016-01-15 16:33 2429目前最新的Tez版本是0.8,但还不是稳定版,所以大家还 ... -
Bug死磕之hue集成的oozie+pig出现资源任务死锁问题
2016-01-14 15:52 3754这两天,打算给现有的 ... -
Apache Pig中如何使用Replace函数
2015-11-17 18:48 1479今天分享一个小案例, ... -
Apache Pig的UDF返回值问题
2015-11-11 16:34 1466今天写了关于Pig的EvalFunc UDF函数,结果一执行 ... -
Pig0.15集成Tez,让猪飞起来
2015-06-29 19:45 17861,Tez是什么? Tez是Hortonworks公司开源 ... -
CDH-Hadoop2.6+ Apache Pig0.15安装记录
2015-06-26 20:06 26891,使用CDH的hadoop里面有对应的组件Pig,但版本较低 ... -
Pig配置vim高亮
2015-05-01 17:14 1567(1) 下载文末上传的压缩包,上到对应的linux机器上,并 ... -
Hadoop2.2如何集成Apache Pig0.12.1?
2015-05-01 16:48 908散仙假设你的Hadoop环境已经安装完毕 (1)到ht ... -
Apache Pig和Solr问题笔记(一)
2015-04-02 13:35 2000记录下最近两天散仙在工作中遇到的有关Pig0.12.0和Sol ... -
Pig使用问题总结
2015-03-29 18:39 10181,如果是a::tags#'pic'作为参数,传递给另一个函 ... -
玩转大数据系列之Apache Pig高级技能之函数编程(六)
2015-03-18 21:57 2099原创不易,转载请务必注明,原创地址,谢谢配合! http:/ ... -
Apache Pig字符串截取实战小例子
2015-03-13 17:23 2232记录一个Pig字符串截取的实战小例子: 需求如下,从下面的字 ... -
玩转大数据系列之Apache Pig如何通过自定义UDF查询数据库(五)
2015-03-12 21:06 1872GMV(一定时间内的成交 ... -
玩转大数据系列之Apache Pig如何与MySQL集成(三)
2015-03-07 19:43 2560上篇介绍了如何把Pig的结果存储到Solr中,那么可能就会有朋 ... -
玩转大数据系列之Apache Pig如何与Apache Solr集成(二)
2015-03-06 21:52 1471散仙,在上篇文章中介绍了,如何使用Apache Pig与Luc ... -
玩转大数据系列之Apache Pig如何与Apache Lucene集成(一)
2015-03-05 21:54 2848在文章开始之前,我们 ... -
Apache Pig学习笔记之内置函数(三)
2015-03-03 19:53 48261 简介 Pig附带了一些 ... -
Apache Pig学习笔记(二)
2015-02-13 19:23 3053主要整理了一下,pig里 ... -
你有一个好的归档习惯吗?
2015-02-11 22:01 1826备忘和扯一扯最近散仙 ... -
Apache Pig入门学习文档(一)
2015-01-20 20:28 31731,Pig的安装 (一)软件要求 (二)下载Pig ...
相关推荐
3-3.新一代大数据调度+-Apache+DolphinScheduler架构演进+&+Roadmap.pdf
Learn to use Apache Pig to develop lightweight big data applications easily and quickly. This book shows you many optimization techniques and covers every context where Pig is used in big data ...
Beginning Apache Pig: Big Data Processing Made Easy English | 29 Dec. 2016 | ISBN: 1484223365 | 300 Pages | PDF | 4.9 MB Learn to use Apache Pig to develop lightweight big data applications easily ...
apache pig 基础及应用,urldecode row_number web日志分析 根据 用户行为 做出 简易的 相似度 判断。
大数据+Apache Doris资料包+示例代码大数据+Apache Doris资料包+示例代码大数据+Apache Doris资料包+示例代码大数据+Apache Doris资料包+示例代码大数据+Apache Doris资料包+示例代码大数据+Apache Doris资料包+示例...
大数据存储及分层实践-5-4 Apache Ozone 下一代大数据存储解密
TutorialsPoint Apache Pig 介绍.epub
Hadoop 2012 大数据大会 的资料,Apache pig性能优化
apache日志hadoop大数据 hive与hbase是如何整合使用的
7-2工业大数据结合医疗领域APACHE体系的研究及应用
ApacheSpark系列技术直播第七讲大数据列式存储之ParquetORC.pdf
大数据--Apache Spark 入门知识
大数据--Apache Spark编程详解
大数据之路 Apache Pulsar 调研(全面).docx
福建师范大学精品大数据导论课程系列 (6.4.1)--5.1 一种并行编程模型--MapReduce-之四.pdf 福建师范大学精品大数据导论课程系列 (6.5.1)--5.2 ApacheSpark之一.pdf 福建师范大学精品大数据导论课程系列 (6.6.1)--5.2...
福建师范大学精品大数据导论课程系列 (6.4.1)--5.1 一种并行编程模型--MapReduce-之四.pdf 福建师范大学精品大数据导论课程系列 (6.5.1)--5.2 ApacheSpark之一.pdf 福建师范大学精品大数据导论课程系列 (6.6.1)--5.2...
福建师范大学精品大数据导论课程系列 (6.4.1)--5.1 一种并行编程模型--MapReduce-之四.pdf 福建师范大学精品大数据导论课程系列 (6.5.1)--5.2 ApacheSpark之一.pdf 福建师范大学精品大数据导论课程系列 (6.6.1)--5.2...
福建师范大学精品大数据导论课程系列 (6.4.1)--5.1 一种并行编程模型--MapReduce-之四.pdf 福建师范大学精品大数据导论课程系列 (6.5.1)--5.2 ApacheSpark之一.pdf 福建师范大学精品大数据导论课程系列 (6.6.1)--5.2...
福建师范大学精品大数据导论课程系列 (6.4.1)--5.1 一种并行编程模型--MapReduce-之四.pdf 福建师范大学精品大数据导论课程系列 (6.5.1)--5.2 ApacheSpark之一.pdf 福建师范大学精品大数据导论课程系列 (6.6.1)--5.2...
大数据--Apache Spark实用详解