1 代码和业务:
package mapreduce; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** a b 1 a b 2 a b 3 c d 4 c d 5 c d 6 期待变成 a b 1,2,3 c d 4,5,6 * @author zm * */ public class ConcatWSMapReduce { public static class ConcatWSMapper extends Mapper<LongWritable, Text, ConcatWS, Text>{ /** * 每一行执行一次map函数 * @param key 表示字节在源文件中偏移量 * @param value 行文本内容 */ protected void map(LongWritable key, Text value, Context context) throws java.io.IOException ,InterruptedException { final String[] splited = value.toString().split("\t"); System.out.println("splited length is: " + splited.length); String col1 = splited[0]; String col2 = splited[1]; String col3 = splited[2]; System.out.println("col1: " + col1 + "col2: " + col2 + "col3: " + col3); context.write(new ConcatWS(col1,col2), new Text(col3)); }; } //分组:<hello,{1,1}><me,{1}><you,{1}>【把相同key的value放到一起】 reduce方法是每一组调用一次 左侧结果 为3组 则调用3次reduce方法 public static class ConcatWSReducer extends Reducer<ConcatWS, Text, Text, Text>{ /** * 每个组调用一次reduce函数 * @param word 表示单词 * @param times 表示相同key的value的迭代器 */ protected void reduce(ConcatWS ws, Iterable<Text> v2s, org.apache.hadoop.mapreduce.Reducer<ConcatWS,Text,Text,Text>.Context context) throws java.io.IOException ,InterruptedException { StringBuilder sb = new StringBuilder(""); for (Text col3 : v2s) { sb.append(col3.toString()).append(","); } System.out.println("reduce key content: " + ws.toString()); System.out.println("reduce val content: " + sb.toString()); context.write(new Text(ws.toString()), new Text(sb.toString())); }; } public static void main(String[] args) throws Exception { // 设置Job对象 final Configuration conf = new Configuration(); final Job job = new Job(conf); job.setJobName(ConcatWSMapReduce.class.getSimpleName()); job.setJarByClass(ConcatWSMapReduce.class); // 给Job对象设置自定义 mapper reducer job.setMapperClass(ConcatWSMapper.class); job.setReducerClass(ConcatWSReducer.class); // 设置map reduce输出参数类型 job.setMapOutputKeyClass(ConcatWS.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // 设置Job任务要处理的数据源和输出数据目的地 FileInputFormat.addInputPaths(job, "/zmdata/zm.txt"); // 注意是addInputPaths 用的是复数的方法 Path outputpath = new Path("/zmdata/zmout"); FileSystem fs = FileSystem.get(new URI("/"), conf); if(fs.exists(outputpath)){ fs.delete(outputpath, true); } FileOutputFormat.setOutputPath(job, outputpath); // 执行Job job.waitForCompletion(true); } // 注意写成内部public 类 否则执行mr时 会报不识别ConcatWS.source public static class ConcatWS implements WritableComparable<ConcatWS>{ private String col1 = ""; private String col2 = ""; public ConcatWS(){} public ConcatWS(String col1, String col2){ this.col1 = col1; this.col2 = col2; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(col1); out.writeUTF(col2); } @Override public void readFields(DataInput in) throws IOException { this.col1 = in.readUTF(); this.col2 = in.readUTF(); } @Override public int compareTo(ConcatWS ws) { int result = 0; result = this.col1.compareTo(ws.col1); if(result == 0){ result = this.col2.compareTo(ws.col2); } return result; } @Override public String toString() { return col1 + "\t" + col2 ; } } }
2 本机使用ant脚本提交后执行,ant脚本如下:
<?xml version="1.0" encoding="UTF-8"?> <!-- 该文件与src文件夹、lib文件夹同一级 --> <project name="hadoop2测试项目" basedir="." default="sshexec"> <!--属性设置--> <property environment="env" /> <property file="build.properties" /> <property name="src.dir" value="${basedir}/src" /> <property name="java.lib.dir" value="${env.JAVA_HOME}/lib" /> <property name="classes.dir" value="${basedir}/classes" /> <property name="dist.dir" value="${basedir}/dist" /> <property name="project.lib.dir" value="${basedir}/lib" /> <property name="localpath.dir" value="${basedir}" /> <property name="remote.home" value="~"/> <!--可以修改:hadoop集群的hostname或者ip--> <property name="remote.hostname" value="hadoop3"/> <!--可以修改:登录hadoop集群所在linux的用户名--> <property name="remote.username" value="root"/> <!--可以修改:登录hadoop集群所在liniux的密码--> <property name="remote.password" value="123456"/> <!--可以修改:每次需要运行的main类,写到这里。运行时拼接为hadoop jar xxx.jar MainClass --> <property name="main.class" value="mapreduce.ConcatWSMapReduce"/> <!--可以修改:hadoop集群在linux的部署路径--> <property name="hadoop.path" value="/opt/hadoop-2.5.2"/> <!-- 基本编译路径设置 --> <path id="compile.classpath"> <fileset dir="${java.lib.dir}"> <include name="tools.jar" /> </fileset> <fileset dir="${project.lib.dir}"> <include name="*.jar" /> </fileset> </path> <!-- 运行路径设置 --> <path id="run.classpath"> <path refid="compile.classpath" /> <pathelement location="${classes.dir}" /> </path> <!-- 清理,删除临时目录 --> <target name="clean" description="清理,删除临时目录"> <!--delete dir="${build.dir}" /--> <delete dir="${dist.dir}" /> <delete dir="${classes.dir}" /> <echo level="info">清理完毕</echo> </target> <!-- 初始化,建立目录,复制文件 --> <target name="init" depends="clean" description="初始化,建立目录,复制文件"> <mkdir dir="${classes.dir}" /> <mkdir dir="${dist.dir}" /> </target> <!-- 编译源文件--> <target name="compile" depends="init" description="编译源文件"> <javac srcdir="${src.dir}" destdir="${classes.dir}" source="1.7" target="1.7" includeAntRuntime="false"> <classpath refid="compile.classpath" /> <compilerarg line="-encoding UTF-8 "/> </javac> </target> <!-- 打包类文件 --> <target name="jar" depends="compile" description="打包类文件"> <jar jarfile="${dist.dir}/jar.jar"> <fileset dir="${classes.dir}" includes="**/*.*" /> </jar> </target> <!--上传到服务器 **需要把lib目录下的jsch-0.1.51拷贝到$ANT_HOME/lib下,如果是Eclipse下的Ant环境必须在Window->Preferences->Ant->Runtime->Classpath中加入jsch-0.1.51。 --> <target name="ssh" depends="jar"> <scp file="${dist.dir}/jar.jar" todir="${remote.username}@${remote.hostname}:${remote.home}" password="${remote.password}" trust="true"/> </target> <target name="sshexec" depends="ssh"> <sshexec host="${remote.hostname}" username="${remote.username}" password="${remote.password}" trust="true" command="${hadoop.path}/bin/hadoop jar ${remote.home}/jar.jar ${main.class}"/> </target> </project>
3 执行的mr代码中写上了 sysotem.out 那么这些数据在哪里呢? 比如上面的输出:
上图所示,找到你执行mr任务的编号,进去后看如下图:
相关推荐
赠送jar包:hadoop-mapreduce-client-jobclient-2.6.5.jar; 赠送原API文档:hadoop-mapreduce-client-jobclient-2.6.5-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-jobclient-2.6.5-sources.jar; 赠送...
赠送jar包:hadoop-yarn-common-2.6.5.jar 赠送原API文档:hadoop-yarn-common-2.6.5-javadoc.jar 赠送源代码:hadoop-yarn-common-2.6.5-sources.jar 包含翻译后的API文档:hadoop-yarn-common-2.6.5-javadoc-...
flink-shaded-hadoop-2-uber-2.7.5-10.0.jar
flink-shaded-hadoop-2-uber-2.7.5-10.0
Eclipse集成Hadoop2.10.0的插件,使用`ant`对hadoop的jar包进行打包并适应Eclipse加载,所以参数里有hadoop和eclipse的目录. 必须注意对于不同的hadoop版本,` HADDOP_INSTALL_PATH/share/hadoop/common/lib`下的jar包...
赠送jar包:hadoop-yarn-server-resourcemanager-2.6.0.jar; 赠送原API文档:hadoop-yarn-server-resourcemanager-2.6.0-javadoc.jar; 赠送源代码:hadoop-yarn-server-resourcemanager-2.6.0-sources.jar; 赠送...
flink1.14.0与hadoop3.x的兼容包,放在flink的lib目录下
hadoop-annotations-3.1.1.jar hadoop-common-3.1.1.jar hadoop-mapreduce-client-core-3.1.1.jar hadoop-yarn-api-3.1.1.jar hadoop-auth-3.1.1.jar hadoop-hdfs-3.1.1.jar hadoop-mapreduce-client-hs-3.1.1.jar ...
hudi-hadoop-mr-bundle-0.11.0.jar 配合文档
hadoop-mapreduce-examples-2.7.1.jar
flink-shaded-hadoop-2-uber-2.7.2-11.0.jar 是flink1.11集成hadoop2.7.2的jar依赖。
赠送jar包:hadoop-yarn-client-2.6.5.jar; 赠送原API文档:hadoop-yarn-client-2.6.5-javadoc.jar; 赠送源代码:hadoop-yarn-client-2.6.5-sources.jar; 赠送Maven依赖信息文件:hadoop-yarn-client-2.6.5.pom;...
hadoop-eclipse-plugin-0.20.3-SNAPSHOT.jarhadoop-eclipse-plugin-0.20.3-SNAPSHOT.jarhadoop-eclipse-plugin-0.20.3-SNAPSHOT.jarhadoop-eclipse-plugin-0.20.3-SNAPSHOT.jar
赠送jar包:hadoop-mapreduce-client-core-2.5.1.jar; 赠送原API文档:hadoop-mapreduce-client-core-2.5.1-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-core-2.5.1-sources.jar; 赠送Maven依赖信息文件:...
赠送jar包:hadoop-yarn-api-2.5.1.jar; 赠送原API文档:hadoop-yarn-api-2.5.1-javadoc.jar; 赠送源代码:hadoop-yarn-api-2.5.1-sources.jar; 赠送Maven依赖信息文件:hadoop-yarn-api-2.5.1.pom; 包含翻译后...
ambari-2.7.5 编译过程中四个大包下载很慢,所以需要提前下载,包含:hbase-2.0.2.3.1.4.0-315-bin.tar.gz ,hadoop-3.1.1.3.1.4.0-315.tar.gz , grafana-6.4.2.linux-amd64.tar.gz ,phoenix-5.0.0.3.1.4.0-315....
hadoop2.6(支持2.x)windows本地开发测试类库(包含winutils.exe),直接解压目录配置到环境变量就可以使用。完美解决hadoop binary path问题。
# 解压命令 tar -zxvf flink-shaded-hadoop-2-uber-3.0.0-cdh6.2.0-7.0.jar.tar.gz # 介绍 用于CDH部署 Flink所依赖的jar包
Flink-1.11.2与Hadoop3集成JAR包,放到flink安装包的lib目录下,可以避免Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the ...这个报错,实现Flink与Hadoop的集成