`

分布式发布订阅消息系统 Kafka 架构设计

阅读更多
Consumer state

In Kafka, the consumers are responsible for maintaining state information (offset) on what has been consumed. Typically, the Kafka consumer library writes their state data to zookeeper. However, it may be beneficial for consumers to write state data into the same datastore where they are writing the results of their processing. For example, the consumer may simply be entering some aggregate value into a centralized transactional OLTP database. In this case the consumer can store the state of what is consumed in the same transaction as the database modification. This solves a distributed consensus problem, by removing the distributed part! A similar trick works for some non-transactional systems as well. A search system can store its consumer state with its index segments. Though it may provide no durability guarantees, this means that the index is always in sync with the consumer state: if an unflushed index segment is lost in a crash, the indexes can always resume consumption from the latest checkpointed offset. Likewise our Hadoop load job which does parallel loads from Kafka, does a similar trick. Individual mappers write the offset of the last consumed message to HDFS at the end of the map task. If a job fails and gets restarted, each mapper simply restarts from the offsets stored in HDFS.
There is a side benefit of this decision. A consumer can deliberately rewind back to an old offset and re-consume data. This violates the common contract of a queue, but turns out to be an essential feature for many consumers. For example, if the consumer code has a bug and is discovered after some messages are consumed, the consumer can re-consume those messages once the bug is fixed.
译者信息
使用者的状态

在Kafka中,由使用者负责维护反映哪些消息已被使用的状态信息(偏移量)。典型情况下,Kafka使用者的library会把状态数据保存到Zookeeper之中。然而,让使用者将状态信息保存到保存它们的消息处理结果的那个数据存储(datastore)中也许会更佳。例如,使用者也许就是要把一些统计值存储到集中式事物OLTP数据库中,在这种情况下,使用者可以在进行那个数据库数据更改的同一个事务中将消息使用状态信息存储起来。这样就消除了分布式的部分,从而解决了分布式中的一致性问题!这在非事务性系统中也有类似的技巧可用。搜索系统可用将使用者状态信息同它的索引段(index segment)存储到一起。尽管这么做可能无法保证数据的持久性(durability),但却可用让索引同使用者状态信息保存同步:如果由于宕机造成有一些没有刷新到磁盘的索引段信息丢了,我们总是可用从上次建立检查点(checkpoint)的偏移量处继续对索引进行处理。与此类似,Hadoop的加载作业(load job)从Kafka中并行加载,也有相同的技巧可用。每个Mapper在map任务结束前,将它使用的最后一个消息的偏移量存入HDFS。
这个决策还带来一个额外的好处。使用者可用故意回退(rewind)到以前的偏移量处,再次使用一遍以前使用过的数据。虽然这么做违背了队列的一般协约(contract),但对很多使用者来讲却是个很基本的功能。举个例子,如果使用者的代码里有个Bug,而且是在它处理完一些消息之后才被发现的,那么当把Bug改正后,使用者还有机会重新处理一遍那些消息。
Push vs. pull

A related question is whether consumers should pull data from brokers or brokers should push data to the subscriber. In this respect Kafka follows a more traditional design, shared by most messaging systems, where data is pushed to the broker from the producer and pulled from the broker by the consumer. Some recent systems, such as scribe and flume, focusing on log aggregation, follow a very different push based path where each node acts as a broker and data is pushed downstream. There are pros and cons to both approaches. However a push-based system has difficulty dealing with diverse consumers as the broker controls the rate at which data is transferred. The goal, is generally for the consumer to be able to consume at the maximum possible rate; unfortunately in a push system this means the consumer tends to be overwhelmed when its rate of consumption falls below the rate of production (a denial of service attack, in essence). A pull-based system has the nicer property that the consumer simply falls behind and catches up when it can. This can be mitigated with some kind of backoff protocol by which the consumer can indicate it is overwhelmed, but getting the rate of transfer to fully utilize (but never over-utilize) the consumer is trickier than it seems. Previous attempts at building systems in this fashion led us to go with a more traditional pull model.
译者信息
Push和Pull

相关问题还有一个,就是到底是应该让使用者从代理那里吧数据Pull(拉)回来还是应该让代理把数据Push(推)给使用者。和大部分消息系统一样,Kafka在这方面遵循了一种更加传统的设计思路:由生产者将数据Push给代理,然后由使用者将数据代理那里Pull回来。近来有些系统,比如scribe和flume,更着重于日志统计功能,遵循了一种非常不同的基于Push的设计思路,其中每个节点都可以作为代理,数据一直都是向下游Push的。上述两种方法都各有优缺点。然而,因为基于Push的系统中代理控制着数据的传输速率,因此它难以应付大量不同种类的使用者。我们的设计目标是,让使用者能以它最大的速率使用数据。不幸的是,在Push系统中当数据的使用速率低于产生的速率时,使用者往往会处于超载状态(这实际上就是一种拒绝服务攻击)。基于Pull的系统在使用者的处理速度稍稍落后的情况下会表现更佳,而且还可以让使用者在有能力的时候往往前赶赶。让使用者采用某种退避协议(backoff protocol)向代理表明自己处于超载状态,可以解决部分问题,但是,将传输速率调整到正好可以完全利用(但从不能过度利用)使用者的处理能力可比初看上去难多了。以前我们尝试过多次,想按这种方式构建系统,得到的经验教训使得我们选择了更加常规的Pull模型。
Distribution

Kafka is built to be run across a cluster of machines as the common case. There is no central "master" node. Brokers are peers to each other and can be added and removed at anytime without any manual configuration changes. Similarly, producers and consumers can be started dynamically at any time. Each broker registers some metadata (e.g., available topics) in Zookeeper. Producers and consumers can use Zookeeper to discover topics and to co-ordinate the production and consumption. The details of producers and consumers will be described below.
译者信息
分发

Kafka通常情况下是运行在集群中的服务器上。没有中央的“主”节点。代理彼此之间是对等的,不需要任何手动配置即可可随时添加和删除。同样,生产者和消费者可以在任何时候开启。 每个代理都可以在Zookeeper(分布式协调系统)中注册的一些元数据(例如,可用的主题)。生产者和消费者可以使用Zookeeper发现主题和相互协调。关于生产者和消费者的细节将在下面描述。
Producer

Automatic producer load balancing

Kafka supports client-side load balancing for message producers or use of a dedicated load balancer to balance TCP connections. A dedicated layer-4 load balancer works by balancing TCP connections over Kafka brokers. In this configuration all messages from a given producer go to a single broker. The advantage of using a level-4 load balancer is that each producer only needs a single TCP connection, and no connection to zookeeper is needed. The disadvantage is that the balancing is done at the TCP connection level, and hence it may not be well balanced (if some producers produce many more messages than others, evenly dividing up the connections per broker may not result in evenly dividing up the messages per broker).
译者信息
生产者

生产者自动负载均衡

对于生产者,Kafka支持客户端负载均衡,也可以使用一个专用的负载均衡器对TCP连接进行负载均衡调整。专用的第四层负载均衡器在Kafka代理之上对TCP连接进行负载均衡。在这种配置的情况,一个给定的生产者所发送的消息都会发送给一个单个的代理。使用第四层负载均衡器的好处是,每个生产者仅需一个单个的TCP连接而无须同Zookeeper建立任何连接。不好的地方在于所有均衡工作都是在TCP连接的层次完成的,因而均衡效果可能并不佳(如果有些生产者产生的消息远多于其它生产者,按每个代理对TCP连接进行平均分配可能会导致每个代理接收到的消息总数并不平均)。
Client-side zookeeper-based load balancing solves some of these problems. It allows the producer to dynamically discover new brokers, and balance load on a per-request basis. Likewise it allows the producer to partition data according to some key instead of randomly, which enables stickiness on the consumer (e.g. partitioning data consumption by user id). This feature is called "semantic partitioning", and is described in more detail below.
The working of the zookeeper-based load balancing is described below. Zookeeper watchers are registered on the following events—
a new broker comes up
a broker goes down
a new topic is registered
a broker gets registered for an existing topic
Internally, the producer maintains an elastic pool of connections to the brokers, one per broker. This pool is kept updated to establish/maintain connections to all the live brokers, through the zookeeper watcher callbacks. When a producer request for a particular topic comes in, a broker partition is picked by the partitioner (see section on semantic partitioning). The available producer connection is used from the pool to send the data to the selected broker partition.
译者信息
采用客户端基于zookeeper的负载均衡可以解决部分问题。如果这么做就能让生产者动态地发现新的代理,并按请求数量进行负载均衡。类似的,它还能让生产者按照某些键值(key)对数据进行分区(partition)而不是随机乱分,因而可以保存同使用者的关联关系(例如,按照用户id对数据使用进行分区)。这种分法叫做“语义分区”(semantic partitioning),下文再讨论其细节。
下面讲解基于zookeeper的负载均衡的工作原理。在发生下列事件时要对zookeeper的监视器(watcher)进行注册:
加入了新的代理
有一个代理下线了
注册了新的话题
代理注册了已有话题。
生产者在其内部为每一个代理维护了一个弹性的连接(同代理建立的连接)池。通过使用zookeeper监视器的回调函数(callback),该连接池在建立/保持同所有在线代理的连接时都要进行更新。当生产者要求进入某特定话题时,由分区者(partitioner)选择一个代理分区(参加语义分区小结)。从连接池中找出可用的生产者连接,并通过它将数据发送到刚才所选的代理分区。
Asynchronous send

Asynchronous non-blocking operations are fundamental to scaling messaging systems. In Kafka, the producer provides an option to use asynchronous dispatch of produce requests (producer.type=async). This allows buffering of produce requests in a in-memory queue and batch sends that are triggered by a time interval or a pre-configured batch size. Since data is typically published from set of heterogenous machines producing data at variable rates, this asynchronous buffering helps generate uniform traffic to the brokers, leading to better network utilization and higher throughput.
译者信息
异步发送

对于可伸缩的消息系统而言,异步非阻塞式操作是不可或缺的。在Kafka中,生产者有个选项(producer.type=async)可用指定使用异步分发出产请求(produce request)。这样就允许用一个内存队列(in-memory queue)把生产请求放入缓冲区,然后再以某个时间间隔或者事先配置好的批量大小将数据批量发送出去。因为一般来说数据会从一组以不同的数据速度生产数据的异构的机器中发布出,所以对于代理而言,这种异步缓冲的方式有助于产生均匀一致的流量,因而会有更佳的网络利用率和更高的吞吐量。
Semantic partitioning

Consider an application that would like to maintain an aggregation of the number of profile visitors for each member. It would like to send all profile visit events for a member to a particular partition and, hence, have all updates for a member to appear in the same stream for the same consumer thread. The producer has the capability to be able to semantically map messages to the available kafka nodes and partitions. This allows partitioning the stream of messages with some semantic partition function based on some key in the message to spread them over broker machines. The partitioning function can be customized by providing an implementation of the kafka.producer.Partitioner interface, default being the random partitioner. For the example above, the key would be member_id and the partitioning function would be hash(member_id)%num_partitions.
译者信息
语义分区

下面看看一个想要为每个成员统计一个个人空间访客总数的程序该怎么做。应该把一个成员的所有个人空间访问事件发送给某特定分区,因此就可以把对一个成员的所有更新都放在同一个使用者线程中的同一个事件流中。生产者具有从语义上将消息映射到有效的Kafka节点和分区之上的能力。这样就可以用一个语义分区函数将消息流按照消息中的某个键值进行分区,并将不同分区发送给各自相应的代理。通过实现kafak.producer.Partitioner接口,可以对分区函数进行定制。在缺省情况下使用的是随即分区函数。上例中,那个键值应该是member_id,分区函数可以是hash(member_id)%num_partitions。
Support for Hadoop and other batch data load

Scalable persistence allows for the possibility of supporting batch data loads that periodically snapshot data into an offline system for batch processing. We make use of this for loading data into our data warehouse and Hadoop clusters.
Batch processing happens in stages beginning with the data load stage and proceeding in an acyclic graph of processing and output stages (e.g. as supported here). An essential feature of support for this model is the ability to re-run the data load from a point in time (in case anything goes wrong).
In the case of Hadoop we parallelize the data load by splitting the load over individual map tasks, one for each node/topic/partition combination, allowing full parallelism in the loading. Hadoop provides the task management, and tasks which fail can restart without danger of duplicate data.
译者信息
对Hadoop以及其它批量数据装载的支持

具有伸缩性的持久化方案使得Kafka可支持批量数据装载,能够周期性将快照数据载入进行批量处理的离线系统。我们利用这个功能将数据载入我们的数据仓库(data warehouse)和Hadoop集群。
批量处理始于数据载入阶段,然后进入非循环图(acyclic graph)处理过程以及输出阶段(支持情况在这里)。支持这种处理模型的一个重要特性是,要有重新装载从某个时间点开始的数据的能力(以防处理中有任何错误发生)。
对于Hadoop,我们通过在单个的map任务之上分割装载任务对数据的装载进行了并行化处理,分割时,所有节点/话题/分区的每种组合都要分出一个来。Hadoop提供了任务管理,失败的任务可以重头再来,不存在数据被重复的危险。
Implementation Details

The following gives a brief description of some relevant lower-level implementation details for some parts of the system described in the above section.
API Design

Producer APIs

The Producer API that wraps the 2 low-level producers -kafka.producer.SyncProducerandkafka.producer.async.AsyncProducer.
class Producer {

  /* Sends the data, partitioned by key to the topic using either the */
  /* synchronous or the asynchronous producer */
  public void send(kafka.javaapi.producer.ProducerData producerData);

  /* Sends a list of data, partitioned by key to the topic using either */
  /* the synchronous or the asynchronous producer */
  public void send(java.util.List< kafka.javaapi.producer.ProducerData> producerData);

  /* Closes the producer and cleans up */
  public void close();

}
译者信息
实施细则

下面给出了一些在上一节所描述的低层相关的实现系统的某些部分的细节的简要说明。
API 设计

生产者 APIs

生产者 API 是给两个底层生产者的再封装 -kafka.producer.SyncProducerandkafka.producer.async.AsyncProducer.
class Producer {

  /* Sends the data, partitioned by key to the topic using either the */
  /* synchronous or the asynchronous producer */
  public void send(kafka.javaapi.producer.ProducerData producerData);

  /* Sends a list of data, partitioned by key to the topic using either */
  /* the synchronous or the asynchronous producer */
  public void send(java.util.List< kafka.javaapi.producer.ProducerData> producerData);

  /* Closes the producer and cleans up */
  public void close();

}
The goal is to expose all the producer functionality through a single API to the client. The new producer -
can handle queueing/buffering of multiple producer requests and asynchronous dispatch of the batched data -
kafka.producer.Producerprovides the ability to batch multiple produce requests (producer.type=async), before serializing and dispatching them to the appropriate kafka broker partition. The size of the batch can be controlled by a few config parameters. As events enter a queue, they are buffered in a queue, until eitherqueue.timeorbatch.sizeis reached. A background thread (kafka.producer.async.ProducerSendThread) dequeues the batch of data and lets thekafka.producer.EventHandlerserialize and send the data to the appropriate kafka broker partition. A custom event handler can be plugged in through theevent.handlerconfig parameter. At various stages of this producer queue pipeline, it is helpful to be able to inject callbacks, either for plugging in custom logging/tracing code or custom monitoring logic. This is possible by implementing thekafka.producer.async.CallbackHandlerinterface and settingcallback.handlerconfig parameter to that class.
handles the serialization of data through a user-specifiedEncoder-
interface Encoder<T> {
  public Message toMessage(T data);
}
The default is the no-opkafka.serializer.DefaultEncoder
provides zookeeper based automatic broker discovery -
The zookeeper based broker discovery and load balancing can be used by specifying the zookeeper connection url through thezk.connectconfig parameter. For some applications, however, the dependence on zookeeper is inappropriate. In that case, the producer can take in a static list of brokers through thebroker.listconfig parameter. Each produce requests gets routed to a random broker partition in this case. If that broker is down, the produce request fails.
provides software load balancing through an optionally user-specifiedPartitioner-
The routing decision is influenced by thekafka.producer.Partitioner.
interface Partitioner<T> {
   int partition(T key, int numPartitions);
}
The partition API uses the key and the number of available broker partitions to return a partition id. This id is used as an index into a sorted list of broker_ids and partitions to pick a broker partition for the producer request. The default partitioning strategy ishash(key)%numPartitions. If the key is null, then a random broker partition is picked. A custom partitioning strategy can also be plugged in using thepartitioner.classconfig parameter.
译者信息
该API的目的是将生产者的所有功能通过一个单个的API公开给其使用者(client)。新建的生产者可以:
对多个生产者请求进行排队/缓冲并异步发送批量数据 —— kafka.producer.Producer提供了在将多个生产请求序列化并发送给适当的Kafka代理分区之前,对这些生产请求进行批量处理的能力(producer.type=async)。批量的大小可以通过一些配置参数进行控制。当事件进入队列时会先放入队列进行缓冲,直到时间到了queue.time或者批量大小到达batch.size为止,后台线程(kafka.producer.async.ProducerSendThread)会将这批数据从队列中取出,交给kafka.producer.EventHandler进行序列化并发送给适当的kafka代理分区。通过event.handler这个配置参数,可以在系统中插入一个自定义的事件处理器。在该生产者队列管道中的各个不同阶段,为了插入自定义的日志/跟踪代码或者自定义的监视逻辑,如能注入回调函数会非常有用。通过实现kafka.producer.asyn.CallbackHandler接口并将配置参数callback.handler设置为实现类就能够实现注入。
使用用户指定的Encoder处理数据的序列化(serialization)
interface Encoder<T> {
  public Message toMessage(T data);
}
Encoder的缺省值是一个什么活都不干的kafka.serializer.DefaultEncoder。
提供基于zookeeper的代理自动发现功能 —— 通过使用zk.connect配置参数指定zookeeper的连接url,就能够使用基于zookeeper的代理发现和负载均衡功能。在有些应用场合,可能不太适合于依赖zookeeper。在这种情况下,生产者可以从broker.list这个配置参数中获得一个代理的静态列表,每个生产请求会被随即的分配给各代理分区。如果相应的代理宕机,那么生产请求就会失败。
通过使用一个可选性的、由用户指定的Partitioner,提供由软件实现的负载均衡功能 —— 数据发送路径选择决策受kafka.producer.Partitioner的影响。
interface Partitioner<T> {
   int partition(T key, int numPartitions);
}
分区API根据相关的键值以及系统中具有的代理分区的数量返回一个分区id。将该id用作索引,在broker_id和partition组成的经过排序的列表中为相应的生产者请求找出一个代理分区。缺省的分区策略是hash(key)%numPartitions。如果key为null,那就进行随机选择。使用partitioner.class这个配置参数也可以插入自定义的分区策略。
分享到:
评论

相关推荐

    分布式发布订阅消息系统Kafka架构设计

    Kafka是一个消息系统,原本开发自LinkedIn,用作LinkedIn的活动流(activitystream)和运营数据处理管道(pipeline)的基础。现在它已为多家不同类型的公司作为多种类型的数据管道(datapipeline)和消息系统使用。...

    分布式发布订阅消息系统Kafka

    Kafka架构及核心概念 Kafka单节点单broker部署及使用 单节点多Broker部署及使用(未调通) xxx(省略一部分) 整合Flume和Kafka完成实时数据采集 2 Kafka概述 官网:http://kafka.apache.org/ A distribute ...

    40 2 深入了解和理解大数据Kafka分布式消息系统的体系架构和实现原理

    教程视频:Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统

    尚硅谷大数据技术之Kafka(笔记+代码+资料).rar

    Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。在本课程中,你将学习到,Kafka架构原理、安装配置使用、详细的Kafka写入数据和处理数据以及写出数据的流程、新旧...

    分布式消息系统:Kafka

    Kafka是分布式发布-订阅消息系统。它最初由LinkedIn公司开发,之后成为Apache项目的一部分。Kafka是一个分布式的,可划分的,冗余备份的持久性的日志服务。它主要用于处理活跃的流式数据。在大数据系统中,常常会...

    Kafka的架构原理,你真的理解吗?

    Kafka社区非常活跃,从0.9版本开始,Kafka的标语已经从“一个高吞吐量,分布式的消息系统”改为"一个分布式流平台"。 Kafka和传统的消息系统不同在于: Kafka是一个分布式系统,易于向外扩展。 它同时为发布和订阅...

    kafka概述及原理.pdf

    它允许发布和订阅记录流,类似于消息队列或企业消息系统。然而,Kafka被设计为一个分布式系统,具有高度的可扩展性、容错性和持久性,使得它能够处理大量实时数据。 Kafka的核心特性 分布式和容错性:Kafka集群由...

    Java思维导图xmind文件+导出图片

    分布式架构设计 主流架构模型-SOA架构和微服务架构 领域驱动设计及业务驱动规划 分布式架构的基本理论CAP、BASE以及其应用 什么是分布式架构下的高可用设计 构架高性能的分布式架构 构建分布式架构最重要因素...

    micro-message-system:基于go micro + gin + kafka + etcd的分布式消息即时通信微服务系统

    基于go micro的分布式即时通信... IM服务: IM消息发送服务 - produce消息到kafka 网关gate调用 IM消息订阅服务 - 订阅kafka消息供websocket展示3. 网关: 调用userrpc验证用户token是否存在 调用imrpc发送消息到kafka

    大数据视频_Kafka视频教程

    Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。在本课程中,你将学习到,Kafka架构原理、安装配置使用、详细的Kafka写入数据和处理数据以及写出数据的流程、新旧...

    Kafka基本架构介绍

    有两种类型的消息模式可用-一种是点对点,另一种是发布-订阅(pub-sub)消息系统。大多数消息模式遵循pub-sub。(1)点对点消息系统在点对点系统中,消息被保留在队列中。一个或多个消费者可以消耗队列中的消息,但是...

    华为HCIA-Big Data V2.0 LVC公开课培训.rar

    第十一章 Kafka分布式消息订阅系统 11.1_Kafka简介-Kafka架构与功能 11.2_Kafka架构与功能-Kafka关键流程 第十二章 ZooKeeper集群分布式协调服务 12.1_ZooKeeper简介-与组件的关系 第十三章 FusionInsight HD ...

    通俗易懂的Kafka升级版教程(含配套资料)

    Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。在本课程中,你将学习到,Kafka架构原理、安装配置使用、详细的Kafka写入数据和处理数据以及写出数据的流程、新旧...

    Kafka技术内幕-郑奇煌(完整版).rar

    其持久化层本质上是一个“按照分布式事务日志架构的大规模发布/订阅消息队列”,[3]这使它作为企业级基础设施来处理流式数据非常有价值。此外,Kafka可以通过Kafka Connect连接到外部系统(用于数据输入/输出),并...

    Apache Kafka 基本介绍.zip

    生产者将事件发布到Kafka主题,而消费者可以根据需要订阅这些事件,从而实现松耦合的分布式系统。 数据集成:Kafka可以用于将数据从一个应用程序或数据存储传输到另一个应用程序或数据存储。这对于将数据从一个...

    轻松玩转Kafka消息中间件

    Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量...

Global site tag (gtag.js) - Google Analytics