Skip to content

Commit 5dfd25c

Browse files
author
Vladimir Kotal
authored
Merge pull request #1186 from vladak/multisearch
reuse IndexReader objects for multi-project searches
2 parents c1b7dca + 1c601d3 commit 5dfd25c

File tree

16 files changed

+578
-93
lines changed

16 files changed

+578
-93
lines changed

nbproject/project.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ build.classes.dir=${build.dir}/classes
4646
dist.dir=dist
4747
manifest.file=manifest.mf
4848
javac.source=1.8
49-
run.jvmargs=-ea -enableassertions\:org.opensolaris.opengrok...
49+
run.jvmargs=-ea -enableassertions:org.opensolaris.opengrok...
5050
manifest.custom.permissions=
5151
lucene-analyzers-common.jar=lucene-analyzers-common-${lucene.version}.jar
5252
run.test.classpath=\

src/org/opensolaris/opengrok/configuration/Configuration.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ public final class Configuration {
134134
private final Map<String, String> cmds;
135135
private int tabSize;
136136
private int command_timeout; // in seconds
137+
private int indexRefreshPeriod; // in seconds
137138
private boolean scopesEnabled;
138139
private boolean foldingEnabled;
139140

@@ -225,6 +226,14 @@ public void setCommandTimeout(int timeout) {
225226
this.command_timeout = timeout;
226227
}
227228

229+
public int getIndexRefreshPeriod() {
230+
return indexRefreshPeriod;
231+
}
232+
233+
public void setIndexRefreshPeriod(int seconds) {
234+
this.indexRefreshPeriod = seconds;
235+
}
236+
228237
/**
229238
* Creates a new instance of Configuration
230239
*/
@@ -275,6 +284,7 @@ public Configuration() {
275284
setRevisionMessageCollapseThreshold(200);
276285
setPluginDirectory(null);
277286
setMaxSearchThreadCount(2 * Runtime.getRuntime().availableProcessors());
287+
setIndexRefreshPeriod(60);
278288
}
279289

280290
public String getRepoCmd(String clazzName) {

src/org/opensolaris/opengrok/configuration/RuntimeEnvironment.java

Lines changed: 238 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
*/
1919

2020
/*
21-
* Copyright (c) 2006, 2016, Oracle and/or its affiliates. All rights reserved.
22-
*/
21+
* Copyright (c) 2006, 2016, Oracle and/or its affiliates. All rights reserved.
22+
*/
2323
package org.opensolaris.opengrok.configuration;
2424

2525
import java.beans.XMLDecoder;
@@ -67,11 +67,26 @@
6767
import org.opensolaris.opengrok.logger.LoggerFactory;
6868
import org.opensolaris.opengrok.util.Executor;
6969
import org.opensolaris.opengrok.util.IOUtils;
70+
import org.opensolaris.opengrok.configuration.ThreadpoolSearcherFactory;
7071

7172
import static java.nio.file.FileVisitResult.CONTINUE;
7273
import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
7374
import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE;
7475
import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
76+
import java.util.Collections;
77+
import java.util.Iterator;
78+
import java.util.SortedSet;
79+
import java.util.concurrent.ConcurrentHashMap;
80+
import org.apache.lucene.index.DirectoryReader;
81+
import org.apache.lucene.index.IndexReader;
82+
import org.apache.lucene.index.MultiReader;
83+
import org.apache.lucene.search.IndexSearcher;
84+
import org.apache.lucene.search.SearcherFactory;
85+
import org.apache.lucene.search.SearcherManager;
86+
import org.apache.lucene.store.AlreadyClosedException;
87+
import org.apache.lucene.store.Directory;
88+
import org.apache.lucene.store.FSDirectory;
89+
import org.opensolaris.opengrok.index.IndexDatabase;
7590

7691

7792
/**
@@ -91,6 +106,7 @@ public final class RuntimeEnvironment {
91106

92107
private final Map<Project, List<RepositoryInfo>> repository_map = new TreeMap<>();
93108
private final Map<Project, Set<Group>> project_group_map = new TreeMap<>();
109+
private final Map<String, SearcherManager> searcherManagerMap = new ConcurrentHashMap<>();
94110

95111
/* Get thread pool used for top-level repository history generation. */
96112
public static synchronized ExecutorService getHistoryExecutor() {
@@ -232,6 +248,14 @@ public void setCommandTimeout(int timeout) {
232248
threadConfig.get().setCommandTimeout(timeout);
233249
}
234250

251+
public int getIndexRefreshPeriod() {
252+
return threadConfig.get().getIndexRefreshPeriod();
253+
}
254+
255+
public void setIndexRefreshPeriod(int seconds) {
256+
threadConfig.get().setIndexRefreshPeriod(seconds);
257+
}
258+
235259
/**
236260
* Get the path to the where the index database is stored
237261
*
@@ -1184,6 +1208,8 @@ public void stopConfigurationListenerThread() {
11841208
IOUtils.close(configServerSocket);
11851209
}
11861210

1211+
private Thread configurationListenerThread;
1212+
11871213
/**
11881214
* Start a thread to listen on a socket to receive new configurations to
11891215
* use.
@@ -1199,7 +1225,7 @@ public boolean startConfigurationListenerThread(SocketAddress endpoint) {
11991225
configServerSocket.bind(endpoint);
12001226
ret = true;
12011227
final ServerSocket sock = configServerSocket;
1202-
Thread t = new Thread(new Runnable() {
1228+
configurationListenerThread = new Thread(new Runnable() {
12031229
@Override
12041230
public void run() {
12051231
ByteArrayOutputStream bos = new ByteArrayOutputStream(1 << 13);
@@ -1228,7 +1254,15 @@ public void run() {
12281254
((Configuration) obj).refreshDateForLastIndexRun();
12291255
setConfiguration((Configuration) obj);
12301256
LOGGER.log(Level.INFO, "Configuration updated: {0}",
1231-
configuration.getSourceRoot());
1257+
configuration.getSourceRoot());
1258+
1259+
// We are assuming that each update of configuration
1260+
// means reindex. If dedicated thread is introduced
1261+
// in the future solely for the purpose of getting
1262+
// the event of reindex, the 2 calls below should
1263+
// be moved there.
1264+
refreshSearcherManagerMap();
1265+
maybeRefreshIndexSearchers();
12321266
}
12331267
} catch (IOException e) {
12341268
LOGGER.log(Level.SEVERE, "Error reading config file: ", e);
@@ -1237,12 +1271,12 @@ public void run() {
12371271
}
12381272
}
12391273
}
1240-
}, "conigurationListener");
1241-
t.start();
1274+
}, "configurationListener");
1275+
configurationListenerThread.start();
12421276
} catch (UnknownHostException ex) {
1243-
LOGGER.log(Level.FINE, "Problem resolving sender: ", ex);
1277+
LOGGER.log(Level.WARNING, "Problem resolving sender: ", ex);
12441278
} catch (IOException ex) {
1245-
LOGGER.log(Level.FINE, "I/O error when waiting for config: ", ex);
1279+
LOGGER.log(Level.WARNING, "I/O error when waiting for config: ", ex);
12461280
}
12471281

12481282
if (!ret && configServerSocket != null) {
@@ -1343,4 +1377,200 @@ public void stopWatchDogService() {
13431377
}
13441378
}
13451379
}
1380+
1381+
private Thread indexReopenThread;
1382+
1383+
public void maybeRefreshIndexSearchers() {
1384+
for (Map.Entry<String, SearcherManager> entry : searcherManagerMap.entrySet()) {
1385+
try {
1386+
entry.getValue().maybeRefresh();
1387+
} catch (AlreadyClosedException ex) {
1388+
// This is a case of removed project.
1389+
// See refreshSearcherManagerMap() for details.
1390+
} catch (IOException ex) {
1391+
LOGGER.log(Level.SEVERE, "maybeRefresh failed", ex);
1392+
}
1393+
}
1394+
}
1395+
1396+
/**
1397+
* Call maybeRefresh() on each SearcherManager object from dedicated thread
1398+
* periodically.
1399+
* If the corresponding index has changed in the meantime, it will be safely
1400+
* reopened, i.e. without impacting existing IndexSearcher/IndexReader
1401+
* objects, thus not disrupting searches in progress.
1402+
*/
1403+
public void startIndexReopenThread() {
1404+
indexReopenThread = new Thread(new Runnable() {
1405+
@Override
1406+
public void run() {
1407+
while (!Thread.currentThread().isInterrupted()) {
1408+
try {
1409+
maybeRefreshIndexSearchers();
1410+
Thread.sleep(getIndexRefreshPeriod() * 1000);
1411+
} catch (InterruptedException ex) {
1412+
Thread.currentThread().interrupt();
1413+
}
1414+
}
1415+
}
1416+
}, "indexReopenThread");
1417+
1418+
indexReopenThread.start();
1419+
}
1420+
1421+
public void stopIndexReopenThread() {
1422+
if (indexReopenThread != null) {
1423+
indexReopenThread.interrupt();
1424+
try {
1425+
indexReopenThread.join();
1426+
} catch (InterruptedException ex) {
1427+
LOGGER.log(Level.INFO, "Cannot join indexReopen thread: ", ex);
1428+
}
1429+
}
1430+
}
1431+
1432+
/**
1433+
* Get IndexSearcher for given project.
1434+
* Each IndexSearcher is born from a SearcherManager object. There is
1435+
* one SearcherManager for every project.
1436+
* This schema makes it possible to reuse IndexSearcher/IndexReader objects
1437+
* so the heavy lifting (esp. system calls) performed in FSDirectory
1438+
* and DirectoryReader happens only once for a project.
1439+
* The caller has to make sure that the IndexSearcher is returned back
1440+
* to the SearcherManager. This is done with returnIndexSearcher().
1441+
* The return of the IndexSearcher should happen only after the search
1442+
* result data are read fully.
1443+
*
1444+
* @param proj project
1445+
* @return SearcherManager for given project
1446+
*/
1447+
public IndexSearcher getIndexSearcher(String proj) throws IOException {
1448+
SearcherManager mgr = searcherManagerMap.get(proj);
1449+
IndexSearcher searcher = null;
1450+
if (mgr == null) {
1451+
File indexDir = new File(getDataRootPath(), IndexDatabase.INDEX_DIR);
1452+
1453+
try {
1454+
Directory dir = FSDirectory.open(new File(indexDir, proj).toPath());
1455+
mgr = new SearcherManager(dir, new ThreadpoolSearcherFactory());
1456+
searcherManagerMap.put(proj, mgr);
1457+
searcher = mgr.acquire();
1458+
} catch (IOException ex) {
1459+
LOGGER.log(Level.SEVERE,
1460+
"cannot construct IndexSearcher for project " + proj, ex);
1461+
}
1462+
} else {
1463+
searcher = mgr.acquire();
1464+
}
1465+
1466+
return searcher;
1467+
}
1468+
1469+
/**
1470+
* Return IndexSearcher object back to corresponding SearcherManager.
1471+
* @param proj project name which belongs to the searcher
1472+
* @param searcher searcher object to release
1473+
*/
1474+
public void returnIndexSearcher(String proj, IndexSearcher searcher) {
1475+
SearcherManager mgr = searcherManagerMap.get(proj);
1476+
if (mgr != null) {
1477+
try {
1478+
mgr.release(searcher);
1479+
} catch (IOException ex) {
1480+
LOGGER.log(Level.SEVERE, "cannot release IndexSearcher for project " + proj, ex);
1481+
}
1482+
} else {
1483+
LOGGER.log(Level.SEVERE, "cannot find SearcherManager for project " + proj);
1484+
}
1485+
}
1486+
1487+
/**
1488+
* After new configuration is put into place, the set of projects might
1489+
* change so we go through the SearcherManager objects and close those where
1490+
* the corresponding project is no longer present.
1491+
*/
1492+
private void refreshSearcherManagerMap() {
1493+
for (Map.Entry<String, SearcherManager> entry : searcherManagerMap.entrySet()) {
1494+
try {
1495+
// If a project is gone, close the corresponding SearcherManager
1496+
// so that it cannot produce new IndexSearcher objects.
1497+
for (Project proj : getProjects()) {
1498+
if (entry.getKey().compareTo(proj.getDescription()) == 0) {
1499+
// XXX Ideally we would like to remove the entry from the map here.
1500+
// However, if some thread acquired an IndexSearcher and then config change happened
1501+
// and the corresponding searcherManager was removed from the map,
1502+
// returnIndexSearcher() will have no place to return the indexSearcher to.
1503+
// This would likely lead to leaks since the corresponding IndexReader
1504+
// will remain open.
1505+
// So, we cannot remove searcherManager from the map until all threads
1506+
// are done with it. We could handle this by inserting the pair into
1507+
// special to-be-removed list and then check reference count
1508+
// of corresponding IndexReader object in returnIndexSearcher().
1509+
// If 0, the SearcherManager can be safely removed from the searcherManagerMap.
1510+
// For the time being, let the map to grow unbounded to keep things simple.
1511+
entry.getValue().close();
1512+
}
1513+
}
1514+
} catch (IOException ex) {
1515+
LOGGER.log(Level.SEVERE,
1516+
"cannot close IndexReader for project" + entry.getKey(), ex);
1517+
}
1518+
}
1519+
}
1520+
1521+
/**
1522+
* Return collection of IndexReader objects as MultiReader object
1523+
* for given list of projects.
1524+
* The caller is responsible for releasing the IndexSearcher objects
1525+
* so we add them to the map.
1526+
*
1527+
* @param projects list of projects
1528+
* @param map each IndexSearcher produced will be put into this map
1529+
* @return MultiReader for the projects
1530+
*/
1531+
public MultiReader getMultiReader(SortedSet<String> projects, Map<String, IndexSearcher> map) {
1532+
IndexReader[] subreaders = new IndexReader[projects.size()];
1533+
int ii = 0;
1534+
1535+
// TODO might need to rewrite to Project instead of
1536+
// String , need changes in projects.jspf too
1537+
for (String proj : projects) {
1538+
try {
1539+
IndexSearcher searcher = RuntimeEnvironment.getInstance().getIndexSearcher(proj);
1540+
subreaders[ii++] = searcher.getIndexReader();
1541+
map.put(proj, searcher);
1542+
} catch (IOException ex) {
1543+
LOGGER.log(Level.SEVERE,
1544+
"cannot get IndexReader for project" + proj, ex);
1545+
}
1546+
}
1547+
MultiReader multiReader = null;
1548+
try {
1549+
multiReader = new MultiReader(subreaders, true);
1550+
} catch (IOException ex) {
1551+
LOGGER.log(Level.SEVERE,
1552+
"cannot construct MultiReader for set of projects", ex);
1553+
}
1554+
return multiReader;
1555+
}
1556+
1557+
/**
1558+
* Helper method for the consumers of getMultiReader() to be called when
1559+
* destroying search context. This will make sure all indexSearcher
1560+
* objects are properly released.
1561+
*
1562+
* @param map map of project to indexSearcher
1563+
*/
1564+
public void freeIndexSearcherMap(Map<String, IndexSearcher> map) {
1565+
List<String> toRemove = new ArrayList<>();
1566+
1567+
for (Map.Entry<String, IndexSearcher> entry : map.entrySet()) {
1568+
RuntimeEnvironment.getInstance().returnIndexSearcher(entry.getKey(), entry.getValue());
1569+
toRemove.add(entry.getKey());
1570+
}
1571+
1572+
for (String key : toRemove) {
1573+
map.remove(key);
1574+
}
1575+
}
13461576
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* CDDL HEADER START
3+
*
4+
* The contents of this file are subject to the terms of the
5+
* Common Development and Distribution License (the "License").
6+
* You may not use this file except in compliance with the License.
7+
*
8+
* See LICENSE.txt included in this distribution for the specific
9+
* language governing permissions and limitations under the License.
10+
*
11+
* When distributing Covered Code, include this CDDL HEADER in each
12+
* file and include the License file at LICENSE.txt.
13+
* If applicable, add the following below this CDDL HEADER, with the
14+
* fields enclosed by brackets "[]" replaced with your own identifying
15+
* information: Portions Copyright [yyyy] [name of copyright owner]
16+
*
17+
* CDDL HEADER END
18+
*/
19+
20+
/*
21+
* Copyright (c) 2016, Oracle and/or its affiliates. All rights reserved.
22+
*/
23+
package org.opensolaris.opengrok.configuration;
24+
25+
import java.io.IOException;
26+
import org.apache.lucene.index.IndexReader;
27+
import org.apache.lucene.search.IndexSearcher;
28+
import org.apache.lucene.search.SearcherFactory;
29+
30+
/**
31+
* Factory for producing IndexSearcher objects.
32+
* This is used inside getIndexSearcher() to produce new SearcherManager objects
33+
* to make sure the searcher threads are constrained to single thread pool.
34+
* @author vkotal
35+
*/
36+
class ThreadpoolSearcherFactory extends SearcherFactory {
37+
@Override
38+
public IndexSearcher newSearcher(IndexReader r, IndexReader prev) throws IOException {
39+
// The previous IndexReader is not used here.
40+
IndexSearcher searcher = new IndexSearcher(r,
41+
RuntimeEnvironment.getInstance().getSearchExecutor());
42+
return searcher;
43+
}
44+
}

0 commit comments

Comments
 (0)