在DataJoin实现多数据源reduce端连接的过程中,连接在reduce阶段才会进行,因此一些无效的数据在reduce的时候才能去除掉,这样做占用了通信带宽,虽然该方法比较通用,但是效率不高。
当数据源中有数据量较小的或者经过处理后数据量较小的数据源的时候,我们考虑使用全局文件复制的方法来实现map端连接。这个过程需要Hadoop缓存系统的支持。Hadoop提供了DistributedCache类来操作缓存系统。
下面我们来说下map()端连接的思想:首先将数据量较小的数据源加载到缓存系统中,在MapReduce框架进行map之前,从缓存系统将文件全部加载到内存中,由于数据量小,因此可以实现。
对那个大的文件进行MapReduce处理,在map()方法中将有相同主键和外键记录进行连接即可。此过程不需要reducer。
当然,并不是所有的数据源都是小数据量的,但是有些情况可以转化为小数据量。比如仅仅需要查询辽宁地区的顾客订单信息,就可以先对customers文件进行过滤,得到一个中间文件,这个中间文件可能就是一个小数据量的数据源。另一个解决办法就是,假设有两个数据源S和R(相对较小),将大数据源R横向切分为多个小的数据源Ri,可以先使用全局文件复制的方法对每个Ri与S进行连接,最后将结果合并起来以、得到最终结果。
当然,数据源也不仅仅都是两个,当我们有多于2个的数据源时,我们可以两两进行连接,以得到最终结果。
代码见代码文档。
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Mapper;
import java.util.*;
import java.io.*;
public class MapJoin {
public static class MapClass extends Mapper<Object,Text,Text,Text>{
private HashMap<String , String> customers =new HashMap<String,String>();
public void setup(Context context){
BufferedReader br=null;
try{
Path paths []=DistributedCache.getLocalCacheFiles(context.getConfiguration());
if(paths !=null&&paths.length>0)
{
br = new BufferedReader(new FileReader(paths[0].toString()));
String line=null;
while((line=br.readLine())!=null){
customers.put(line.split(",",2)[0],line.split(",",2)[1]);
}
}
}catch(Exception e){e.printStackTrace();}
finally{
try{
br.close();
}catch(Exception e){
e.printStackTrace();
}
}
}
public void map(Object key,Text value,Context context) throws IOException,InterruptedException{
String [] strs=value.toString().split(",",2);
context.write(new Text(strs[0]), new Text(customers.get(strs[0])+","+strs[1]));
}
}
public static void main(String [] args) throws Exception
{
Configuration conf = new Configuration();
Job job = new Job(conf);
FileInputFormat.addInputPath(job,new Path("hdfs://master:9000/user/xuyizhen/in/order.txt"));
FileOutputFormat.setOutputPath(job,new Path("hdfs://master:9000/user/xuyizhen/out"));
//addCacheFile的第二个参数必须是JobConf而不是Job类
DistributedCache.addCacheFile(new Path("hdfs://master:9000/user/xuyizhen/in/customer.txt").toUri(),job.getConfiguration());
job.setJarByClass(MapJoin.class);
job.setMapperClass(MapClass.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setNumReduceTasks(0);
System.exit(job.waitForCompletion(true)?0:1);
}
}
分享到:
相关推荐
将全局预置拖到您的场景中并自定义外观。 这个新的主要版本改进了视觉效果并支持标准/内置和LWRP。所有的新特性和改进都将包含在这个版本中,而以前的版本只会收到修复。 完整的资产具有强大的可视化功能、...
GID:Windows 95全局索引文件(包括帮助状态) GIF:CompuServe位图文件 GL:动画格式 GRP:程序管理组 H HEX:Macintosh BinHex2.0文件 HLP:帮助文件;Date CAD Windows帮助文件 HPP:C++程序头文件 HQX:...
GeoServer是一个开源的用于共享地理空间数据的服务器,它支持使用开放标准对多数主要空间数据源进行发布。GeoServer实现了行业标准的OGC协议,如Web Feature Service(WFS)、Web Map Service(WMS)和Web Coverage ...
07、多数据源配置和使用 08、map+warpper介绍 09、日志系统 10、swagger讲解 11、3.0项目介绍与项目拆分 12、shiro与权限系统 13、全局异常拦截 14、代码生成器 15、数据范围使用和原理 16、缓存讲解 17、...
7、多数据源配置和使用;8、map+warpper介绍;9、日志系统;10、swagger讲解;11、3.0项目介绍与拆分;12、shiro与权限系统;13、全局异常拦截;14、代码生成器;15、数据范围使用和原理;16、;17、;18;。。。。...
7.10 综合案例分析—使用uploadify插件实现文件上传功能 /232 7.10.1 需求分析/232 7.10.2 效果界面/233 7.10.3 功能实现/234 7.10.4 代码分析/236 7.11 本章小结/241 第8章 jQuery UI插件/242 8.1 认识...
14.2.2 使用DataPager控件进行数据源分页 459 14.3 小结 461 第四部分 创建组件 第15章 创建组件 464 15.1 创建基本组件 464 15.1.1 组件和动态编译 466 15.1.2 App_Code文件夹中不同语言的组件 466 15.1.3 声明...
■利用已有数据源,保护商业数据的私密性 兼容微软Excel2000/xp/2003/2007版本数据,最大限度利用已有的企业数据资源。非联网环境操作,隔绝任何商业机密外泄的可能性。 ■针对您的商业逻辑量身定做 Officemap支持...
14.2.2 使用datapager控件进行数据源分页526 14.3 小结527 第15章 使用chart控件528 15.1 chart控件的基础知识528 15.1.1 使用chart控件显示数据528 15.1.2 排序和过滤数据532 15.1.3 统计公式534 15.2 定制图表的...
11.2 为其他数据源使用DAO模式 203 11.2.1 示例:为LDAP使用DAO 203 11.2.2 示例:为Web服务使用DAO 208 11.3 使用Spring DAO 209 11.3.1 编写代码 209 11.3.2 为什么使用Spring代替iBATIS 211 11.4 创建自己的DAO层...
命令ip nat inside source list 2 pool c2501 overload中的参数overload,将允许多个内部地址使用相同的全局地址(一个合法IP地址,它是由NIC或服务提供商所分配的地址)。命令ip nat pool c2501 202.96.38.1 202.96...
S Y N和A C K位的报文,并等待源端来的 A C K应答。如果请求的发出者不作响应,主机就会因为超时而结束连接。当主机在等待这个 事务完成时,这种 h a l f - o p e n的连接消耗了主机的资源。在等待三路握手时资源...
不过,针对 Java EE 开发的服务器端应用,可以通过集成 BlazeDS,充分利用 AMF 协议并能轻易与 Flex 前端交换数据,这种方式是 Java EE 应用程序集成 Flex 的首选。 BlazeDS 是 Adobe LifeCycle Data Services 的...
不过,针对 Java EE 开发的服务器端应用,可以通过集成 BlazeDS,充分利用 AMF 协议并能轻易与 Flex 前端交换数据,这种方式是 Java EE 应用程序集成 Flex 的首选。 BlazeDS 是 Adobe LifeCycle Data Services 的...
1. 单元测试:(掌握) 1. 写一个单元测试类,命名方式:XxxTest(测试类没有main方法) 2.... 选中项目,右键 => ... 解决了线程安全问题,通过线程隔离有安全问题的数据实现的,底层是通过map保存线程id和值的。
节点实用程序,用于从Open Street Map提取数据并将其转换为TopoJSON文件。 查看示例部分以获取生成上面地图的源代码 安装 npm install -g topojson-osm-fetch 注意:如果只想将其用作模块,则不需要全局安装 用作...
系统自带加密功能,对某些只想指定人看到的内容,可以通过加密来实现; 内容集合储存,方便全局管理,设有频道、无限级分类方便归类各内容; 已有开发频道产品、文章、商城、图片、多媒体(播放影音视频)、介绍、...
2. 采用工作单元来组织数据模型,提供级联查询,延时加载,N+1=>1+1,短事务,全局唯一ID和Identity Map等特性,能有效防止数据库死锁的发生。 3. ConnectionManager管理工作单元内的数据库链接,支持跨库提交,跨库...