`
m635674608
  • 浏览: 4929477 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

Spark-Streaming获取kafka数据的两种方式-Receiver与Direct的方式

 
阅读更多
Spark-Streaming获取kafka数据的两种方式-Receiver与Direct的方式,可以从代码中简单理解成Receiver方式是通过zookeeper来连接kafka队列,Direct方式是直接连接到kafka的节点上获取数据了。

一、基于Receiver的方式

这种方式使用Receiver来获取数据。Receiver是使用Kafka的高层次Consumer API来实现的。receiver从Kafka中获取的数据都是存储在Spark Executor的内存中的,然后Spark Streaming启动的job会去处理那些数据。

然而,在默认的配置下,这种方式可能会因为底层的失败而丢失数据。如果要启用高可靠机制,让数据零丢失,就必须启用Spark Streaming的预写日志机制(Write Ahead Log,WAL)。该机制会同步地将接收到的Kafka数据写入分布式文件系统(比如HDFS)上的预写日志中。所以,即使底层节点出现了失败,也可以使用预写日志中的数据进行恢复。

如何进行Kafka数据源连接

1、在maven添加依赖

[html] view plain copy
 
  1. <dependency>  
  2.     <groupId>org.apache.spark</groupId>  
  3.     <artifactId>spark-streaming-kafka_2.10</artifactId>  
  4.     <version>1.4.1</version>  
  5. </dependency>  


2、scala代码

[java] view plain copy
 
  1. val kafkaStream = {  
  2.   val sparkStreamingConsumerGroup = "spark-streaming-consumer-group"  
  3.   val kafkaParams = Map(  
  4.     "zookeeper.connect" -> "zookeeper1:2181",  
  5.     "group.id" -> "spark-streaming-test",  
  6.     "zookeeper.connection.timeout.ms" -> "1000")  
  7.   val inputTopic = "input-topic"  
  8.   val numPartitionsOfInputTopic = 5  
  9.   val streams = (1 to numPartitionsOfInputTopic) map { _ =>  
  10.     KafkaUtils.createStream(ssc, kafkaParams, Map(inputTopic -> 1), StorageLevel.MEMORY_ONLY_SER).map(_._2)  
  11.   }  
  12.   val unifiedStream = ssc.union(streams)  
  13.   val sparkProcessingParallelism = 1 // You'd probably pick a higher value than 1 in production.  
  14.   unifiedStream.repartition(sparkProcessingParallelism)  
  15. }  


需要注意的要点

1、Kafka中的topic的partition,与Spark中的RDD的partition是没有关系的。所以,在KafkaUtils.createStream()中,提高partition的数量,只会增加一个Receiver中,读取partition的线程的数量。不会增加Spark处理数据的并行度。

2、可以创建多个Kafka输入DStream,使用不同的consumer group和topic,来通过多个receiver并行接收数据。

3、如果基于容错的文件系统,比如HDFS,启用了预写日志机制,接收到的数据都会被复制一份到预写日志中。因此,在KafkaUtils.createStream()中,设置的持久化级别是StorageLevel.MEMORY_AND_DISK_SER。

二、基于Direct的方式

这种新的不基于Receiver的直接方式,是在Spark 1.3中引入的,从而能够确保更加健壮的机制。替代掉使用Receiver来接收数据后,这种方式会周期性地查询Kafka,来获得每个topic+partition的最新的offset,从而定义每个batch的offset的范围。当处理数据的job启动时,就会使用Kafka的简单consumer api来获取Kafka指定offset范围的数据。

这种方式有如下优点:

1、简化并行读取:如果要读取多个partition,不需要创建多个输入DStream然后对它们进行union操作。Spark会创建跟Kafka partition一样多的RDD partition,并且会并行从Kafka中读取数据。所以在Kafka partition和RDD partition之间,有一个一对一的映射关系。

2、高性能:如果要保证零数据丢失,在基于receiver的方式中,需要开启WAL机制。这种方式其实效率低下,因为数据实际上被复制了两份,Kafka自己本身就有高可靠的机制,会对数据复制一份,而这里又会复制一份到WAL中。而基于direct的方式,不依赖Receiver,不需要开启WAL机制,只要Kafka中作了数据的复制,那么就可以通过Kafka的副本进行恢复。

3、一次且仅一次的事务机制:

基于receiver的方式,是使用Kafka的高阶API来在ZooKeeper中保存消费过的offset的。这是消费Kafka数据的传统方式。这种方式配合着WAL机制可以保证数据零丢失的高可靠性,但是却无法保证数据被处理一次且仅一次,可能会处理两次。因为Spark和ZooKeeper之间可能是不同步的。

基于direct的方式,使用kafka的简单api,Spark Streaming自己就负责追踪消费的offset,并保存在checkpoint中。Spark自己一定是同步的,因此可以保证数据是消费一次且仅消费一次。

scala连接代码

[java] view plain copy
 
  1. val topics = Set("teststreaming")    
  2. val brokers = "bdc46.hexun.com:9092,bdc53.hexun.com:9092,bdc54.hexun.com:9092"    
  3. val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder")    
  4. // Create a direct stream    
  5. val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)     
  6. val events = kafkaStream.flatMap(line => {    
  7. Some(line.toString())    
  8. })  


三、总结:两种方式在生产中都有广泛的应用,新api的Direct应该是以后的首选方式。
参考之前的文章,Direct连接kafka的实例:
http://blog.csdn.net/kwu_ganymede/article/details/50160793
 
http://blog.csdn.net/wisgood/article/details/51815845
分享到:
评论

相关推荐

    基于NFV的虚拟化BRAS组网方案.docx

    5G通信行业、网络优化、通信工程建设资料。

    299-煤炭大数据智能分析解决方案.pptx

    299-煤炭大数据智能分析解决方案.pptx

    工资汇总打印税务计算系统-(Excel函数版)

    使用说明: 1、各月工资表,已用公式设置完毕,请在AI1单元格填入月份本表自动显示数据,您再按实际情况稍加修正,工资就完成了! 2、使用时,请把一月份工资表中公式的数据,按你的实际情况修改,之后把一月份工资表复制到2至12月就行了。以后再用时参阅第一条说明。 3、养老保险、失业保险、医疗保险、住房公积金 自动生成,但各单位的比例不同,请自行修改公式中的参数。 4、AK 列至 BD 列是报税资料,自动生成。 5、“四联工资单”只须输入员工编号与选择月份,便可自动取数;请根据需要任选。 6、“工资条”全部自动生成;有单行与双行两种,请任选使用。使用工资条时,请在《个税报告》表的V9单元格选择月份。 7、《扣缴个人所得税报告表》自动生成,请在V9单元格选择月份。请不要随意改动。 8、加班工资、考勤应扣,按每月30天计算;养、失、医、房 项目提取基数与比例亦应按单位规定进行修改。 9、各表均设了保护,但未设密码,您尽可撤消,做您想作的事。 10、打印工资表时,可将不需用的列

    考试资料+7、互联网与物联网.docx

    5G通信行业、网络优化、通信工程建设资料

    景区4G网络覆盖提升解决案例.docx

    5G通信、网络优化与通信建设

    基于Springboot+Vue的机动车号牌管理系统-毕业源码案例设计.zip

    网络技术和计算机技术发展至今,已经拥有了深厚的理论基础,并在现实中进行了充分运用,尤其是基于计算机运行的软件更是受到各界的关注。加上现在人们已经步入信息时代,所以对于信息的宣传和管理就很关键。系统化是必要的,设计网上系统不仅会节约人力和管理成本,还会安全保存庞大的数据量,对于信息的维护和检索也不需要花费很多时间,非常的便利。 网上系统是在MySQL中建立数据表保存信息,运用SpringBoot框架和Java语言编写。并按照软件设计开发流程进行设计实现。系统具备友好性且功能完善。 网上系统在让售信息规范化的同时,也能及时通过数据输入的有效性规则检测出错误数据,让数据的录入达到准确性的目的,进而提升数据的可靠性,让系统数据的错误率降至最低。 关键词:vue;MySQL;SpringBoot框架 【引流】 Java、Python、Node.js、Spring Boot、Django、Express、MySQL、PostgreSQL、MongoDB、React、Angular、Vue、Bootstrap、Material-UI、Redis、Docker、Kubernetes

    199-袁骏毅:新形势下医院数据安全治理应对实践.pdf

    199-袁骏毅:新形势下医院数据安全治理应对实践.pdf

    毕业设计:基于SSM的mysql-信息类课程教学知识管理系统(源码 + 数据库)

    毕业设计:基于SSM的mysql_信息类课程教学知识管理系统(源码 + 数据库) ssm信息类课程教学知识管理系统开发 采用 ssm框架技术,java语言,mysql数据库 前台+后台的模式开发 前台界面:W1 后台界面:CC 内容页面:P4 前台: 用户注册(手机号,用户名,姓名,登录用户名,密码,备注) 用户登录,找回密码,设置新密码 网站公告查看 课程查看(注册用户登录后,可以查看非公开课,用户不登录,可以查看公开课),点击查看课程详情,课程名称,授课专业,课程简介等,可以下载课程,以word或者PDF形式 知识卡片查看:点击课程后,可以查看该课程里的知识卡片,知识卡片分为文本,图片,视频3种类型的知识卡片。登录后可以收藏知识卡片 后台: 管理员 管理员管理 教师信息管理(姓名,学校,职级,绑定邮箱,电话,用户名,密码等) 注册用户审核 网站公告管理 课程信息管理(可以上传课程,word或者pdf形式) 知识卡片管理 系统管理 教师 个人资料修改 创建课程 创建知识卡片 注册用户 个人资料修改 我收藏的知识卡片

    基于SpringBoot+Vue的常规应急物资管理系统-毕业源码案例设计.zip

    网络技术和计算机技术发展至今,已经拥有了深厚的理论基础,并在现实中进行了充分运用,尤其是基于计算机运行的软件更是受到各界的关注。加上现在人们已经步入信息时代,所以对于信息的宣传和管理就很关键。系统化是必要的,设计网上系统不仅会节约人力和管理成本,还会安全保存庞大的数据量,对于信息的维护和检索也不需要花费很多时间,非常的便利。 网上系统是在MySQL中建立数据表保存信息,运用SpringBoot框架和Java语言编写。并按照软件设计开发流程进行设计实现。系统具备友好性且功能完善。 网上系统在让售信息规范化的同时,也能及时通过数据输入的有效性规则检测出错误数据,让数据的录入达到准确性的目的,进而提升数据的可靠性,让系统数据的错误率降至最低。 关键词:vue;MySQL;SpringBoot框架 【引流】 Java、Python、Node.js、Spring Boot、Django、Express、MySQL、PostgreSQL、MongoDB、React、Angular、Vue、Bootstrap、Material-UI、Redis、Docker、Kubernetes

    计算机二级攻略.docx

    计算机二级备考资源丰富多样,可通过权威机构官网、在线教育平台、专业培训机构网站等获取学习资料。结合多种资源学习,备考更高效。

    夭月.zi删除p

    夭月.zi删除p

    基于c语言的ktv歌曲系统.zip

    基于c语言的ktv歌曲系统.zip

    6.docx

    6.docx

    基于Springboot+Vue网上点餐系统毕业源码案例设计.zip

    网络技术和计算机技术发展至今,已经拥有了深厚的理论基础,并在现实中进行了充分运用,尤其是基于计算机运行的软件更是受到各界的关注。加上现在人们已经步入信息时代,所以对于信息的宣传和管理就很关键。系统化是必要的,设计网上系统不仅会节约人力和管理成本,还会安全保存庞大的数据量,对于信息的维护和检索也不需要花费很多时间,非常的便利。 网上系统是在MySQL中建立数据表保存信息,运用SpringBoot框架和Java语言编写。并按照软件设计开发流程进行设计实现。系统具备友好性且功能完善。 网上系统在让售信息规范化的同时,也能及时通过数据输入的有效性规则检测出错误数据,让数据的录入达到准确性的目的,进而提升数据的可靠性,让系统数据的错误率降至最低。 关键词:vue;MySQL;SpringBoot框架 【引流】 Java、Python、Node.js、Spring Boot、Django、Express、MySQL、PostgreSQL、MongoDB、React、Angular、Vue、Bootstrap、Material-UI、Redis、Docker、Kubernetes

    工厂工资明细表Excel模版

    基于提供的字段介绍,我们可以设计一个基础的工厂工资明细表Excel模板结构如下: | 序号 | 姓名 | 工种 | 工作天数 | 工时费/天 | 小计(正常工资) | 加班时间 | 加班费率/小时 | 小计(加班工资) | 预借 | 合计(实发工资) | 签字 | 备注 | | ---- | ---- | ---- | -------- | ---------- | -------------- | -------- | -------------- | --------------- | ---- | -------------- | ---- | ---- | | 1 | | | | | | | | | | | | | | 2 | | | | | =D2*C2

    推荐智慧工业园区大数据云平台解决方案.docx

    当前园区的主要工作是如何在资源 缺的背景下,创造更多的价值,实现园区经济转型、社会和谐。利用电子信息技术的深度应用与融合,提升园区政府的管理、服务、引导能力,提升企业的研发设计、生产制造、经营管理的效率,提升园区运行的智能、顺畅程度,提升园区居民生活的便利水平,是智慧园区建设的主要任务。 智慧园区由基础设施、信息资源、业务应用、运行保障四个层次组成 通过构建新一代信息基础设施,为园区内的组织和个人提供安全、高速、便捷的网络环境,实现园区内的部件、人员随时随地接入网络,奠定了泛在感知的网络基础。新一代的信息基础设主要包括两个层面,全面覆盖的感知层和泛在的传输网络层。 智慧应用体系是智慧园区建设重要的部分,也是可以直接提升园区管理服务能力、生产生活环境的重要构成,智慧应用体系分析、整合园区运行核心系统的各项关键信息,从而对于包括民生、环保、公共安全、城市服务、工商业活动在内的各种需求做出智能响应,使园区运行更加智慧顺畅,为人类创造更美好的城市生活。智慧应用体系包括一个支撑平台和四个应用体系。

    计费模式配置有误导致低速率.docx

    5G通信行业、网络优化、通信工程建设资料

    ODX2.2.0 C#解析代码

    由ASAM组织提出的诊断数据交互格式,全称为Open Diagnostic Data Exchange。即ODX规范ISO-22901,主要用于描述整车以及ECU的诊断数据,方便供应商与OEM、产品开发与售后间的数据交互。ODX使用统一建模语言(UML)图描述,数据交互格式使用可扩展标记语言(XML)存储记录数据。便于承载从设计、开发、测试、生产及售后维护的全流程工作。

    CSF405 EN 2001673 P0802183-01, Rev B CHAINSAW (TOP HANDLE)

    CSF405 EN 2001673 P0802183-01, Rev B CHAINSAW (TOP HANDLE) OPERATOR MANUAL

    C#Gif动画录制软件是一款方便好用的小软件源码.zip

    Gif动画录制软件是一款方便好用的小软件,使用此工具,您可以记录屏幕的选定区域,网络摄像头的实时提要或草图板上的实时图形。之后,您可以编辑动画并将其另存为gif,apng,视频,psd或png图像。

Global site tag (gtag.js) - Google Analytics