常用命令:
val rdd1 = sc.parallelize(List(('a',1),('a',2))) val rdd = sc.textFile(“/usr/local/spark/tmp/char.data") rdd.count rdd.cache val word_count = rdd.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_) word_count.saveAsTextFile("/usr/local/spark/tmp/result") val word_count = rdd.flatMap(_.split(" ")).map((_,1)).groupByKey() rdd1.lookup('a') val rdd2 = sc.parallelize(List(1,2,3,4,5)) rdd2.reduce(_+_) word_count.map(x => (x._2,x._1)).sortByKey(false).map(x => (x._1,x._2)).collcet val rdd = sc.textFile(“/usr/local/spark/tmp/SogouQ1.txt") rdd.map(_.split("\\t")(0)).filter(_< "20111230010101").count //parallelize演示 val num=sc.parallelize(1 to 10) //并行化 val doublenum = num.map(_*2) val threenum = doublenum.filter(_ % 3 == 0) threenum.collect threenum.toDebugString val num1=sc.parallelize(1 to 10,6) val doublenum1 = num1.map(_*2) val threenum1 = doublenum1.filter(_ % 3 == 0) threenum1.collect threenum1.toDebugString threenum.cache() val fournum = threenum.map(x=>x*x) fournum.collect fournum.toDebugString threenum.unpersist() num.reduce (_ + _) num.take(5) num.first num.count num.take(5).foreach(println) //K-V演示 val kv1=sc.parallelize(List(("A",1),("B",2),("C",3),("A",4),("B",5))) kv1.sortByKey().collect //注意sortByKey的小括号不能省 kv1.groupByKey().collect kv1.reduceByKey(_+_).collect val kv2=sc.parallelize(List(("A",4),("A",4),("C",3),("A",4),("B",5))) kv2.distinct.collect kv1.union(kv2).collect val kv3=sc.parallelize(List(("A",10),("B",20),("D",30))) kv1.join(kv3).collect kv1.cogroup(kv3).collect val kv4=sc.parallelize(List(List(1,2),List(3,4))) kv4.flatMap(x=>x.map(_+1)).collect //文件读取演示 val rdd1 = sc.textFile("hdfs://hadoop1:8000/dataguru/week2/directory/") rdd1.toDebugString val words=rdd1.flatMap(_.split(" ")) val wordscount=words.map(x=>(x,1)).reduceByKey(_+_) wordscount.collect wordscount.toDebugString val rdd2 = sc.textFile("hdfs://hadoop1:8000/dataguru/week2/directory/*.txt") rdd2.flatMap(_.split(" ")).map(x=>(x,1)).reduceByKey(_+_).collect //gzip压缩的文件 val rdd3 = sc.textFile("hdfs://hadoop1:8000/dataguru/week2/test.txt.gz") rdd3.flatMap(_.split(" ")).map(x=>(x,1)).reduceByKey(_+_).collect //日志处理演示 //http://download.labs.sogou.com/dl/q.html 完整版(2GB):gz格式 //访问时间\t用户ID\t[查询词]\t该URL在返回结果中的排名\t用户点击的顺序号\t用户点击的URL //SogouQ1.txt、SogouQ2.txt、SogouQ3.txt分别是用head -n 或者tail -n 从SogouQ数据日志文件中截取 //搜索结果排名第1,但是点击次序排在第2的数据有多少? val rdd1 = sc.textFile("hdfs://hadoop1:8000/dataguru/data/SogouQ1.txt") val rdd2=rdd1.map(_.split("\t")).filter(_.length==6) //非6列数据 rdd2.count() val rdd3=rdd2.filter(_(3).toInt==1).filter(_(4).toInt==2) rdd3.count() rdd3.toDebugString //session查询次数排行榜 val rdd4=rdd2.map(x=>(x(1),1)).reduceByKey(_+_).map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1)) //(x._2,x._1)表示互换 rdd4.toDebugString rdd4.saveAsTextFile("hdfs://hadoop1:8000/dataguru/week2/output1") //cache()演示 //检查block命令:bin/hdfs fsck /dataguru/data/SogouQ3.txt -files -blocks -locations val rdd5 = sc.textFile("hdfs://hadoop1:8000/dataguru/data/SogouQ3.txt") rdd5.cache() rdd5.count() rdd5.count() //比较时间 //join演示 val format = new java.text.SimpleDateFormat("yyyy-MM-dd") case class Register (d: java.util.Date, uuid: String, cust_id: String, lat: Float,lng: Float) case class Click (d: java.util.Date, uuid: String, landing_page: Int) val reg = sc.textFile("hdfs://hadoop1:8000/dataguru/week2/join/reg.tsv").map(_.split("\t")).map(r => (r(1), Register(format.parse(r(0)), r(1), r(2), r(3).toFloat, r(4).toFloat))) val clk = sc.textFile("hdfs://hadoop1:8000/dataguru/week2/join/clk.tsv").map(_.split("\t")).map(c => (c(1), Click(format.parse(c(0)), c(1), c(2).trim.toInt))) reg.join(clk).take(2)
相关推荐
Kafka自LinkedIn开源以来就以高性能、高吞吐量、分布式的特性著称,本书以0.10版本的源码为基础,深入分析了Kafka的设计与实现,包括生产者和消费者的消息处理流程,新旧消费者不同的设计方式,存储层的实现,协调者...
Kafka自LinkedIn开源以来就以高性能、高吞吐量、分布式的特性著称,本书以0.10版本的源码为基础,深入分析了Kafka的设计与实现,包括生产者和消费者的消息处理流程,新旧消费者不同的设计方式,存储层的实现,协调者...
kafkapython教程_Kafka快速⼊门(⼗⼆)——Python客户端 Kafka快速⼊门(⼗⼆)——Python客户端 ⼀、confluent-kafka 1、confluent-kafka简介 confluent-kafka是Python模块,是对librdkafka的轻量级封装,⽀持Kafka ...
赠送jar包:kafka-clients-2.0.0.jar; 赠送原API文档:kafka-clients-2.0.0-javadoc.jar; 赠送源代码:kafka-clients-2.0.0-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.0.0.pom; 包含翻译后的API文档...
offsetExplore2 实际上是 Kafka 的一个工具,用于管理和监控 Apache Kafka 中的偏移量(offset)。在 Kafka 中,偏移量是用来标识消费者在一个特定分区中的位置的标识符,它可以用来记录消费者消费消息的进度。 ...
赠送jar包:kafka_2.11-0.10.0.1.jar; 赠送原API文档:kafka_2.11-0.10.0.1-javadoc.jar; 赠送源代码:kafka_2.11-0.10.0.1-sources.jar; 赠送Maven依赖信息文件:kafka_2.11-0.10.0.1.pom; 包含翻译后的API文档...
赠送jar包:kafka-clients-2.4.1.jar; 赠送原API文档:kafka-clients-2.4.1-javadoc.jar; 赠送源代码:kafka-clients-2.4.1-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.4.1.pom; 包含翻译后的API文档...
kafka
1、Kafka如何防止数据丢失 1)消费端弄丢数据 消费者在消费完消息之后需要执行消费位移的提交,该消费位移表示下一条需要拉取的消息的位置。Kafka默认位移提交方式是自动提交,但它不是在你每消费一次数据之后就...
赠送jar包:kafka-clients-2.4.1.jar; 赠送原API文档:kafka-clients-2.4.1-javadoc.jar; 赠送源代码:kafka-clients-2.4.1-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.4.1.pom; 包含翻译后的API文档...
使用Maven整合Kafka 包括生产者,消费者 Kafka各种配置 //1.设置参数 Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "81.68.232.188:9092,81.68.232.188:9093,81...
Kafka自LinkedIn开源以来就以高性能、高吞吐量、分布式的特性著称,本书以0.10版本的源码为基础,深入分析了Kafka的设计与实现,包括生产者和消费者的消息处理流程,新旧消费者不同的设计方式,存储层的实现,协调者...
赠送jar包:kafka-clients-2.0.0.jar; 赠送原API文档:kafka-clients-2.0.0-javadoc.jar; 赠送源代码:kafka-clients-2.0.0-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.0.0.pom; 包含翻译后的API文档...
kafka_2.11-2.0.0.tgz, kafka_2.11-2.0.1.tgz, kafka_2.11-2.1.0.tgz, kafka_2.11-2.1.1.tgz, kafka_2.11-2.2.0.tgz, kafka_2.11-2.2.1.tgz, kafka_2.11-2.2.2.tgz, kafka_2.11-2.3.0.tgz, kafka_2.11-2.3.1.tgz, ...
本人在北美刚刚毕业,目前面试的几家大厂包括小公司在面试中都频繁的问道kafka这个技术,作为大数据开发或者java全栈的开发者来说,2020年很有必要系统的学习一下kafka. 1.[全面][Kafka2.11][jdk1.8][ZooKeeper3.4.6...
消费kafka某时间段消息用于分析问题,生产环境海量数据,用kafka-console-consumer.sh只能消费全量,文件巨大,无法grep。 代码来源于博主:BillowX_ ,感谢分享 原贴地址:...
kafka连接工具
mvn install:install-file -DgroupId=io.confluent -DartifactId=kafka-schema-registry-client -Dversion=6.2.2 -Dfile=/root/kafka-schema-registry-client-6.2.2.jar -Dpackaging=jar 官网下载地址 packages....
赠送jar包:flink-connector-kafka_2.12-1.14.3.jar 赠送原API文档:flink-connector-kafka_2.12-1.14.3-javadoc.jar 赠送源代码:flink-connector-kafka_2.12-1.14.3-sources.jar 包含翻译后的API文档:flink-...
本文不讲kafka集群原理,只谈部署步骤。 默认读者已对kafka有最基本的认知,纯粹作为部署笔记,方便回忆。 另外本文是基于Windows部署的,Linux的步骤是基本相同的(只是启动脚本位置不同)。 kafka集群类型: ...