`
sunasheng
  • 浏览: 119796 次
  • 性别: Icon_minigender_1
  • 来自: 北京
文章分类
社区版块
存档分类
最新评论

mapreduce处理结果向输出至mysql(直接插入/更新/追加式更新)

阅读更多

package cn.m15.ipj.job.usergroup;

Mapreduce处理结果向输出至mysql

1.写入mysql
	<1>job中输出的配置:
		DBOutputFormat.setOutput(this, MySQLConstant.MYSQL_FIX_USER,
		MySQLConstant.MYSQL_FIX_OPEN_FIRST_FIELDS);
		(DBOutputFormat为hadoop自带API,将输入插入数据库)
		public final static String
		MYSQL_FIX_USER =
		"ipj_fix_user";
		public final static String[]
		MYSQL_FIX_OPEN_FIRST_FIELDS = {"app_id","version","imei","first_open","date"};
		 
	<2>reduce中写入:
		private FixOpenAppFirstRecord record = new FixOpenAppFirstRecord();
		 
		record.setApp_id(Integer.parseInt(app_id));
		record.setImei(imei);
		record.setVersion(version);
		record.setFirst_open(exactDate);
		record.setDate(date);
		context.write(record, NULL);
		 
	<3>FixOpenAppFirstRecord中字段的顺序配置(只列出一条):
		@Override
		public void write(PreparedStatement statement) throws SQLException {
			statement.setInt(1, app_id);
			statement.setString(2, version);
			statement.setString(3, imei);
			statement.setString(4, first_open);
			statement.setString(5, date);
		}
	<4>注意:
		1.reduce中record的set值的顺序无所谓,可以任意
		2.job的mysql字段MYSQL_FIX_OPEN_FIRST_FIELDS的顺序一定要和类FixOpenAppFirstRecord中字段的配置顺序一致
2.更新mysql(改变值)
	<1>job中输出的配置:
		FixDBOutputFormat.setOutput(this, MySQLConstant.MYSQL_FIX_USER,
		MySQLConstant.MYSQL_FIX_IS_TAOBAO_FIELDS);
		(FixDBOutputFormat为自定义Format类,用于更新mysql)
		public final static String
		MYSQL_FIX_USER =
		"ipj_fix_user";
		public final static String[]
		MYSQL_FIX_IS_WEIBO_FIELDS = {"is_weibo","app_id","version","imei"};
	<2>reduce中写入:
		private FixIsMallUserRecord record = new FixIsMallUserRecord();
		record.setApp_id(Integer.parseInt(app_id));
		record.setVersion(version);
		record.setImei(imei);
		record.setIs_taobao(is_taobao);
		context.write(record, NULL);
	<3>FixIsMallUserRecord 中字段的顺序配置(只列出一条):
		@Override
		public void readFields(ResultSet resultSet) throws SQLException {
			this.is_taobao = resultSet.getInt(1);
			this.app_id = resultSet.getInt(2);
			this.version = resultSet.getString(3);
			this.imei = resultSet.getString(4);
		}
	<4>FixDBOutputFormat中关键的拼接sql代码:
		public String constructQuery(String table, String[] fieldNames) {
		if (fieldNames == null) {
			throw new IllegalArgumentException(
			"Field names may not be null");
		}
		 
		StringBuilder query = new StringBuilder();
		query.append("UPDATE ").append(table);
		if (fieldNames.length > 0 && fieldNames[0] != null
		&& fieldNames[1] != null&& fieldNames[2] != null
		&& fieldNames[3] != null) {
			query.append(" SET ");
			query.append(fieldNames[0] + " =?");
			query.append(" WHERE ");
			query.append(fieldNames[1] + " =?");
			query.append(" AND ");
			query.append(fieldNames[2] + " =?");
			query.append(" AND ");
			query.append(fieldNames[3] + " =?");
			return query.toString();
			}
			return null;
		}
	<5>注意:
		1.reduce中record的set值的顺序无所谓,可以任意
		2.job的mysql字段MYSQL_FIX_IS_WEIBO_FIELDS的顺序一定要和类
		3.2中的顺序也一定要和FixDBOutputFormat类中的更新顺序一致(第一个参数为要更新的值,第二三四个参数分别为条件参数)
3.更新mysql(值的追加)
	<1>job中输出的配置:
		FixAppendDBOutputFormat.setOutput(this, MySQLConstant.MYSQL_FIX_USER,
		MySQLConstant.MYSQL_FIX_MALL_LOGIN_FIELDS);
		(FixAppendDBOutputFormat自定义Format,用户更新mysql[追加])
		public final static String
		MYSQL_FIX_USER =
		"ipj_fix_user";
		public final static String[]
		MYSQL_FIX_MALL_LOGIN_FIELDS = {"login_taobao_count","app_id","version","imei"};
	<2>reduce中写入:
		private FixMallTotalLoginRecord record = new FixMallTotalLoginRecord();
		record.setApp_id(Integer.parseInt(app_id));
		record.setVersion(version);
		record.setImei(imei);
		record.setLogin_taobao_count(num);
		context.write(record, NULL);
	<3>FixIsMallUserRecord 中字段的顺序配置(只列出一条):
		@Override
		public void write(PreparedStatement statement) throws SQLException {
			statement.setInt(1, login_taobao_count);
			statement.setInt(2, app_id);
			statement.setString(3, version);
			statement.setString(4, imei);
		}
	<4>FixAppendDBOutputFormat中关键的拼接sql代码
		public String constructQuery(String table, String[] fieldNames) {
			if (fieldNames == null) {
				throw new IllegalArgumentException
				("Field names may not be null");
			}
			StringBuilder query = new StringBuilder();
			query.append("UPDATE ").append(table);
			if ( fieldNames.length > 0 &&
			fieldNames[0] != null &&
			fieldNames[1] != null &&
			fieldNames[2] != null &&
			fieldNames[3] != null) {
				query.append(" SET ");
				query.append(fieldNames[0] +
				" = "+fieldNames[0]+"+?");
				query.append(" WHERE ");
				query.append(fieldNames[1] + " =?");
				query.append(" AND ");
				query.append(fieldNames[2] + " =?");
				query.append(" AND ");
				query.append(fieldNames[3] + " =?");
				return query.toString();
				}
			return null;
		}
	<5>注意:
		1.reduce中record的set值的顺序无所谓,可以任意
		2.job的mysql字段MYSQL_FIX_MALL_LOGIN_FIELDS的顺序一定要和类
		3.2中的顺序也一定要和FixAppendDBOutputFormat类中的更新顺序一致(第一个参数为要更新的值[在原有基础上增加],第二三四个参数分别为条件参数)

 

分享到:
评论

相关推荐

    MapReduce多路径输入输出

    这是 MapReduce 的多路径输入输出示例代码。有关大数据的相关文章可以阅读我的专栏:《大数据之Hadoop》 http://blog.csdn.net/column/details/bumblebee-hadoop.html

    MapReduce详解Shuffle过程

    在map端,map task将输出结果存储在内存缓冲区中,当缓冲区快满的时候将缓冲区的数据以一个临时文件的方式存放到磁盘,然后对磁盘中这个map task产生的所有临时文件做合并,生成最终的正式输出文件。 在reduce端,...

    实验项目 MapReduce 编程

    1. 启 动 全 分 布 模 式 Hadoop 集 群 , 守护进程 包 括 NameNode 、 DataNode 、 SecondaryNameNode、ResourceManager、NodeManager 和 JobHistoryServer。 2. 在 Hadoop 集群主节点上搭建 MapReduce 开发环境 ...

    论文研究-迭代式MapReduce研究进展.pdf

    近几年,研究者扩展和改进原始MapReduce,已开发了若干迭代式MapReduce以更好地为大数据处理而支持迭代计算。对迭代式MapReduce编程框架进行综合评述,较详细地阐述了这些研究成果,给出了它们各自的基本思想,并...

    mapreduce mapreduce mapreduce

    mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce ...

    用mapreduce进行文本处理

    用mapreduce进行文本处理,发表在SIGIR2009

    MapReduce:超大机群上的简单数据处理

    计算利用一个输入key/value对集,来产生一个输出key/value对集.MapReduce库的用户用两个函数表达这个计算:map和reduce. 用户自定义的map函数,接受一个输入对,然后产生一个中间key/value对集.MapReduce库把所有具有...

    基于Java和mapreduce实现的贝叶斯文本分类器设计.zip

    资源包含文件:课程论文报告word和PDF两...输出每个测试文档的分类结果; 3:利用测试文档的真实类别,计算分类模型的Precision,Recall和F1值。 详细介绍参考:https://blog.csdn.net/newlw/article/details/124984567

    MapReduce输出至hbase共16页.pdf.zip

    MapReduce输出至hbase共16页.pdf.zip

    Hadoop的MapReduce中多文件输出.pdf

    Hadoop的MapReduce中多文件输出.pdf

    MapReduce海量数据处理

    为了更加有效和简洁的处理此类问题,Google 提出了 MapReduce 编程模型,它可以隐藏并行化、容错、数据分布、负载均衡等细节,把这些公共的细节抽象到一个库中,由一个运行时系统来负责。而将对数据的操作抽象为 map...

    Mapreduce实验报告.doc

    Mapreduce实验报告 前言和简介 MapReduce是Google提出的一种编程模型,在这个模型的支持下可以实现大规模并行化计 算。在Mapreduce框架下一个计算机群通过统一的任务调度将一个巨型任务分成许多部分 ,分别解决然后...

    利用采样器实现mapreduce任务输出全排序

    利用采样器实现mapreduce任务输出全排序大数据-MapReduce

    云计算中大数据的MapReduce处理方法简析.pdf

    云计算中大数据的MapReduce处理方法简析.pdf

    【MapReduce篇07】MapReduce之数据清洗ETL1

    【MapReduce篇07】MapReduce之数据清洗ETL1

    Hadoop原理与技术MapReduce实验

    (2)打开网站localhost:8088和localhost:50070,查看MapReduce任务启动情况 (3)写wordcount代码并把代码生成jar包 (4)运行命令 (1):把linus下的文件放到hdfs上 (2):运行MapReduce (5):查看运行结果 ...

    MapReduce简介

    大规模数据处理时,MapReduce在三个层面上的基本构思 如何对付大数据处理:分而治之 对相互间不具有计算依赖关系的大数据,实现并行最自然的办法就是采取分而治之的策略 上升到抽象模型:Mapper与Reducer MPI等...

    hadoop/mapreduce-矩阵乘法 mapreduce的实现(代码)

    最近在研究hadoop与mapReduce,网上教程只有个wordcount程序示范,太简单,故写了个相对复杂点的涉及到多个文件之间同时运算的矩阵乘法的代码用于实验与测试,上传供大家学习与参考。 调用方法: 执行:hadoop jar ...

    MapReduce设计模式介绍.ppt

    MapReduce 设计模式是大数据处理的核心组件,负责将大规模数据处理和分析任务分解为可并行处理的任务。MapReduce 设计模式主要由两个阶段组成:Map 阶段和 Reduce 阶段。 Map 阶段 Map 阶段主要负责数据的载入、...

Global site tag (gtag.js) - Google Analytics