`
fhqibjg
  • 浏览: 53845 次
  • 性别: Icon_minigender_1
  • 来自: 湖南
社区版块
存档分类
最新评论

lucene 如何在query完成后进行过滤去重(不引响分页功能)

阅读更多

目的:

    最近项目中有用到lucene,第一次使用些开源工具记录下从中遇到的开发问题。

 

版本:

     3.6

 

需求:

     在创建索引时一个主字段可能对应多条索引记录,其它字段数据不同。如a(1,2,b(3,4))索引就要建成(1,2,3)  (1,2,4)这样,查询时根据b中的条件可能查询出多条同一样的记录,所以要根据a的主字段去重。

 

问题:

      lucene在执行query与filter时,是先执行filter把范围缩小后再执行query。我们去重用的是filter过滤器,先去重后再查询就己丢失了b中的部分数据,导致某些b的条件查询不出a

 

上述执行顺序可见 FilteredQuery类的getFilteredScorer方法:

 

static Scorer getFilteredScorer(final IndexReader indexReader,
			final Similarity similarity, final Weight weight,
			final Weight wrapperWeight, final Filter filter) throws IOException {
		assert filter != null;

		final DocIdSet filterDocIdSet = filter.getDocIdSet(indexReader);
		if (filterDocIdSet == null) {
			// this means the filter does not accept any documents.
			return null;
		}

		final DocIdSetIterator filterIter = filterDocIdSet.iterator();
		if (filterIter == null) {
			// this means the filter does not accept any documents.
			return null;
		}

		// we are gonna advance() this scorer, so we set
		// inorder=true/toplevel=false
		final Scorer scorer = weight.scorer(indexReader, true, false);
		return (scorer == null) ? null : new Scorer(similarity, wrapperWeight) {
			private int scorerDoc = -1, filterDoc = -1;

			// optimization: we are topScorer and collect directly using
			// short-circuited algo
			@Override
			public void score(Collector collector) throws IOException {
				int filterDoc = filterIter.nextDoc();
				int scorerDoc = scorer.advance(filterDoc);
				// the normalization trick already applies the boost of this
				// query,
				// so we can use the wrapped scorer directly:
				collector.setScorer(scorer);
				for (;;) {
					if (scorerDoc == filterDoc) {
						// Check if scorer has exhausted, only before
						// collecting.
						if (scorerDoc == DocIdSetIterator.NO_MORE_DOCS) {
							break;
						}
						// 这里作去重处理
						collector.collect(scorerDoc);
						filterDoc = filterIter.nextDoc();
						scorerDoc = scorer.advance(filterDoc);
					} else if (scorerDoc > filterDoc) {
						filterDoc = filterIter.advance(scorerDoc);
					} else {
						scorerDoc = scorer.advance(filterDoc);
					}
				}
			}

			private int advanceToNextCommonDoc() throws IOException {
				for (;;) {
					if (scorerDoc < filterDoc) {
						scorerDoc = scorer.advance(filterDoc);
					} else if (scorerDoc == filterDoc) {
						return scorerDoc;
					} else {
						filterDoc = filterIter.advance(scorerDoc);
					}
				}
			}

			@Override
			public int nextDoc() throws IOException {
				filterDoc = filterIter.nextDoc();
				return advanceToNextCommonDoc();
			}

			@Override
			public int advance(int target) throws IOException {
				if (target > filterDoc) {
					filterDoc = filterIter.advance(target);
				}
				return advanceToNextCommonDoc();
			}

			@Override
			public int docID() {
				return scorerDoc;
			}

			@Override
			public float score() throws IOException {
				return scorer.score();
			}
		};
	}

 

 

 

 

解决:

      由于从lucene源码中可以看到,查询那里没提供参数来改变filter与query的执行顺序。所以只好改动源码,在最后查询出的结果处自己去做数据的去重过滤。

 

 涉及改动类:

        IndexSearcher.java,  HitQueue.java,  FilteredQuery.java

 

 说明:

       这里HitQueue.java要重写是因为IndexSearcher.java被自定义,可源码中的HitQueue没被定义成public继而引用不了。这里只改动三个类是取巧了,以牺牲 性能为代价下面将会有说明,其实在本需求中要改动的远远不止这三个类。

 

     IndexSearcher改动:

           在查询时无论使用search方法还是searchAfter,最后源码跟踪都是执行以下代码 :

 

 	@Override
					public void search(Weight weight, Filter filter, Collector collector)
							throws IOException {
						// TODO: should we make this
						// threaded...? the Collector could be sync'd?

						// always use single thread:
						for (int i = 0; i < subReaders.length; i++) { // search each subreader
							collector.setNextReader(subReaders[i], docBase + docStarts[i]);
							final Scorer scorer = (filter == null) ? weight.scorer(
									subReaders[i], !collector.acceptsDocsOutOfOrder(), true)
									: FilteredQuery.getFilteredScorer(subReaders[i],
											getSimilarity(), weight, weight, filter);
							if (scorer != null) {
								scorer.score(collector);
							}
							
							//break;
						}
					}
	  

 

 

  所以我们要对此处进行改动,当然这只针对IndexSearcher中executor为空即没传入ExecutorService的             情况下(可自己跟踪源码查看)。先将关键处改动后的代码贴出:

 

	@Override
	public void search(Weight weight, Filter filter, Collector collector)
			throws IOException {
		Map<String, String> idVersionKey = new HashMap<String, String>();
		// TODO: should we make this
		// threaded...? the Collector could be sync'd?

		// always use single thread:
		for (int i = 0; i < subReaders.length; i++) { // search each subreader
			collector.setNextReader(subReaders[i], docBase + docStarts[i]);
			final Scorer scorer = (filter == null) ? weight.scorer(
					subReaders[i], !collector.acceptsDocsOutOfOrder(), true)
					: MyFilteredQuery.getFilteredScorer(subReaders[i],
							getSimilarity(), weight, weight, filter, this,
							idVersionKey);
			if (scorer != null) {
				scorer.score(collector);
			}

			// break;
		}
	}

 

 

 

      FilteredQuery改动:

            上面代码中己能看到FilteredQuery的改动,其方法为getFilteredScorer。可先将此方法复制一份,改动形参(只所以不去除老方法是因为此处还有其本能中的一个类部类在调用)。getFilteredScorer末改动源码在问题部份己贴出,现贴改动后的方法:

 

static Scorer getFilteredScorer(final IndexReader indexReader,
						final Similarity similarity, final Weight weight,
						final Weight wrapperWeight, final Filter filter, final MyIndexSearcher myIndexSearch,
						final Map<String, String> idVersionKey) throws IOException {
					assert filter != null;

					final DocIdSet filterDocIdSet = filter.getDocIdSet(indexReader);
					if (filterDocIdSet == null) {
						// this means the filter does not accept any documents.
						return null;
					}

					final DocIdSetIterator filterIter = filterDocIdSet.iterator();
					if (filterIter == null) {
						// this means the filter does not accept any documents.
						return null;
					}

					// we are gonna advance() this scorer, so we set
					// inorder=true/toplevel=false
					final Scorer scorer = weight.scorer(indexReader, true, false);
					return (scorer == null) ? null : new Scorer(similarity, wrapperWeight) {
						private int scorerDoc = -1, filterDoc = -1;

						// optimization: we are topScorer and collect directly using
						// short-circuited algo
						@Override
						public void score(Collector collector) throws IOException {
							int filterDoc = filterIter.nextDoc();
							int scorerDoc = scorer.advance(filterDoc);
							String key = null;
							// the normalization trick already applies the boost of this
							// query,
							// so we can use the wrapped scorer directly:
							collector.setScorer(scorer);
							
							for (;;) {
								if (scorerDoc == filterDoc) {
									// Check if scorer has exhausted, only before
									// collecting.
									if (scorerDoc == DocIdSetIterator.NO_MORE_DOCS) {
										break;
									}
									// 过滤去重
									Document doc = myIndexSearch.doc(scorerDoc);
									key = doc.get("要去重的索引字段");
									
									if (idVersionKey.get(key) == null) {
										collector.collect(scorerDoc);
										idVersionKey.put(key, "exist");
									}  
									
									filterDoc = filterIter.nextDoc();
									scorerDoc = scorer.advance(filterDoc);
								} else if (scorerDoc > filterDoc) {
									filterDoc = filterIter.advance(scorerDoc);
								} else {
									scorerDoc = scorer.advance(filterDoc);
								}
							}
						}

						private int advanceToNextCommonDoc() throws IOException {
							for (;;) {
								if (scorerDoc < filterDoc) {
									scorerDoc = scorer.advance(filterDoc);
								} else if (scorerDoc == filterDoc) {
									return scorerDoc;
								} else {
									filterDoc = filterIter.advance(scorerDoc);
								}
							}
						}

						@Override
						public int nextDoc() throws IOException {
							filterDoc = filterIter.nextDoc();
							return advanceToNextCommonDoc();
						}

						@Override
						public int advance(int target) throws IOException {
							if (target > filterDoc) {
								filterDoc = filterIter.advance(target);
							}
							return advanceToNextCommonDoc();
						}

						@Override
						public int docID() {
							return scorerDoc;
						}

						@Override
						public float score() throws IOException {
							return scorer.score();
						}
					};
				}

   

 

     从代码

 

final DocIdSet filterDocIdSet = filter.getDocIdSet(indexReader);
	if (filterDocIdSet == null) {
		// this means the filter does not accept any documents.
		return null;
	}

    片段中可看出,些处当过滤器为空时下面代码即不会再走,上面所说的“取巧”就是指这里。我们为了保证代码流程走到改动的代码中,所以要给我们的查询定义一个什么都不做的自定义过滤器。

 

    当然,如果你们需求就是先过滤后查询,这里自定义过滤器就派上用场了,这里可以多自段过滤。

 

new Filter() {
					@Override
					public DocIdSet getDocIdSet(IndexReader reader) throws IOException {
						TermDocs termDocs = reader.termDocs(new Term(LuceneKey.MARK_ID,
								LuceneKey.MARK_VALUE));
						final FixedBitSet bitSet = new FixedBitSet(reader.maxDoc());
						final int[] docs = new int[32];
						final int[] freqs = new int[32];

						try {
							do {
								while (true) {
									final int count = termDocs.read(docs, freqs);
									if (count != 0) {
										for (int i = 0; i < count; i++) {
											bitSet.set(docs[i]);
										}
									} else {
										break;
									}
								}
							} while (termDocs.next());

						} finally {
							termDocs.close();
						}
				
						return bitSet;
					}
			}

 

 

这里顺便也贴出多字段自定义过滤器

 

new Filter() {

			@Override
			public DocIdSet getDocIdSet(IndexReader reader) throws IOException {
				TermDocs termDocs = reader.termDocs(new Term("这里只要是全部索引都有的字段就行,用于全部索引查询"));
				final IndexSearcher indexSearch = new IndexSearcher(reader);
				final FixedBitSet bitSet = new FixedBitSet(reader.maxDoc());
				Map<String, String> idVersionKey = new HashMap<String, String>();
				final int[] docs = new int[32];
				final int[] freqs = new int[32];
				String key = null;

				try {
					do {
						while (true) {
							final int count = termDocs.read(docs, freqs);
							if (count != 0) {
								for (int i = 0; i < count; i++) {
									Document doc = indexSearch.doc(docs[i]);
									key = doc.get("去重字段1") + doc.get("去重字段2");
									bitSet.set(docs[i]);
									if (idVersionKey.get(key) == null) {
										bitSet.set(docs[i]);
										idVersionKey.put(key, "exist");
									}
								}
							} else {
								break;
							}
						}
					} while (termDocs.next());

				} finally {
					try {
						termDocs.close();
					} finally {
						indexSearch.close();
					}
				}
			
				return bitSet;
			}
		}

 

 

 注:

      Map<String, String> idVersionKey要从IndexSearcher类的search方法中传入到FilteredQuery类的getFilteredScorer中,是因为subReaders.length可能大于1,这里可能出现有次循环导致过滤结果不准确。出现这种现像的原因是,当你重新添加索引后而又末调用 writer.forceMerge(1),就会出现多个“虚拟子目录”。

 

for (int i = 0; i < subReaders.length; i++) { // search each subreader
							collector.setNextReader(subReaders[i], docBase + docStarts[i]);
							final Scorer scorer = (filter == null) ? weight.scorer(
									subReaders[i], !collector.acceptsDocsOutOfOrder(), true)
									: MyFilteredQuery.getFilteredScorer(subReaders[i],
											getSimilarity(), weight, weight, filter, this,idVersionKey);
							if (scorer != null) {
								scorer.score(collector);
							}
							
							//break;
		}

 

 

 

 

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics