Skip to content

Commit 4cd6962

Browse files
authored
Close IndexFieldDataService asynchronously (#18888)
Signed-off-by: Sagar Upadhyaya <sagar.upadhyaya.121@gmail.com> Signed-off-by: Sagar <99425694+sgup432@users.noreply.github.com>
1 parent f90b12c commit 4cd6962

File tree

6 files changed

+107
-11
lines changed

6 files changed

+107
-11
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
9797
- Ignore awareness attributes when a custom preference string is included with a search request ([#18848](https://github.com/opensearch-project/OpenSearch/pull/18848))
9898
- Use ScoreDoc instead of FieldDoc when creating TopScoreDocCollectorManager to avoid unnecessary conversion ([#18802](https://github.com/opensearch-project/OpenSearch/pull/18802))
9999
- Fix leafSorter optimization for ReadOnlyEngine and NRTReplicationEngine ([#18639](https://github.com/opensearch-project/OpenSearch/pull/18639))
100+
- Close IndexFieldDataService asynchronously ([#18888](https://github.com/opensearch-project/OpenSearch/pull/18888))
100101
- Fix query string regex queries incorrectly swallowing TooComplexToDeterminizeException ([#18883](https://github.com/opensearch-project/OpenSearch/pull/18883))
101102

102103
### Security

server/src/main/java/org/opensearch/index/IndexService.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,13 @@ public IndexService(
274274
idFieldDataEnabled,
275275
scriptService
276276
);
277-
this.indexFieldData = new IndexFieldDataService(indexSettings, indicesFieldDataCache, circuitBreakerService, mapperService);
277+
this.indexFieldData = new IndexFieldDataService(
278+
indexSettings,
279+
indicesFieldDataCache,
280+
circuitBreakerService,
281+
mapperService,
282+
threadPool
283+
);
278284
if (indexSettings.getIndexSortConfig().hasIndexSort()) {
279285
// we delay the actual creation of the sort order for this index because the mapping has not been merged yet.
280286
// The sort order is validated right after the merge of the mapping later in the process.

server/src/main/java/org/opensearch/index/fielddata/IndexFieldDataService.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.opensearch.index.mapper.MapperService;
4545
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
4646
import org.opensearch.search.lookup.SearchLookup;
47+
import org.opensearch.threadpool.ThreadPool;
4748

4849
import java.io.Closeable;
4950
import java.io.IOException;
@@ -77,6 +78,8 @@ public class IndexFieldDataService extends AbstractIndexComponent implements Clo
7778
Property.IndexScope
7879
);
7980

81+
private final ThreadPool threadPool;
82+
8083
private final CircuitBreakerService circuitBreakerService;
8184

8285
private final IndicesFieldDataCache indicesFieldDataCache;
@@ -96,12 +99,14 @@ public IndexFieldDataService(
9699
IndexSettings indexSettings,
97100
IndicesFieldDataCache indicesFieldDataCache,
98101
CircuitBreakerService circuitBreakerService,
99-
MapperService mapperService
102+
MapperService mapperService,
103+
ThreadPool threadPool
100104
) {
101105
super(indexSettings);
102106
this.indicesFieldDataCache = indicesFieldDataCache;
103107
this.circuitBreakerService = circuitBreakerService;
104108
this.mapperService = mapperService;
109+
this.threadPool = threadPool;
105110
}
106111

107112
public synchronized void clear() {
@@ -181,6 +186,17 @@ public void setListener(IndexFieldDataCache.Listener listener) {
181186

182187
@Override
183188
public void close() throws IOException {
184-
clear();
189+
// Clear the field data cache for this index in an async manner
190+
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
191+
try {
192+
this.clear();
193+
} catch (Exception ex) {
194+
logger.warn(
195+
"Exception occurred while clearing index field data cache for index: {}, exception: {}",
196+
indexSettings.getIndex().getName(),
197+
ex
198+
);
199+
}
200+
});
185201
}
186202
}

server/src/test/java/org/opensearch/index/fielddata/IndexFieldDataServiceTests.java

Lines changed: 71 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import org.opensearch.index.mapper.NumberFieldMapper;
5959
import org.opensearch.index.mapper.TextFieldMapper;
6060
import org.opensearch.indices.IndicesService;
61+
import org.opensearch.indices.cluster.IndicesClusterStateService;
6162
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
6263
import org.opensearch.plugins.Plugin;
6364
import org.opensearch.search.lookup.SearchLookup;
@@ -67,6 +68,7 @@
6768
import org.opensearch.threadpool.TestThreadPool;
6869
import org.opensearch.threadpool.ThreadPool;
6970

71+
import java.io.IOException;
7072
import java.util.Arrays;
7173
import java.util.Collection;
7274
import java.util.Collections;
@@ -93,7 +95,8 @@ public void testGetForFieldDefaults() {
9395
indexService.getIndexSettings(),
9496
indicesService.getIndicesFieldDataCache(),
9597
indicesService.getCircuitBreakerService(),
96-
indexService.mapperService()
98+
indexService.mapperService(),
99+
indexService.getThreadPool()
97100
);
98101
final BuilderContext ctx = new BuilderContext(indexService.getIndexSettings().getSettings(), new ContentPath(1));
99102
final MappedFieldType stringMapper = new KeywordFieldMapper.Builder("string").build(ctx).fieldType();
@@ -127,14 +130,72 @@ public void testGetForFieldDefaults() {
127130
assertTrue(fd instanceof SortedNumericIndexFieldData);
128131
}
129132

133+
public void testIndexFieldDataCacheIsCleredAfterIndexRemoval() throws IOException, InterruptedException {
134+
final IndexService indexService = createIndex("test");
135+
final IndicesService indicesService = getInstanceFromNode(IndicesService.class);
136+
// copy the ifdService since we can set the listener only once.
137+
final IndexFieldDataService ifdService = new IndexFieldDataService(
138+
indexService.getIndexSettings(),
139+
indicesService.getIndicesFieldDataCache(),
140+
indicesService.getCircuitBreakerService(),
141+
indexService.mapperService(),
142+
indexService.getThreadPool()
143+
);
144+
145+
final BuilderContext ctx = new BuilderContext(indexService.getIndexSettings().getSettings(), new ContentPath(1));
146+
final MappedFieldType mapper1 = new TextFieldMapper.Builder("field_1", createDefaultIndexAnalyzers()).fielddata(true)
147+
.build(ctx)
148+
.fieldType();
149+
final MappedFieldType mapper2 = new TextFieldMapper.Builder("field_2", createDefaultIndexAnalyzers()).fielddata(true)
150+
.build(ctx)
151+
.fieldType();
152+
final IndexWriter writer = new IndexWriter(new ByteBuffersDirectory(), new IndexWriterConfig(new KeywordAnalyzer()));
153+
Document doc = new Document();
154+
doc.add(new StringField("field_1", "thisisastring", Store.NO));
155+
doc.add(new StringField("field_2", "thisisanotherstring", Store.NO));
156+
writer.addDocument(doc);
157+
final IndexReader reader = DirectoryReader.open(writer);
158+
final AtomicInteger onCacheCalled = new AtomicInteger();
159+
final AtomicInteger onRemovalCalled = new AtomicInteger();
160+
ifdService.setListener(new IndexFieldDataCache.Listener() {
161+
@Override
162+
public void onCache(ShardId shardId, String fieldName, Accountable ramUsage) {}
163+
164+
@Override
165+
public void onRemoval(ShardId shardId, String fieldName, boolean wasEvicted, long sizeInBytes) {}
166+
});
167+
IndexFieldData<?> ifd1 = ifdService.getForField(mapper1, "test", () -> { throw new UnsupportedOperationException(); });
168+
IndexFieldData<?> ifd2 = ifdService.getForField(mapper2, "test", () -> { throw new UnsupportedOperationException(); });
169+
LeafReaderContext leafReaderContext = reader.getContext().leaves().get(0);
170+
LeafFieldData loadField1 = ifd1.load(leafReaderContext);
171+
LeafFieldData loadField2 = ifd2.load(leafReaderContext);
172+
173+
assertEquals(2, indicesService.getIndicesFieldDataCache().getCache().count());
174+
175+
// Remove index
176+
indicesService.removeIndex(
177+
indexService.index(),
178+
IndicesClusterStateService.AllocatedIndices.IndexRemovalReason.DELETED,
179+
"Please delete!"
180+
);
181+
182+
waitUntil(() -> indicesService.getIndicesFieldDataCache().getCache().count() == 0);
183+
184+
reader.close();
185+
loadField1.close();
186+
loadField2.close();
187+
writer.close();
188+
}
189+
130190
public void testGetForFieldRuntimeField() {
131191
final IndexService indexService = createIndex("test");
132192
final IndicesService indicesService = getInstanceFromNode(IndicesService.class);
133193
final IndexFieldDataService ifdService = new IndexFieldDataService(
134194
indexService.getIndexSettings(),
135195
indicesService.getIndicesFieldDataCache(),
136196
indicesService.getCircuitBreakerService(),
137-
indexService.mapperService()
197+
indexService.mapperService(),
198+
indexService.getThreadPool()
138199
);
139200
final SetOnce<Supplier<SearchLookup>> searchLookupSetOnce = new SetOnce<>();
140201
MappedFieldType ft = mock(MappedFieldType.class);
@@ -159,7 +220,8 @@ public void testClearField() throws Exception {
159220
indexService.getIndexSettings(),
160221
indicesService.getIndicesFieldDataCache(),
161222
indicesService.getCircuitBreakerService(),
162-
indexService.mapperService()
223+
indexService.mapperService(),
224+
indexService.getThreadPool()
163225
);
164226

165227
final BuilderContext ctx = new BuilderContext(indexService.getIndexSettings().getSettings(), new ContentPath(1));
@@ -227,7 +289,8 @@ public void testFieldDataCacheListener() throws Exception {
227289
indexService.getIndexSettings(),
228290
indicesService.getIndicesFieldDataCache(),
229291
indicesService.getCircuitBreakerService(),
230-
indexService.mapperService()
292+
indexService.mapperService(),
293+
indexService.getThreadPool()
231294
);
232295

233296
final BuilderContext ctx = new BuilderContext(indexService.getIndexSettings().getSettings(), new ContentPath(1));
@@ -284,7 +347,8 @@ public void testSetCacheListenerTwice() {
284347
indexService.getIndexSettings(),
285348
indicesService.getIndicesFieldDataCache(),
286349
indicesService.getCircuitBreakerService(),
287-
indexService.mapperService()
350+
indexService.mapperService(),
351+
indexService.getThreadPool()
288352
);
289353
// set it the first time...
290354
shardPrivateService.setListener(new IndexFieldDataCache.Listener() {
@@ -325,7 +389,8 @@ private void doTestRequireDocValues(MappedFieldType ft) {
325389
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
326390
cache,
327391
null,
328-
null
392+
null,
393+
threadPool
329394
);
330395
if (ft.hasDocValues()) {
331396
ifds.getForField(ft, "test", () -> { throw new UnsupportedOperationException(); }); // no exception

server/src/test/java/org/opensearch/index/shard/IndexShardTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3359,7 +3359,8 @@ public void testReaderWrapperWorksWithGlobalOrdinals() throws IOException {
33593359
shard.indexSettings,
33603360
indicesFieldDataCache,
33613361
new NoneCircuitBreakerService(),
3362-
shard.mapperService()
3362+
shard.mapperService(),
3363+
shard.getThreadPool()
33633364
);
33643365
IndexFieldData.Global ifd = indexFieldDataService.getForField(foo, "test", () -> {
33653366
throw new UnsupportedOperationException("search lookup not available");

test/framework/src/main/java/org/opensearch/test/AbstractBuilderTestCase.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@
9090
import org.opensearch.script.ScriptModule;
9191
import org.opensearch.script.ScriptService;
9292
import org.opensearch.search.SearchModule;
93+
import org.opensearch.threadpool.TestThreadPool;
94+
import org.opensearch.threadpool.ThreadPool;
9395
import org.opensearch.transport.client.Client;
9496
import org.junit.After;
9597
import org.junit.AfterClass;
@@ -109,6 +111,7 @@
109111
import java.util.Map;
110112
import java.util.concurrent.Callable;
111113
import java.util.concurrent.ExecutionException;
114+
import java.util.concurrent.TimeUnit;
112115
import java.util.function.Function;
113116
import java.util.function.Predicate;
114117
import java.util.stream.Stream;
@@ -177,6 +180,7 @@ public abstract class AbstractBuilderTestCase extends OpenSearchTestCase {
177180
private static Settings nodeSettings;
178181
private static Index index;
179182
private static long nowInMillis;
183+
private static ThreadPool threadPool;
180184

181185
protected static Index getIndex() {
182186
return index;
@@ -198,6 +202,7 @@ public static void beforeClass() {
198202

199203
index = new Index(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLength(10));
200204
nowInMillis = randomNonNegativeLong();
205+
threadPool = new TestThreadPool("random_threadpool_name");
201206
}
202207

203208
@Override
@@ -240,6 +245,7 @@ protected Iterable<MappedFieldType> getMapping() {
240245
public static void afterClass() throws Exception {
241246
IOUtils.close(serviceHolder);
242247
IOUtils.close(serviceHolderWithNoType);
248+
ThreadPool.terminate(threadPool, 1, TimeUnit.MINUTES);
243249
serviceHolder = null;
244250
serviceHolderWithNoType = null;
245251
}
@@ -432,7 +438,8 @@ private static class ServiceHolder implements Closeable {
432438
idxSettings,
433439
indicesFieldDataCache,
434440
new NoneCircuitBreakerService(),
435-
mapperService
441+
mapperService,
442+
threadPool
436443
);
437444
bitsetFilterCache = new BitsetFilterCache(idxSettings, new BitsetFilterCache.Listener() {
438445
@Override

0 commit comments

Comments
 (0)