`
jybbh
  • 浏览: 15570 次
  • 性别: Icon_minigender_1
  • 来自: 广州
社区版块
存档分类
最新评论

spark 连kafka_2.10-0.10.0.0

阅读更多
包:
chill_2.11-0.8.0.jar
commons-collections-3.2.2.jar
commons-configuration-1.6.jar
commons-lang-2.6.jar
commons-lang3-3.5.jar
commons-logging-1.1.3.jar
guava-14.0.1.jar
hadoop-auth-2.6.4.jar
hadoop-common-2.6.4.jar
jackson-annotations-2.6.5.jar
jackson-core-2.6.5.jar
jackson-databind-2.6.5.jar
jackson-module-paranamer-2.6.5.jar
jackson-module-scala_2.11-2.6.5.jar
javax.servlet-api-3.1.0.jar
javax.ws.rs-api-2.0.1.jar
jersey-container-servlet-core-2.22.2.jar
jersey-server-2.22.2.jar
json4s-ast_2.11-3.2.11.jar
kafka-clients-0.10.2.0.jar
kryo-shaded-3.0.3.jar
log4j-1.2.17.jar
lz4-1.3.0.jar
metrics-core-3.1.2.jar
metrics-json-3.1.2.jar
netty-all-4.0.42.Final.jar
paranamer-2.3.jar
scala-library-2.11.8.jar
scala-library.jar
slf4j-api-1.7.16.jar
slf4j-log4j12-1.7.16.jar
spark-core_2.11-2.1.0.jar
spark-launcher_2.11-2.1.0.jar
spark-network-common_2.11-2.1.0.jar
spark-network-shuffle_2.11-2.1.0.jar
spark-streaming-kafka-0-10_2.10-2.1.0.jar
spark-streaming_2.11-2.1.0.jar
spark-unsafe_2.11-2.1.0.jar
xbean-asm5-shaded-4.4.jar

PS:下面这个写法支持JDK1.7,即非lambda表达式写法
-----------------------------------------------------------
import java.util.*;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;

import org.apache.spark.streaming.kafka010.LocationStrategies;


public class PrintDirectMsgDirect {
  public static void main(String[] args) {
    try {
      SparkConf sparkConf = new SparkConf().setAppName("PrintDirectMsg").setMaster("local[2]");
      final JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(10000));

      String brokers = "localhost:9092";
      String topics = "test";
      Collection<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
      Map<String, Object> kafkaP = new HashMap<>();
      kafkaP.put("metadata.broker.list", brokers);
      kafkaP.put("bootstrap.servers", brokers);
      kafkaP.put("group.id", "group1");
      kafkaP.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      kafkaP.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      kafkaP.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

      JavaInputDStream<ConsumerRecord<Object, Object>> lines = KafkaUtils.createDirectStream(
              jssc,
              LocationStrategies.PreferConsistent(),
              ConsumerStrategies.Subscribe(topicsSet, kafkaP)
      );

      lines.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<Object, Object>>>(){
        @Override
        public void call(JavaRDD<ConsumerRecord<Object, Object>> consumerRecordJavaRDD) throws Exception {
          consumerRecordJavaRDD.foreach(new VoidFunction<ConsumerRecord<Object, Object>>(){
            @Override
            public void call(ConsumerRecord<Object, Object> objectObjectConsumerRecord) throws Exception {
              System.out.println(">>>>>>>>>>objectObjectConsumerRecord:"+objectObjectConsumerRecord.value()+"]");
            }
          });
        }
      });
//      lines.foreachRDD(rdd -> {
//        rdd.foreach(x -> {
//          System.out.println(">>>>>>>>>>>>>>>>x:" + x + "]");
//          System.out.println(">>>>>>>>>>>>>>>>x.value:" + x.value() + "]");
//        });
//      });
      jssc.start();
      jssc.awaitTermination();
      jssc.close();
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
}
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics