Skip to content

Commit

Permalink
Add slice level operation listener methods
Browse files Browse the repository at this point in the history
Signed-off-by: Jay Deng <jayd0104@gmail.com>
  • Loading branch information
jed326 authored and Jay Deng committed Aug 8, 2024
1 parent f03dde9 commit f27d3d9
Show file tree
Hide file tree
Showing 10 changed files with 215 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add `rangeQuery` and `regexpQuery` for `constant_keyword` field type ([#14711](https://github.com/opensearch-project/OpenSearch/pull/14711))
- Add took time to request nodes stats ([#15054](https://github.com/opensearch-project/OpenSearch/pull/15054))
- [Workload Management] QueryGroup resource tracking framework changes ([#13897](https://github.com/opensearch-project/OpenSearch/pull/13897))
- Add slice execution listeners to SearchOperationListener interface ([#15153](https://github.com/opensearch-project/OpenSearch/pull/15153))

### Dependencies
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,33 @@ default void onFailedQueryPhase(SearchContext searchContext) {}
*/
default void onQueryPhase(SearchContext searchContext, long tookInNanos) {}

/**
* Executed before the slice execution in
* {@link org.opensearch.search.internal.ContextIndexSearcher#search(List, org.apache.lucene.search.Weight, org.apache.lucene.search.Collector)}.
* This will be called once per segment slice in concurrent search and only once in non-concurrent search.*
* @param searchContext the current search context
*/
default void onPreSliceExecution(SearchContext searchContext) {}

/**
* Executed if the slice execution in
* {@link org.opensearch.search.internal.ContextIndexSearcher#search(List, org.apache.lucene.search.Weight, org.apache.lucene.search.Collector)} failed.
* This will be called once per segment slice in concurrent search and only once in non-concurrent search.*
* @param searchContext the current search context
*/
default void onFailedSliceExecution(SearchContext searchContext) {}

/**
* Executed after the slice execution in
* {@link org.opensearch.search.internal.ContextIndexSearcher#search(List, org.apache.lucene.search.Weight, org.apache.lucene.search.Collector)} successfully finished.
* This will be called once per segment slice in concurrent search and only once in non-concurrent search.
* Note: this is not invoked if the slice execution failed.*
* @param searchContext the current search context
*
* @see #onFailedSliceExecution(org.opensearch.search.internal.SearchContext)
*/
default void onSliceExecution(SearchContext searchContext) {}

/**
* Executed before the fetch phase is executed
* @param searchContext the current search context
Expand Down Expand Up @@ -195,6 +222,39 @@ public void onQueryPhase(SearchContext searchContext, long tookInNanos) {
}
}

@Override
public void onPreSliceExecution(SearchContext searchContext) {
for (SearchOperationListener listener : listeners) {
try {
listener.onPreSliceExecution(searchContext);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("onPreSliceExecution listener [{}] failed", listener), e);
}
}
}

@Override
public void onFailedSliceExecution(SearchContext searchContext) {
for (SearchOperationListener listener : listeners) {
try {
listener.onFailedSliceExecution(searchContext);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("onFailedSliceExecution listener [{}] failed", listener), e);
}
}
}

@Override
public void onSliceExecution(SearchContext searchContext) {
for (SearchOperationListener listener : listeners) {
try {
listener.onSliceExecution(searchContext);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("onSliceExecution listener [{}] failed", listener), e);
}
}
}

@Override
public void onPreFetchPhase(SearchContext searchContext) {
for (SearchOperationListener listener : listeners) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,20 +270,27 @@ public void search(

@Override
protected void search(List<LeafReaderContext> leaves, Weight weight, Collector collector) throws IOException {
// Time series based workload by default traverses segments in desc order i.e. latest to the oldest order.
// This is actually beneficial for search queries to start search on latest segments first for time series workload.
// That can slow down ASC order queries on timestamp workload. So to avoid that slowdown, we will reverse leaf
// reader order here.
if (searchContext.shouldUseTimeSeriesDescSortOptimization()) {
for (int i = leaves.size() - 1; i >= 0; i--) {
searchLeaf(leaves.get(i), weight, collector);
}
} else {
for (int i = 0; i < leaves.size(); i++) {
searchLeaf(leaves.get(i), weight, collector);
searchContext.indexShard().getSearchOperationListener().onPreSliceExecution(searchContext);
try {
// Time series based workload by default traverses segments in desc order i.e. latest to the oldest order.
// This is actually beneficial for search queries to start search on latest segments first for time series workload.
// That can slow down ASC order queries on timestamp workload. So to avoid that slowdown, we will reverse leaf
// reader order here.
if (searchContext.shouldUseTimeSeriesDescSortOptimization()) {
for (int i = leaves.size() - 1; i >= 0; i--) {
searchLeaf(leaves.get(i), weight, collector);
}
} else {
for (int i = 0; i < leaves.size(); i++) {
searchLeaf(leaves.get(i), weight, collector);
}
}
searchContext.bucketCollectorProcessor().processPostCollection(collector);
} catch (Throwable t) {
searchContext.indexShard().getSearchOperationListener().onFailedSliceExecution(searchContext);
throw t;
}
searchContext.bucketCollectorProcessor().processPostCollection(collector);
searchContext.indexShard().getSearchOperationListener().onSliceExecution(searchContext);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ public void testListenersAreExecuted() {
AtomicInteger preQuery = new AtomicInteger();
AtomicInteger failedQuery = new AtomicInteger();
AtomicInteger onQuery = new AtomicInteger();
AtomicInteger preSlice = new AtomicInteger();
AtomicInteger failedSlice = new AtomicInteger();
AtomicInteger onSlice = new AtomicInteger();
AtomicInteger onFetch = new AtomicInteger();
AtomicInteger preFetch = new AtomicInteger();
AtomicInteger failedFetch = new AtomicInteger();
Expand Down Expand Up @@ -86,6 +89,24 @@ public void onQueryPhase(SearchContext searchContext, long tookInNanos) {
onQuery.incrementAndGet();
}

@Override
public void onPreSliceExecution(SearchContext searchContext) {
assertNotNull(searchContext);
preSlice.incrementAndGet();
}

@Override
public void onFailedSliceExecution(SearchContext searchContext) {
assertNotNull(searchContext);
failedSlice.incrementAndGet();
}

@Override
public void onSliceExecution(SearchContext searchContext) {
assertNotNull(searchContext);
onSlice.incrementAndGet();
}

@Override
public void onPreFetchPhase(SearchContext searchContext) {
assertNotNull(searchContext);
Expand Down Expand Up @@ -167,10 +188,30 @@ public void onSearchIdleReactivation() {
compositeListener.onQueryPhase(ctx, timeInNanos.get());
assertEquals(0, preFetch.get());
assertEquals(0, preQuery.get());
assertEquals(0, preSlice.get());
assertEquals(0, failedFetch.get());
assertEquals(0, failedQuery.get());
assertEquals(0, failedSlice.get());
assertEquals(2, onQuery.get());
assertEquals(0, onFetch.get());
assertEquals(0, onSlice.get());
assertEquals(0, newContext.get());
assertEquals(0, newScrollContext.get());
assertEquals(0, freeContext.get());
assertEquals(0, freeScrollContext.get());
assertEquals(0, searchIdleReactivateCount.get());
assertEquals(0, validateSearchContext.get());

compositeListener.onSliceExecution(ctx);
assertEquals(0, preFetch.get());
assertEquals(0, preQuery.get());
assertEquals(0, preSlice.get());
assertEquals(0, failedFetch.get());
assertEquals(0, failedQuery.get());
assertEquals(0, failedSlice.get());
assertEquals(2, onQuery.get());
assertEquals(0, onFetch.get());
assertEquals(2, onSlice.get());
assertEquals(0, newContext.get());
assertEquals(0, newScrollContext.get());
assertEquals(0, freeContext.get());
Expand All @@ -181,10 +222,13 @@ public void onSearchIdleReactivation() {
compositeListener.onFetchPhase(ctx, timeInNanos.get());
assertEquals(0, preFetch.get());
assertEquals(0, preQuery.get());
assertEquals(0, preSlice.get());
assertEquals(0, failedFetch.get());
assertEquals(0, failedQuery.get());
assertEquals(0, failedSlice.get());
assertEquals(2, onQuery.get());
assertEquals(2, onFetch.get());
assertEquals(2, onSlice.get());
assertEquals(0, newContext.get());
assertEquals(0, newScrollContext.get());
assertEquals(0, freeContext.get());
Expand All @@ -195,10 +239,30 @@ public void onSearchIdleReactivation() {
compositeListener.onPreQueryPhase(ctx);
assertEquals(0, preFetch.get());
assertEquals(2, preQuery.get());
assertEquals(0, preSlice.get());
assertEquals(0, failedFetch.get());
assertEquals(0, failedQuery.get());
assertEquals(0, failedSlice.get());
assertEquals(2, onQuery.get());
assertEquals(2, onFetch.get());
assertEquals(2, onSlice.get());
assertEquals(0, newContext.get());
assertEquals(0, newScrollContext.get());
assertEquals(0, freeContext.get());
assertEquals(0, freeScrollContext.get());
assertEquals(0, searchIdleReactivateCount.get());
assertEquals(0, validateSearchContext.get());

compositeListener.onPreSliceExecution(ctx);
assertEquals(0, preFetch.get());
assertEquals(2, preQuery.get());
assertEquals(2, preSlice.get());
assertEquals(0, failedFetch.get());
assertEquals(0, failedQuery.get());
assertEquals(0, failedSlice.get());
assertEquals(2, onQuery.get());
assertEquals(2, onFetch.get());
assertEquals(2, onSlice.get());
assertEquals(0, newContext.get());
assertEquals(0, newScrollContext.get());
assertEquals(0, freeContext.get());
Expand All @@ -209,10 +273,13 @@ public void onSearchIdleReactivation() {
compositeListener.onPreFetchPhase(ctx);
assertEquals(2, preFetch.get());
assertEquals(2, preQuery.get());
assertEquals(2, preSlice.get());
assertEquals(0, failedFetch.get());
assertEquals(0, failedQuery.get());
assertEquals(0, failedSlice.get());
assertEquals(2, onQuery.get());
assertEquals(2, onFetch.get());
assertEquals(2, onSlice.get());
assertEquals(0, newContext.get());
assertEquals(0, newScrollContext.get());
assertEquals(0, freeContext.get());
Expand All @@ -223,10 +290,13 @@ public void onSearchIdleReactivation() {
compositeListener.onFailedFetchPhase(ctx);
assertEquals(2, preFetch.get());
assertEquals(2, preQuery.get());
assertEquals(2, preSlice.get());
assertEquals(2, failedFetch.get());
assertEquals(0, failedQuery.get());
assertEquals(0, failedSlice.get());
assertEquals(2, onQuery.get());
assertEquals(2, onFetch.get());
assertEquals(2, onSlice.get());
assertEquals(0, newContext.get());
assertEquals(0, newScrollContext.get());
assertEquals(0, freeContext.get());
Expand All @@ -237,10 +307,30 @@ public void onSearchIdleReactivation() {
compositeListener.onFailedQueryPhase(ctx);
assertEquals(2, preFetch.get());
assertEquals(2, preQuery.get());
assertEquals(2, preSlice.get());
assertEquals(2, failedFetch.get());
assertEquals(2, failedQuery.get());
assertEquals(0, failedSlice.get());
assertEquals(2, onQuery.get());
assertEquals(2, onFetch.get());
assertEquals(2, onSlice.get());
assertEquals(0, newContext.get());
assertEquals(0, newScrollContext.get());
assertEquals(0, freeContext.get());
assertEquals(0, freeScrollContext.get());
assertEquals(0, searchIdleReactivateCount.get());
assertEquals(0, validateSearchContext.get());

compositeListener.onFailedSliceExecution(ctx);
assertEquals(2, preFetch.get());
assertEquals(2, preQuery.get());
assertEquals(2, preSlice.get());
assertEquals(2, failedFetch.get());
assertEquals(2, failedQuery.get());
assertEquals(2, failedSlice.get());
assertEquals(2, onQuery.get());
assertEquals(2, onFetch.get());
assertEquals(2, onSlice.get());
assertEquals(0, newContext.get());
assertEquals(0, newScrollContext.get());
assertEquals(0, freeContext.get());
Expand All @@ -251,10 +341,13 @@ public void onSearchIdleReactivation() {
compositeListener.onNewReaderContext(mock(ReaderContext.class));
assertEquals(2, preFetch.get());
assertEquals(2, preQuery.get());
assertEquals(2, preSlice.get());
assertEquals(2, failedFetch.get());
assertEquals(2, failedQuery.get());
assertEquals(2, failedSlice.get());
assertEquals(2, onQuery.get());
assertEquals(2, onFetch.get());
assertEquals(2, onSlice.get());
assertEquals(2, newContext.get());
assertEquals(0, newScrollContext.get());
assertEquals(0, freeContext.get());
Expand All @@ -265,10 +358,13 @@ public void onSearchIdleReactivation() {
compositeListener.onNewScrollContext(mock(ReaderContext.class));
assertEquals(2, preFetch.get());
assertEquals(2, preQuery.get());
assertEquals(2, preSlice.get());
assertEquals(2, failedFetch.get());
assertEquals(2, failedQuery.get());
assertEquals(2, failedSlice.get());
assertEquals(2, onQuery.get());
assertEquals(2, onFetch.get());
assertEquals(2, onSlice.get());
assertEquals(2, newContext.get());
assertEquals(2, newScrollContext.get());
assertEquals(0, freeContext.get());
Expand All @@ -279,10 +375,13 @@ public void onSearchIdleReactivation() {
compositeListener.onFreeReaderContext(mock(ReaderContext.class));
assertEquals(2, preFetch.get());
assertEquals(2, preQuery.get());
assertEquals(2, preSlice.get());
assertEquals(2, failedFetch.get());
assertEquals(2, failedQuery.get());
assertEquals(2, failedSlice.get());
assertEquals(2, onQuery.get());
assertEquals(2, onFetch.get());
assertEquals(2, onSlice.get());
assertEquals(2, newContext.get());
assertEquals(2, newScrollContext.get());
assertEquals(2, freeContext.get());
Expand All @@ -293,10 +392,13 @@ public void onSearchIdleReactivation() {
compositeListener.onFreeScrollContext(mock(ReaderContext.class));
assertEquals(2, preFetch.get());
assertEquals(2, preQuery.get());
assertEquals(2, preSlice.get());
assertEquals(2, failedFetch.get());
assertEquals(2, failedQuery.get());
assertEquals(2, failedSlice.get());
assertEquals(2, onQuery.get());
assertEquals(2, onFetch.get());
assertEquals(2, onSlice.get());
assertEquals(2, newContext.get());
assertEquals(2, newScrollContext.get());
assertEquals(2, freeContext.get());
Expand All @@ -307,10 +409,13 @@ public void onSearchIdleReactivation() {
compositeListener.onSearchIdleReactivation();
assertEquals(2, preFetch.get());
assertEquals(2, preQuery.get());
assertEquals(2, preSlice.get());
assertEquals(2, failedFetch.get());
assertEquals(2, failedQuery.get());
assertEquals(2, failedSlice.get());
assertEquals(2, onQuery.get());
assertEquals(2, onFetch.get());
assertEquals(2, onSlice.get());
assertEquals(2, newContext.get());
assertEquals(2, newScrollContext.get());
assertEquals(2, freeContext.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.tasks.TaskCancelledException;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.SearchOperationListener;
import org.opensearch.search.internal.ContextIndexSearcher;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.test.OpenSearchTestCase;
Expand Down Expand Up @@ -138,6 +139,9 @@ public void testCancellableCollector() throws IOException {
Runnable cancellation = () -> { throw new TaskCancelledException("cancelled"); };
IndexShard indexShard = mock(IndexShard.class);
when(searchContext.indexShard()).thenReturn(indexShard);
SearchOperationListener searchOperationListener = new SearchOperationListener() {
};
when(indexShard.getSearchOperationListener()).thenReturn(searchOperationListener);
ContextIndexSearcher searcher = new ContextIndexSearcher(
reader,
IndexSearcher.getDefaultSimilarity(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.index.cache.bitset.BitsetFilterCache;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.SearchOperationListener;
import org.opensearch.search.SearchService;
import org.opensearch.search.aggregations.LeafBucketCollector;
import org.opensearch.test.IndexSettingsModule;
Expand Down Expand Up @@ -262,6 +263,9 @@ public void onRemoval(ShardId shardId, Accountable accountable) {
SearchContext searchContext = mock(SearchContext.class);
IndexShard indexShard = mock(IndexShard.class);
when(searchContext.indexShard()).thenReturn(indexShard);
SearchOperationListener searchOperationListener = new SearchOperationListener() {
};
when(indexShard.getSearchOperationListener()).thenReturn(searchOperationListener);
when(searchContext.bucketCollectorProcessor()).thenReturn(SearchContext.NO_OP_BUCKET_COLLECTOR_PROCESSOR);
ContextIndexSearcher searcher = new ContextIndexSearcher(
filteredReader,
Expand Down
Loading

0 comments on commit f27d3d9

Please sign in to comment.