`
wbj0110
  • 浏览: 1549966 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

(HBase+Lucene)

阅读更多

1、核心工具类

复制代码
package junit;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.math.NumberUtils;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.Term;
import org.apache.lucene.queryParser.MultiFieldQueryParser;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.highlight.Formatter;
import org.apache.lucene.search.highlight.Fragmenter;
import org.apache.lucene.search.highlight.Highlighter;
import org.apache.lucene.search.highlight.QueryScorer;
import org.apache.lucene.search.highlight.SimpleFragmenter;
import org.apache.lucene.search.highlight.SimpleHTMLFormatter;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.util.Version;
import org.junit.Test;
import org.wltea.analyzer.lucene.IKAnalyzer;

import com.ljq.entity.Person;
import com.ljq.utils.HBaseAPI;
import com.ljq.utils.LuceneUtil;
import com.ljq.utils.XMLPropertyConfig;

/**   
 *  模拟功能:假如从互联网上采集数据,采集到的数据实时存储到HBase数据库上,<br/>
 *  现需把这些数据从HBase读取出来,并在本地创建lucene索引,<br/>方便前端通过索引进行查询,把查询取得的结果在界面上展现出来。<br/><br/>
 *  
 *  要求该功能要7*24不间断执行,实时性要求非常高,故采用Strom实时计算框架为载体,实现上面的模拟功能。
 *
 * @author 林计钦
 * @version 1.0 2013-6-5 下午05:36:09   
 */
public class IndexCreationTest {
    private static StringBuffer ids=new StringBuffer();
    private static IndexWriter indexWriter = LuceneUtil.getIndexWriter();
    // 计算时间评估参数
    private static long _initStartTime = 0; // 获取CAS consumer被实例化时的系统时间
    private static long mTimeForCommit = 60; // 经过多久时间提交
    // 计数器
    private static int mIndexCommitBatchNum = 100; // 批量更新索引
    private static int mDocCount = 0;
    /**
     * 从HBase实时读取数据,并在本地创建lucene索引 
     */
    @Test
    public void createIndexFromHBase(){
        readTable("1370422328578");
    }
    
    /**
     * 查询索引
     * @throws IOException 
     */
    @Test
    public void searchIndex() throws IOException{
        for(int i=0;i<500;i++){
            search(new String[]{"id", "name", "age", "rowKey"}, i+"");
        }
//        search(new String[]{"id", "name", "age", "rowKey"}, "0");
//        search(new String[]{"id", "name", "age", "rowKey"}, "998");
        
        //数据写入txt文件
        BufferedWriter writer = new BufferedWriter(new FileWriter(new File("E:\\123.txt")));
        writer.write(ids.toString());
        writer.close();
    }
    
    /**
     * 从HBase实时读取数据,并在本地创建lucene索引
     * 
     * @param startRow 指定开始的行(时间戳)
     */
    public static void readTable(String startRow) {
        int i = 0;
        try {
            while (true) {
                // [{"timestamp":"1370422360734","id":"950","name":"lin950","age":"950","row":"1370422507578"}]
                System.out.println("startRow=" + startRow);
                List<Map<String, String>> datas = HBaseAPI.scan("tb_stu", null, String
                        .valueOf(startRow), 100);
                i += datas.size();
                System.out.println(i);
                if (datas != null && datas.size() > 0) {
                    for (Map<String, String> data : datas) {
                        String row = data.get(HBaseAPI.HBASE_ROW);
                        startRow = row;
                        // System.out.println(String.format("id:%s, name:%s,
                        // age:%s, rowKey:%s.",
                        // data.get("id"), data.get("name"), data.get("age"),
                        // row));
                        createIndex(data.get("id"), data.get("name"), data.get("age"), row);
                    }
                }
                
                if(isCommitTime()){
                    System.out.println("indexCommit");
                    try {
                        indexWriter.commit(); // 批量提交
                        indexWriter.forceMerge(1); // forceMerge代替optimize
                        System.out.println("indexWriter.commit();");
                    } catch (Exception e) {
                        e.printStackTrace();
                    }  
                    System.out.println("indexCommit fin");
                }
            }
        }catch(Exception e){
            e.printStackTrace();
        }
    }
    
    private static void createIndex(String id, String name, String age, String rowKey){
        if (StringUtils.isBlank(id) || StringUtils.isBlank(name) || StringUtils.isBlank(age)
                || StringUtils.isBlank(rowKey)) {
            System.out.println(String.format("id:%s, name:%s, age:%s, rowKey:%s.", 
                id, name, age, rowKey));
            return;
        }
        try {
            Document doc = new Document();
            doc.add(new Field("id", id, Field.Store.YES, Field.Index.NOT_ANALYZED));
            doc.add(new Field("name", name, Field.Store.YES, Field.Index.ANALYZED));
            doc.add(new Field("age", age, Field.Store.YES, Field.Index.NOT_ANALYZED));
            doc.add(new Field("rowKey", rowKey, Field.Store.YES, Field.Index.NOT_ANALYZED));
            
            //更新索引
            if(LuceneUtil.existsIndex()){
                System.out.println("---update index---");
                indexWriter.updateDocument(new Term("id", id), doc);
            }else { //第一次创建索引
                System.out.println("---create index---");
                indexWriter.addDocument(doc);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    /**
     * 判断是否可以提交索引
     * @return
     */
    private static synchronized boolean isCommitTime(){
        //每隔1分钟提交一次
        if((System.currentTimeMillis() - _initStartTime) >= (mTimeForCommit*1000)){
            _initStartTime = System.currentTimeMillis();
            return true;
        }
        //累加到100条提交一次
        else if(mDocCount % mIndexCommitBatchNum == 0){
            _initStartTime = System.currentTimeMillis();
            return true ;
        }
        else
           return false;
    }
    
    /**
     * 搜索、高亮显示
     * 
     * @param fields
     * @param keyword
     */
    private void search(String[] fields, String keyword) {
        IndexSearcher indexSearcher = null;

         
        try {
            // 创建索引搜索器,且只读
            IndexReader indexReader = IndexReader.open(FSDirectory.open(new File(XMLPropertyConfig.getConfigXML()
                    .getString("index_path"))), true);
            
            indexSearcher = new IndexSearcher(indexReader);

            MultiFieldQueryParser queryParser = new MultiFieldQueryParser(Version.LUCENE_35,
                    fields, new IKAnalyzer());
            Query query = queryParser.parse(keyword);

            // 返回前number条记录
            TopDocs topDocs = indexSearcher.search(query, 1000);
            // 信息展示
            int totalCount = topDocs.totalHits;
            //System.out.println("共检索出 " + totalCount + " 条记录");

            // 高亮显示
            /*
             * 创建高亮器,使搜索的结果高亮显示 SimpleHTMLFormatter:用来控制你要加亮的关键字的高亮方式 此类有2个构造方法
             * :SimpleHTMLFormatter()默认的构造方法.加亮方式:<B>关键字</B>
             * :SimpleHTMLFormatter(String preTag, String
             * postTag).加亮方式:preTag关键字postTag
             */
            Formatter formatter = new SimpleHTMLFormatter("<font color='red'>", "</font>");
            /*
             * QueryScorer QueryScorer
             * 是内置的计分器。计分器的工作首先是将片段排序。QueryScorer使用的项是从用户输入的查询中得到的;
             * 它会从原始输入的单词、词组和布尔查询中提取项,并且基于相应的加权因子(boost factor)给它们加权。
             * 为了便于QueryScoere使用,还必须对查询的原始形式进行重写。 比如,带通配符查询、模糊查询、前缀查询以及范围查询
             * 等,都被重写为BoolenaQuery中所使用的项。
             * 在将Query实例传递到QueryScorer之前,可以调用Query.rewrite
             * (IndexReader)方法来重写Query对象
             */
            QueryScorer fragmentScorer = new QueryScorer(query);
            Highlighter highlighter = new Highlighter(formatter, fragmentScorer);
            Fragmenter fragmenter = new SimpleFragmenter(100);
            /*
             * Highlighter利用Fragmenter将原始文本分割成多个片段。
             * 内置的SimpleFragmenter将原始文本分割成相同大小的片段,片段默认的大小为100个字符。这个大小是可控制的。
             */
            highlighter.setTextFragmenter(fragmenter);

            ScoreDoc[] scoreDocs = topDocs.scoreDocs;

            for (ScoreDoc scDoc : scoreDocs) {
                Document document = indexSearcher.doc(scDoc.doc);
                String id = document.get("id");
                String name = document.get("name");
                String age = document.get("age");
                String rowKey = document.get("rowKey");
                float score = scDoc.score; //相似度

                //高亮显示
                String lighterName = highlighter.getBestFragment(new IKAnalyzer(), "name", name);
                if (null == lighterName) {
                    lighterName = name;
                }

                String lighterAge = highlighter.getBestFragment(new IKAnalyzer(), "age", age);
                if (null == lighterAge) {
                    lighterAge = age;
                }

                Person person = new Person();
                person.setId(NumberUtils.toLong(id));
                person.setName(lighterName);
                person.setAge(NumberUtils.toInt(age));
                
                ids.append(id).append("\n\r");
                System.out.println(String.format("id:%s, name:%s, age:%s, rowKey:%s, 相似度:%s.", 
                        id, lighterName, lighterAge, rowKey, score));
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                indexSearcher.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

    }
}
复制代码

2、LuceneUtil类->Lucene工具类

复制代码
/**
 * lucene工具类,采用IKAnalyzer中文分词器
 * 
 * @author 林计钦
 * @version 1.0 2013-6-3 下午03:51:29
 */
public class LuceneUtil {
    /** 索引库路径 */
    private static final String indexPath = XMLPropertyConfig.getConfigXML()
            .getString("index_path");
    private static final Logger log=Logger.getLogger(LuceneUtil.class);
    
    public static IndexWriter getIndexWriter(){
        try {
            //索引库路径不存在则新建一个
            File indexFile=new File(indexPath);
            if(!indexFile.exists()) indexFile.mkdir();
            
            Directory fsDirectory = FSDirectory.open(indexFile);
            IndexWriterConfig confIndex = new IndexWriterConfig(Version.LUCENE_35, new IKAnalyzer());
            confIndex.setOpenMode(OpenMode.CREATE_OR_APPEND);
            if (IndexWriter.isLocked(fsDirectory)) {
                IndexWriter.unlock(fsDirectory);
            }
            return new IndexWriter(fsDirectory, confIndex);
        } catch (Exception e) {
            e.printStackTrace();
        }  
        return null;
    }

    /**
     * 判断索引库是否已创建
     * 
     * @return true:存在,false:不存在
     * @throws Exception
     */
    public static boolean existsIndex() throws Exception {
        File file = new File(indexPath);
        if (!file.exists()) {
            file.mkdirs();
        }
        String indexSufix = "/segments.gen";
        // 根据索引文件segments.gen是否存在判断是否是第一次创建索引
        File indexFile = new File(indexPath + indexSufix);
        return indexFile.exists();
    }
     
}
复制代码

3、HBaseAPI类->HBase数据库封装类

复制代码
/**
 * HBase数据库封装类
 * 
 * @author 林计钦
 * @version 1.0 2013-6-4 上午11:02:17
 */
public class HBaseAPI { 
    /**主键*/
    public static String HBASE_ROW = "row"; 
    /**列镞*/
    public static String HBASE_FAMILY = "family"; 
    /**列名*/
    public static String HBASE_QUALIFIER = "qualifier";
    /**列值*/
    public static String HBASE_QUALIFIERVALUE = "qualifiervalue";
    /**时间戳*/
    public static String HBASE_TIMESTAMP = "timestamp"; 
    
    /** 访问HBase线程池大小 */
    public static int poolSize = 1000;

    public static Configuration conf;
    private static HTablePool tablePool = null;
    private static final Logger log=Logger.getLogger(HBaseAPI.class);

    
    static {
        //来自$HBase/conf/hbase-site.xml配置文件
        conf = new Configuration();
        conf.set("hbase.master.port", XMLPropertyConfig.getConfigXML().getString("hbase.hbase_master_port"));
        conf.set("hbase.zookeeper.quorum", XMLPropertyConfig.getConfigXML().getString("hbase.hbase_zookeeper_quorum"));
        conf.set("hbase.zookeeper.property.clientPort", XMLPropertyConfig.getConfigXML().getString("hbase.hbase_zookeeper_property_clientPort"));
    }

    /**
     * HTablePool对HBase表进行CRUD操作,不推荐用HTable对HBase表进行CRUD操作。<br/><br/>
     * 
     * HTablePool可以解决HTable存在的线程不安全问题,同时通过维护固定数量的HTable对象,能够在程序运行期间复用这些HTable资源对象。
     * 
     * @return
     */
    public static HTablePool getHTablePool() {
        if (tablePool == null) {
            tablePool = new HTablePool(conf, poolSize);
        }
        return tablePool;
    }

    /**
     * 从startRow开始查询,查询maxCount条记录
     * 
     * @param tableName 表名
     * @param startRow 指定开始的行(时间戳)
     * @param maxCount 从startRow开始查询,查询maxCount条记录,最高阀值为10000
     * @return [{"timestamp":"1370412537880","id":"1","name":"zhangsan","age":"20","row":"quanzhou"}]
     */
    public static List<Map<String, String>> scan(String tableName, FilterList filterList, String startRow, int maxCount) {
        List<Map<String, String>> datas = new ArrayList<Map<String, String>>();

        ResultScanner rs = null;
        try {
            HTable table = (HTable) getHTablePool().getTable(tableName);
            Scan scan = new Scan();

            if(filterList!=null){
                scan.setFilter(filterList);
            }
            if (startRow != null && !"".equals(startRow.trim())) {
                scan.setStartRow(Bytes.toBytes(startRow));
            }
            if(maxCount<=0){
                maxCount = 10000;
            }
            if (maxCount > 10000) {
                maxCount = 10000;
            }
            scan.setCaching(maxCount + 1);
            rs = table.getScanner(scan);
            //Result类提供了raw()、list()、getValue(byte[] family, byte[] qualifier)三种方法来遍历数据
            for (Result r : rs) {
                HashMap<String, String> map = new HashMap<String, String>();
                long timestamp = 0;
                for (KeyValue kv : r.list()) {
                    timestamp = kv.getTimestamp();
                    String qualifier = Bytes.toString(kv.getQualifier()); //列名
                    String value = Bytes.toString(kv.getValue()); //列值
                    map.put(qualifier, value);

                }
                map.put(HBASE_ROW, Bytes.toString(r.getRow()));
                map.put(HBASE_TIMESTAMP, "" + timestamp);
                datas.add(map);
                // 假如到了指定条数就跳出
                if (datas.size() >= maxCount) {
                    break;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }finally{
            if(rs!=null){
                rs.close();
            }
            //table.close();
        }

        return datas;
    }
}
分享到:
评论

相关推荐

    Hadoop+HBase+Hive+lucene分布式搜索引擎分析系统

    Hadoop+HBase+Hive+lucene分布式搜索引擎分析系统

    利用hadoop的mapreduce和Hbase,基于lucene做的简单的搜索引擎.zip

    利用hadoop的mapreduce和Hbase,基于lucene做的简单的搜索引擎 ## 基本介绍 - InjectDriver 将本地的url注入到hbase数据库中等待下一步执行 - FetchDriver 负责抓取url对应的网页内容 - ParserUrlDriver 解析所抓取...

    人工智能-项目实践-搜索引擎-利用hadoop的mapreduce和Hbase,基于lucene做的简单的搜索引擎

    利用hadoop的mapreduce和Hbase,基于lucene做的简单的搜索引擎 基本介绍 InjectDriver 将本地的url注入到hbase数据库中等待下一步执行 FetchDriver 负责抓取url对应的网页内容 ParserUrlDriver 解析所抓取网页内容...

    集成Lucene和HBase

    Lucene搜索程序库事实上已经成为实现搜索引擎的标准。苹果、IBM、Attlassian(Jira)、Wolfram以及很多大家喜欢的公司【1】都使用了这种技术。因此,大家对任何能够提升Lucene的可伸缩性和性能的实现都很感兴趣。...

    Hbase 二级索引方案

    在 Hbase 中,表的 RowKey 按照字典排序, Region 按照 RowKey 设置 split point 进行 shard, 通过这种方式实现的全局、分布式索引. 成为了其成功的最大的砝码。 然而单一的通过 RowKey 检索数据的方式,不再满足更多...

    java开发web搜索引擎源码-Elasticsearch-Hbase:elasticsearch+hbase海量数据查询,支持千万数据秒回查

    ElasticSearch是一个基于Lucene的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。Elasticsearch是用Java开发的,并作为Apache许可条款下的开放源码发布,是当前流行的企业级搜索...

    大数据技术 Hadoop开发者第二期 MapReduce HDFS Hive Mahout HBase 共64页.pdf

    5、Java RMI + Lucene 构建分布式检索应用初探 ................ - 17 - 6、一对多的表关联在mapreduce中的应用(续) .................. - 26 - 7、InputSplit文件格式分析................................. - 32 -...

    高运算性能,低碰撞率的hash算法MurmurHash算法.zip

    MurmurHash算法由Austin Appleby创建于2008年,现已应用到Hadoop、libstdc 、nginx、libmemcached,Redis,Memcached,Cassandra,HBase,Lucene等开源系统。2011年Appleby被Google雇佣,随后Google推出其变种的...

    hadoop开发者文档

    5、Java RMI + Lucene 构建分布式检索应用初探 ................ - 17 - 6、一对多的表关联在mapreduce中的应用(续) .................. - 26 - 7、InputSplit文件格式分析................................. - 32 -...

    分布式图形数据库 Titan.zip

    集群很容易扩展以支持更大的数据集,Titan有一个很好的插件式性能,这个性能让它搭建在一些成熟的数据库技术上像 Apache Cassandra、Apache HBase、 Oracle BerkeleyDB。插件式索引架构可以整合 ElasticSearch ...

    DataMingProject:大数据平台相关代码(ESHiveHadoophdfshbase)

    本项目已经不再维护DMP项目代码Elastic Search 操作Hive操作Hbase操作Hadoop操作Spark操作yarnzookeeperLinux Shell其他DMP相关知识包含内容有dmp项目架构设计数据挖掘中文分词网络爬虫数据分析通用模块数据清洗数据...

    Hadoop基础培训教程.pdf

    By Hal Varian, Google's chief economist 起源与目标 大数据与Hadoop 应用模式 推荐读物 主要内容 大数据的起源与目标 大数据与Hadoop Hadoop的应用模式 Google的"三驾马车":起源 HDFS MapReduce HBase 业 务 驱 ...

    大型分布式网站架构与实践

     垂直化的搜索引擎在分布式系统中的使用,包括搜索引擎的基本原理、Lucene详细的使用介绍,以及基于Lucene的开源搜索引擎工具Solr的使用。  2.1 分布式缓存 60  2.1.1 memcache简介及安装 60  2.1.2 memcache ...

    录信数据库技术白皮书.pdf

    数据规模超大,数据时效性高,可节约千倍IO,可对接外部数据源Oracle、Mysql、Kafka、Hive、Hbase、File、HDFS File等,可以与接口层Hive SQL、JDBC、WebService直接进行交互;同时支持跨集群数据同步,支持多种不同...

    titan1-hadoop1 part-2

    集群很容易扩展以支持更大的数据集,Titan有一个很好的插件式性能,这个性能让它搭建在一些成熟的数据库技术上像 Apache Cassandra、Apache HBase、 Oracle BerkeleyDB。插件式索引架构可以整合 ElasticSearch 和...

    titan1-hadoop1

    集群很容易扩展以支持更大的数据集,Titan有一个很好的插件式性能,这个性能让它搭建在一些成熟的数据库技术上像 Apache Cassandra、Apache HBase、 Oracle BerkeleyDB。插件式索引架构可以整合 ElasticSearch 和...

    航空订票系统java源码-resume:NikhilPrabhakar的简历作为Git时间线

    航空订票系统 java源码尼基尔·普拉巴卡尔 接触 电子邮件: 概括 我目前在 LinkedIn (SlideShare) ...Lucene 相关的片段: MongoDB相关: Postgres 片段: 恢复: 工作 高级软件工程师(大数据/分析),

    大数据基础知识入门.pdf

    2005年,Hadoop作为Lucene的子项目Nutch 的一部分正式引入Apache基金会。2006年2 月被分离出来,成为一套完整独立的软件, 起名为Hadoop。 Hadoop的起源 总结起来,Hadoop起源于Google的三大论文 GFS—-&gt;HDFS ...

Global site tag (gtag.js) - Google Analytics