- 浏览: 2146948 次
- 性别:
- 来自: 北京
文章分类
- 全部博客 (682)
- 软件思想 (7)
- Lucene(修真篇) (17)
- Lucene(仙界篇) (20)
- Lucene(神界篇) (11)
- Solr (48)
- Hadoop (77)
- Spark (38)
- Hbase (26)
- Hive (19)
- Pig (25)
- ELK (64)
- Zookeeper (12)
- JAVA (119)
- Linux (59)
- 多线程 (8)
- Nutch (5)
- JAVA EE (21)
- Oracle (7)
- Python (32)
- Xml (5)
- Gson (1)
- Cygwin (1)
- JavaScript (4)
- MySQL (9)
- Lucene/Solr(转) (5)
- 缓存 (2)
- Github/Git (1)
- 开源爬虫 (1)
- Hadoop运维 (7)
- shell命令 (9)
- 生活感悟 (42)
- shell编程 (23)
- Scala (11)
- MongoDB (3)
- docker (2)
- Nodejs (3)
- Neo4j (5)
- storm (3)
- opencv (1)
最新评论
-
qindongliang1922:
粟谷_sugu 写道不太理解“分词字段存储docvalue是没 ...
浅谈Lucene中的DocValues -
粟谷_sugu:
不太理解“分词字段存储docvalue是没有意义的”,这句话, ...
浅谈Lucene中的DocValues -
yin_bp:
高性能elasticsearch ORM开发库使用文档http ...
为什么说Elasticsearch搜索是近实时的? -
hackWang:
请问博主,有用solr做电商的搜索项目?
Solr中Group和Facet的用法 -
章司nana:
遇到的问题同楼上 为什么会返回null
Lucene4.3开发之第八步之渡劫初期(八)
上篇介绍了如何把Pig的结果存储到Solr中,那么可能就会有朋友问了,为什么不存到数据库呢? 不支持还是? 其实只要我们愿意,我们可以存储它的结果集到任何地方,只需要重写我们自己的StoreFunc类即可。
关于如何将Pig分析完的结果存储到数据库,在pig的piggy贡献组织里,已经有了对应的UDF了,piggybank是非apache官方提供的工具函数,里面的大部分的UDF都是,其他公司或着个人在后来使用时贡献的,这些工具类,虽然没有正式划入pig的源码包里,但是pig每次发行的时候,都会以扩展库的形式附带,编译后会放在pig根目录下一个叫contrib的目录下,
piggybank的地址是
https://cwiki.apache.org/confluence/display/PIG/PiggyBank
,感兴趣的朋友们,可以看一看。
将pig分析完的结果存入到数据库,也是非常简单的,需要的条件有:
(1)piggybank.jar的jar包
(2)依赖数据库的对应的驱动jar
有一点需要注意下,在将结果存储到数据库之前,一定要确保有访问和写入数据库的权限,否则任务就会失败!
散仙在存储到远程的MySQL上,就是由于权限的问题,而写入失败了,具体的异常是这样描述的:
当出现上面异常的时候,就意味着权限写入有问题,我们使用以下的授权方法,来给目标机赋予权限:
(1)允许所有的机器ip访问
GRANT ALL PRIVILEGES ON *.* TO 'myuser'@'%' IDENTIFIED BY 'mypassword' WITH GRANT OPTION;
(2)允许指定的机器ip访问:
1. GRANT ALL PRIVILEGES ON *.* TO 'myuser'@'192.168.1.3' IDENTIFIED BY 'mypassword' WITH GRANT OPTION;
确定有权限之后,我们就可以造一份数据,测试是否可以将HDFS上的数据存储到数据库中,测试数据如下:
提前在对应的MySQL上,建库建表建字段,看下散仙测试表的结构:
最后,在来看下我们的pig脚本是如何定义和使用的:
执行成功后,我们再去查看数据库发现已经将pig处理后的数据正确的写入到了数据库中:
最后,附上DBStore类的源码:
欢迎扫码关注微信公众号:我是攻城师(woshigcs)
本公众号的内容是有关搜索和大数据技术和互联网等方面内容的分享,也是一个温馨的技术互动交流的小家园,有什么问题随时都可以留言,欢迎大家来访!
关于如何将Pig分析完的结果存储到数据库,在pig的piggy贡献组织里,已经有了对应的UDF了,piggybank是非apache官方提供的工具函数,里面的大部分的UDF都是,其他公司或着个人在后来使用时贡献的,这些工具类,虽然没有正式划入pig的源码包里,但是pig每次发行的时候,都会以扩展库的形式附带,编译后会放在pig根目录下一个叫contrib的目录下,
piggybank的地址是
https://cwiki.apache.org/confluence/display/PIG/PiggyBank
,感兴趣的朋友们,可以看一看。
将pig分析完的结果存入到数据库,也是非常简单的,需要的条件有:
(1)piggybank.jar的jar包
(2)依赖数据库的对应的驱动jar
有一点需要注意下,在将结果存储到数据库之前,一定要确保有访问和写入数据库的权限,否则任务就会失败!
散仙在存储到远程的MySQL上,就是由于权限的问题,而写入失败了,具体的异常是这样描述的:
Access denied for user 'root'@'localhost'
当出现上面异常的时候,就意味着权限写入有问题,我们使用以下的授权方法,来给目标机赋予权限:
(1)允许所有的机器ip访问
GRANT ALL PRIVILEGES ON *.* TO 'myuser'@'%' IDENTIFIED BY 'mypassword' WITH GRANT OPTION;
(2)允许指定的机器ip访问:
1. GRANT ALL PRIVILEGES ON *.* TO 'myuser'@'192.168.1.3' IDENTIFIED BY 'mypassword' WITH GRANT OPTION;
确定有权限之后,我们就可以造一份数据,测试是否可以将HDFS上的数据存储到数据库中,测试数据如下:
1,2,3 1,2,4 2,2,4 3,4,2 8,2,4
提前在对应的MySQL上,建库建表建字段,看下散仙测试表的结构:
最后,在来看下我们的pig脚本是如何定义和使用的:
--注册数据库驱动包和piggybank的jar register ./dependfiles/mysql-connector-java-5.1.23-bin.jar; register ./dependfiles/piggybank.jar --为了能使schemal和数据库对应起来,建议在这个地方给数据加上列名 a = load '/tmp/dongliang/g.txt' using PigStorage(',') as (id:int,name:chararray,count:int) ; --过滤出id大于2的数据 a = filter a by id > 2; --存储结果到数据库里 STORE a INTO '/tmp/dbtest' using org.apache.pig.piggybank.storage.DBStorage('com.mysql.jdbc.Driver', 'jdbc:mysql://192.168.146.63/user', 'root', 'pwd', 'INSERT into pig(id,name,count) values (?,?,?)'); ~
执行成功后,我们再去查看数据库发现已经将pig处理后的数据正确的写入到了数据库中:
最后,附上DBStore类的源码:
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.pig.piggybank.storage; import org.joda.time.DateTime; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.pig.StoreFunc; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.data.DataByteArray; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; import java.io.IOException; import java.sql.*; public class DBStorage extends StoreFunc { private final Log log = LogFactory.getLog(getClass()); private PreparedStatement ps; private Connection con; private String jdbcURL; private String user; private String pass; private int batchSize; private int count = 0; private String insertQuery; public DBStorage(String driver, String jdbcURL, String insertQuery) { this(driver, jdbcURL, null, null, insertQuery, "100"); } public DBStorage(String driver, String jdbcURL, String user, String pass, String insertQuery) throws SQLException { this(driver, jdbcURL, user, pass, insertQuery, "100"); } public DBStorage(String driver, String jdbcURL, String user, String pass, String insertQuery, String batchSize) throws RuntimeException { log.debug("DBStorage(" + driver + "," + jdbcURL + "," + user + ",XXXX," + insertQuery + ")"); try { Class.forName(driver); } catch (ClassNotFoundException e) { log.error("can't load DB driver:" + driver, e); throw new RuntimeException("Can't load DB Driver", e); } this.jdbcURL = jdbcURL; this.user = user; this.pass = pass; this.insertQuery = insertQuery; this.batchSize = Integer.parseInt(batchSize); } /** * Write the tuple to Database directly here. */ public void putNext(Tuple tuple) throws IOException { int sqlPos = 1; try { int size = tuple.size(); for (int i = 0; i < size; i++) { try { Object field = tuple.get(i); switch (DataType.findType(field)) { case DataType.NULL: ps.setNull(sqlPos, java.sql.Types.VARCHAR); sqlPos++; break; case DataType.BOOLEAN: ps.setBoolean(sqlPos, (Boolean) field); sqlPos++; break; case DataType.INTEGER: ps.setInt(sqlPos, (Integer) field); sqlPos++; break; case DataType.LONG: ps.setLong(sqlPos, (Long) field); sqlPos++; break; case DataType.FLOAT: ps.setFloat(sqlPos, (Float) field); sqlPos++; break; case DataType.DOUBLE: ps.setDouble(sqlPos, (Double) field); sqlPos++; break; case DataType.DATETIME: ps.setDate(sqlPos, new Date(((DateTime) field).getMillis())); sqlPos++; break; case DataType.BYTEARRAY: byte[] b = ((DataByteArray) field).get(); ps.setBytes(sqlPos, b); sqlPos++; break; case DataType.CHARARRAY: ps.setString(sqlPos, (String) field); sqlPos++; break; case DataType.BYTE: ps.setByte(sqlPos, (Byte) field); sqlPos++; break; case DataType.MAP: case DataType.TUPLE: case DataType.BAG: throw new RuntimeException("Cannot store a non-flat tuple " + "using DbStorage"); default: throw new RuntimeException("Unknown datatype " + DataType.findType(field)); } } catch (ExecException ee) { throw new RuntimeException(ee); } } ps.addBatch(); count++; if (count > batchSize) { count = 0; ps.executeBatch(); ps.clearBatch(); ps.clearParameters(); } } catch (SQLException e) { try { log .error("Unable to insert record:" + tuple.toDelimitedString("\t"), e); } catch (ExecException ee) { // do nothing } if (e.getErrorCode() == 1366) { // errors that come due to utf-8 character encoding // ignore these kind of errors TODO: Temporary fix - need to find a // better way of handling them in the argument statement itself } else { throw new RuntimeException("JDBC error", e); } } } class MyDBOutputFormat extends OutputFormat<NullWritable, NullWritable> { @Override public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException { // IGNORE } @Override public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException { return new OutputCommitter() { @Override public void abortTask(TaskAttemptContext context) throws IOException { try { if (ps != null) { ps.close(); } if (con != null) { con.rollback(); con.close(); } } catch (SQLException sqe) { throw new IOException(sqe); } } @Override public void commitTask(TaskAttemptContext context) throws IOException { if (ps != null) { try { ps.executeBatch(); con.commit(); ps.close(); con.close(); ps = null; con = null; } catch (SQLException e) { log.error("ps.close", e); throw new IOException("JDBC Error", e); } } } @Override public boolean needsTaskCommit(TaskAttemptContext context) throws IOException { return true; } @Override public void cleanupJob(JobContext context) throws IOException { // IGNORE } @Override public void setupJob(JobContext context) throws IOException { // IGNORE } @Override public void setupTask(TaskAttemptContext context) throws IOException { // IGNORE } }; } @Override public RecordWriter<NullWritable, NullWritable> getRecordWriter( TaskAttemptContext context) throws IOException, InterruptedException { // We don't use a record writer to write to database return new RecordWriter<NullWritable, NullWritable>() { @Override public void close(TaskAttemptContext context) { // Noop } @Override public void write(NullWritable k, NullWritable v) { // Noop } }; } } @SuppressWarnings("unchecked") @Override public OutputFormat getOutputFormat() throws IOException { return new MyDBOutputFormat(); } /** * Initialise the database connection and prepared statement here. */ @SuppressWarnings("unchecked") @Override public void prepareToWrite(RecordWriter writer) throws IOException { ps = null; con = null; if (insertQuery == null) { throw new IOException("SQL Insert command not specified"); } try { if (user == null || pass == null) { con = DriverManager.getConnection(jdbcURL); } else { con = DriverManager.getConnection(jdbcURL, user, pass); } con.setAutoCommit(false); ps = con.prepareStatement(insertQuery); } catch (SQLException e) { log.error("Unable to connect to JDBC @" + jdbcURL); throw new IOException("JDBC Error", e); } count = 0; } @Override public void setStoreLocation(String location, Job job) throws IOException { // IGNORE since we are writing records to DB. } }
欢迎扫码关注微信公众号:我是攻城师(woshigcs)
本公众号的内容是有关搜索和大数据技术和互联网等方面内容的分享,也是一个温馨的技术互动交流的小家园,有什么问题随时都可以留言,欢迎大家来访!
发表评论
-
centos7安装mysql
2016-11-14 16:41 1200centos7的默认yum源已经 ... -
MySQL的InsertOrUpdate语法
2016-08-04 14:47 1646MySQL的插入语法提供了 ... -
Apache Tez0.7编译笔记
2016-01-15 16:33 2427目前最新的Tez版本是0.8,但还不是稳定版,所以大家还 ... -
Bug死磕之hue集成的oozie+pig出现资源任务死锁问题
2016-01-14 15:52 3754这两天,打算给现有的 ... -
unbutu+mysql的root密码重置方法
2016-01-11 14:16 1221MySQL密码重置策略: 1,停止mysql服务 s ... -
Apache Pig中如何使用Replace函数
2015-11-17 18:48 1479今天分享一个小案例, ... -
Apache Pig的UDF返回值问题
2015-11-11 16:34 1466今天写了关于Pig的EvalFunc UDF函数,结果一执行 ... -
使用shell分页读取600万+的MySQL数据脚本
2015-07-15 13:02 2672shell-mysql (1)脚本背景: 由于要在Linux ... -
Pig0.15集成Tez,让猪飞起来
2015-06-29 19:45 17851,Tez是什么? Tez是Hortonworks公司开源 ... -
CDH-Hadoop2.6+ Apache Pig0.15安装记录
2015-06-26 20:06 26891,使用CDH的hadoop里面有对应的组件Pig,但版本较低 ... -
Pig配置vim高亮
2015-05-01 17:14 1567(1) 下载文末上传的压缩包,上到对应的linux机器上,并 ... -
Hadoop2.2如何集成Apache Pig0.12.1?
2015-05-01 16:48 908散仙假设你的Hadoop环境已经安装完毕 (1)到ht ... -
Apache Pig和Solr问题笔记(一)
2015-04-02 13:35 1999记录下最近两天散仙在工作中遇到的有关Pig0.12.0和Sol ... -
Pig使用问题总结
2015-03-29 18:39 10181,如果是a::tags#'pic'作为参数,传递给另一个函 ... -
玩转大数据系列之Apache Pig高级技能之函数编程(六)
2015-03-18 21:57 2099原创不易,转载请务必注明,原创地址,谢谢配合! http:/ ... -
Apache Pig字符串截取实战小例子
2015-03-13 17:23 2231记录一个Pig字符串截取的实战小例子: 需求如下,从下面的字 ... -
玩转大数据系列之Apache Pig如何通过自定义UDF查询数据库(五)
2015-03-12 21:06 1871GMV(一定时间内的成交 ... -
玩转大数据系列之如何给Apache Pig自定义存储形式(四)
2015-03-07 20:35 1121Pig里面内置大量的工具函数,也开放了大量的接口,来给我们开发 ... -
玩转大数据系列之Apache Pig如何与Apache Solr集成(二)
2015-03-06 21:52 1470散仙,在上篇文章中介绍了,如何使用Apache Pig与Luc ... -
玩转大数据系列之Apache Pig如何与Apache Lucene集成(一)
2015-03-05 21:54 2848在文章开始之前,我们 ...
相关推荐
大数据+Apache Doris资料包+示例代码大数据+Apache Doris资料包+示例代码大数据+Apache Doris资料包+示例代码大数据+Apache Doris资料包+示例代码大数据+Apache Doris资料包+示例代码大数据+Apache Doris资料包+示例...
3-3.新一代大数据调度+-Apache+DolphinScheduler架构演进+&+Roadmap.pdf
大数据--Apache Spark 入门知识
Learn to use Apache Pig to develop lightweight big data applications easily and quickly. This book shows you many optimization techniques and covers every context where Pig is used in big data ...
php+apache+mysql集成安装程序
大数据--Apache Spark编程详解
Beginning Apache Pig: Big Data Processing Made Easy English | 29 Dec. 2016 | ISBN: 1484223365 | 300 Pages | PDF | 4.9 MB Learn to use Apache Pig to develop lightweight big data applications easily ...
apache pig 基础及应用,urldecode row_number web日志分析 根据 用户行为 做出 简易的 相似度 判断。
大数据之路 Apache Pulsar 调研(全面).docx
大数据--Apache Spark实用详解
福建师范大学精品大数据导论课程系列 (6.7.1)--5.2 ApacheSpark之三.pdf 福建师范大学精品大数据导论课程系列 (7.1.1)--6.1 《数据描述性分析》课件PPT.pdf 福建师范大学精品大数据导论课程系列 (7.2.1)--6.2 《回归...
大数据-- Apache Spark Semi-Structured data
与Apache一起使用MySQL与Apache一起使用MySQL与Apache一起使用MySQL
Windos Apache Mysql PHP集成安装环境,简单易学
wamp window apache mysql php集成包,wampserver2.2d-x64这个版本支持window 10
Apache+PHP+MySQL集成包+V1.5.6+绿色
TutorialsPoint Apache Pig 介绍.epub
apache日志hadoop大数据 hive与hbase是如何整合使用的
福建师范大学精品大数据导论课程系列 (6.7.1)--5.2 ApacheSpark之三.pdf 福建师范大学精品大数据导论课程系列 (7.1.1)--6.1 《数据描述性分析》课件PPT.pdf 福建师范大学精品大数据导论课程系列 (7.2.1)--6.2 《回归...
xampp-win32-5.6.21-0-VC11(apache,tomcat,php,mysql统一集成) 一个整合apache,tomcat,php,mysql的软件,维护起服务器很方便