val sc = new SparkContext("local", "xxx")
val inputData = sc.textFile("hdfs://master:8020/data/spark/user-history-data")
val lines = inputData.map(line => (line, line.length))
val result = lines.mapPartitions { valueIterator =>
if (valueIterator.isEmpty) {
Iterator[String]()
} else {
val transformedItem = new ListBuffer[String]() //setup ListBuffer
val fs: FileSystem = FileSystem.get(new Configuration()) //setup FileSystem
valueIterator.map { item =>
transformedItem += item._1 +":"+item._2
val outputFile = fs.create(new Path("/home/xxx/opt/data/spark/" + item._1.substring(0,item._1.indexOf("\t")) + ".txt"))
outputFile.write((item._1 +":"+item._2).getBytes())
if (!valueIterator.hasNext) {
transformedItem.clear() //cleanup transformedItem
outputFile.close() //cleanup outputFile
fs.close() //cleanup fs
}
transformedItem
}
}
}
result.foreach(println(_))
sc.stop()
将hdfs数据:
zhangsan 1 2015-07-30 20:01:01 127.0.0.1
zhangsan 2 2015-07-30 20:01:01 127.0.0.1
zhangsan 3 2015-07-30 20:01:01 127.0.0.1
zhangsan 4 2015-07-31 20:01:01 127.0.0.1
zhangsan 5 2015-07-31 20:21:01 127.0.0.1
lisi 1 2015-07-30 21:01:01 127.0.0.1
lisi 2 2015-07-30 22:01:01 127.0.0.1
lisi 3 2015-07-31 23:31:01 127.0.0.1
lisi 4 2015-07-31 22:21:01 127.0.0.1
lisi 5 2015-07-31 23:11:01 127.0.0.1
wangwu 1 2015-07-30 21:01:01 127.0.0.1
wangwu 2 2015-07-30 22:01:01 127.0.0.1
wangwu 3 2015-07-31 23:31:01 127.0.0.1
wangwu 4 2015-07-31 22:21:01 127.0.0.1
wangwu 5 2015-07-31 23:11:01 127.0.0.1
读取到spark中,并统计每行长度,再将数据写到本地的文件中(文件名称以每行第一个单词)
最终实现hadoop中setup, cleanup
强烈阅读如下链接:
http://mail-archives.apache.org/mod_mbox/spark-user/201407.mbox/%3CCAPH-c_O9kQO6yJ4khXUVdO=+D4vj=JfG2tP9eqn5RPko=dRNAg@mail.gmail.com%3E
http://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/
http://apache-spark-user-list.1001560.n3.nabble.com/how-to-split-RDD-by-key-and-save-to-different-path-td11887.html#a11983
http://stackoverflow.com/questions/24520225/writing-to-hadoop-distributed-file-system-multiple-times-with-spark
相关推荐
Hadoop的框架最核心的设计就是:HDFS和MapReduce。HDFS为海量的数据提供了存储,则MapReduce为海量的数据提供了计算。Storm是一个分布式的、容错的实时计算系统。两者整合,优势互补。
Python+Spark 2.0+Hadoop机器学习与大数据实战完整高清带书签的PDF压缩包
Big Data Analytics with Spark and Hadoop(Spark与Hadoop大数据分析)代码code
Big Data Analytics with Spark and Hadoop 英文无水印pdf pdf使用FoxitReader和PDF-XChangeViewer测试可以打开
对于想学习 Spark 的人而言,如何构建 Spark 集群是其最大的难点之一, 为了解决大家构建 Spark 集群的一切困难,Spark 集群的构建分为了五个步骤,从 零起步,不需要任何前置知识,涵盖操作的每一个细节,构建完整...
Spark 是加州大学伯克利分校 AMP(Algorithms,Machines,People)实验室开发的通用内存并行计算...Spark 与 Hadoop Hadoop 已经成了大数据技术的事实标准,Hadoop MapReduce 也非常适合于对大规模数据集合进行批处理操
HADOOP SPARK 开发依赖包,对于第一个SPARK开发的例子,可以导入这个包去进行开发
spark+hadoop+zookeeper 大数据平台搭建脚本,亲测通过,适用于大数据初学者在虚拟机玩
为了把spark2.4.X和hadoop2.7.3升级为spark3.1.1和hadoop3.2.2找了半天资源,最后还是自己手动编写了一个。已经在集群上测试可用
Spark安装包:spark-3.1.3-bin-without-hadoop.tgz
hadoop etc/hadoop/slaves data-node-sa data-node-sb data-node-sc Python3安装 安装pyspark cd /opt/spark-2.3.2-bin-hadoop2.7/python && python36 setup.py install 安装numpy pip3 install numpy -i ...
在ubuntu下spark+Hadoop环境搭建。详细的步骤,亲自实验。
hadoop与spark分布式安装,内容详细,亲自搭建成功。助于新手
本地开发Spark/Hadoop报错“ERROR Shell: Failed to locate the winutils binary in the hadoop binary path java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.” ...
大数据笔记,包含Hadoop、Spark、Flink、Hive、Kafka、Flume、ZK...... 大数据笔记,包含Hadoop、Spark、Flink、Hive、Kafka、Flume、ZK...... 大数据笔记,包含Hadoop、Spark、Flink、Hive、Kafka、Flume、ZK.......
apache-hive-3.1.3-bin.tar.gz apache-zookeeper-3.5.10-bin.tar.gz hadoop-3.3.3.tar.gz spark-3.2.1-bin-hadoop3.2.tgz mysql-8.0.29-1.el8.x86_64.rpm-bundle
dr-elephant spark 1.6.0 hadoop 2.4.1 编译好的 dr elephant文件 需要part2 中的jar包放入 part1 lib目录下
Developer Training for Apache Spark and Hadoop
Big Data Analytics with Spark and Hadoop 英文mobi 本资源转载自网络,如有侵权,请联系上传者或csdn删除 本资源转载自网络,如有侵权,请联系上传者或csdn删除
spark2.4.7版本兼容hadoop2.7版本,官网上下载的话会很慢。