`
mshijie
  • 浏览: 94645 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

使用Berkeley DB构建持久化队列

    博客分类:
  • Java
阅读更多

Berkeley DB简介

Berkeley DB(以下简称Bdb)是一个嵌入式的键值数据库。Bdb目前有两个版本,一个是使用c++构建的版本,还有一个java版本。c++版本支持在众多的语言中使用,Berkeley DB Java Edition(以下简称JE)完全用java语言编写。JE执行在应用程序中,完全不需要Client/Server的通信。JE更容易部署和嵌入到java程序中,所以我选择了使用Berkeley DB Java Edition(sleep cat)。

 

Berkeley DB编程接口  

在Java程序中使用JE,首先需要初始化一个数据库环境。

File home = new File(envHome);
EnvironmentConfig environmentConfig = new EnvironmentConfig();
environmentConfig.setTransactional(true);
environmentConfig.setAllowCreate(true);
environmentConfig.setDurability(Durability.COMMIT_WRITE_NO_SYNC);
Environment environment = new Environment(home, environmentConfig);


home是一个存在的目录,JE将把自己所有的数据库文件存储到这个目录下。setTransactional设置数据库支持事务。setAllowCreate设置数据库不存在时,是否创建数据库。setDurability设置Bdb的写数据的方式,这个很重要,对性能和数据安全的影响和很大。Bdb数据为了性能考虑,写入的数据不会立即写回到磁盘中,必须手动同步和关闭数据库的时候才回写回数据。可以通过设置Durability修改默认行为。Durability预设值了以下几种策略

COMMIT_SYNC  
事务提交时,同步数据到磁盘,并等待磁盘完成,这是最安全的策略,也是最耗时的策略。    
COMMIT_WRITE_NO_SYNC  
事务提交时,写数据到磁盘,不等待磁盘完成,安全和速度折中的策略。
COMMIT_NO_SYNC 
COMMIT_WRITE_NO_SYNC
READ_ONLY_TXN

 
COMMIT_SYNC   能保证应用或者系统挂掉而不丢失数据,保证了事务的完整性。而COMMIT_WRITE_NO_SYNC   只能保证应用挂掉不丢失数据,不能保证系统挂掉时的事务完整性。

JE提供了两个层次的编程接口,高层的Direct Persistence Layer(DPL),DPL适合直接保存和读取一个Java对象的场景。DPL读取使用annotation配置的元信息保存和读取数据。

 

 

Direct Persistence Layer  

待存取的用户类

@Entity
public class User {

    @PrimaryKey(sequence = "SEQ_USER_ID")
    private Integer userId;

    @SecondaryKey(relate = MANY_TO_ONE)
    private String nick;

    //constructor, getter and setter
}


PrimaeyKey配置userId为主键,sequence配置了一个自增序列,@SecondaryKey配置了可查询的索引。MANY_TO_ONE表示,nick的值在数据库中可重复,多个user可使用同一个nick,按nick查询会返回一个列表。

    private static EntityStore createStore(Environment envionment) throws DatabaseException {
        StoreConfig storeConfig = new StoreConfig();
        storeConfig.setAllowCreate(true);
        storeConfig.setTransactional(true);
        return new EntityStore(envionment, "store1", storeConfig);
    }


创建一个EntityStore

PrimaryIndex<Integer, User> primaryKey = entityStore.getPrimaryIndex(Integer.class, User.class);

primaryKey.put(new User("user" + i));
primaryKey.get(1);


通过EntityStore获得对应的PrimaryIndex,即可保存,读取,查询Java对象了。

 

 

Base API  

同样,首先创建一个Database。

 DatabaseConfig myDbConfig = new DatabaseConfig();
 myDbConfig.setAllowCreate(true);
 myDbConfig.setTransactional(true);
 myDbConfig.setSortedDuplicates(true);

 Database myDb = myEnv.openDatabase(null,     // txn handle
           dbName,   // Database file name
           myDbConfig);


使用Database保存数据时,提供的Key和Value都需要序列化为DatabaseEntry。

 

 

Bdb JE Java Collections  

JE提供了一组高层的Collection API,通过java的Map,List等API封装了底层存取操作。比较适合用来构建持久化的数据结构。

 

 

构建持久化队列


public class BdbMessageQueue<T> {

    private static Logger log = Logger.getLogger(BdbMessageQueue.class);

    private static final String MESSAGE_STORE = "message_store";

    private Database messageDb;
    private StoredSortedMap<Long, T> messageMap;

    private TransactionRunner transactionRunner;

    //EnqueueWorker和DequeueWorker的同步对象
    private Object syncObject = new Object();

    public BdbMessageQueue(BdbEnvironment bdbEnvironment, String queueName)
            throws DatabaseException {
        try {
            // Set the Berkeley DB config for opening all stores.
            DatabaseConfig dbConfig = new DatabaseConfig();
            dbConfig.setTransactional(true);
            dbConfig.setAllowCreate(true);


            // Open the Berkeley DB database for the part, supplier and shipment
            // stores.  The stores are opened with no duplicate keys allowed.
            messageDb = bdbEnvironment.getEnvironment().openDatabase(null, MESSAGE_STORE + queueName, dbConfig);

            EntryBinding messageKeyBinding =
                    new SerialBinding(bdbEnvironment.getJavaCatalog(), Long.class);
            EntryBinding messageValueBinding =
                    new SerialBinding(bdbEnvironment.getJavaCatalog(), Object.class);

            messageMap = new StoredSortedMap(messageDb, messageKeyBinding, messageValueBinding, true);

            // Create transactionRunner for the transactional operation
            transactionRunner = new TransactionRunner(bdbEnvironment.getEnvironment());

        } catch (DatabaseException dbe) {
            throw new VipServiceException(VipErrorCode.DBD_ERROR, dbe);
        }
    }


    /**
     * 安全关闭berkeley 数据库
     */
    public void close() {
        try {
            if (messageDb != null) messageDb.close();
        } catch (DatabaseException dbe) {
            throw new VipServiceException(VipErrorCode.DBD_ERROR, dbe);
        }
    }

    /**
     * 出队
     *
     * @return
     */
    public void dequeue(TaskCallback task) {
        try {
            DequeueWorker worker = new DequeueWorker(task);
            transactionRunner.run(worker);
        } catch (Exception e) {
            throw new VipServiceException(VipErrorCode.DBD_QUEUE_ERROR, e);
        }
    }

    /**
     * 入队
     *
     * @param message
     */
    public void enqueue(T message) {
        try {
            EnqueueWorker worker = new EnqueueWorker(message);
            transactionRunner.run(worker);
        } catch (Exception e) {
            throw new VipServiceException(VipErrorCode.DBD_QUEUE_ERROR, e);
        }
    }

    /**
     * 出队列事务Worker,内部类
     */
    private class DequeueWorker implements TransactionWorker {

        private TaskCallback task;

        private DequeueWorker(TaskCallback task) {
            this.task = task;
        }

        public void doWork() throws Exception {

            Long firstKey;
            T message;

            synchronized (syncObject) {
                //没有获得消息就不起床
                while ((firstKey = messageMap.firstKey()) == null ||
                        (message = messageMap.get(firstKey)) == null) {
                    syncObject.wait();
                }
            }
            //如果执行任务的时候,抛出了RuntimeException,消息不会从队列中删除。BDB事务也会回滚。
            task.handelMessage(message);

            messageMap.remove(firstKey);

            if (log.isDebugEnabled()) {
                log.debug(String.format("DequeueWorker dequeue %1$s. ", message));
            }
        }

    }

    /**
     * 入队列事务Worker,内部类
     */
    private class EnqueueWorker implements TransactionWorker {
        private T message;

        private EnqueueWorker(T message) {
            this.message = message;
        }

        public void doWork() throws Exception {
            synchronized (syncObject) {
                Long lastKey = messageMap.lastKey();
                lastKey = (lastKey == null) ? 1L : lastKey + 1;
                messageMap.put(lastKey, message);

                syncObject.notify();
            }

            if (log.isDebugEnabled()) {
                log.debug(String.format("EnqueueWorker enqueue %1$s. ", message));
            }
        }
    }
}


代码很长,代码中使用的JE提供的StoredMap高层次的API。

文本只是一个code show,如果你对Berkeley db感兴趣,请阅读Oracle的官方文档。

Getting Started Guide

Writing Transactional Applications

Java Collections Tutorial

Getting Started with BDB JE High Availability

 

5
2
分享到:
评论

相关推荐

    Berkeley DB 读取样例

    嵌入式数据库Berkeley DB Java Edition Berkeley DB的使用 使用Berkeley DB的一般步骤 创建、打开、操作、关闭数据库环境Environment

    Berkeley DB4.8以上各版本

    Berkeley DB4.8以上各版本,已经亲测过哪些版本可与redhat6.5兼容,见附件名称备注。

    berkeley db使用手册

    berkeley db 使用手册

    Berkeley DB Java 版 4.0.92

    Oracle Berkeley DB Java 版是一个开源的、可嵌入的事务存储引擎,是完全用 Java 编写的。与 Oracle Berkeley DB 类似,Oracle Berkeley DB Java 版在应用程序的地址空间中执行,没有客户端/服务器通信的开销,从而...

    Berkeley DB数据库 6.2.32 64位

    Berkeley DB 6.2.32_64.msi Windows 64-bit binary installer Berkeley DB是一个嵌入式数据库,为应用程序提供可伸缩的、高性能的、有事务保护功能的数据管理服务。 主要特点: 嵌入式:直接链接到应用程序中,与...

    Berkeley db使用方法简介(c接口)

    Berkeley db使用方法简介(c接口) 非SQL的高速内存数据库的使用方法,简单明了..

    Berkeley DB

    Berkeley DB(BDB)是OpenLDAP后台数据库的默认配置,因此在安装OpenLDAP之前应先安装BDB。

    BerkeleyDB测试程序

    BerkeleyDB测试程序 包含散列文件入库,和读取的速度的测试

    Berkeley DB 5.3.21.tar

    Berkeley DB 5.3.21.tar,你也可以去http://www.oracle.com/technetwork/products/berkeleydb/downloads/index.html下载最新版

    Berkeley DB文章集合

    Berkeley DB文章集合

    BerkeleyDB

    BerkeleyDB的java应用jar包

    Berkeley DB C++编程入门教

    介绍DB API的设置与使用的快速入门手册,目标是提供一个快速有效地机制,能让你进入Berkeley DB研发的世界。在本文中侧重于C++语言的研发人员,以及研究进城内数据管理解决方案的资深架构师。

    Berkeley DB数据库最新版

    Berkeley DB6.0.20 Berkeley DB BDB Berkeley DB数据库

    Berkeley DB数据库支持事物的C++语言入门教程

    本文描述了如何在Berkeley DB中使用事务(Transaction)。它简要介绍了事务是如何保护你的应用的数据的...本书假设你已经了解BerkeleyDB的基本架构知识(这些知识在&lt;Getting Started with Berkeley DB Guide&gt;一书中。)

    BerkeleyDB-JE je-6.0.11

    Oracle BerkeleyDB-JE je-6.0.11

    sqlite PK Berkeley DB

    sqlite 和Berkeley db各方面 的比较

    BerkeleyDB Java Edition用户手册

    Java版本的Berkeley DB用户手册,找了好久

    BerkeleyDB-0.26

    BerkeleyDB和Sqlite是当前最流行的嵌入式开源数据库。

    SQL 开发人员 Oracle Berkeley DB 指南

    不是所有的 SQL 应用程序都应该在 Oracle Berkeley DB 实施( Oracle Berkeley DB 是一个开放源的嵌入式数据库引擎,提供了快速、可靠、本地的持久性,无需管理),但如果您有一系列相对固定的查询且很关心性能,...

Global site tag (gtag.js) - Google Analytics