Lucene并发连接实现 - ConcurrentLuceneConnection
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;import edu.emory.mathcs.backport.java.util.concurrent.locks.Lock;import edu.emory.mathcs.backport.java.util.concurrent.locks.ReentrantLock;import java.io.File;import java.io.IOException;import org.apache.log4j.Logger;import org.apache.lucene.analysis.Analyzer;import org.apache.lucene.index.IndexReader;import org.apache.lucene.index.IndexWriter;import org.apache.lucene.search.DelayCloseIndexSearcher;import org.apache.lucene.search.IndexSearcher;import org.apache.lucene.store.*;?public class ConcurrentLuceneConnection implements ILuceneConnection{? private static Logger log; private transient Directory directory; private final Analyzer analyzerForIndexing; private final ILuceneConnection.Configuration configuration; private final Lock indexWriteLock; private final Lock searcherCreationLock; private final AtomicBoolean batchMode; private DelayCloseIndexSearcher currentSearcher;? public ConcurrentLuceneConnection(Directory directory, Analyzer analyzer, ILuceneConnection.Configuration configuration) { indexWriteLock = new ReentrantLock(); searcherCreationLock = new ReentrantLock(); batchMode = new AtomicBoolean(false); this.directory = directory; analyzerForIndexing = analyzer; this.configuration = configuration; }? public ConcurrentLuceneConnection(File path, Analyzer analyzer, ILuceneConnection.Configuration configuration) { this(getDirectory(path), analyzer, configuration); }? public int getNumDocs() { return ((Integer)withReader(new ILuceneConnection.ReaderAction() {? public Object perform(IndexReader reader) { return new Integer(reader.numDocs()); }? })).intValue(); }? public boolean isIndexCreated() { try{ return IndexReader.indexExists(directory); }catch(IOException e){ throw new LuceneException(e); } }? public IndexSearcher leakSearcher() { try{ return getOpenedSearcher(); }catch(Throwable e){ flipCurrentSearcher(); throwLuceneException(e); throw new IllegalStateException("Exception should have been thrown."); } }? public void optimize() throws LuceneException { withWriter(new ILuceneConnection.WriterAction() {? public void perform(IndexWriter writer) throws IOException { writer.optimize(); } }); }? public void recreateIndexDirectory() { indexWriteLock.lock(); try { directory.close(); if(directory instanceof FSDirectory) directory = FSDirectory.getDirectory(((FSDirectory)directory).getFile(), true); else if(directory instanceof RAMDirectory) directory = new RAMDirectory(); (new IndexWriter(directory, null, true)).close(); } catch(IOException e) { throw new LuceneException("Cannot create index directory.", e); } flipCurrentSearcher(); indexWriteLock.unlock(); }? public void close() { flipCurrentSearcher(); }? public void withSearch(ILuceneConnection.SearcherAction action) throws LuceneException { try{ IndexSearcher searcher = getOpenedSearcher(); boolean b = action.perform(searcher); if(!b) throw new UnsupportedOperationException("Searchers are always closed. The searcherAction should always return true, we do not allow them to control closing of the searchers"); closeSearcher(searcher); }catch(Throwable e){ flipCurrentSearcher(); throwLuceneException(e); } }? public Object withReader(ILuceneConnection.ReaderAction action) throws LuceneException { try{ IndexSearcher searcher = getOpenedSearcher(); Object obj = action.perform(searcher.getIndexReader()); closeSearcher(searcher); return obj; }catch(Throwable e){ flipCurrentSearcher(); throwLuceneException(e); return null; } }? public void withReaderAndDeletes(ILuceneConnection.ReaderAction action) throws LuceneException { IndexReader deleter; indexWriteLock.lock(); deleter = null; try { deleter = constructIndexDeleter(); action.perform(deleter); flipCurrentSearcher(); } catch(IOException e) { throw new LuceneException(e); } closeReader(deleter); indexWriteLock.unlock(); }? public void withWriter(ILuceneConnection.WriterAction action) throws LuceneException { IndexWriter writer; indexWriteLock.lock(); writer = null; try { writer = constructIndexWriter(); action.perform(writer); flipCurrentSearcher(); } catch(IOException e) { throw new LuceneException(e); } closeWriter(writer); indexWriteLock.unlock(); }? public void withDeleteAndWrites(ILuceneConnection.ReaderAction readerAction, ILuceneConnection.WriterAction writerAction) throws LuceneException { indexWriteLock.lock(); withReaderAndDeletes(readerAction); withWriter(writerAction); indexWriteLock.unlock(); }? public void withBatchUpdate(ILuceneConnection.BatchUpdateAction action) { try{ indexWriteLock.lock(); batchMode.set(true); action.perform(); batchMode.set(false); indexWriteLock.unlock(); }catch(Exception e){ throwLuceneException(e); } }? public void flipCurrentSearcher() { if(log.isDebugEnabled()) log.debug("Closing current searcher.."); searcherCreationLock.lock(); if(currentSearcher != null) { try{ currentSearcher.closeWhenDone(); currentSearcher = null; }catch(Exception e){ log.error(e); currentSearcher = null; } } searcherCreationLock.unlock(); }? private DelayCloseIndexSearcher getOpenedSearcher() throws IOException { searcherCreationLock.lock(); DelayCloseIndexSearcher delaycloseindexsearcher; if(currentSearcher == null) currentSearcher = new DelayCloseIndexSearcher(directory); currentSearcher.open(); delaycloseindexsearcher = currentSearcher; searcherCreationLock.unlock(); return delaycloseindexsearcher; }? private IndexReader constructIndexDeleter() { try{ return IndexReader.open(directory); }catch(IOException e){ throw new LuceneException(e); } }? private void closeReader(IndexReader reader) { try { if(reader != null) { if(log.isDebugEnabled()) log.debug(Thread.currentThread().getName() + "## closing reader"); reader.close(); } } catch(IOException e) { log.error("Error closing reader. " + e, e); } }? private void closeSearcher(IndexSearcher searcher) { try { if(searcher != null) searcher.close(); } catch(IOException e) { log.error("Error occurred while closing searcher.", e); } }? private void closeWriter(IndexWriter writer) { try { if(writer != null) { if(log.isDebugEnabled()) log.debug("## closing writer"); writer.close(); } else { log.warn("## trying to close null writer."); } } catch(IOException e) { log.error("Error closing writer. " + e, e); } }? private IndexWriter constructIndexWriter() throws LuceneException { IndexWriter indexWriter; if(log.isDebugEnabled()) log.debug(Thread.currentThread().getName() + "## opening writer"); try{ indexWriter = new IndexWriter(directory, analyzerForIndexing, false); if(batchMode.get()) { indexWriter.setMaxBufferedDocs(configuration.getBatchMaxBufferedDocs()); indexWriter.setMaxMergeDocs(configuration.getBatchMaxMergeDocs()); indexWriter.setMergeFactor(configuration.getBatchMergeFactor()); } else { indexWriter.setMaxBufferedDocs(configuration.getInteractiveMaxBufferedDocs()); indexWriter.setMaxMergeDocs(configuration.getInteractiveMaxMergeDocs()); indexWriter.setMergeFactor(configuration.getInteractiveMergeFactor()); } indexWriter.setMaxFieldLength(configuration.getMaxFieldLength()); return indexWriter; }catch(IOException e){ throw new LuceneException(e); } }? private void throwLuceneException(Throwable e) { if(e instanceof Error) throw (Error)e; if(e instanceof RuntimeException) throw (RuntimeException)e; else throw new LuceneException(e); }? private static Directory getDirectory(File path) { try{ return FSDirectory.getDirectory(path, false); }catch(IOException e){ throw new LuceneException(e); } }? static { log = Logger.getLogger(ConcurrentLuceneConnection.class); }}