Skip to content

Commit 2a5ba92

Browse files
author
Andre van de Ven
committed
added timing for entire fetch phase
1 parent afb08a0 commit 2a5ba92

File tree

11 files changed

+307
-73
lines changed

11 files changed

+307
-73
lines changed

server/src/main/java/org/opensearch/search/SearchService.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,9 @@
128128
import org.opensearch.search.internal.ShardSearchContextId;
129129
import org.opensearch.search.internal.ShardSearchRequest;
130130
import org.opensearch.search.lookup.SearchLookup;
131+
import org.opensearch.search.profile.ProfileShardResult;
131132
import org.opensearch.search.profile.Profilers;
133+
import org.opensearch.search.profile.SearchProfileShardResults;
132134
import org.opensearch.search.query.QueryPhase;
133135
import org.opensearch.search.query.QuerySearchRequest;
134136
import org.opensearch.search.query.QuerySearchResult;
@@ -756,6 +758,10 @@ private QueryFetchSearchResult executeFetchPhase(ReaderContext reader, SearchCon
756758
try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context, true, afterQueryTime)) {
757759
shortcutDocIdsToLoad(context);
758760
fetchPhase.execute(context);
761+
if (context.getProfilers() != null) {
762+
ProfileShardResult shardResults = SearchProfileShardResults.buildShardResults(context.getProfilers(), context.request());
763+
context.queryResult().profileResults(shardResults);
764+
}
759765
if (reader.singleSession()) {
760766
freeReaderContext(reader.id());
761767
}

server/src/main/java/org/opensearch/search/fetch/FetchPhase.java

Lines changed: 84 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@
7070
import org.opensearch.search.internal.SearchContext;
7171
import org.opensearch.search.lookup.SearchLookup;
7272
import org.opensearch.search.lookup.SourceLookup;
73+
import org.opensearch.search.profile.Timer;
74+
import org.opensearch.search.profile.fetch.FetchTimingType;
7375

7476
import java.io.IOException;
7577
import java.util.ArrayList;
@@ -103,92 +105,103 @@ public FetchPhase(List<FetchSubPhase> fetchSubPhases) {
103105
}
104106

105107
public void execute(SearchContext context) {
106-
if (LOGGER.isTraceEnabled()) {
107-
LOGGER.trace("{}", new SearchContextSourcePrinter(context));
108+
Timer timer = null;
109+
if (context.getProfilers() != null) {
110+
timer = context.getProfilers().getFetchProfiler().getQueryBreakdown("fetch").getTimer(FetchTimingType.EXECUTE_FETCH_PHASE);
111+
timer.start();
108112
}
113+
try {
114+
if (LOGGER.isTraceEnabled()) {
115+
LOGGER.trace("{}", new SearchContextSourcePrinter(context));
116+
}
109117

110-
if (context.isCancelled()) {
111-
throw new TaskCancelledException("cancelled task with reason: " + context.getTask().getReasonCancelled());
112-
}
118+
if (context.isCancelled()) {
119+
throw new TaskCancelledException("cancelled task with reason: " + context.getTask().getReasonCancelled());
120+
}
113121

114-
if (context.docIdsToLoadSize() == 0) {
115-
// no individual hits to process, so we shortcut
116-
context.fetchResult()
117-
.hits(new SearchHits(new SearchHit[0], context.queryResult().getTotalHits(), context.queryResult().getMaxScore()));
118-
return;
119-
}
122+
if (context.docIdsToLoadSize() == 0) {
123+
// no individual hits to process, so we shortcut
124+
context.fetchResult()
125+
.hits(new SearchHits(new SearchHit[0], context.queryResult().getTotalHits(), context.queryResult().getMaxScore()));
126+
return;
127+
}
120128

121-
DocIdToIndex[] docs = new DocIdToIndex[context.docIdsToLoadSize()];
122-
for (int index = 0; index < context.docIdsToLoadSize(); index++) {
123-
docs[index] = new DocIdToIndex(context.docIdsToLoad()[context.docIdsToLoadFrom() + index], index);
124-
}
125-
// make sure that we iterate in doc id order
126-
Arrays.sort(docs);
129+
DocIdToIndex[] docs = new DocIdToIndex[context.docIdsToLoadSize()];
130+
for (int index = 0; index < context.docIdsToLoadSize(); index++) {
131+
docs[index] = new DocIdToIndex(context.docIdsToLoad()[context.docIdsToLoadFrom() + index], index);
132+
}
133+
// make sure that we iterate in doc id order
134+
Arrays.sort(docs);
127135

128-
Map<String, Set<String>> storedToRequestedFields = new HashMap<>();
129-
FieldsVisitor fieldsVisitor = createStoredFieldsVisitor(context, storedToRequestedFields);
136+
Map<String, Set<String>> storedToRequestedFields = new HashMap<>();
137+
FieldsVisitor fieldsVisitor = createStoredFieldsVisitor(context, storedToRequestedFields);
130138

131-
FetchContext fetchContext = new FetchContext(context);
139+
FetchContext fetchContext = new FetchContext(context);
132140

133-
SearchHit[] hits = new SearchHit[context.docIdsToLoadSize()];
141+
SearchHit[] hits = new SearchHit[context.docIdsToLoadSize()];
134142

135-
List<FetchSubPhaseProcessor> processors = getProcessors(context.shardTarget(), fetchContext);
143+
List<FetchSubPhaseProcessor> processors = getProcessors(context.shardTarget(), fetchContext);
136144

137-
int currentReaderIndex = -1;
138-
LeafReaderContext currentReaderContext = null;
139-
CheckedBiConsumer<Integer, FieldsVisitor, IOException> fieldReader = null;
140-
boolean hasSequentialDocs = hasSequentialDocs(docs);
141-
for (int index = 0; index < context.docIdsToLoadSize(); index++) {
142-
if (context.isCancelled()) {
143-
throw new TaskCancelledException("cancelled task with reason: " + context.getTask().getReasonCancelled());
144-
}
145-
int docId = docs[index].docId;
146-
try {
147-
int readerIndex = ReaderUtil.subIndex(docId, context.searcher().getIndexReader().leaves());
148-
if (currentReaderIndex != readerIndex) {
149-
currentReaderContext = context.searcher().getIndexReader().leaves().get(readerIndex);
150-
currentReaderIndex = readerIndex;
151-
if (currentReaderContext.reader() instanceof SequentialStoredFieldsLeafReader
152-
&& hasSequentialDocs
153-
&& docs.length >= 10) {
154-
// All the docs to fetch are adjacent but Lucene stored fields are optimized
155-
// for random access and don't optimize for sequential access - except for merging.
156-
// So we do a little hack here and pretend we're going to do merges in order to
157-
// get better sequential access.
158-
SequentialStoredFieldsLeafReader lf = (SequentialStoredFieldsLeafReader) currentReaderContext.reader();
159-
fieldReader = lf.getSequentialStoredFieldsReader()::document;
160-
} else {
161-
fieldReader = currentReaderContext.reader().storedFields()::document;
145+
int currentReaderIndex = -1;
146+
LeafReaderContext currentReaderContext = null;
147+
CheckedBiConsumer<Integer, FieldsVisitor, IOException> fieldReader = null;
148+
boolean hasSequentialDocs = hasSequentialDocs(docs);
149+
for (int index = 0; index < context.docIdsToLoadSize(); index++) {
150+
if (context.isCancelled()) {
151+
throw new TaskCancelledException("cancelled task with reason: " + context.getTask().getReasonCancelled());
152+
}
153+
int docId = docs[index].docId;
154+
try {
155+
int readerIndex = ReaderUtil.subIndex(docId, context.searcher().getIndexReader().leaves());
156+
if (currentReaderIndex != readerIndex) {
157+
currentReaderContext = context.searcher().getIndexReader().leaves().get(readerIndex);
158+
currentReaderIndex = readerIndex;
159+
if (currentReaderContext.reader() instanceof SequentialStoredFieldsLeafReader
160+
&& hasSequentialDocs
161+
&& docs.length >= 10) {
162+
// All the docs to fetch are adjacent but Lucene stored fields are optimized
163+
// for random access and don't optimize for sequential access - except for merging.
164+
// So we do a little hack here and pretend we're going to do merges in order to
165+
// get better sequential access.
166+
SequentialStoredFieldsLeafReader lf = (SequentialStoredFieldsLeafReader) currentReaderContext.reader();
167+
fieldReader = lf.getSequentialStoredFieldsReader()::document;
168+
} else {
169+
fieldReader = currentReaderContext.reader().storedFields()::document;
170+
}
171+
for (FetchSubPhaseProcessor processor : processors) {
172+
processor.setNextReader(currentReaderContext);
173+
}
162174
}
175+
assert currentReaderContext != null;
176+
HitContext hit = prepareHitContext(
177+
context,
178+
fetchContext.searchLookup(),
179+
fieldsVisitor,
180+
docId,
181+
storedToRequestedFields,
182+
currentReaderContext,
183+
fieldReader
184+
);
163185
for (FetchSubPhaseProcessor processor : processors) {
164-
processor.setNextReader(currentReaderContext);
186+
processor.process(hit);
165187
}
188+
hits[docs[index].index] = hit.hit();
189+
} catch (Exception e) {
190+
throw new FetchPhaseExecutionException(context.shardTarget(), "Error running fetch phase for doc [" + docId + "]", e);
166191
}
167-
assert currentReaderContext != null;
168-
HitContext hit = prepareHitContext(
169-
context,
170-
fetchContext.searchLookup(),
171-
fieldsVisitor,
172-
docId,
173-
storedToRequestedFields,
174-
currentReaderContext,
175-
fieldReader
176-
);
177-
for (FetchSubPhaseProcessor processor : processors) {
178-
processor.process(hit);
179-
}
180-
hits[docs[index].index] = hit.hit();
181-
} catch (Exception e) {
182-
throw new FetchPhaseExecutionException(context.shardTarget(), "Error running fetch phase for doc [" + docId + "]", e);
183192
}
184-
}
185-
if (context.isCancelled()) {
186-
throw new TaskCancelledException("cancelled task with reason: " + context.getTask().getReasonCancelled());
187-
}
188-
189-
TotalHits totalHits = context.queryResult().getTotalHits();
190-
context.fetchResult().hits(new SearchHits(hits, totalHits, context.queryResult().getMaxScore()));
193+
if (context.isCancelled()) {
194+
throw new TaskCancelledException("cancelled task with reason: " + context.getTask().getReasonCancelled());
195+
}
191196

197+
TotalHits totalHits = context.queryResult().getTotalHits();
198+
context.fetchResult().hits(new SearchHits(hits, totalHits, context.queryResult().getMaxScore()));
199+
} finally {
200+
if (timer != null) {
201+
timer.stop();
202+
context.getProfilers().getFetchProfiler().pollLastElement();
203+
}
204+
}
192205
}
193206

194207
List<FetchSubPhaseProcessor> getProcessors(SearchShardTarget target, FetchContext context) {

server/src/main/java/org/opensearch/search/profile/ProfileShardResult.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.opensearch.core.common.io.stream.StreamOutput;
3838
import org.opensearch.core.common.io.stream.Writeable;
3939
import org.opensearch.search.profile.aggregation.AggregationProfileShardResult;
40+
import org.opensearch.search.profile.fetch.FetchProfileShardResult;
4041
import org.opensearch.search.profile.query.QueryProfileShardResult;
4142

4243
import java.io.IOException;
@@ -56,14 +57,19 @@ public class ProfileShardResult implements Writeable {
5657

5758
private final AggregationProfileShardResult aggProfileShardResult;
5859

60+
private final FetchProfileShardResult fetchProfileResult;
61+
5962
private NetworkTime networkTime;
6063

6164
public ProfileShardResult(
6265
List<QueryProfileShardResult> queryProfileResults,
6366
AggregationProfileShardResult aggProfileShardResult,
67+
FetchProfileShardResult fetchProfileResult,
68+
6469
NetworkTime networkTime
6570
) {
6671
this.aggProfileShardResult = aggProfileShardResult;
72+
this.fetchProfileResult = fetchProfileResult;
6773
this.queryProfileResults = Collections.unmodifiableList(queryProfileResults);
6874
this.networkTime = networkTime;
6975
}
@@ -77,6 +83,7 @@ public ProfileShardResult(StreamInput in) throws IOException {
7783
}
7884
this.queryProfileResults = Collections.unmodifiableList(queryProfileResults);
7985
this.aggProfileShardResult = new AggregationProfileShardResult(in);
86+
this.fetchProfileResult = new FetchProfileShardResult(in);
8087
this.networkTime = new NetworkTime(in);
8188
}
8289

@@ -87,6 +94,7 @@ public void writeTo(StreamOutput out) throws IOException {
8794
queryShardResult.writeTo(out);
8895
}
8996
aggProfileShardResult.writeTo(out);
97+
fetchProfileResult.writeTo(out);
9098
networkTime.writeTo(out);
9199
}
92100

@@ -98,6 +106,10 @@ public AggregationProfileShardResult getAggregationProfileResults() {
98106
return aggProfileShardResult;
99107
}
100108

109+
public FetchProfileShardResult getFetchProfileResult() {
110+
return fetchProfileResult;
111+
}
112+
101113
public NetworkTime getNetworkTime() {
102114
return networkTime;
103115
}

server/src/main/java/org/opensearch/search/profile/Profilers.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.opensearch.search.internal.ContextIndexSearcher;
3737
import org.opensearch.search.profile.aggregation.AggregationProfiler;
3838
import org.opensearch.search.profile.aggregation.ConcurrentAggregationProfiler;
39+
import org.opensearch.search.profile.fetch.FetchProfiler;
3940
import org.opensearch.search.profile.query.ConcurrentQueryProfileTree;
4041
import org.opensearch.search.profile.query.ConcurrentQueryProfiler;
4142
import org.opensearch.search.profile.query.InternalQueryProfileTree;
@@ -56,6 +57,7 @@ public final class Profilers {
5657
private final ContextIndexSearcher searcher;
5758
private final List<QueryProfiler> queryProfilers;
5859
private final AggregationProfiler aggProfiler;
60+
private final FetchProfiler fetchProfiler;
5961
private final boolean isConcurrentSegmentSearchEnabled;
6062

6163
/** Sole constructor. This {@link Profilers} instance will initially wrap one {@link QueryProfiler}. */
@@ -64,6 +66,7 @@ public Profilers(ContextIndexSearcher searcher, boolean isConcurrentSegmentSearc
6466
this.isConcurrentSegmentSearchEnabled = isConcurrentSegmentSearchEnabled;
6567
this.queryProfilers = new ArrayList<>();
6668
this.aggProfiler = isConcurrentSegmentSearchEnabled ? new ConcurrentAggregationProfiler() : new AggregationProfiler();
69+
this.fetchProfiler = new FetchProfiler();
6770
addQueryProfiler();
6871
}
6972

@@ -92,4 +95,8 @@ public AggregationProfiler getAggregationProfiler() {
9295
return aggProfiler;
9396
}
9497

98+
public FetchProfiler getFetchProfiler() {
99+
return fetchProfiler;
100+
}
101+
95102
}

server/src/main/java/org/opensearch/search/profile/SearchProfileShardResults.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@
4141
import org.opensearch.search.internal.ShardSearchRequest;
4242
import org.opensearch.search.profile.aggregation.AggregationProfileShardResult;
4343
import org.opensearch.search.profile.aggregation.AggregationProfiler;
44+
import org.opensearch.search.profile.fetch.FetchProfileShardResult;
45+
import org.opensearch.search.profile.fetch.FetchProfiler;
46+
import org.opensearch.search.profile.fetch.FetchTimingType;
4447
import org.opensearch.search.profile.query.QueryProfileShardResult;
4548
import org.opensearch.search.profile.query.QueryProfiler;
4649

@@ -117,6 +120,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
117120
}
118121
builder.endArray();
119122
profileShardResult.getAggregationProfileResults().toXContent(builder, params);
123+
profileShardResult.getFetchProfileResult().toXContent(builder, params);
120124
builder.endObject();
121125
}
122126
builder.endArray().endObject();
@@ -149,6 +153,7 @@ private static void parseSearchProfileResultsEntry(XContentParser parser, Map<St
149153
ensureExpectedToken(XContentParser.Token.START_OBJECT, token, parser);
150154
List<QueryProfileShardResult> queryProfileResults = new ArrayList<>();
151155
AggregationProfileShardResult aggProfileShardResult = null;
156+
FetchProfileShardResult fetchProfileShardResult = null;
152157
String id = null;
153158
String currentFieldName = null;
154159
long inboundNetworkTime = 0;
@@ -173,6 +178,8 @@ private static void parseSearchProfileResultsEntry(XContentParser parser, Map<St
173178
}
174179
} else if (AggregationProfileShardResult.AGGREGATIONS.equals(currentFieldName)) {
175180
aggProfileShardResult = AggregationProfileShardResult.fromXContent(parser);
181+
} else if (FetchProfileShardResult.FETCH.equals(currentFieldName)) {
182+
fetchProfileShardResult = FetchProfileShardResult.fromXContent(parser);
176183
} else {
177184
parser.skipChildren();
178185
}
@@ -181,7 +188,7 @@ private static void parseSearchProfileResultsEntry(XContentParser parser, Map<St
181188
}
182189
}
183190
NetworkTime networkTime = new NetworkTime(inboundNetworkTime, outboundNetworkTime);
184-
searchProfileResults.put(id, new ProfileShardResult(queryProfileResults, aggProfileShardResult, networkTime));
191+
searchProfileResults.put(id, new ProfileShardResult(queryProfileResults, aggProfileShardResult, fetchProfileShardResult, networkTime));
185192
}
186193

187194
/**
@@ -196,6 +203,7 @@ private static void parseSearchProfileResultsEntry(XContentParser parser, Map<St
196203
public static ProfileShardResult buildShardResults(Profilers profilers, ShardSearchRequest request) {
197204
List<QueryProfiler> queryProfilers = profilers.getQueryProfilers();
198205
AggregationProfiler aggProfiler = profilers.getAggregationProfiler();
206+
FetchProfiler fetchProfiler = profilers.getFetchProfiler();
199207
List<QueryProfileShardResult> queryResults = new ArrayList<>(queryProfilers.size());
200208
for (QueryProfiler queryProfiler : queryProfilers) {
201209
QueryProfileShardResult result = new QueryProfileShardResult(
@@ -206,11 +214,17 @@ public static ProfileShardResult buildShardResults(Profilers profilers, ShardSea
206214
queryResults.add(result);
207215
}
208216
AggregationProfileShardResult aggResults = new AggregationProfileShardResult(aggProfiler.getTree());
217+
long fetchTime = 0L;
218+
List<ProfileResult> fetchTree = fetchProfiler.getTree();
219+
if (!fetchTree.isEmpty()) {
220+
fetchTime = fetchTree.get(0).getTime();
221+
}
222+
FetchProfileShardResult fetchResult = new FetchProfileShardResult(fetchTime);
209223
NetworkTime networkTime = new NetworkTime(0, 0);
210224
if (request != null) {
211225
networkTime.setInboundNetworkTime(request.getInboundNetworkTime());
212226
networkTime.setOutboundNetworkTime(request.getOutboundNetworkTime());
213227
}
214-
return new ProfileShardResult(queryResults, aggResults, networkTime);
228+
return new ProfileShardResult(queryResults, aggResults, fetchResult, networkTime);
215229
}
216230
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.search.profile.fetch;
10+
11+
import org.opensearch.search.profile.AbstractProfileBreakdown;
12+
13+
14+
public class FetchProfileBreakdown extends AbstractProfileBreakdown<FetchTimingType> {
15+
public FetchProfileBreakdown() {
16+
super(FetchTimingType.class);
17+
}
18+
}

0 commit comments

Comments
 (0)