`

Transactional Topologies

 
阅读更多

https://storm.apache.org/documentation/Transactional-topologies.html

 

NOTE: Transactional topologies have been deprecated -- use the Trident framework instead.

 

Storm guarantees data processing by providing an at least once processing guarantee. The most common question asked about Storm is "Given that tuples can be replayed, how do you do things like counting on top of Storm? Won't you overcount?"

Storm 0.7.0 introduces transactional topologies, which enable you to get exactly once messaging semantics for pretty much any computation. So you can do things like counting in a fully-accurate, scalable, and fault-tolerant way.

Like Distributed RPC, transactional topologies aren't so much a feature of Storm as they are a higher level abstraction built on top of Storm's primitives of streams, spouts, bolts, and topologies.

This page explains the transactional topology abstraction, how to use the API, and provides details as to its implementation.

Design 1

The core idea behind transactional topologies is to provide a strong ordering on the processing of data. The simplest manifestation of this, and the first design we'll look at, is processing the tuples one at a time and not moving on to the next tuple until the current tuple has been successfully processed by the topology.

Each tuple is associated with a transaction id. If the tuple fails and needs to be replayed, then it is emitted with the exact same transaction id. A transaction id is an integer that increments for every tuple, so the first tuple will have transaction id 1, the second id 2, and so on.

The strong ordering of tuples gives you the capability to achieve exactly-once semantics even in the case of tuple replay. Let's look at an example of how you would do this.

Suppose you want to do a global count of the tuples in the stream. Instead of storing just the count in the database, you instead store the count and the latest transaction id together as one value in the database. When your code updates the count in the db, it should update the count only if the transaction id in the database differs from the transaction id for the tuple currently being processed. Consider the two cases:

  1. The transaction id in the database is different than the current transaction id: Because of the strong ordering of transactions, we know for sure that the current tuple isn't represented in that count. So we can safely increment the count and update the transaction id.
  2. The transaction id is the same as the current transaction id: Then we know that this tuple is already incorporated into the count and can skip the update. The tuple must have failed after updating the database but before reporting success back to Storm.

This logic and the strong ordering of transactions ensures that the count in the database will be accurate even if tuples are replayed. Credit for this trick of storing a transaction id in the database along with the value goes to the Kafka devs, particularly this design document.

Furthermore, notice that the topology can safely update many sources of state in the same transaction and achieve exactly-once semantics. If there's a failure, any updates that already succeeded will skip on the retry, and any updates that failed will properly retry. For example, if you were processing a stream of tweeted urls, you could update a database that stores a tweet count for each url as well as a database that stores a tweet count for each domain.

There is a significant problem though with this design of processing one tuple at time. Having to wait for each tuple to be completely processed before moving on to the next one is horribly inefficient. It entails a huge amount of database calls (at least one per tuple), and this design makes very little use of the parallelization capabilities of Storm. So it isn't very scalable.

Design 2

Instead of processing one tuple at a time, a better approach is to process a batch of tuples for each transaction. So if you're doing a global count, you would increment the count by the number of tuples in the entire batch. If a batch fails, you replay the exact batch that failed. Instead of assigning a transaction id to each tuple, you assign a transaction id to each batch, and the processing of the batches is strongly ordered. Here's a diagram of this design:

Storm cluster

So if you're processing 1000 tuples per batch, your application will do 1000x less database operations than design 1. Additionally, it takes advantage of Storm's parallelization capabilities as the computation for each batch can be parallelized.

While this design is significantly better than design 1, it's still not as resource-efficient as possible. The workers in the topology spend a lot of time being idle waiting for the other portions of the computation to finish. For example, in a topology like this:

Storm cluster

After bolt 1 finishes its portion of the processing, it will be idle until the rest of the bolts finish and the next batch can be emitted from the spout.

Design 3 (Storm's design)

A key realization is that not all the work for processing batches of tuples needs to be strongly ordered. For example, when computing a global count, there's two parts to the computation:

  1. Computing the partial count for the batch
  2. Updating the global count in the database with the partial count

The computation of #2 needs to be strongly ordered across the batches, but there's no reason you shouldn't be able to pipeline the computation of the batches by computing #1 for many batches in parallel. So while batch 1 is working on updating the database, batches 2 through 10 can compute their partial counts.

Storm accomplishes this distinction by breaking the computation of a batch into two phases:

  1. The processing phase: this is the phase that can be done in parallel for many batches
  2. The commit phase: The commit phases for batches are strongly ordered. So the commit for batch 2 is not done until the commit for batch 1 has been successful.

The two phases together are called a "transaction". Many batches can be in the processing phase at a given moment, but only one batch can be in the commit phase. If there's any failure in the processing or commit phase for a batch, the entire transaction is replayed (both phases).

Design details

When using transactional topologies, Storm does the following for you:

  1. Manages state: Storm stores in Zookeeper all the state necessary to do transactional topologies. This includes the current transaction id as well as the metadata defining the parameters for each batch.
  2. Coordinates the transactions: Storm will manage everything necessary to determine which transactions should be processing or committing at any point.
  3. Fault detection: Storm leverages the acking framework to efficiently determine when a batch has successfully processed, successfully committed, or failed. Storm will then replay batches appropriately. You don't have to do any acking or anchoring -- Storm manages all of this for you.
  4. First class batch processing API: Storm layers an API on top of regular bolts to allow for batch processing of tuples. Storm manages all the coordination for determining when a task has received all the tuples for that particular transaction. Storm will also take care of cleaning up any accumulated state for each transaction (like the partial counts).

Finally, another thing to note is that transactional topologies require a source queue that can replay an exact batch of messages. Technologies like Kestrel can't do this. Apache Kafka is a perfect fit for this kind of spout, and storm-kafka contains a transactional spout implementation for Kafka.

 

 

 

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics