`
liyonghui160com
  • 浏览: 761134 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

flume-ng+Kafka+Storm+HDFS 实时系统组合

阅读更多

 

flume-ng+Kafka+Storm+HDFS 实时系统组合

 

1).数据采集
负责从各节点上实时采集数据,选用cloudera的flume来实现
2).数据接入
由于采集数据的速度和数据处理的速度不一定同步,因此添加一个消息中间件来作为缓冲,选用apache的kafka
3).流式计算
对采集到的数据进行实时分析,选用apache的storm
4).数据输出
对分析后的结果持久化,暂定用mysql
另一方面是模块化之后,假如当Storm挂掉了之后,数据采集和数据接入还是继续在跑着,数据不会丢失,storm起来之后可以继续进行流式计算;

 

那么接下来我们来看下整体的架构图

 

详细介绍各个组件及安装配置:
操作系统:ubuntu

 

Flume
Flume是Cloudera提供的一个分布式、可靠、和高可用的海量日志采集、聚合和传输的日志收集系统,支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。
下图为flume典型的体系结构:
Flume数据源以及输出方式:
Flume 提供了从console(控制台)、RPC(Thrift-RPC)、text(文件)、tail(UNIX tail)、syslog(syslog日志系统,支持TCP和UDP等2种模式),exec(命令执行)等数据源上收集数据的能力,在我们的系统中目前 使用exec方式进行日志采集。

Flume的数据接受方,可以是console(控制台)、text(文件)、dfs(HDFS文件)、RPC(Thrift-RPC)和syslogTCP(TCP syslog日志系统)等。在我们系统中由kafka来接收。

Flume下载及文档:
http://flume.apache.org/
Flume安装:
  1. $tar zxvf apache-flume-1.4.0-bin.tar.gz/usr/local
复制代码
Flume启动命令:
  1. $bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name producer -Dflume.root.logger=INFO,console
复制代码
Kafka

 

kafka是一种高吞吐量的分布式发布订阅消息系统,她有如下特性:
  • 通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。
  • 高吞吐量:即使是非常普通的硬件kafka也可以支持每秒数十万的消息。
  • 支持通过kafka服务器和消费机集群来分区消息。
  • 支持Hadoop并行数据加载。
kafka的目的是提供一个发布订阅解决方案,它可以处理消费者规模的网站中的所有动作流数 据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。kafka的目的是通过Hadoop的并行加载机 制来统一线上和离线的消息处理,也是为了通过集群机来提供实时的消费。
kafka分布式订阅架构如下图:--取自Kafka官网
罗宝兄弟文章上的架构图是这样的
其实两者没有太大区别,官网的架构图只是把Kafka简洁的表示成一个Kafka Cluster,而上面架构图就相对详细一些;

 

Kafka版本:0.8.0
Kafka下载及文档:http://kafka.apache.org/
Kafka安装:
  1. > tar xzf kafka-<VERSION>.tgz
  2. > cd kafka-<VERSION>
  3. > ./sbt update
  4. > ./sbt package
  5. > ./sbt assembly-package-dependency
这里可能有很多童鞋执行sbt的时候会报找不到这个命令
[plain] view plaincopy
No command 'sbt' found, did you mean: 
 Command 'skt' from package 'latex-sanskrit' (main) 
 Command 'sb2' from package 'scratchbox2' (universe) 
 Command 'sbd' from package 'cluster-glue' (main) 
 Command 'mbt' from package 'mbt' (universe) 
 Command 'sbmt' from package 'atfs' (universe) 
 Command 'lbt' from package 'lbt' (universe) 
 Command 'st' from package 'suckless-tools' (universe) 
 Command 'sb' from package 'lrzsz' (universe) 
sbt: command not found 
这个是需要自己安装的,安装包可以到sbt官网下载。我这边用的ubuntu系统,所以我下载了个deb包,官网地址:http://www.scala-sbt.org/
deb包地址:http://repo.scala-sbt.org/scalasbt/sbt-native-packages/org/scala-sbt/sbt/0.13.1/sbt.deb
rpm包地址:http://repo.scala-sbt.org/scalasbt/sbt-native-packages/org/scala-sbt/sbt/0.13.1/sbt.rpm
启动及测试命令:
(1) start server

 

  1. > bin/zookeeper-server-start.shconfig/zookeeper.properties
  2. > bin/kafka-server-start.shconfig/server.properties
复制代码
这里是官网上的教程,kafka本身有内置zookeeper,但是我自己在实际部署中是使用单独的zookeeper集群,所以第一行命令我就没执行,这里只是些出来给大家看下。

 

配置独立的zookeeper集群需要配置server.properties文件,讲zookeeper.connect修改为独立集群的IP和端口

 

  1. zookeeper.connect=nutch1:2181
复制代码
(2)Create a topic

 

  1. > bin/kafka-create-topic.sh --zookeeper localhost:2181 --replica 1 --partition 1 --topic test
  2. > bin/kafka-list-topic.sh --zookeeperlocalhost:2181
复制代码
(3)Send some messages

 

  1. > bin/kafka-console-producer.sh--broker-list localhost:9092 --topic test
复制代码
(4)Start a consumer

 

  1. > bin/kafka-console-consumer.sh--zookeeper localhost:2181 --topic test --from-beginning
复制代码
kafka-console-producer.sh和kafka-console-cousumer.sh只是系统提供的命令行工具。这里启动是为了测试是否能正常生产消费;验证流程正确性
在实际开发中还是要自行开发自己的生产者与消费者;
Storm
Twitter 将Storm正式开源了,这是一个分布式的、容错的实时计算系统,它被托管在GitHub上,遵循  Eclipse Public License 1.0。Storm是由BackType开发的实时处理系统,BackType现在已在Twitter麾下。GitHub上的最新版本是Storm 0.5.2,基本是用Clojure写的。

 

Storm的主要特点如下:
  • 简单的编程模型。类似于MapReduce降低了并行批处理复杂性,Storm降低了进行实时处理的复杂性。
  • 可以使用各种编程语言。你可以在Storm之上使用各种编程语言。默认支持Clojure、Java、Ruby和Python。要增加对其他语言的支持,只需实现一个简单的Storm通信协议即可。
  • 容错性。Storm会管理工作进程和节点的故障。
  • 水平扩展。计算是在多个线程、进程和服务器之间并行进行的。
  • 可靠的消息处理。Storm保证每个消息至少能得到一次完整处理。任务失败时,它会负责从消息源重试消息。
  • 快速。系统的设计保证了消息能得到快速的处理,使用&Oslash;MQ作为其底层消息队列。(0.9.0.1版本支持&Oslash;MQ和netty两种模式)
  • 本地模式。Storm有一个“本地模式”,可以在处理过程中完全模拟Storm集群。这让你可以快速进行开发和单元测试。
由于篇幅问题,具体的安装步骤可以参考:Storm-0.9.0.1安装部署 指导
接下来重头戏开始拉!那就是框架之间的整合啦

 

flume和kafka整合
2.提取插件中的flume-conf.properties文件
修改该文件:#source section
producer.sources.s.type = exec
producer.sources.s.command = tail -f -n+1 /mnt/hgfs/vmshare/test.log
producer.sources.s.channels = c
修改所有topic的值改为test
将改后的配置文件放进flume/conf目录下
在该项目中提取以下jar包放入环境中flume的lib下:
注:这里的flumeng-kafka-plugin.jar这个包,后面在github项目中已经移动到package目录了。找不到的童鞋可以到package目录获取。

 

完成上面的步骤之后,我们来测试下flume+kafka这个流程有没有走通;
我们先启动flume,然后再启动kafka,启动步骤按之前的步骤执行;接下来我们使用kafka的kafka-console-consumer.sh脚本查看是否有flume有没有往Kafka传输数据;
以上这个是我的test.log文件通过flume抓取传到kafka的数据;说明我们的flume和kafka流程走通了;
大家还记得刚开始我们的流程图么,其中有一步是通过flume到kafka,还有一步是到hdfs的;而我们这边还没有提到如何存入kafka且同时存如hdfs;
flume是支持数据同步复制,同步复制流程图如下,取自于flume官网,官网用户指南地址:http://flume.apache.org/FlumeUserGuide.html
怎么设置同步复制呢,看下面的配置:

 

  1. #2个channel和2个sink的配置文件  这里我们可以设置两个sink,一个是kafka的,一个是hdfs的;
  2. a1.sources = r1
  3. a1.sinks = k1 k2
  4. a1.channels = c1 c2
复制代码
具体配置大伙根据自己的需求去设置,这里就不具体举例了

 

kafka和storm的整合

 

2.使用maven package进行编译,得到storm-kafka-0.8-plus-0.3.0-SNAPSHOT.jar包   --有转载的童鞋注意下,这里的包名之前写错了,现在改正确了!不好意思!
3.将该jar包及kafka_2.9.2-0.8.0-beta1.jar、metrics-core-2.2.0.jar、scala-library-2.9.2.jar (这三个jar包在kafka项目中能找到)
备注:如果开发的项目需要其他jar,记得也要放进storm的Lib中比如用到了mysql就要添加mysql-connector-java-5.1.22-bin.jar到storm的lib下
那么接下来我们把storm也重启下;
完成以上步骤之后,我们还有一件事情要做,就是使用kafka-storm0.8插件,写一个自己的Storm程序;
这里我给大伙附上一个我弄的storm程序,代码:
    public class KafkaSpout implements IRichSpout {  
       
          private static final Log logger = LogFactory.getLog(KafkaSpout.class);  
          /** 
           * 
           */  
          private static final long serialVersionUID = -5569857211173547938L;  
          SpoutOutputCollector collector;  
          private ConsumerConnectorconsumer;  
          private Stringtopic;  
       
          public KafkaSpout(String topic) {  
               this.topic = topic;  
          }  
       
          @Override  
          public void open(Map conf, TopologyContext context,  
                     SpoutOutputCollector collector) {  
               this.collector = collector;  
                
          }  
       
          private static ConsumerConfig createConsumerConfig() {  
               Properties props = newProperties();  
               props.put("zookeeper.connect","xx.xx.xx.xx:2181");  
               props.put("group.id","0");  
               props.put("zookeeper.session.timeout.ms","10000");  
               //props.put("zookeeper.sync.time.ms", "200");  
               //props.put("auto.commit.interval.ms", "1000");  
       
               return new ConsumerConfig(props);  
          }  
       
          @Override  
          public void close() {  
               // TODOAuto-generated method stub  
       
          }  
       
          @Override  
          public void activate() {  
               this.consumer = Consumer.createJavaConsumerConnector(createConsumerConfig());  
               Map<String, Integer> topickMap = newHashMap<String, Integer>();  
               topickMap.put(topic,new Integer(1));  
               Map<String, List<KafkaStream<byte[],byte[]>>>streamMap =consumer.createMessageStreams(topickMap);  
               KafkaStream<byte[],byte[]>stream = streamMap.get(topic).get(0);  
               ConsumerIterator<byte[],byte[]> it =stream.iterator();  
               while (it.hasNext()) {  
                     String value = newString(it.next().message());  
                     System.out.println("(consumer)-->" + value);  
                     collector.emit(new Values(value), value);  
               }  
       
          }  
       
          @Override  
          public void deactivate() {  
               // TODOAuto-generated method stub  
       
          }  
       
          private boolean isComplete;  
       
          @Override  
          public void nextTuple() {  
       
          }  
       
          @Override  
          public void ack(Object msgId) {  
               // TODOAuto-generated method stub  
       
          }  
       
          @Override  
          public void fail(Object msgId) {  
               // TODOAuto-generated method stub  
       
          }  
       
          @Override  
          public void declareOutputFields(OutputFieldsDeclarer declarer) {  
               declarer.declare(new Fields("KafkaSpout"));  
       
          }  
       
          @Override  
          public Map<String, Object> getComponentConfiguration() {  
               // TODOAuto-generated method stub  
               return null;  
          }  
       
    }  
       
    public class FileBlots implementsIRichBolt{  
           
          OutputCollector collector;  
           
          public void prepare(Map stormConf, TopologyContext context,  
                     OutputCollector collector) {  
               this.collector = collector;  
                
          }  
       
          public void execute(Tuple input) {  
               String line = input.getString(0);  
               for(String str : line.split("\\s+")){  
               List a = newArrayList();  
               a.add(input);   
               this.collector.emit(a,newValues(str));  
               }  
               this.collector.ack(input);  
          }  
       
          public void cleanup() {  
                
          }  
       
          public void declareOutputFields(OutputFieldsDeclarer declarer) {  
               declarer.declare(new Fields("words"));  
                
          }  
       
          public Map<String, Object> getComponentConfiguration() {  
               // TODOAuto-generated method stub  
               return null;  
          }  
       
    }  
    public class WordsCounterBlots implementsIRichBolt{  
           
          OutputCollector collector;  
          Map<String, Integer> counter;  
           
          public void prepare(Map stormConf, TopologyContext context,  
                     OutputCollector collector) {  
               this.collector = collector;  
               this.counter =new HashMap<String, Integer>();  
                
          }  
       
          public void execute(Tuple input) {  
               String word = input.getString(0);  
               Integer integer = this.counter.get(word);  
               if(integer !=null){  
                     integer +=1;  
                     this.counter.put(word, integer);  
               }else{  
                     this.counter.put(word, 1);  
               }  
               System.out.println("execute");  
               Jedis jedis = JedisUtils.getJedis();  
               jedis.incrBy(word, 1);  
               System.out.println("=============================================");  
               this.collector.ack(input);  
          }  
       
          public void cleanup() {  
               for(Entry<String, Integer> entry :this.counter.entrySet()){  
                          System.out.println("------:"+entry.getKey()+"=="+entry.getValue());  
               }  
                
          }  
       
          public void declareOutputFields(OutputFieldsDeclarer declarer) {  
                
                
          }  
       
          public Map<String, Object> getComponentConfiguration() {  
               // TODOAuto-generated method stub  
               return null;  
          }  
       
    }  
 
先稍微看下程序的创建Topology代码
    public class KafkaTopology {  
       
          public static void main(String[] args) {  
               try {  
                     JedisUtils.initialPool("xx.xx.xx.xx", 6379);  
               } catch (Exception e) {  
                     e.printStackTrace();  
               }  
                
               TopologyBuilder builder = newTopologyBuilder();           builder.setSpout("kafka",new KafkaSpout("kafka"));  
               builder.setBolt("file-blots",new FileBlots()).shuffleGrouping("kafka");  
               builder.setBolt("words-counter",new WordsCounterBlots(),2).fieldsGrouping("file-blots",new Fields("words"));  
               Config config = new Config();  
               config.setDebug(true);  
                     LocalCluster local = newLocalCluster();  
                     local.submitTopology("counter", config, builder.createTopology());  
          }  
    }  
 

 

  1. storm-0.9.0.1/bin/storm jar storm-start-demo-0.0.1-SNAPSHOT.jar com.storm.topology.KafkaTopology
先看下日志,这里打印出来了往数据库里面插入数据了
然后我们查看下数据库;插入成功了!

到这里我们的整个整合就完成了!
但是这里还有一个问题,不知道大伙有没有发现。
由于我们使用storm进行分布式流式计算,那么分布式最需要注意的是数据一致性以及避免脏数据的产生;所以我提供的测试项目只能用于测试,正式开发不能这样处理
 
 
分享到:
评论

相关推荐

    lamp安装配置及flume+Kafka+Storm+HDFS实时系统搭分享

    lnmp(linux+nginx+mysql+php)安装配置及分布式系统大数据处理hadoop集群中的flume+Kafka+Storm+HDFS等实时系统搭分享

    flume-ng+Kafka+Storm+HDFS实时系统组合

    对于离线处理,hadoop还是比较适合的,但是对于实时性比较强的,数据量比较大的,我们可以采用Storm,那么Storm和什么技术搭配,才能够做一个适合自己的项目。下面给大家可以参考。1.一个好的项目架构应该具备什么...

    大数据架构:flume-ng+Kafka+Storm+HDFS实时系统组合

    直以来都想接触Storm实时计算这块的东西,最近在群里看到上海一哥们罗宝写的Flume+Kafka+Storm的实时日志流系统的搭建文档,自己也跟着整了一遍,之前罗宝的文章中有一些要注意点没提到的,以后一些写错的点,在这边...

    emsite后台全自动快速开发框架.rar

    - emsite采用dubbo作为服务层框架,后台将集成单点登录、oauth2.0、storm+kafka消息处理系统、kafka+ flume+storm+hdfs+hadoop作为日志分析系统、配置中心、分布式任务调度系统、服务器实时监控系统、搜索引擎系统...

    emsite后台全自动快速开发框架 v2.1.0

    采用dubbo作为服务层框架,后台将集成单点登录、Auth2.0、storm+kafka消息处理系统、kafka+ flume+storm+hdfs+hadoop作为日志分析系统、配置中心、分布式任务调度系统、服务器实时监控系统、搜索引擎系统(elastic...

    大数据流处理框架介绍.pdf

    实时流处理的的流程与技术选型 : ⼀、⽇志收集 由于业务系统⼀般是游离与流处理集群如SparkStreaming、Storm之外的,所以我们需要对业务系统的数据进⾏实时收集。这就⽤到了 ⽇志收集框架,⽇志收集框架主要需要...

    syslog-storm-sample:HDP上的syslog和Storm的快速示例

    Syslog-&gt; Flume-&gt; Kafka-&gt; Storm-&gt; Hive 这是一个简单的演示项目,显示了从kafka主题收集风暴日志并将其推入HDFS的过程。 它还每n分钟将文件旋转到一个配置单元分区中。 入门 使用HDP 2.2.4,或在pom中相应地调整...

    新版Hadoop视频教程 段海涛老师Hadoop八天完全攻克Hadoop视频教程 Hadoop开发

    第一天 hadoop的基本概念 伪分布式hadoop集群安装 hdfs mapreduce 演示 01-hadoop职位需求状况.avi 02-hadoop课程安排.avi 03-hadoop应用场景.avi 04-hadoop对海量数据处理的解决思路.avi 05-hadoop版本选择和...

    storm读书笔记---storm运行流程

    一、storm是一个用于实时流式计算的分布式计算引擎,弥补了Hadoop在实时计算方面的不足(Hadoop在本质上是一个批处理系统)。二、storm在实际应用场景中的位置一般如下:其中的编号1~5说明如下:1、Flume用于收集...

    LogAnalyzer:分析大数据组件的客户日志,例如HDFS,Hive,HBase,Yarn,MapReduce,Storm,Spark,Spark 2,Knox,Ambari Metrics,Nifi,Accumulo,Kafka,Flume,Oozie,Falcon,Atlas和Zookeeper

    日志分析器-分析大数据组件的客户日志,例如HDFS,Hive,HBase,Yarn,MapReduce,Storm,Spark,Spark 2,Knox,Ambari Metrics,Nifi,Accumulo,Kafka,Flume,Oozie,Falcon,Atlas和Zookeeper。 内部架构 分析...

    大数据学习路线 大数据技术栈思维导图 大数据常用软件安装指南

    包括Hadoop、Hive、Spark、Storm、Flink、HBase、Kafka、Zookeeper、Flume、Sqoop等技术的学习 Hadoop 分布式文件存储系统 —— HDFS 分布式计算框架 —— MapReduce 集群资源管理器 —— YARN Hadoop 单机伪集群...

    Spark Streaming 流式处理项目代码.rar

    而流处理则是直接对运动中的数据的处理,在接收数据时直接计算数据。 大多数数据都是连续的流:传感器事件,网站...+ Spark Streaming 可以从 HDFS,Flume,Kafka,Twitter 和 ZeroMQ 读取数据,也支持自定义数据源。

    百度云盘 pdf《大数据架构和算法实现之路:电商系统的技术实战》百度云盘-带标签目录

    11.4 自行设计之 Flume、 Kafka 和 Storm 的整合……………··…… 386 11.4.1 实时性数据分析之 Kafka 简介 ……….......……....... 386 11.4.2 实时性数据分析之 Storm 简介………………·388 11.4.3 Flume 、...

    第七章-《大数据导论》大数据处理平台.pdf

    数据源 … 引擎 数 据 安 全 与 隐 私 保 护 … 基于开源系统的大数据处理平台 SparkS QL Spark Streaming MLib GraphX MapReduce Hive Storm Giraph Spark HDFS Swift Kafka Sqoop Flume Scrapy 数据采集系统: ...

    大数据工程师学习计划.pdf

    Linux 基本操作 Hadoop(HDFS+MapReduce+Yarn ) HBase(JavaAPI操作+Phoenix ) Hive(Hql基本操作和原理理解) Kafka Storm/JStorm Scala Python Spark (Core+sparksql+Spark streaming ) 辅助⼩⼯具(Sqoop/Flume/...

    八斗大虚据第九期完整版.docx

    第二阶段:这一阶段会学习FLume、Kafka、Spark Streaming、Flink/Storm、Zookeeper、HBase等计算框架的开发技术,以及大数据体系内的数据采集和数据仓库理论思想和技术实现。通过项目实践,你能快速掌握这些技术,...

    大数据技术体系.pdf

    ⼤数据技术体系 ⽂件存储:Hadoop HDFS、Tachyon、KFS 离线计算:Hadoop MapReduce、Spark 流式、实时计算:Storm、Spark Streaming、S4、Heron K-V、NOSQL数据库:HBase、Redis、MongoDB 资源管理:YARN、Mesos ⽇...

    大数据的基础知识.pdf

    2 Flume Flume是Cloudera提供的⼀个⾼可⽤的,⾼可靠的,分布式的海量⽇志采集、聚合和传输的系统,Flume⽀持在⽇志系统中定制各类数 据发送⽅,⽤于收集数据;同时,Flume提供对数据进⾏简单处理,并写到各种数据...

    Solr权威指南-上卷

    Solr权威指南-上卷 本书作者是国内较早接触Solr的技术专家之一,多年一直在Solr的研究、实践和布道的路上...然后讲解了Solr与MapReduce、HDFS、Hbase、Kafka、Flume、Storm、Spark等大数据技术的结合使用的集成方法。

Global site tag (gtag.js) - Google Analytics