`

kafka use

阅读更多

kafka是一种高吞吐量的分布式发布订阅消息系统,它有如下特性:

  • 通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。
  • 高吞吐量:即使是非常普通的硬件kafka也可以支持每秒数十万的消息。
  • 支持通过kafka服务器和消费机集群来分区消息。
  • 支持Hadoop并行数据加载。

卡夫卡的目的是提供一个发布订阅解决方案,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群机来提供实时的消费。

kafka是显式分布式架构,producer、broker(Kafka)和consumer都可以有多个。Kafka的作用类似于缓存,即活跃的数据和离线处理系统之间的缓存。几个基本概念:
(1)message(消息)是通信的基本单位,每个producer可以向一个topic(主题)发布一些消息。如果consumer订阅了这个主题,那么新发布的消息就会广播给这些consumer。
(2)Kafka是显式分布式的,多个producer、consumer和broker可以运行在一个大的集群上,作为一个逻辑整体对外提供服务。对于consumer,多个consumer可以组成一个group,这个message只能传输给某个group中的某一个consumer.

1、kafka安装

1
http://kafka.apache.org/documentation.html#quickstart

wget http://mirrors.cnnic.cn/apache/kafka/0.8.2.1/kafka_2.10-0.8.2.1.tgz

[root@itr-mastertest01 local]# tar -zxvf software/kafka_2.10-0.8.2-beta.tgz

#TCP TEST
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol

# vim config/server.properties
brokerid:这个每个server(broker)必须唯一,写数字
host.name:这个也是唯一的,写ip或者hostname
[root@itr-mastertest01 config]# egrep -v "^$|#" server.properties  
broker.id=1
port=9092
host.name=itr-mastertest01  #所在节点的ip地址,如果不想配置全为localhost即可

num.network.threads=3
 
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=65536
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=itr-mastertest01:2181,itr-mastertest02:2181,itr-nodetest01:2181
zookeeper.connection.timeout.ms=2000

[root@itr-mastertest01 config]# egrep -v "^$|#" zookeeper.properties 
dataDir=/tmp/zookeeper
clientPort=2181
maxClientCnxns=0

[root@itr-mastertest01 local]# scp -rq kafka_2.10-0.8.2-beta itr-mastertest02:/usr/local/

2、启动kafka

1
#启动之前zookeeper必须启动!zookeeper请参考[zookeeper cluster deploy](http://www.itweet.cn/2015/07/12/zk-cluster-deploy/)
[root@itr-mastertest01 kafka_2.10-0.8.2-beta]# bin/kafka-server-start.sh config/server.properties &

--建议下面方式启动否则关闭终端kafka就挂了
# nohup kafka/bin/kafka-server-start.sh kafka/config/server.properties > kafka/kafka-logs/kafka-server.log &

[root@itr-mastertest02 kafka_2.10-0.8.2-beta]# bin/kafka-server-start.sh config/server.properties &

[root@itr-nodetest01 kafka_2.10-0.8.2-beta]# bin/kafka-server-start.sh config/server.properties &

[root@itr-nodetest02 kafka_2.10-0.8.2-beta]# bin/kafka-server-start.sh config/server.properties &

[zk: localhost:2181(CONNECTED) 3] get /brokers/ids   
null
cZxid = 0x1d00000939
ctime = Tue May 12 23:52:57 CST 2015
mZxid = 0x1d00000939
mtime = Tue May 12 23:52:57 CST 2015
pZxid = 0x1d00000ca7
cversion = 4
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0

3、创建一个topic

1
[root@itr-mastertest01 kafka_2.10-0.8.2-beta]# bin/kafka-topics.sh --create --zookeeper itr-mastertest01:2181,itr-mastertest02:2181,itr-nodetest01:2181 --replication-factor 3 --partitions 1 --topic test

4、命令行查看topic

1
[root@itr-mastertest02 kafka_2.10-0.8.2-beta]# bin/kafka-topics.sh --list --zookeeper itr-mastertest01:2181,itr-mastertest02:2181,itr-nodetest01:2181
test

5、发送一些消息

1
[root@itr-mastertest01 kafka_2.10-0.8.2-beta]# bin/kafka-console-producer.sh --broker-list itr-mastertest01:9092 --topic test         
This is a message
This is another message

[zk: localhost:2181(CONNECTED) 7] get /brokers/topics/test/partitions/3/state
{"controller_epoch":1,"leader":3,"version":1,"leader_epoch":0,"isr":[3,1]}
[zk: localhost:2181(CONNECTED) 12] get /brokers/ids/3
{"jmx_port":-1,"timestamp":"1431447378690","host":"itr-nodetest01","version":1,"port":9092}

6、开始消费信息

1
[root@itr-mastertest02 kafka_2.10-0.8.2-beta]# bin/kafka-console-consumer.sh --zookeeper itr-mastertest01:2181,itr-mastertest02:2181,itr-nodetest01:2181 --topic test --from-beginning
This is a message
This is another message

[root@itr-nodetest01 kafka_2.10-0.8.2-beta]#  bin/kafka-console-consumer.sh --zookeeper itr-mastertest01:2181,itr-mastertest02:2181,itr-nodetest01:2181 --topic test --from-beginning
This is a message
This is another message
kafaka to slave msg...

[root@itr-nodetest02 kafka_2.10-0.8.2-beta]# bin/kafka-console-consumer.sh --zookeeper itr-mastertest01:2181,itr-mastertest02:2181,itr-nodetest01:2181 --topic test --from-beginning
This is a message
This is another message
kafaka to slave msg.

8、查看集群topic详细信息

1
[root@itr-mastertest02 kafka_2.10-0.8.2-beta]# bin/kafka-topics.sh --describe --zookeeper itr-mastertest01:2181,itr-mastertest02:2181,itr-nodetest01:2181 --topic test
Topic:test      PartitionCount:4        ReplicationFactor:2     Configs:
        Topic: test     Partition: 0    Leader: 4       Replicas: 4,2   Isr: 4,2
        Topic: test     Partition: 1    Leader: 1       Replicas: 1,3   Isr: 1,3
        Topic: test     Partition: 2    Leader: 2       Replicas: 2,4   Isr: 2,4
        Topic: test     Partition: 3    Leader: 3       Replicas: 3,1   Isr: 3,1

消息被写到目录:
[root@itr-mastertest02 bin]# ls /tmp/kafka-logs/test-2
00000000000000000000.index  00000000000000000000.log

9、删除topic

1
$ kafka-topics.sh --zookeeper itr-mastertest01:2181,itr-mastertest02:2181,itr-nodetest01:2181 --delete --topic test
Topic test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

配置server.config , delete.topic.enable=true即可删除

10、kafka的webui

  • Kafka监控工具一

    1
    https://github.com/quantifind/KafkaOffsetMonitor
    
    wget https://github.com/quantifind/KafkaOffsetMonitor/releases/download/v0.2.1/KafkaOffsetMonitor-assembly-0.2.1.jar
    
    Running It
    This is a small webapp, you can run it locally or on a server, as long as you have access to the ZooKeeper nodes controlling kafka.
    java -cp KafkaOffsetMonitor-assembly-0.2.1.jar \
         com.quantifind.kafka.offsetapp.OffsetGetterWeb \
         --zk zk-server1,zk-server2 \
         --port 8080 \
         --refresh 10.seconds \
         --retain 2.days
    The arguments are:
    -   zk the ZooKeeper hosts
    -   port on what port will the app be available
    -   refresh how often should the app refresh and store a point in the DB
    -   retain how long should points be kept in the DB
    -   dbName where to store the history (default 'offsetapp')
    
    [root@itr-mastertest01 local]# mkdir kafka-offset-console
    [root@itr-mastertest01 local]# cd kafka-offset-console/
    [root@itr-mastertest01 kafka-offset-console]# cat mobile_start_en.sh 
    #!/bin/bash 
    cd /usr/local/kafka-offset-console
    java -Xms512M -Xmx512M -Xss1024K -XX:PermSize=256m -XX:MaxPermSize=512m -cp ./KafkaOffsetMonitor-assembly-0.2.1.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --zk itr-mastertest01,itr-mastertest02,itr-nodetest01/config/mobile/kafka-offset-console --port 9999 --refresh 10.seconds --retain 7.days 1>./stdout.log 2>./stderr.log &
    
    [root@itr-mastertest01 kafka-offset-console]# chmod +x mobile_start_en.sh  
    
    [root@itr-mastertest01 kafka-offset-console]# sh mobile_start_en.sh
    
    [root@itr-mastertest01 kafka-offset-console]# tail -f stdout.log 
    serving resources from: jar:file:/usr/local/kafka-offset-console/KafkaOffsetMonitor-assembly-0.2.1.jar!/offsetapp
  • Kafka监控工具二

    1
    https://github.com/yahoo/kafka-manager
    [root@itr-mastertest01 kafka-manager]# echo $JAVA_HOME
    /usr/local/jdk1.7.0_45
    [root@itr-mastertest01 sbt]# sbt -version
    Getting org.scala-sbt sbt 0.13.8 ...
    
    https://github.com/yahoo/kafka-manager
    [root@itr-mastertest01 hsu]# yum install git -y
    [root@itr-mastertest01 hsu]# git clone https://github.com/yahoo/kafka-manager.git
    [root@itr-mastertest01 hsu]# cd kafka-manager/
    [root@itr-mastertest01 kafka-manager]# sbt clean dist #这里生产zip包
    
    Configuration
    $ unzip kafka-manager-1.0-SNAPSHOT.zip
    在kafka-manager安装包的conf目录下面的application.conf文件中进行配置
    kafka-manager.zkhosts="itr-mastertest01:2181,itr-mastertest02:2181,itr-nodetest01:2181"
    
    Starting the service
    $ bin/kafka-manager -Dconfig.file=./conf/application.conf -Dhttp.port=8081  > manager-ui.log &

这个工具需要自己编译,也可以直接找我获取编译包!

  • Kafka监控工具三
    1
    https://github.com/shunfei/DCMonitor
    
    druid:
    https://github.com/alibaba/druid/wiki/%E9%85%8D%E7%BD%AE_StatViewServlet%E9%85%8D%E7%BD%AE

11、kafka和MQ区别

1
https://github.com/alibaba/RocketMQ/wiki/rmq_vs_kafka

http://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
  • 性能对比
    • Kafka单机写入TPS约在百万条/秒,消息大小10个字节
    • RocketMQ单机写入TPS单实例约7万条/秒,单机部署3个Broker,可以跑到最高12万条/秒,消息大小10个字节
    总结:Kafka的TPS跑到单机百万,主要是由于Producer端将多个小消息合并,批量发向Broker。

12、项目分析

1
数据源:
Oracle订单表


写代码
发送数据给kafka

接收:
[root@itr-mastertest01 kafka_2.10-0.8.2-beta]# bin/kafka-console-consumer.sh --zookeeper itr-mastertest01:2181,itr-mastertest02:2181,itr-nodetest01:2181 --topic order --from-beginning
998801026       11118   731     44      0       2015-05-17 00:36:08
46300226        68178   556     14      0       2015-05-17 00:36:08
367575834       42812   820     21      0       2015-05-17 00:36:08
97829386        96289   583     67      1       2015-05-17 00:36:08

#需要拷贝kafka的包到storm相应的lib目录下

#zk创建节点
[zk: localhost:2181(CONNECTED) 11] create /order 1
Created /order
[zk: localhost:2181(CONNECTED) 12] create /order/id 1
Created /order/id
[zk: localhost:2181(CONNECTED) 13] ls /order
[id]

#创建topic
kafka-topics.sh --zookeeper itr-mastertest01:2181,itr-mastertest02:2181,itr-nodetest01:2181 --delete --topic order
bin/kafka-topics.sh --create --zookeeper itr-mastertest01:2181,itr-mastertest02:2181,itr-nodetest01:2181 --replication-factor 3 --partitions 4 --topic order
bin/kafka-topics.sh --describe --zookeeper itr-mastertest01:2181,itr-mastertest02:2181,itr-nodetest01:2181 --topic order

#打包测试1
storm-0.9.2/bin/storm jar storm-code-1.0-SNAPSHOT-jar-with-dependencies.jar com.kafka_storm.topology.CounterTopology

#打包测试2
$ storm/bin/storm jar storm-test_fat.jar com.kafka_storm.topology.OrderTopology

#优化数据结构保证数据不丢失!通过存储到第三方内存系统中如redis/memcached

#zk分布式锁保证多线程处理数据,数据一致性,使用第三方封装zkcli包,保证每次只有一个线程去操作mysql,而不是多个线程导致数据混乱!保证同一条数据不会被多个线程同时处理![这样就可以topology开启多线程处理数据!]
http://repo1.maven.org/maven2/com/netflix/curator/

#集群模式测试,多线程
# storm/bin/storm jar storm-test_fat.jar com.kafka_storm.topology.OrderTopology  OrderTopology > order.log

参考:

原创文章,转载请注明: 转载自sparkjvm的博客

分享到:
评论

相关推荐

    Practical Apache Spark

    Use Spark SQL, DataFrames, and Datasets to process data using traditional SQL queries Work with different machine learning concepts and libraries using Spark’s MLlib packages Who This Book Is For ...

    kafkatool_64bit_v1.0.3.exe.zip

    To download the Kafka UI Tool for your operating system, use the links below. All versions of Kafka Tool come with a bundled JRE with the exception of the Linux version. For Linux, you must have Java ...

    Packt.Learning.Apache.Kafka.2nd.Edition

    producers and some advanced level Java producers that use message partitioning. Chapter 5, Writing Consumers, provides detailed information about how to write basic consumers and some advanced level ...

    kafkatool_64bit.exe.zip

    Kafka Tool is free for personal use only. Any non-personal use, including commercial, educational and non-profit work is not permitted without purchasing a license. Non-personal use is allowed for ...

    Learning Apache Kafka 2nd Edition

    Our goal is to give you an understanding not just of what Apache Kafka is, but also how to use it as a part of your broader technical infrastructure. In the end, we will walk you through ...

    kafka_2.12-3.3.1.tgz

    Kafka是一种高吞吐量的分布式发布订阅消息系统

    Kafka Streams in Action

    Kafka Streams is a library designed to ... By the end of the book, readers will be ready to use Kafka Streams in their projects to reap the benefits of the insight their data holds quickly and easily.

    Kafka Tool 2.0.7(windows 32\64).7z

    To download the Kafka UI Tool for your operating system, use the links below. All versions of Kafka Tool come with a bundled JRE with the exception of the Linux version. For Linux, you must have Java ...

    Kafka_The Definitive Guide_Real-Time Data and Stream Processing at Scale-2017

    We looked back at our experience writing Kafka, running Kafka in production, and helping many companies use Kafka to build software architectures and manage their data pipelines and we asked ...

    Kafka Streams in Action (MEAP V9)

    Kafka Streams in Action teaches you everything you... By the end of the book, you'll be ready to use Kafka Streams in your projects to reap the benefits of the insight your data holds quickly and easily.

    Kafka: The Definitive Guide

    Kafka: The Definitive Guide, the cover image, and related trade dress are trademarks of O’Reilly Media, Inc. While the publisher and the authors have used good faith efforts to ensure that the ...

    Building Data Streaming Applications with Apache Kafka

    to use Kafka efficiently to handle high data volumes with ease. This book first takes you through understanding the type messaging system and then provides a thorough introduction to Apache Kafka and ...

    kafka the definitive guide

    Kafka, running Kafka in production, and helping many companies use Kafka to build software architectures and manage their data pipelines and we asked ourselves, “What are the most useful things we ...

    apache-kafka-1.0.0 java Demo

    apache-kafka-1.0.0 java Demo(附jar),只是简单的实现,没有使用连接池。

    Kafka Tool 2.0.7(dmg,sh).7z

    To download the Kafka UI Tool for your operating system, use the links below. All versions of Kafka Tool come with a bundled JRE with the exception of the Linux version. For Linux, you must have Java ...

    Apache Kafka

    Integrate Kafka with Apache Hadoop and Storm for use cases such as processing streaming data Provide an overview of Kafka tools and other contributions that work with Kafka in areas such as logging, ...

    apache kafka 1.0 cookbook-高清-标签

    use Apache Kafka. These companies include the top 10 travel companies, 7 of the top 10 banks, 8 of the top 10 insurance companies, and 9 of the top 10 telecom companies. LinkedIn, Uber, Twitter, ...

    [示例][PHP]kafka-PHP客户端库(Composer).zip

    use SingletonTrait; private $groupBrokerId = null; private $topics = []; private $brokers = []; private $metaSockets = []; private $dataSockets = []; private $process; private $socket; ...

    kafka的PHP库(Composer).zip

     use SingletonTrait;  private $groupBrokerId = null;  private $topics = [];  private $brokers = [];  private $metaSockets = [];  private $dataSockets = [];  private $...

Global site tag (gtag.js) - Google Analytics