一、单词统计源码
package cn.com.sparkdemo.myspark;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import scala.Tuple2;
public class Demo {
/**
* -Xms256m -Xmx1024m
* @param args
*/
public static void main(String[] args) {
// 为Spark环境中服务于本App的各个Executor程序设置使用内存量的上限
//System.setProperty("spark.executor.memory", "512m");
SparkConf conf = new SparkConf().setAppName("simpledemo").setMaster("local");
conf.set("spark.testing.memory", "2147480000");
//Could not locate executable null\bin\winutils.exe
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> input=sc.textFile("F:/BigData_workspace/myspark/src/main/resources/1.txt");
//flatMap 将行数据切分为单词
JavaRDD<String> words=input.flatMap(new FlatMapFunction<String, String>() {
public Iterable<String> call(String s) throws Exception {
return Arrays.asList(s.split(" "));
}
});
//对其中的单词进行统计
JavaPairRDD<String, Integer> wordCounts = words.mapToPair(
new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
// try {
// //为了访问http://192.168.1.101:4040/故此休眠暂停
//Thread.sleep(1000*4000);
//} catch (InterruptedException e) {
//// TODO Auto-generated catch block
//e.printStackTrace();
//}
List<Tuple2<String, Integer>> output = wordCounts.collect();
for (Tuple2<?, ?> tuple : output) {
System.out.println(tuple._1() + ": " + tuple._2());
}
}
}
二、控制台日志分析
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
17/02/17 20:07:30 INFO SparkContext: Running Spark version 1.6.0
17/02/17 20:07:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/02/17 20:07:31 INFO SecurityManager: Changing view acls to: Administrator
17/02/17 20:07:31 INFO SecurityManager: Changing modify acls to: Administrator
17/02/17 20:07:31 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(Administrator); users with modify permissions: Set(Administrator)
17/02/17 20:07:31 INFO Utils: Successfully started service 'sparkDriver' on port 2955.
17/02/17 20:07:32 INFO Slf4jLogger: Slf4jLogger started
17/02/17 20:07:32 INFO Remoting: Starting remoting
17/02/17 20:07:32 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@192.168.1.101:2968]
17/02/17 20:07:32 INFO Utils: Successfully started service 'sparkDriverActorSystem' on port 2968.
17/02/17 20:07:32 INFO SparkEnv: Registering MapOutputTracker
17/02/17 20:07:32 INFO SparkEnv: Registering BlockManagerMaster
17/02/17 20:07:32 INFO DiskBlockManager: Created local directory at C:\Documents and Settings\Administrator\Local Settings\Temp\blockmgr-8687bffc-358c-45de-9f90-bcf5f0279dd6
17/02/17 20:07:32 INFO MemoryStore: MemoryStore started with capacity 1311.0 MB
17/02/17 20:07:32 INFO SparkEnv: Registering OutputCommitCoordinator
17/02/17 20:07:32 INFO Utils: Successfully started service 'SparkUI' on port 4040.
17/02/17 20:07:32 INFO SparkUI: Started SparkUI at http://192.168.1.101:4040
17/02/17 20:07:32 INFO Executor: Starting executor ID driver on host localhost
17/02/17 20:07:32 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 2975.
17/02/17 20:07:32 INFO NettyBlockTransferService: Server created on 2975
17/02/17 20:07:32 INFO BlockManagerMaster: Trying to register BlockManager
17/02/17 20:07:32 INFO BlockManagerMasterEndpoint: Registering block manager localhost:2975 with 1311.0 MB RAM, BlockManagerId(driver, localhost, 2975)
17/02/17 20:07:32 INFO BlockManagerMaster: Registered BlockManager
17/02/17 20:07:33 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes
17/02/17 20:07:33 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 98.8 KB, free 98.8 KB)
17/02/17 20:07:33 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 9.8 KB, free 108.6 KB)
17/02/17 20:07:33 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:2975 (size: 9.8 KB, free: 1311.0 MB)
17/02/17 20:07:33 INFO SparkContext: Created broadcast 0 from textFile at Demo.java:34
17/02/17 20:07:35 WARN : Your hostname, MS-201609261921 resolves to a loopback/non-reachable address: 192.168.56.1, but we couldn't find any external IP address!
17/02/17 20:07:35 INFO FileInputFormat: Total input paths to process : 1
17/02/17 20:07:35 INFO SparkContext: Starting job: collect at Demo.java:54
17/02/17 20:07:35 INFO DAGScheduler: Registering RDD 3 (mapToPair at Demo.java:43)
17/02/17 20:07:35 INFO DAGScheduler: Got job 0 (collect at Demo.java:54) with 1 output partitions
17/02/17 20:07:35 INFO DAGScheduler: Final stage: ResultStage 1 (collect at Demo.java:54)
17/02/17 20:07:35 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)
17/02/17 20:07:35 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0)
17/02/17 20:07:35 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[3] at mapToPair at Demo.java:43), which has no missing parents
17/02/17 20:07:35 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 4.8 KB, free 113.4 KB)
17/02/17 20:07:35 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.7 KB, free 116.0 KB)
17/02/17 20:07:35 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:2975 (size: 2.7 KB, free: 1311.0 MB)
17/02/17 20:07:35 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006
17/02/17 20:07:35 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[3] at mapToPair at Demo.java:43)
17/02/17 20:07:35 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
17/02/17 20:07:35 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0,PROCESS_LOCAL, 2150 bytes)
17/02/17 20:07:35 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
17/02/17 20:07:35 INFO HadoopRDD: Input split: file:/F:/BigData_workspace/myspark/src/main/resources/1.txt:0+169
17/02/17 20:07:35 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
17/02/17 20:07:35 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
17/02/17 20:07:35 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
17/02/17 20:07:35 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
17/02/17 20:07:35 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
17/02/17 20:07:35 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2253 bytes result sent to driver
17/02/17 20:07:35 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 141 ms on localhost (1/1)
17/02/17 20:07:35 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
17/02/17 20:07:35 INFO DAGScheduler: ShuffleMapStage 0 (mapToPair at Demo.java:43) finished in 0.172 s
17/02/17 20:07:35 INFO DAGScheduler: looking for newly runnable stages
17/02/17 20:07:35 INFO DAGScheduler: running: Set()
17/02/17 20:07:35 INFO DAGScheduler: waiting: Set(ResultStage 1)
17/02/17 20:07:35 INFO DAGScheduler: failed: Set()
17/02/17 20:07:35 INFO DAGScheduler: Submitting ResultStage 1 (ShuffledRDD[4] at reduceByKey at Demo.java:48), which has no missing parents
17/02/17 20:07:35 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 2.9 KB, free 118.9 KB)
17/02/17 20:07:35 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 1748.0 B, free 120.6 KB)
17/02/17 20:07:35 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:2975 (size: 1748.0 B, free: 1311.0 MB)
17/02/17 20:07:35 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1006
17/02/17 20:07:35 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (ShuffledRDD[4] at reduceByKey at Demo.java:48)
17/02/17 20:07:35 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
17/02/17 20:07:35 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, partition 0,NODE_LOCAL, 1894 bytes)
17/02/17 20:07:35 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
17/02/17 20:07:35 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks
17/02/17 20:07:35 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 16 ms
17/02/17 20:07:35 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1739 bytes result sent to driver
17/02/17 20:07:35 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 31 ms on localhost (1/1)
17/02/17 20:07:35 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
17/02/17 20:07:35 INFO DAGScheduler: ResultStage 1 (collect at Demo.java:54) finished in 0.031 s
17/02/17 20:07:35 INFO DAGScheduler: Job 0 finished: collect at Demo.java:54, took 0.401950 s
this: 1
Spark: 1
spark: 1
is: 3
LZ4: 1
general: 1
a: 3
fast: 2
Apache: 1
processing: 1
data: 1
large-scale: 1
very: 1
hello: 1
compression: 1
for: 1
and: 2
engine: 1
decompression: 1
algorithm.: 1
test: 1
world: 1
17/02/17 20:07:35 INFO SparkContext: Invoking stop() from shutdown hook
17/02/17 20:07:35 INFO SparkUI: Stopped Spark web UI at http://192.168.1.101:4040
17/02/17 20:07:35 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
17/02/17 20:07:35 INFO MemoryStore: MemoryStore cleared
17/02/17 20:07:35 INFO BlockManager: BlockManager stopped
17/02/17 20:07:35 INFO BlockManagerMaster: BlockManagerMaster stopped
17/02/17 20:07:35 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
17/02/17 20:07:35 INFO SparkContext: Successfully stopped SparkContext
17/02/17 20:07:35 INFO ShutdownHookManager: Shutdown hook called
17/02/17 20:07:35 INFO ShutdownHookManager: Deleting directory C:\Documents and Settings\Administrator\Local Settings\Temp\spark-11aa6dd4-c479-45ed-9929-c2ed7053d137
17/02/17 20:07:35 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
三、错误解决方案
1)内存问题
2)Hadoop中util.exe没有可执行文件
三、结果验证
四、Spark的WebUI
相关推荐
使用Spark实现对一个文档中的每一行的单词进行词配对计数,要求去标点符号,将大写符号统一转化成为小写单词。
基于Java+spark的离线统计移动端数据分析源码.zip基于Java+spark的离线统计移动端数据分析源码.zip基于Java+spark的离线统计移动端数据分析源码.zip基于Java+spark的离线统计移动端数据分析源码.zip基于Java+spark的...
采用java实现的spark的wordcount单词技术的程序.可以跑通。
用JAVA实现的基于Spark的解决数据倾斜的单词技术的计数
该资源可以简单计算文本中单词个数
统计本地文件中单词出现次数 二.操作流程 1.读取外部文件创建JavaRDD; 2.通过flatMap转化操作切分字符串,获取单词新JavaRDD; 3.通过mapToPair,以key为单词,value统一为1的键值JavaPairRDD; 4.通过reduceByKey...
本次作业要完成在Hadoop平台搭建完成的基础上,利用Spark组件完成文本词频统计的任务,目标是学习Scala语言,理解Spark编程思想,基于Spark 思想,使用IDEA编写SparkWordCount程序,并能够在spark-shell中执行代码和...
使用Java语言编写操作Spark的入门程序,包括Java连接Spark,所需的信息配置开发,连接目标配置Spark集群详细说明
1、25个经典Spark算子的JAVA实现。2、含有详细的注释。3、全部通过junit测试。
参考网络资源使用IKAnalyzer分词组件,实现了纯Java语言、MapReduce模式及Spark 框架三种方式对唐诗宋词等国学经典进行中文分词统计排序的功能,你可以轻松发现唐诗宋词中最常用的词是那几个。
spark程序一般使用scala开发,此代码是java开发spark的示例代码。
。。。
java spark淘宝大数据分析可视化系统(源码+数据+报告)
为了把spark2.4.X和hadoop2.7.3升级为spark3.1.1和hadoop3.2.2找了半天资源,最后还是自己手动编写了一个。已经在集群上测试可用
spark连接mysql核心代码 java实现方式======================================================================
资源是java连接spark的源码,里面有支持连接hive,spark的方法,内部有两个方法,一个是getMaps,获取一个List对象,用于直接使用,一个是getJson,将获取到的数据转换成json,方便好用,不想下载的可以去我的博客去...
通过使用三种不同语言编写来编写分词及词频统计程序,比较在大数数据背景下,MapReduce和Spark对三种语言的适应性及其各自的效率对比;项目均采用IDEA+Maven进行构建,相关依赖均在对应pom.xml中给出; 软件架构 ...
基于spark streaming和kafka,hbase的日志统计分析系统 仅用于学习和参考
该图书推荐系统适用于学校书籍管理,其重点功能首先是推荐,根据用户对于书籍点击情况,通过基于用户的协同过滤算法实现,其次是文件上传,通过spark读取数据集(csv文件)写入数据库,还有借书还书的功能等等。