Skip to content

reuse IndexReader objects for multi-project searches #1186

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Sep 30, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion nbproject/project.properties
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ build.classes.dir=${build.dir}/classes
dist.dir=dist
manifest.file=manifest.mf
javac.source=1.8
run.jvmargs=-ea -enableassertions\:org.opensolaris.opengrok...
run.jvmargs=-ea -enableassertions:org.opensolaris.opengrok...
manifest.custom.permissions=
lucene-analyzers-common.jar=lucene-analyzers-common-${lucene.version}.jar
run.test.classpath=\
Expand Down
10 changes: 10 additions & 0 deletions src/org/opensolaris/opengrok/configuration/Configuration.java
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ public final class Configuration {
private final Map<String, String> cmds;
private int tabSize;
private int command_timeout; // in seconds
private int indexRefreshPeriod; // in seconds
private boolean scopesEnabled;
private boolean foldingEnabled;

Expand Down Expand Up @@ -225,6 +226,14 @@ public void setCommandTimeout(int timeout) {
this.command_timeout = timeout;
}

public int getIndexRefreshPeriod() {
return indexRefreshPeriod;
}

public void setIndexRefreshPeriod(int seconds) {
this.indexRefreshPeriod = seconds;
}

/**
* Creates a new instance of Configuration
*/
Expand Down Expand Up @@ -275,6 +284,7 @@ public Configuration() {
setRevisionMessageCollapseThreshold(200);
setPluginDirectory(null);
setMaxSearchThreadCount(2 * Runtime.getRuntime().availableProcessors());
setIndexRefreshPeriod(60);
}

public String getRepoCmd(String clazzName) {
Expand Down
246 changes: 238 additions & 8 deletions src/org/opensolaris/opengrok/configuration/RuntimeEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
*/

/*
* Copyright (c) 2006, 2016, Oracle and/or its affiliates. All rights reserved.
*/
* Copyright (c) 2006, 2016, Oracle and/or its affiliates. All rights reserved.
*/
package org.opensolaris.opengrok.configuration;

import java.beans.XMLDecoder;
Expand Down Expand Up @@ -67,11 +67,26 @@
import org.opensolaris.opengrok.logger.LoggerFactory;
import org.opensolaris.opengrok.util.Executor;
import org.opensolaris.opengrok.util.IOUtils;
import org.opensolaris.opengrok.configuration.ThreadpoolSearcherFactory;

import static java.nio.file.FileVisitResult.CONTINUE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
import java.util.Collections;
import java.util.Iterator;
import java.util.SortedSet;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.MultiReader;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.SearcherFactory;
import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.opensolaris.opengrok.index.IndexDatabase;


/**
Expand All @@ -91,6 +106,7 @@ public final class RuntimeEnvironment {

private final Map<Project, List<RepositoryInfo>> repository_map = new TreeMap<>();
private final Map<Project, Set<Group>> project_group_map = new TreeMap<>();
private final Map<String, SearcherManager> searcherManagerMap = new ConcurrentHashMap<>();

/* Get thread pool used for top-level repository history generation. */
public static synchronized ExecutorService getHistoryExecutor() {
Expand Down Expand Up @@ -232,6 +248,14 @@ public void setCommandTimeout(int timeout) {
threadConfig.get().setCommandTimeout(timeout);
}

public int getIndexRefreshPeriod() {
return threadConfig.get().getIndexRefreshPeriod();
}

public void setIndexRefreshPeriod(int seconds) {
threadConfig.get().setIndexRefreshPeriod(seconds);
}

/**
* Get the path to the where the index database is stored
*
Expand Down Expand Up @@ -1184,6 +1208,8 @@ public void stopConfigurationListenerThread() {
IOUtils.close(configServerSocket);
}

private Thread configurationListenerThread;

/**
* Start a thread to listen on a socket to receive new configurations to
* use.
Expand All @@ -1199,7 +1225,7 @@ public boolean startConfigurationListenerThread(SocketAddress endpoint) {
configServerSocket.bind(endpoint);
ret = true;
final ServerSocket sock = configServerSocket;
Thread t = new Thread(new Runnable() {
configurationListenerThread = new Thread(new Runnable() {
@Override
public void run() {
ByteArrayOutputStream bos = new ByteArrayOutputStream(1 << 13);
Expand Down Expand Up @@ -1228,7 +1254,15 @@ public void run() {
((Configuration) obj).refreshDateForLastIndexRun();
setConfiguration((Configuration) obj);
LOGGER.log(Level.INFO, "Configuration updated: {0}",
configuration.getSourceRoot());
configuration.getSourceRoot());

// We are assuming that each update of configuration
// means reindex. If dedicated thread is introduced
// in the future solely for the purpose of getting
// the event of reindex, the 2 calls below should
// be moved there.
refreshSearcherManagerMap();
maybeRefreshIndexSearchers();
}
} catch (IOException e) {
LOGGER.log(Level.SEVERE, "Error reading config file: ", e);
Expand All @@ -1237,12 +1271,12 @@ public void run() {
}
}
}
}, "conigurationListener");
t.start();
}, "configurationListener");
configurationListenerThread.start();
} catch (UnknownHostException ex) {
LOGGER.log(Level.FINE, "Problem resolving sender: ", ex);
LOGGER.log(Level.WARNING, "Problem resolving sender: ", ex);
} catch (IOException ex) {
LOGGER.log(Level.FINE, "I/O error when waiting for config: ", ex);
LOGGER.log(Level.WARNING, "I/O error when waiting for config: ", ex);
}

if (!ret && configServerSocket != null) {
Expand Down Expand Up @@ -1343,4 +1377,200 @@ public void stopWatchDogService() {
}
}
}

private Thread indexReopenThread;

public void maybeRefreshIndexSearchers() {
for (Map.Entry<String, SearcherManager> entry : searcherManagerMap.entrySet()) {
try {
entry.getValue().maybeRefresh();
} catch (AlreadyClosedException ex) {
// This is a case of removed project.
// See refreshSearcherManagerMap() for details.
} catch (IOException ex) {
LOGGER.log(Level.SEVERE, "maybeRefresh failed", ex);
}
}
}

/**
* Call maybeRefresh() on each SearcherManager object from dedicated thread
* periodically.
* If the corresponding index has changed in the meantime, it will be safely
* reopened, i.e. without impacting existing IndexSearcher/IndexReader
* objects, thus not disrupting searches in progress.
*/
public void startIndexReopenThread() {
indexReopenThread = new Thread(new Runnable() {
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
maybeRefreshIndexSearchers();
Thread.sleep(getIndexRefreshPeriod() * 1000);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
}
}, "indexReopenThread");

indexReopenThread.start();
}

public void stopIndexReopenThread() {
if (indexReopenThread != null) {
indexReopenThread.interrupt();
try {
indexReopenThread.join();
} catch (InterruptedException ex) {
LOGGER.log(Level.INFO, "Cannot join indexReopen thread: ", ex);
}
}
}

/**
* Get IndexSearcher for given project.
* Each IndexSearcher is born from a SearcherManager object. There is
* one SearcherManager for every project.
* This schema makes it possible to reuse IndexSearcher/IndexReader objects
* so the heavy lifting (esp. system calls) performed in FSDirectory
* and DirectoryReader happens only once for a project.
* The caller has to make sure that the IndexSearcher is returned back
* to the SearcherManager. This is done with returnIndexSearcher().
* The return of the IndexSearcher should happen only after the search
* result data are read fully.
*
* @param proj project
* @return SearcherManager for given project
*/
public IndexSearcher getIndexSearcher(String proj) throws IOException {
SearcherManager mgr = searcherManagerMap.get(proj);
IndexSearcher searcher = null;
if (mgr == null) {
File indexDir = new File(getDataRootPath(), IndexDatabase.INDEX_DIR);

try {
Directory dir = FSDirectory.open(new File(indexDir, proj).toPath());
mgr = new SearcherManager(dir, new ThreadpoolSearcherFactory());
searcherManagerMap.put(proj, mgr);
searcher = mgr.acquire();
} catch (IOException ex) {
LOGGER.log(Level.SEVERE,
"cannot construct IndexSearcher for project " + proj, ex);
}
} else {
searcher = mgr.acquire();
}

return searcher;
}

/**
* Return IndexSearcher object back to corresponding SearcherManager.
* @param proj project name which belongs to the searcher
* @param searcher searcher object to release
*/
public void returnIndexSearcher(String proj, IndexSearcher searcher) {
SearcherManager mgr = searcherManagerMap.get(proj);
if (mgr != null) {
try {
mgr.release(searcher);
} catch (IOException ex) {
LOGGER.log(Level.SEVERE, "cannot release IndexSearcher for project " + proj, ex);
}
} else {
LOGGER.log(Level.SEVERE, "cannot find SearcherManager for project " + proj);
}
}

/**
* After new configuration is put into place, the set of projects might
* change so we go through the SearcherManager objects and close those where
* the corresponding project is no longer present.
*/
private void refreshSearcherManagerMap() {
for (Map.Entry<String, SearcherManager> entry : searcherManagerMap.entrySet()) {
try {
// If a project is gone, close the corresponding SearcherManager
// so that it cannot produce new IndexSearcher objects.
for (Project proj : getProjects()) {
if (entry.getKey().compareTo(proj.getDescription()) == 0) {
// XXX Ideally we would like to remove the entry from the map here.
// However, if some thread acquired an IndexSearcher and then config change happened
// and the corresponding searcherManager was removed from the map,
// returnIndexSearcher() will have no place to return the indexSearcher to.
// This would likely lead to leaks since the corresponding IndexReader
// will remain open.
// So, we cannot remove searcherManager from the map until all threads
// are done with it. We could handle this by inserting the pair into
// special to-be-removed list and then check reference count
// of corresponding IndexReader object in returnIndexSearcher().
// If 0, the SearcherManager can be safely removed from the searcherManagerMap.
// For the time being, let the map to grow unbounded to keep things simple.
entry.getValue().close();
}
}
} catch (IOException ex) {
LOGGER.log(Level.SEVERE,
"cannot close IndexReader for project" + entry.getKey(), ex);
}
}
}

/**
* Return collection of IndexReader objects as MultiReader object
* for given list of projects.
* The caller is responsible for releasing the IndexSearcher objects
* so we add them to the map.
*
* @param projects list of projects
* @param map each IndexSearcher produced will be put into this map
* @return MultiReader for the projects
*/
public MultiReader getMultiReader(SortedSet<String> projects, Map<String, IndexSearcher> map) {
IndexReader[] subreaders = new IndexReader[projects.size()];
int ii = 0;

// TODO might need to rewrite to Project instead of
// String , need changes in projects.jspf too
for (String proj : projects) {
try {
IndexSearcher searcher = RuntimeEnvironment.getInstance().getIndexSearcher(proj);
subreaders[ii++] = searcher.getIndexReader();
map.put(proj, searcher);
} catch (IOException ex) {
LOGGER.log(Level.SEVERE,
"cannot get IndexReader for project" + proj, ex);
}
}
MultiReader multiReader = null;
try {
multiReader = new MultiReader(subreaders, true);
} catch (IOException ex) {
LOGGER.log(Level.SEVERE,
"cannot construct MultiReader for set of projects", ex);
}
return multiReader;
}

/**
* Helper method for the consumers of getMultiReader() to be called when
* destroying search context. This will make sure all indexSearcher
* objects are properly released.
*
* @param map map of project to indexSearcher
*/
public void freeIndexSearcherMap(Map<String, IndexSearcher> map) {
List<String> toRemove = new ArrayList<>();

for (Map.Entry<String, IndexSearcher> entry : map.entrySet()) {
RuntimeEnvironment.getInstance().returnIndexSearcher(entry.getKey(), entry.getValue());
toRemove.add(entry.getKey());
}

for (String key : toRemove) {
map.remove(key);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* CDDL HEADER START
*
* The contents of this file are subject to the terms of the
* Common Development and Distribution License (the "License").
* You may not use this file except in compliance with the License.
*
* See LICENSE.txt included in this distribution for the specific
* language governing permissions and limitations under the License.
*
* When distributing Covered Code, include this CDDL HEADER in each
* file and include the License file at LICENSE.txt.
* If applicable, add the following below this CDDL HEADER, with the
* fields enclosed by brackets "[]" replaced with your own identifying
* information: Portions Copyright [yyyy] [name of copyright owner]
*
* CDDL HEADER END
*/

/*
* Copyright (c) 2016, Oracle and/or its affiliates. All rights reserved.
*/
package org.opensolaris.opengrok.configuration;

import java.io.IOException;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.SearcherFactory;

/**
* Factory for producing IndexSearcher objects.
* This is used inside getIndexSearcher() to produce new SearcherManager objects
* to make sure the searcher threads are constrained to single thread pool.
* @author vkotal
*/
class ThreadpoolSearcherFactory extends SearcherFactory {
@Override
public IndexSearcher newSearcher(IndexReader r, IndexReader prev) throws IOException {
// The previous IndexReader is not used here.
IndexSearcher searcher = new IndexSearcher(r,
RuntimeEnvironment.getInstance().getSearchExecutor());
return searcher;
}
}
Loading