`
gaojingsong
  • 浏览: 1154485 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
文章分类
社区版块
存档分类
最新评论

【JAVA语言之spark单词统计】

阅读更多

一、单词统计源码

package cn.com.sparkdemo.myspark;

 

import java.util.Arrays;

import java.util.List;

 

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.SparkConf;  

import org.apache.spark.api.java.JavaPairRDD;  

import org.apache.spark.api.java.JavaRDD;  

import org.apache.spark.api.java.function.*;  

import org.apache.spark.streaming.api.java.JavaPairDStream;

 

import scala.Tuple2;

 

public class Demo {

 

/**

* -Xms256m -Xmx1024m

* @param args

*/

public static void main(String[] args) {

       // 为Spark环境中服务于本App的各个Executor程序设置使用内存量的上限

      //System.setProperty("spark.executor.memory", "512m"); 

      SparkConf conf = new SparkConf().setAppName("simpledemo").setMaster("local");

      conf.set("spark.testing.memory", "2147480000");

 

    //Could not locate executable null\bin\winutils.exe

 

    JavaSparkContext sc = new JavaSparkContext(conf);

 

     JavaRDD<String> input=sc.textFile("F:/BigData_workspace/myspark/src/main/resources/1.txt");  

         //flatMap 将行数据切分为单词  

             JavaRDD<String> words=input.flatMap(new FlatMapFunction<String, String>() {  

                 public Iterable<String> call(String s) throws Exception {  

                     return Arrays.asList(s.split(" "));  

                 }  

         });  

 

           //对其中的单词进行统计

             JavaPairRDD<String, Integer> wordCounts = words.mapToPair(

                   new PairFunction<String, String, Integer>() {

                     public Tuple2<String, Integer> call(String s) {

                       return new Tuple2<String, Integer>(s, 1);

                     }

                   }).reduceByKey(new Function2<Integer, Integer, Integer>() {

                     public Integer call(Integer i1, Integer i2) {

                       return i1 + i2;

                     }

                   });

             

        //             try {

               //             //为了访问http://192.168.1.101:4040/故此休眠暂停

                  //Thread.sleep(1000*4000);

       //} catch (InterruptedException e) {

      //// TODO Auto-generated catch block

               //e.printStackTrace();

       //}

          

             List<Tuple2<String, Integer>> output = wordCounts.collect();

                  for (Tuple2<?, ?> tuple : output) {

                   System.out.println(tuple._1() + ": " + tuple._2());

                  }

}

 

}

 

二、控制台日志分析

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties

17/02/17 20:07:30 INFO SparkContext: Running Spark version 1.6.0

17/02/17 20:07:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

17/02/17 20:07:31 INFO SecurityManager: Changing view acls to: Administrator

17/02/17 20:07:31 INFO SecurityManager: Changing modify acls to: Administrator

17/02/17 20:07:31 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(Administrator); users with modify permissions: Set(Administrator)

17/02/17 20:07:31 INFO Utils: Successfully started service 'sparkDriver' on port 2955.

17/02/17 20:07:32 INFO Slf4jLogger: Slf4jLogger started

17/02/17 20:07:32 INFO Remoting: Starting remoting

17/02/17 20:07:32 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@192.168.1.101:2968]

17/02/17 20:07:32 INFO Utils: Successfully started service 'sparkDriverActorSystem' on port 2968.

17/02/17 20:07:32 INFO SparkEnv: Registering MapOutputTracker

17/02/17 20:07:32 INFO SparkEnv: Registering BlockManagerMaster

17/02/17 20:07:32 INFO DiskBlockManager: Created local directory at C:\Documents and Settings\Administrator\Local Settings\Temp\blockmgr-8687bffc-358c-45de-9f90-bcf5f0279dd6

17/02/17 20:07:32 INFO MemoryStore: MemoryStore started with capacity 1311.0 MB

17/02/17 20:07:32 INFO SparkEnv: Registering OutputCommitCoordinator

17/02/17 20:07:32 INFO Utils: Successfully started service 'SparkUI' on port 4040.

17/02/17 20:07:32 INFO SparkUI: Started SparkUI at http://192.168.1.101:4040

17/02/17 20:07:32 INFO Executor: Starting executor ID driver on host localhost

17/02/17 20:07:32 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 2975.

17/02/17 20:07:32 INFO NettyBlockTransferService: Server created on 2975

17/02/17 20:07:32 INFO BlockManagerMaster: Trying to register BlockManager

17/02/17 20:07:32 INFO BlockManagerMasterEndpoint: Registering block manager localhost:2975 with 1311.0 MB RAM, BlockManagerId(driver, localhost, 2975)

17/02/17 20:07:32 INFO BlockManagerMaster: Registered BlockManager

17/02/17 20:07:33 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes

17/02/17 20:07:33 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 98.8 KB, free 98.8 KB)

17/02/17 20:07:33 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 9.8 KB, free 108.6 KB)

17/02/17 20:07:33 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:2975 (size: 9.8 KB, free: 1311.0 MB)

17/02/17 20:07:33 INFO SparkContext: Created broadcast 0 from textFile at Demo.java:34

17/02/17 20:07:35 WARN : Your hostname, MS-201609261921 resolves to a loopback/non-reachable address: 192.168.56.1, but we couldn't find any external IP address!

17/02/17 20:07:35 INFO FileInputFormat: Total input paths to process : 1

17/02/17 20:07:35 INFO SparkContext: Starting job: collect at Demo.java:54

17/02/17 20:07:35 INFO DAGScheduler: Registering RDD 3 (mapToPair at Demo.java:43)

17/02/17 20:07:35 INFO DAGScheduler: Got job 0 (collect at Demo.java:54) with 1 output partitions

17/02/17 20:07:35 INFO DAGScheduler: Final stage: ResultStage 1 (collect at Demo.java:54)

17/02/17 20:07:35 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)

17/02/17 20:07:35 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0)

17/02/17 20:07:35 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[3] at mapToPair at Demo.java:43), which has no missing parents

17/02/17 20:07:35 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 4.8 KB, free 113.4 KB)

17/02/17 20:07:35 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.7 KB, free 116.0 KB)

17/02/17 20:07:35 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:2975 (size: 2.7 KB, free: 1311.0 MB)

17/02/17 20:07:35 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006

17/02/17 20:07:35 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[3] at mapToPair at Demo.java:43)

17/02/17 20:07:35 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks

17/02/17 20:07:35 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0,PROCESS_LOCAL, 2150 bytes)

17/02/17 20:07:35 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)

17/02/17 20:07:35 INFO HadoopRDD: Input split: file:/F:/BigData_workspace/myspark/src/main/resources/1.txt:0+169

17/02/17 20:07:35 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id

17/02/17 20:07:35 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id

17/02/17 20:07:35 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap

17/02/17 20:07:35 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition

17/02/17 20:07:35 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id

17/02/17 20:07:35 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2253 bytes result sent to driver

17/02/17 20:07:35 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 141 ms on localhost (1/1)

17/02/17 20:07:35 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 

17/02/17 20:07:35 INFO DAGScheduler: ShuffleMapStage 0 (mapToPair at Demo.java:43) finished in 0.172 s

17/02/17 20:07:35 INFO DAGScheduler: looking for newly runnable stages

17/02/17 20:07:35 INFO DAGScheduler: running: Set()

17/02/17 20:07:35 INFO DAGScheduler: waiting: Set(ResultStage 1)

17/02/17 20:07:35 INFO DAGScheduler: failed: Set()

17/02/17 20:07:35 INFO DAGScheduler: Submitting ResultStage 1 (ShuffledRDD[4] at reduceByKey at Demo.java:48), which has no missing parents

17/02/17 20:07:35 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 2.9 KB, free 118.9 KB)

17/02/17 20:07:35 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 1748.0 B, free 120.6 KB)

17/02/17 20:07:35 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:2975 (size: 1748.0 B, free: 1311.0 MB)

17/02/17 20:07:35 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1006

17/02/17 20:07:35 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (ShuffledRDD[4] at reduceByKey at Demo.java:48)

17/02/17 20:07:35 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks

17/02/17 20:07:35 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, partition 0,NODE_LOCAL, 1894 bytes)

17/02/17 20:07:35 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)

17/02/17 20:07:35 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks

17/02/17 20:07:35 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 16 ms

17/02/17 20:07:35 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1739 bytes result sent to driver

17/02/17 20:07:35 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 31 ms on localhost (1/1)

17/02/17 20:07:35 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 

17/02/17 20:07:35 INFO DAGScheduler: ResultStage 1 (collect at Demo.java:54) finished in 0.031 s

17/02/17 20:07:35 INFO DAGScheduler: Job 0 finished: collect at Demo.java:54, took 0.401950 s

this: 1

Spark: 1

spark: 1

is: 3

LZ4: 1

general: 1

a: 3

fast: 2

Apache: 1

processing: 1

data: 1

large-scale: 1

very: 1

hello: 1

compression: 1

for: 1

and: 2

engine: 1

decompression: 1

algorithm.: 1

test: 1

world: 1

17/02/17 20:07:35 INFO SparkContext: Invoking stop() from shutdown hook

17/02/17 20:07:35 INFO SparkUI: Stopped Spark web UI at http://192.168.1.101:4040

17/02/17 20:07:35 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!

17/02/17 20:07:35 INFO MemoryStore: MemoryStore cleared

17/02/17 20:07:35 INFO BlockManager: BlockManager stopped

17/02/17 20:07:35 INFO BlockManagerMaster: BlockManagerMaster stopped

17/02/17 20:07:35 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!

17/02/17 20:07:35 INFO SparkContext: Successfully stopped SparkContext

17/02/17 20:07:35 INFO ShutdownHookManager: Shutdown hook called

17/02/17 20:07:35 INFO ShutdownHookManager: Deleting directory C:\Documents and Settings\Administrator\Local Settings\Temp\spark-11aa6dd4-c479-45ed-9929-c2ed7053d137

17/02/17 20:07:35 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.

 

三、错误解决方案

1)内存问题



 

2)Hadoop中util.exe没有可执行文件



 

三、结果验证



 四、Spark的WebUI



 

 

  • 大小: 110 KB
  • 大小: 57.8 KB
  • 大小: 62.7 KB
  • 大小: 82 KB
  • 大小: 53.6 KB
0
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics