`

kafaka+spark+hdfs简单实例

 
阅读更多
1.spark的standalone搭建很简单,简单列出相应的配置文件.
vi spark-env.sh
#!/usr/bin/env bash
export SCALA_HOME=/opt/scala-2.10.3
export JAVA_HOME=/opt/jdk1.7.0_79
export SPARK_MASTER_IP=192.168.1.16
export SPARK_WORKER_INSTANCES=3
export SPARK_MASTER_PORT=7776
export SPARK_MASTER_WEBUI_PORT=7777
export SPARK_WORKER_PORT=7778
export SPARK_WORKER_MEMORY=5000m
export SPARK_JAVA_OPTS="-Dspark.cores.max=4"
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=192.168.1.16:2181,192.168.1.17:2181,192.168.1.18:2181 -Dspark.deploy.zookeeper.dir=/spark"
vi slaves
shaobao17
shaobao18
shaobao19
将配置好的spark,scp到相应的机器上。访问http://shaobao16:7777/会出现相应的spark界面。
2kafak配置
vi server.properties
broker.id=16
port=9092
log.dirs=/tmp/kafka-logs16
zookeeper.connect=shaobao16:2181,shaobao17:2181,shaobao18:2181
各个机器上的broker.id,port,log.dirs是不同的,修改相应的配置
3创建topic mykafka(创建过程见kafka官网有详细的说明)
启动topic
bin/kafka-console-producer.sh --broker-list shaobao16:9092 --topic mykafka
出现以下信息
[2015-05-07 18:30:57,761] WARN Property topic is not valid (kafka.utils.VerifiableProperties)
此处处于阻塞状态,输入相应的信息;
sfsfsf
gdgdg
dgdgd
sfsf
aadfsfsfsdfsf
sffsfsfsfsf
sfsdfsfsf
111111111111111111111111111111111111111111111111111
sfsfsfs
sfsfsfsdfsdfsfsfsf5555555555555555555555555555
hello world a is big city
he is a good man
she is a big man
he is a big city
oh my god ^H
ik ok thank your
he is a big man
ok thankyour and your
he i a name
he is a big man he
he is a storm is he a storm ok tsfsf
he is a big man
he is a big man ok ,l kike he
sdfsfsdf  id   id s fs
he is a big man ok he is a bi city
he is bifs sdfsf id he is
he si big a the is sfs
he is a big man hei is a big city
he is abig man ,o k 123 234 123 234
aaaaaaaaa aaaaa bbbbbbb
he is a big man ok , 11 22 1 21 1 2
he is a sfs sfsfsf sdf sfsd sfsfdsd sf sd fsd fs f fsdfsdf sd fs ff fsdf ds f  fsfsdf sf 1 1  1 1 1 1 1 1 1 1 1 1  1 1 1 1 1 1 1 1 1 1 1 1 1   1 1 1  1 1 1 1 1 
^[[B^[[B^[[B^[[B^[[B^[[B^[[B
编写处理这些信息的jar
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements.  See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License.  You may obtain a copy of the License at
*
*    http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.examples.streaming;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.regex.Pattern;

import kafka.serializer.StringDecoder;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;

import scala.Tuple2;

/**
* Consumes messages from one or more topics in Kafka and does wordcount.
* Usage: DirectKafkaWordCount <brokers> <topics>
*   <brokers> is a list of one or more Kafka brokers
*   <topics> is a list of one or more kafka topics to consume from
*
* Example:
*    $ bin/run-example streaming.KafkaWordCount broker1-host:port,broker2-host:port topic1,topic2
*/

public final class JavaDirectKafkaWordCount {
  private static final Pattern SPACE = Pattern.compile(" ");

  public static void main(String[] args) {
  System.out.println("---------------aaaaaaaa-------------");
    if (args.length < 2) {
      System.err.println("Usage: DirectKafkaWordCount <brokers> <topics>\n" +
          "  <brokers> is a list of one or more Kafka brokers\n" +
          "  <topics> is a list of one or more kafka topics to consume from\n\n");
      System.exit(1);
    }

    //StreamingExamples.setStreamingLogLevels();

    String brokers = args[0];
    String topics = args[1];

    // Create context with 2 second batch interval
    SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount");
    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));

    HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics.split(",")));
    HashMap<String, String> kafkaParams = new HashMap<String, String>();
    kafkaParams.put("metadata.broker.list", brokers);
    System.out.println("----------------bbbbbbbbbbb------------------");
    // Create direct kafka stream with brokers and topics
    JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
        jssc,
        String.class,
        String.class,
        StringDecoder.class,
        StringDecoder.class,
        kafkaParams,
        topicsSet
    );
    System.out.println("-----------cccccccccccc-------------");
    // Get the lines, split them into words, count the words and print
    JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
      @Override
      public String call(Tuple2<String, String> tuple2) {
        return tuple2._2();
      }
    });
    System.out.println("-----------dddddddddd-------------");
    JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
      @Override
      public Iterable<String> call(String x) {
    String arr[] =  SPACE.split(x);
    return Arrays.asList(arr);
       // return Lists.newArrayList(SPACE.split(x));
      }
    });
    System.out.println("------------eeeeeeeeeeeeee-----------");
    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
      new PairFunction<String, String, Integer>() {
        @Override
        public Tuple2<String, Integer> call(String s) {
          return new Tuple2<String, Integer>(s, 1);
        }
      }).reduceByKey(
        new Function2<Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer i1, Integer i2) {
          return i1 + i2;
        }
      });
    System.out.println("------------ffffffffffffff--------------");
    System.out.println( wordCounts.count()+"           is ok");
    wordCounts.print();
   // wordCounts.saveAsHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass);
    wordCounts.foreachRDD(new Function<JavaPairRDD<String,Integer>, Void>() {
@Override
public Void call(JavaPairRDD<String, Integer> arg0) throws Exception {
// TODO Auto-generated method stub
arg0.saveAsObjectFile("hdfs://192.168.1.16:9000/sort/price");
return null;
}
});
   // wordCounts.saveAsHadoopFiles("one", "one");
    System.out.println("------------hhhhhhhhhhh--------------");
    // Start the computation
    jssc.start();
    System.out.println("------------jjjjjjjjjjjjjjjjjjjj-------------");
    jssc.awaitTermination();
    System.out.println("------------iiiiiiiiiiiiiii-------------");
  }
}
启动spark-submit
./bin/spark-submit --master spark://192.168.1.16:7776 --class org.apache.spark.examples.streaming.JavaDirectKafkaWordCount  --name kafka  --executor-memory 400M --driver-memory 512M  --jars /opt/spark-1.3.1-bin-hadoop2.6/lib/spark-assembly-1.3.1-hadoop2.6.0.jar --jars /opt/spark-1.3.1-bin-hadoop2.6/lib/spark-examples-1.3.1-hadoop2.6.0.jar  /opt/resource/kafkawc.jar shaobao16:9092,shaobao17:9093,shaobao18:9094 mykafka
此时sparkStream一直在运行着,两秒提交一次job
-------------------------------------------
Time: 1431146040000 ms
-------------------------------------------

15/05/08 21:34:00 INFO JobScheduler: Finished job streaming job 1431146040000 ms.0 from job set of time 1431146040000 ms
15/05/08 21:34:00 INFO JobScheduler: Starting job streaming job 1431146040000 ms.1 from job set of time 1431146040000 ms
15/05/08 21:34:00 INFO SequenceFileRDDFunctions: Saving as sequence file of type (NullWritable,BytesWritable)
15/05/08 21:34:00 INFO SparkContext: Starting job: foreachRDD at JavaDirectKafkaWordCount.java:121
15/05/08 21:34:00 INFO DAGScheduler: Got job 9116 (foreachRDD at JavaDirectKafkaWordCount.java:121) with 2 output partitions (allowLocal=false)
15/05/08 21:34:00 INFO DAGScheduler: Final stage: Stage 18233(foreachRDD at JavaDirectKafkaWordCount.java:121)
15/05/08 21:34:00 INFO DAGScheduler: Parents of final stage: List(Stage 18232)
15/05/08 21:34:00 INFO DAGScheduler: Missing parents: List()
15/05/08 21:34:00 INFO DAGScheduler: Submitting Stage 18233 (MapPartitionsRDD[21272] at foreachRDD at JavaDirectKafkaWordCount.java:121), which has no missing parents
15/05/08 21:34:00 INFO MemoryStore: ensureFreeSpace(127608) called with curMem=3297084, maxMem=278302556
15/05/08 21:34:00 INFO MemoryStore: Block broadcast_12155 stored as values in memory (estimated size 124.6 KB, free 262.1 MB)
15/05/08 21:34:00 INFO MemoryStore: ensureFreeSpace(76638) called with curMem=3424692, maxMem=278302556
15/05/08 21:34:00 INFO MemoryStore: Block broadcast_12155_piece0 stored as bytes in memory (estimated size 74.8 KB, free 262.1 MB)
15/05/08 21:34:00 INFO BlockManagerInfo: Added broadcast_12155_piece0 in memory on shaobao16:52385 (size: 74.8 KB, free: 264.1 MB)
15/05/08 21:34:00 INFO BlockManagerMaster: Updated info of block broadcast_12155_piece0
15/05/08 21:34:00 INFO SparkContext: Created broadcast 12155 from broadcast at DAGScheduler.scala:839
15/05/08 21:34:00 INFO DAGScheduler: Submitting 2 missing tasks from Stage 18233 (MapPartitionsRDD[21272] at foreachRDD at JavaDirectKafkaWordCount.java:121)
15/05/08 21:34:00 INFO TaskSchedulerImpl: Adding task set 18233.0 with 2 tasks
15/05/08 21:34:00 INFO TaskSetManager: Starting task 0.0 in stage 18233.0 (TID 15193, shaobao19, PROCESS_LOCAL, 1186 bytes)
15/05/08 21:34:00 INFO TaskSetManager: Starting task 1.0 in stage 18233.0 (TID 15194, shaobao17, PROCESS_LOCAL, 1186 bytes)
15/05/08 21:34:00 INFO BlockManagerInfo: Added broadcast_12155_piece0 in memory on shaobao19:38410 (size: 74.8 KB, free: 207.0 MB)
15/05/08 21:34:00 INFO BlockManagerInfo: Added broadcast_12155_piece0 in memory on shaobao17:41922 (size: 74.8 KB, free: 206.7 MB)
15/05/08 21:34:00 INFO MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 3038 to sparkExecutor@shaobao19:45578
15/05/08 21:34:00 INFO TaskSetManager: Finished task 1.0 in stage 18233.0 (TID 15194) in 82 ms on shaobao17 (1/2)
15/05/08 21:34:00 INFO TaskSetManager: Finished task 0.0 in stage 18233.0 (TID 15193) in 87 ms on shaobao19 (2/2)
15/05/08 21:34:00 INFO TaskSchedulerImpl: Removed TaskSet 18233.0, whose tasks have all completed, from pool
15/05/08 21:34:00 INFO DAGScheduler: Stage 18233 (foreachRDD at JavaDirectKafkaWordCount.java:121) finished in 0.088 s
15/05/08 21:34:00 INFO DAGScheduler: Job 9116 finished: foreachRDD at JavaDirectKafkaWordCount.java:121, took 0.109517 s
15/05/08 21:34:00 INFO JobScheduler: Finished job streaming job 1431146040000 ms.1 from job set of time 1431146040000 ms
15/05/08 21:34:00 INFO JobScheduler: Total delay: 0.235 s for time 1431146040000 ms (execution: 0.225 s)
15/05/08 21:34:00 INFO ShuffledRDD: Removing RDD 21263 from persistence list
15/05/08 21:34:00 INFO BlockManager: Removing RDD 21263
15/05/08 21:34:00 INFO MapPartitionsRDD: Removing RDD 21262 from persistence list
15/05/08 21:34:00 INFO BlockManager: Removing RDD 21262
15/05/08 21:34:00 INFO MapPartitionsRDD: Removing RDD 21261 from persistence list
15/05/08 21:34:00 INFO BlockManager: Removing RDD 21261
15/05/08 21:34:00 INFO MapPartitionsRDD: Removing RDD 21260 from persistence list
15/05/08 21:34:00 INFO BlockManager: Removing RDD 21260
15/05/08 21:34:00 INFO KafkaRDD: Removing RDD 21259 from persistence list
15/05/08 21:34:00 INFO BlockManager: Removing RDD 21259
15/05/08 21:34:00 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer()
15/05/08 21:34:02 INFO JobScheduler: Added jobs for time 1431146042000 ms
15/05/08 21:34:02 INFO SparkContext: Starting job: print at JavaDirectKafkaWordCount.java:120
15/05/08 21:34:02 INFO JobScheduler: Starting job streaming job 1431146042000 ms.0 from job set of time 1431146042000 ms
15/05/08 21:34:02 INFO DAGScheduler: Registering RDD 21276 (mapToPair at JavaDirectKafkaWordCount.java:105)
15/05/08 21:34:02 INFO DAGScheduler: Got job 9117 (print at JavaDirectKafkaWordCount.java:120) with 1 output partitions (allowLocal=true)
15/05/08 21:34:02 INFO DAGScheduler: Final stage: Stage 18235(print at JavaDirectKafkaWordCount.java:120)
15/05/08 21:34:02 INFO DAGScheduler: Parents of final stage: List(Stage 18234)
15/05/08 21:34:02 INFO DAGScheduler: Missing parents: List(Stage 18234)
15/05/08 21:34:02 INFO DAGScheduler: Submitting Stage 18234 (MapPartitionsRDD[21276] at mapToPair at JavaDirectKafkaWordCount.java:105), which has no missing parents
15/05/08 21:34:02 INFO MemoryStore: ensureFreeSpace(4640) called with curMem=3501330, maxMem=278302556
15/05/08 21:34:02 INFO MemoryStore: Block broadcast_12156 stored as values in memory (estimated size 4.5 KB, free 262.1 MB)
15/05/08 21:34:02 INFO MemoryStore: ensureFreeSpace(3235) called with curMem=3505970, maxMem=278302556
15/05/08 21:34:02 INFO MemoryStore: Block broadcast_12156_piece0 stored as bytes in memory (estimated size 3.2 KB, free 262.1 MB)
15/05/08 21:34:02 INFO BlockManagerInfo: Added broadcast_12156_piece0 in memory on shaobao16:52385 (size: 3.2 KB, free: 264.1 MB)
15/05/08 21:34:02 INFO BlockManagerMaster: Updated info of block broadcast_12156_piece0
15/05/08 21:34:02 INFO SparkContext: Created broadcast 12156 from broadcast at DAGScheduler.scala:839
15/05/08 21:34:02 INFO DAGScheduler: Submitting 1 missing tasks from Stage 18234 (MapPartitionsRDD[21276] at mapToPair at JavaDirectKafkaWordCount.java:105)
15/05/08 21:34:02 INFO TaskSchedulerImpl: Adding task set 18234.0 with 1 tasks
15/05/08 21:34:02 INFO TaskSetManager: Starting task 0.0 in stage 18234.0 (TID 15195, shaobao17, ANY, 1291 bytes)
15/05/08 21:34:02 INFO BlockManagerInfo: Added broadcast_12156_piece0 in memory on shaobao17:55250 (size: 3.2 KB, free: 207.0 MB)
15/05/08 21:34:02 INFO TaskSetManager: Finished task 0.0 in stage 18234.0 (TID 15195) in 15 ms on shaobao17 (1/1)
15/05/08 21:34:02 INFO TaskSchedulerImpl: Removed TaskSet 18234.0, whose tasks have all completed, from pool
15/05/08 21:34:02 INFO DAGScheduler: Stage 18234 (mapToPair at JavaDirectKafkaWordCount.java:105) finished in 0.016 s
15/05/08 21:34:02 INFO DAGScheduler: looking for newly runnable stages
15/05/08 21:34:02 INFO DAGScheduler: running: Set()
15/05/08 21:34:02 INFO DAGScheduler: waiting: Set(Stage 18235)
15/05/08 21:34:02 INFO DAGScheduler: failed: Set()
15/05/08 21:34:02 INFO DAGScheduler: Missing parents for Stage 18235: List()
15/05/08 21:34:02 INFO DAGScheduler: Submitting Stage 18235 (ShuffledRDD[21277] at reduceByKey at JavaDirectKafkaWordCount.java:111), which is now runnable
15/05/08 21:34:02 INFO MemoryStore: ensureFreeSpace(2296) called with curMem=3509205, maxMem=278302556
15/05/08 21:34:02 INFO MemoryStore: Block broadcast_12157 stored as values in memory (estimated size 2.2 KB, free 262.1 MB)
15/05/08 21:34:02 INFO MemoryStore: ensureFreeSpace(1702) called with curMem=3511501, maxMem=278302556
15/05/08 21:34:02 INFO MemoryStore: Block broadcast_12157_piece0 stored as bytes in memory (estimated size 1702.0 B, free 262.1 MB)
15/05/08 21:34:02 INFO BlockManagerInfo: Added broadcast_12157_piece0 in memory on shaobao16:52385 (size: 1702.0 B, free: 264.1 MB)
15/05/08 21:34:02 INFO BlockManagerMaster: Updated info of block broadcast_12157_piece0
15/05/08 21:34:02 INFO SparkContext: Created broadcast 12157 from broadcast at DAGScheduler.scala:839
15/05/08 21:34:02 INFO DAGScheduler: Submitting 1 missing tasks from Stage 18235 (ShuffledRDD[21277] at reduceByKey at JavaDirectKafkaWordCount.java:111)
15/05/08 21:34:02 INFO TaskSchedulerImpl: Adding task set 18235.0 with 1 tasks
15/05/08 21:34:02 INFO TaskSetManager: Starting task 0.0 in stage 18235.0 (TID 15196, shaobao17, PROCESS_LOCAL, 1186 bytes)
15/05/08 21:34:02 INFO BlockManagerInfo: Added broadcast_12157_piece0 in memory on shaobao17:55250 (size: 1702.0 B, free: 207.0 MB)
15/05/08 21:34:02 INFO MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 3039 to sparkExecutor@shaobao17:35431
15/05/08 21:34:02 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 3039 is 138 bytes
15/05/08 21:34:02 INFO TaskSetManager: Finished task 0.0 in stage 18235.0 (TID 15196) in 17 ms on shaobao17 (1/1)
15/05/08 21:34:02 INFO TaskSchedulerImpl: Removed TaskSet 18235.0, whose tasks have all completed, from pool
15/05/08 21:34:02 INFO DAGScheduler: Stage 18235 (print at JavaDirectKafkaWordCount.java:120) finished in 0.017 s
15/05/08 21:34:02 INFO DAGScheduler: Job 9117 finished: print at JavaDirectKafkaWordCount.java:120, took 0.043544 s
15/05/08 21:34:02 INFO SparkContext: Starting job: print at JavaDirectKafkaWordCount.java:120
15/05/08 21:34:02 INFO DAGScheduler: Got job 9118 (print at JavaDirectKafkaWordCount.java:120) with 1 output partitions (allowLocal=true)
15/05/08 21:34:02 INFO DAGScheduler: Final stage: Stage 18237(print at JavaDirectKafkaWordCount.java:120)
15/05/08 21:34:02 INFO DAGScheduler: Parents of final stage: List(Stage 18236)
15/05/08 21:34:02 INFO DAGScheduler: Missing parents: List()
15/05/08 21:34:02 INFO DAGScheduler: Submitting Stage 18237 (ShuffledRDD[21277] at reduceByKey at JavaDirectKafkaWordCount.java:111), which has no missing parents
15/05/08 21:34:02 INFO MemoryStore: ensureFreeSpace(2296) called with curMem=3513203, maxMem=278302556
15/05/08 21:34:02 INFO MemoryStore: Block broadcast_12158 stored as values in memory (estimated size 2.2 KB, free 262.1 MB)
15/05/08 21:34:02 INFO MemoryStore: ensureFreeSpace(1702) called with curMem=3515499, maxMem=278302556
15/05/08 21:34:02 INFO MemoryStore: Block broadcast_12158_piece0 stored as bytes in memory (estimated size 1702.0 B, free 262.1 MB)
15/05/08 21:34:02 INFO BlockManagerInfo: Added broadcast_12158_piece0 in memory on shaobao16:52385 (size: 1702.0 B, free: 264.1 MB)
15/05/08 21:34:02 INFO BlockManagerMaster: Updated info of block broadcast_12158_piece0
15/05/08 21:34:02 INFO SparkContext: Created broadcast 12158 from broadcast at DAGScheduler.scala:839
15/05/08 21:34:02 INFO DAGScheduler: Submitting 1 missing tasks from Stage 18237 (ShuffledRDD[21277] at reduceByKey at JavaDirectKafkaWordCount.java:111)
15/05/08 21:34:02 INFO TaskSchedulerImpl: Adding task set 18237.0 with 1 tasks
15/05/08 21:34:02 INFO TaskSetManager: Starting task 0.0 in stage 18237.0 (TID 15197, shaobao17, PROCESS_LOCAL, 1186 bytes)
15/05/08 21:34:02 INFO BlockManagerInfo: Added broadcast_12158_piece0 in memory on shaobao17:41922 (size: 1702.0 B, free: 206.7 MB)
15/05/08 21:34:02 INFO MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 3039 to sparkExecutor@shaobao17:41112
15/05/08 21:34:02 INFO TaskSetManager: Finished task 0.0 in stage 18237.0 (TID 15197) in 16 ms on shaobao17 (1/1)
15/05/08 21:34:02 INFO TaskSchedulerImpl: Removed TaskSet 18237.0, whose tasks have all completed, from pool
15/05/08 21:34:02 INFO DAGScheduler: Stage 18237 (print at JavaDirectKafkaWordCount.java:120) finished in 0.017 s
15/05/08 21:34:02 INFO DAGScheduler: Job 9118 finished: print at JavaDirectKafkaWordCount.java:120, took 0.021178 s
-------------------------------------------
Time: 1431146042000 ms
-------------------------------------------

15/05/08 21:34:02 INFO JobScheduler: Finished job streaming job 1431146042000 ms.0 from job set of time 1431146042000 ms
15/05/08 21:34:02 INFO JobScheduler: Starting job streaming job 1431146042000 ms.1 from job set of time 1431146042000 ms
15/05/08 21:34:02 INFO SequenceFileRDDFunctions: Saving as sequence file of type (NullWritable,BytesWritable)
15/05/08 21:34:02 INFO SparkContext: Starting job: foreachRDD at JavaDirectKafkaWordCount.java:121
15/05/08 21:34:02 INFO DAGScheduler: Got job 9119 (foreachRDD at JavaDirectKafkaWordCount.java:121) with 2 output partitions (allowLocal=false)
15/05/08 21:34:02 INFO DAGScheduler: Final stage: Stage 18239(foreachRDD at JavaDirectKafkaWordCount.java:121)
15/05/08 21:34:02 INFO DAGScheduler: Parents of final stage: List(Stage 18238)
15/05/08 21:34:02 INFO DAGScheduler: Missing parents: List()
15/05/08 21:34:02 INFO DAGScheduler: Submitting Stage 18239 (MapPartitionsRDD[21279] at foreachRDD at JavaDirectKafkaWordCount.java:121), which has no missing parents
^C15/05/08 21:34:02 INFO MemoryStore: ensureFreeSpace(127608) called with curMem=3517201, maxMem=278302556
15/05/08 21:34:02 INFO MemoryStore: Block broadcast_12159 stored as values in memory (estimated size 124.6 KB, free 261.9 MB)
15/05/08 21:34:02 INFO MemoryStore: ensureFreeSpace(76638) called with curMem=3644809, maxMem=278302556
15/05/08 21:34:02 INFO MemoryStore: Block broadcast_12159_piece0 stored as bytes in memory (estimated size 74.8 KB, free 261.9 MB)
15/05/08 21:34:02 INFO BlockManagerInfo: Added broadcast_12159_piece0 in memory on shaobao16:52385 (size: 74.8 KB, free: 264.0 MB)
15/05/08 21:34:02 INFO BlockManagerMaster: Updated info of block broadcast_12159_piece0
15/05/08 21:34:02 INFO SparkContext: Created broadcast 12159 from broadcast at DAGScheduler.scala:839
15/05/08 21:34:02 INFO DAGScheduler: Submitting 2 missing tasks from Stage 18239 (MapPartitionsRDD[21279] at foreachRDD at JavaDirectKafkaWordCount.java:121)
15/05/08 21:34:02 INFO TaskSchedulerImpl: Adding task set 18239.0 with 2 tasks
15/05/08 21:34:02 INFO TaskSetManager: Starting task 0.0 in stage 18239.0 (TID 15198, shaobao19, PROCESS_LOCAL, 1186 bytes)
15/05/08 21:34:02 INFO TaskSetManager: Starting task 1.0 in stage 18239.0 (TID 15199, shaobao19, PROCESS_LOCAL, 1186 bytes)
15/05/08 21:34:02 INFO BlockManagerInfo: Added broadcast_12159_piece0 in memory on shaobao19:48399 (size: 74.8 KB, free: 206.7 MB)
15/05/08 21:34:02 INFO BlockManagerInfo: Added broadcast_12159_piece0 in memory on shaobao19:38410 (size: 74.8 KB, free: 206.9 MB)
15/05/08 21:34:02 INFO MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 3039 to sparkExecutor@shaobao19:52686
15/05/08 21:34:02 INFO MapOutputTrackerMasterActor: Asked to send map output locations
分享到:
评论

相关推荐

    spring4+hibernate5+quartz2+hdfs整合

    "spring4+hibernate5+quartz2+hdfs整合"的主题,就是将四个关键的技术组件整合在一起,以实现高效、可扩展的数据处理和任务调度解决方案。以下是这些技术组件的核心知识点: **Spring 4** Spring 是一个开源的Java...

    kafaka+hive.rar

    标题中的“kafaka+hive.rar”表明这是一个关于Kafka和Hive整合的压缩文件,主要探讨如何在大数据处理环境中将消息队列系统Kafka与数据仓库工具Hive相结合,以实现更高效的数据流动和分析。Hive是Apache Hadoop项目的...

    Python+Spark 2.0+Hadoop机器学习与大数据

    在Python和Spark中,可以通过Hadoop的Python接口(PyDoop)或者使用Spark的Hadoop兼容层与HDFS进行交互,实现对Hadoop集群的数据访问。 书中可能会涵盖以下内容: 1. Hadoop生态系统的介绍,包括HDFS、MapReduce、...

    Hadoop+Spark+Hive+HBase+Oozie+Kafka+Flume+Flink+ES+Redash等详细安装部署

    在大数据领域,构建一个完整的生态系统是至关重要的,其中包括多个组件,如Hadoop、Spark、Hive、HBase、Oozie、Kafka、Flume、Flink、Elasticsearch和Redash。这些组件协同工作,提供了数据存储、处理、调度、流...

    基于Flume+kafka+spark大型电商网站日志分析系统(离线+实时).zip

    - **Spark SQL**:用于离线分析,可以方便地执行SQL查询,对接多种数据源,包括HDFS、Hive、Cassandra等,帮助我们从海量日志中提取关键信息,例如用户购买行为、热门商品等。 - **Spark Streaming**:处理实时...

    基于scala语言的spark操作,包含连接操作mysql,连接hdfs+源代码+文档说明

    - 不懂运行,下载完可以私聊问,可远程教学 该资源内项目源码是个人的毕设,代码都测试ok,都是运行成功后才上传资源,答辩评审平均分达到96分,放心下载使用! &lt;项目介绍&gt; 1、该资源内项目代码都经过测试运行成功,...

    lamp安装配置及flume+Kafka+Storm+HDFS实时系统搭分享

    《LNMP环境构建与Flume+Kafka+Storm+HDFS实时系统集成详解》 在当前的互联网时代,数据量的急剧增长使得大数据处理成为一项关键任务。本篇将深入探讨如何在Linux环境下搭建LNMP(Linux + Nginx + MySQL + PHP)...

    基于springboot+netty+redis+hdfs实现的一个线上分布式网盘系统,毕业设计项目,个人学习,课设等均可

    基于springboot+netty+redis+hdfs实现的一个线上分布式网盘系统,毕业设计项目,个人学习,课设等均可,项目经过测试,可完美运行! 项目结构 项目主要分为一下几个模块 web-service 主要功能是页面展示,交互 ...

    大数据Hadoop+HBase+Spark+Hive集群搭建教程(七月在线)1

    在构建大数据处理环境时,Hadoop、HBase、Spark和Hive是四个核心组件,它们协同工作以实现高效的数据存储、处理和分析。本教程将详细介绍如何在Ubuntu系统上搭建这些组件的集群。 1. **Hadoop**:Hadoop是Apache...

    3-7+快手EB级HDFS挑战与实践.pdf

    7. **数据分析与处理**:在HDFS上进行大数据分析,需要考虑如何高效地进行MapReduce任务调度、使用Spark等并行计算框架,以及优化I/O性能。 8. **成本控制**:在保证性能和服务质量的同时,如何有效降低成本,比如...

    hdfs+spark安装指南

    ### HDFS + Spark 安装部署知识点 #### HDFS安装部署 ##### 目的 本文档旨在指导读者完成Hadoop伪分布式环境下HDFS的安装与部署。通过本指南的学习,您将能够掌握如何搭建一个基本的HDFS集群,并了解相关的配置步骤...

    基于Hadoop+Spark招聘推荐可视化系统 大数据项目 毕业设计(源码下载)

    数据存储与处理:系统利用Hadoop分布式文件系统(HDFS)存储采集到的招聘数据,并使用Hadoop生态圈中的工具(如Hive、HBase等)进行数据处理和管理。Spark作为数据处理引擎,提供高性能的批处理和实时计算能力,对...

    项目源码:基于Hadoop+Spark招聘推荐可视化系统 大数据项目

    数据存储与处理:系统利用Hadoop分布式文件系统(HDFS)存储采集到的招聘数据,并使用Hadoop生态圈中的工具(如Hive、HBase等)进行数据处理和管理。Spark作为数据处理引擎,提供高性能的批处理和实时计算能力,对...

    毕业设计-基于Hadoop+Spark的大数据金融信贷风险控系统源码.zip

    这些步骤通常涉及MapReduce模型,使用Hadoop的分布式文件系统(HDFS)进行数据存储,并利用Spark进行高效计算。 4. **金融信贷风险评估**:在金融领域,信贷风险评估是决定是否批准贷款申请的关键环节。该系统可能...

    Hadoop+Hive+Spark+Kafka+Zookeeper+Flume+Sqoop+Azkaban+Scala

    分布式文件存储系统:HDFS 分布式计算框架:MapReduce 集群资源管理器:YARN 单机伪集群环境搭建 集群环境搭建 常用 Shell 命令 Java API 的使用 基于 Zookeeper 搭建 Hadoop 高可用集群 二、Hive 简介及核心概念 ...

    《Python+Spark2.0+Hadoop机器学习与大数据实战》练习.zip

    《Python+Spark2.0+Hadoop机器学习与大数据实战》是一本深入探讨大数据处理和机器学习技术的书籍。在本书的练习部分,作者通过实际案例帮助读者掌握Python、Spark 2.0以及Hadoop的核心概念和技术。这些技术是当前大...

    使用maven+spring做的HDFS的WEB项目

    【标题】:“使用maven+spring做的HDFS的WEB项目” 该项目是一个基于Maven和Spring框架构建的Web应用程序,主要用于实现Hadoop分布式文件系统(HDFS)的前端操作功能,包括文件的上传、下载、删除以及目录的浏览。...

    基于Sqoop+Hive+Spark+MySQL+AirFlow+Grafana的工业大数据离线数仓项目

    在大数据项目中,AirFlow能确保数据从收集到分析的整个流程自动化、有条不紊地运行,例如设置定时任务从MySQL抽取数据,用Sqoop导入HDFS,然后启动Hive和Spark作业进行分析。 最后,Grafana是一个强大的可视化工具...

    徐老师大数据培训Hadoop+HBase+ZooKeeper+Spark+Kafka+Scala+Ambari

    根据提供的标题、描述、标签及部分内容链接,我们可以推断出这是一个关于大数据技术栈的培训课程,涉及的技术包括Hadoop、HBase、Zookeeper、Spark、Kafka、Scala以及Ambari。下面将针对这些技术进行详细的介绍和...

    eclipse集成hadoop+spark+hive开发源码实例

    在Windows操作系统中,Eclipse是一款广泛使用的Java集成开发环境(IDE),它对于开发分布式计算框架如Hadoop、Spark以及数据仓库工具Hive来说,是非常有用的。本教程将深入讲解如何在Eclipse中集成这些组件,以创建...

Global site tag (gtag.js) - Google Analytics