包:
chill_2.11-0.8.0.jar
commons-collections-3.2.2.jar
commons-configuration-1.6.jar
commons-lang-2.6.jar
commons-lang3-3.5.jar
commons-logging-1.1.3.jar
guava-14.0.1.jar
hadoop-auth-2.6.4.jar
hadoop-common-2.6.4.jar
jackson-annotations-2.6.5.jar
jackson-core-2.6.5.jar
jackson-databind-2.6.5.jar
jackson-module-paranamer-2.6.5.jar
jackson-module-scala_2.11-2.6.5.jar
javax.servlet-api-3.1.0.jar
javax.ws.rs-api-2.0.1.jar
jersey-container-servlet-core-2.22.2.jar
jersey-server-2.22.2.jar
json4s-ast_2.11-3.2.11.jar
kafka-clients-0.10.2.0.jar
kryo-shaded-3.0.3.jar
log4j-1.2.17.jar
lz4-1.3.0.jar
metrics-core-3.1.2.jar
metrics-json-3.1.2.jar
netty-all-4.0.42.Final.jar
paranamer-2.3.jar
scala-library-2.11.8.jar
scala-library.jar
slf4j-api-1.7.16.jar
slf4j-log4j12-1.7.16.jar
spark-core_2.11-2.1.0.jar
spark-launcher_2.11-2.1.0.jar
spark-network-common_2.11-2.1.0.jar
spark-network-shuffle_2.11-2.1.0.jar
spark-streaming-kafka-0-10_2.10-2.1.0.jar
spark-streaming_2.11-2.1.0.jar
spark-unsafe_2.11-2.1.0.jar
xbean-asm5-shaded-4.4.jar
PS:下面这个写法支持JDK1.7,即非lambda表达式写法
-----------------------------------------------------------
import java.util.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
public class PrintDirectMsgDirect {
public static void main(String[] args) {
try {
SparkConf sparkConf = new SparkConf().setAppName("PrintDirectMsg").setMaster("local[2]");
final JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(10000));
String brokers = "localhost:9092";
String topics = "test";
Collection<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
Map<String, Object> kafkaP = new HashMap<>();
kafkaP.put("metadata.broker.list", brokers);
kafkaP.put("bootstrap.servers", brokers);
kafkaP.put("group.id", "group1");
kafkaP.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaP.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaP.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
JavaInputDStream<ConsumerRecord<Object, Object>> lines = KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topicsSet, kafkaP)
);
lines.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<Object, Object>>>(){
@Override
public void call(JavaRDD<ConsumerRecord<Object, Object>> consumerRecordJavaRDD) throws Exception {
consumerRecordJavaRDD.foreach(new VoidFunction<ConsumerRecord<Object, Object>>(){
@Override
public void call(ConsumerRecord<Object, Object> objectObjectConsumerRecord) throws Exception {
System.out.println(">>>>>>>>>>objectObjectConsumerRecord:"+objectObjectConsumerRecord.value()+"]");
}
});
}
});
// lines.foreachRDD(rdd -> {
// rdd.foreach(x -> {
// System.out.println(">>>>>>>>>>>>>>>>x:" + x + "]");
// System.out.println(">>>>>>>>>>>>>>>>x.value:" + x.value() + "]");
// });
// });
jssc.start();
jssc.awaitTermination();
jssc.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
分享到:
相关推荐
赠送jar包:kafka-clients-0.10.0.1.jar; 赠送原API文档:kafka-clients-0.10.0.1-javadoc.jar; 赠送源代码:kafka-clients-0.10.0.1-sources.jar; 赠送Maven依赖信息文件:kafka-clients-0.10.0.1.pom; 包含...
kafka_2.11-0.10.1.0.tgzKafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素...
kafka_2.11-0.10.2.0 版解压使用 kafka kafka kafka kafka kafka
赠送jar包:kafka-clients-0.9.0.0.jar; 赠送原API文档:kafka-clients-0.9.0.0-javadoc.jar; 赠送源代码:kafka-clients-0.9.0.0-sources.jar; 赠送Maven依赖信息文件:kafka-clients-0.9.0.0.pom; 包含翻译后...
赠送jar包:kafka-clients-0.10.0.1.jar; 赠送原API文档:kafka-clients-0.10.0.1-javadoc.jar; 赠送源代码:kafka-clients-0.10.0.1-sources.jar; 包含翻译后的API文档:kafka-clients-0.10.0.1-javadoc-API...
KAFKA-3.0.0-1.3.0.0.p0.40-el7.parcel KAFKA-3.0.0-1.3.0.0.p0.40-el7.parcel.sha1 manifest.json
kafka_2.10-0.10.0.1.tgz
kafka_2.10-0.8.2.1.tgz安装包,如果需要请下载
kafka_2.9.2-0.8.2.1.tgz 安装 liunx环境 ... 2) 删除kafka存储目录(server.properties文件log.dirs配置,默认为"/tmp/kafka-logs")相关topic目录删除zookeeper "/brokers/topics/"目录下相关topic节点
spark-streaming-kafka-0-8_2.11-2.4.0.jar
kafka-clients-0.10.2.1.jar
spark-streaming-kafka-0-8-assembly_2.11-2.4.0.jar
1、kafka_2.111.0.0-1_arm64.deb UOS+鲲鹏平台 2、软件目录/opt/kafka_2.111.0.0 3、自带服务启动
centos kafka_exporter kafka_exporter-1.6.0.linux-amd64.tar
kafka_2.10-0.10.0.0.tgz,可以在JDK1.7中运行。
kafka_2.11-2.0.0.tgz
flink-connector-kafka_2.12-1.13.2.jar
kafka_2.10-0.10.2.0.tgz
kafka_2.10-0.10.1.0.tgz
kafka_2.10-0.10.2.0.tar