Skip to content

Commit faaff37

Browse files
author
Andre van de Ven
committed
added more granular fetch profiling
1 parent 2a5ba92 commit faaff37

File tree

6 files changed

+166
-60
lines changed

6 files changed

+166
-60
lines changed

.idea/runConfigurations/Debug_OpenSearch.xml

Lines changed: 5 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

server/src/main/java/org/opensearch/search/fetch/FetchPhase.java

Lines changed: 114 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@
7171
import org.opensearch.search.lookup.SearchLookup;
7272
import org.opensearch.search.lookup.SourceLookup;
7373
import org.opensearch.search.profile.Timer;
74+
import org.opensearch.search.profile.fetch.FetchProfileBreakdown;
7475
import org.opensearch.search.profile.fetch.FetchTimingType;
7576

7677
import java.io.IOException;
@@ -105,10 +106,9 @@ public FetchPhase(List<FetchSubPhase> fetchSubPhases) {
105106
}
106107

107108
public void execute(SearchContext context) {
108-
Timer timer = null;
109+
FetchProfileBreakdown profile = null;
109110
if (context.getProfilers() != null) {
110-
timer = context.getProfilers().getFetchProfiler().getQueryBreakdown("fetch").getTimer(FetchTimingType.EXECUTE_FETCH_PHASE);
111-
timer.start();
111+
profile = context.getProfilers().getFetchProfiler().getQueryBreakdown("fetch");
112112
}
113113
try {
114114
if (LOGGER.isTraceEnabled()) {
@@ -131,16 +131,39 @@ public void execute(SearchContext context) {
131131
docs[index] = new DocIdToIndex(context.docIdsToLoad()[context.docIdsToLoadFrom() + index], index);
132132
}
133133
// make sure that we iterate in doc id order
134-
Arrays.sort(docs);
134+
if (profile != null) {
135+
Timer t = profile.getTimer(FetchTimingType.SORT_DOC_IDS);
136+
t.start();
137+
Arrays.sort(docs);
138+
t.stop();
139+
} else {
140+
Arrays.sort(docs);
141+
}
135142

136143
Map<String, Set<String>> storedToRequestedFields = new HashMap<>();
137-
FieldsVisitor fieldsVisitor = createStoredFieldsVisitor(context, storedToRequestedFields);
144+
FieldsVisitor fieldsVisitor;
145+
if (profile != null) {
146+
Timer t = profile.getTimer(FetchTimingType.CREATE_STORED_FIELDS_VISITOR);
147+
t.start();
148+
fieldsVisitor = createStoredFieldsVisitor(context, storedToRequestedFields);
149+
t.stop();
150+
} else {
151+
fieldsVisitor = createStoredFieldsVisitor(context, storedToRequestedFields);
152+
}
138153

139154
FetchContext fetchContext = new FetchContext(context);
140155

141156
SearchHit[] hits = new SearchHit[context.docIdsToLoadSize()];
142157

143-
List<FetchSubPhaseProcessor> processors = getProcessors(context.shardTarget(), fetchContext);
158+
List<Tuple<FetchSubPhaseProcessor, FetchSubPhase>> processors;
159+
if (profile != null) {
160+
Timer t = profile.getTimer(FetchTimingType.BUILD_SUB_PHASE_PROCESSORS);
161+
t.start();
162+
processors = getProcessors(context.shardTarget(), fetchContext);
163+
t.stop();
164+
} else {
165+
processors = getProcessors(context.shardTarget(), fetchContext);
166+
}
144167

145168
int currentReaderIndex = -1;
146169
LeafReaderContext currentReaderContext = null;
@@ -154,7 +177,14 @@ public void execute(SearchContext context) {
154177
try {
155178
int readerIndex = ReaderUtil.subIndex(docId, context.searcher().getIndexReader().leaves());
156179
if (currentReaderIndex != readerIndex) {
157-
currentReaderContext = context.searcher().getIndexReader().leaves().get(readerIndex);
180+
if (profile != null) {
181+
Timer t = profile.getTimer(FetchTimingType.GET_LEAF_READER);
182+
t.start();
183+
currentReaderContext = context.searcher().getIndexReader().leaves().get(readerIndex);
184+
t.stop();
185+
} else {
186+
currentReaderContext = context.searcher().getIndexReader().leaves().get(readerIndex);
187+
}
158188
currentReaderIndex = readerIndex;
159189
if (currentReaderContext.reader() instanceof SequentialStoredFieldsLeafReader
160190
&& hasSequentialDocs
@@ -168,22 +198,48 @@ public void execute(SearchContext context) {
168198
} else {
169199
fieldReader = currentReaderContext.reader().storedFields()::document;
170200
}
171-
for (FetchSubPhaseProcessor processor : processors) {
172-
processor.setNextReader(currentReaderContext);
201+
for (Tuple<FetchSubPhaseProcessor, FetchSubPhase> p: processors) {
202+
p.v1().setNextReader(currentReaderContext);
173203
}
174204
}
175205
assert currentReaderContext != null;
176-
HitContext hit = prepareHitContext(
177-
context,
178-
fetchContext.searchLookup(),
179-
fieldsVisitor,
180-
docId,
181-
storedToRequestedFields,
182-
currentReaderContext,
183-
fieldReader
184-
);
185-
for (FetchSubPhaseProcessor processor : processors) {
186-
processor.process(hit);
206+
HitContext hit;
207+
if (profile != null) {
208+
Timer t = profile.getTimer(FetchTimingType.PREPARE_HIT_CONTEXT);
209+
t.start();
210+
hit = prepareHitContext(
211+
context,
212+
fetchContext.searchLookup(),
213+
fieldsVisitor,
214+
docId,
215+
storedToRequestedFields,
216+
currentReaderContext,
217+
fieldReader
218+
);
219+
t.stop();
220+
} else {
221+
hit = prepareHitContext(
222+
context,
223+
fetchContext.searchLookup(),
224+
fieldsVisitor,
225+
docId,
226+
storedToRequestedFields,
227+
currentReaderContext,
228+
fieldReader
229+
);
230+
}
231+
for (Tuple<FetchSubPhaseProcessor, FetchSubPhase> p : processors) {
232+
if (profile != null) {
233+
FetchTimingType tt = timingTypeFor(p.v2());
234+
if (tt != null) {
235+
Timer st = profile.getTimer(tt);
236+
st.start();
237+
p.v1().process(hit);
238+
st.stop();
239+
continue;
240+
}
241+
}
242+
p.v1().process(hit);
187243
}
188244
hits[docs[index].index] = hit.hit();
189245
} catch (Exception e) {
@@ -195,22 +251,28 @@ public void execute(SearchContext context) {
195251
}
196252

197253
TotalHits totalHits = context.queryResult().getTotalHits();
198-
context.fetchResult().hits(new SearchHits(hits, totalHits, context.queryResult().getMaxScore()));
254+
if (profile != null) {
255+
Timer t = profile.getTimer(FetchTimingType.BUILD_SEARCH_HITS);
256+
t.start();
257+
context.fetchResult().hits(new SearchHits(hits, totalHits, context.queryResult().getMaxScore()));
258+
t.stop();
259+
} else {
260+
context.fetchResult().hits(new SearchHits(hits, totalHits, context.queryResult().getMaxScore()));
261+
}
199262
} finally {
200-
if (timer != null) {
201-
timer.stop();
263+
if (profile != null) {
202264
context.getProfilers().getFetchProfiler().pollLastElement();
203265
}
204266
}
205267
}
206268

207-
List<FetchSubPhaseProcessor> getProcessors(SearchShardTarget target, FetchContext context) {
269+
List<Tuple<FetchSubPhaseProcessor, FetchSubPhase>> getProcessors(SearchShardTarget target, FetchContext context) {
208270
try {
209-
List<FetchSubPhaseProcessor> processors = new ArrayList<>();
271+
List<Tuple<FetchSubPhaseProcessor, FetchSubPhase>> processors = new ArrayList<>();
210272
for (FetchSubPhase fsp : fetchSubPhases) {
211273
FetchSubPhaseProcessor processor = fsp.getProcessor(context);
212274
if (processor != null) {
213-
processors.add(processor);
275+
processors.add(new Tuple<>(processor, fsp));
214276
}
215277
}
216278
return processors;
@@ -219,6 +281,32 @@ List<FetchSubPhaseProcessor> getProcessors(SearchShardTarget target, FetchContex
219281
}
220282
}
221283

284+
private FetchTimingType timingTypeFor(FetchSubPhase phase) {
285+
if (phase instanceof org.opensearch.search.fetch.subphase.ExplainPhase) {
286+
return FetchTimingType.EXPLAIN;
287+
} else if (phase instanceof org.opensearch.search.fetch.subphase.FetchDocValuesPhase) {
288+
return FetchTimingType.FETCH_DOC_VALUES;
289+
} else if (phase instanceof org.opensearch.search.fetch.subphase.ScriptFieldsPhase) {
290+
return FetchTimingType.SCRIPT_FIELDS;
291+
} else if (phase instanceof org.opensearch.search.fetch.subphase.FetchSourcePhase) {
292+
return FetchTimingType.FETCH_SOURCE;
293+
} else if (phase instanceof org.opensearch.search.fetch.subphase.FetchFieldsPhase) {
294+
return FetchTimingType.FETCH_FIELDS;
295+
} else if (phase instanceof org.opensearch.search.fetch.subphase.FetchVersionPhase) {
296+
return FetchTimingType.FETCH_VERSION;
297+
} else if (phase instanceof org.opensearch.search.fetch.subphase.SeqNoPrimaryTermPhase) {
298+
return FetchTimingType.SEQ_NO_PRIMARY_TERM;
299+
} else if (phase instanceof org.opensearch.search.fetch.subphase.MatchedQueriesPhase) {
300+
return FetchTimingType.MATCHED_QUERIES;
301+
} else if (phase instanceof org.opensearch.search.fetch.subphase.highlight.HighlightPhase) {
302+
return FetchTimingType.HIGHLIGHT;
303+
} else if (phase instanceof org.opensearch.search.fetch.subphase.FetchScorePhase) {
304+
return FetchTimingType.FETCH_SCORE;
305+
} else {
306+
return null;
307+
}
308+
}
309+
222310
static class DocIdToIndex implements Comparable<DocIdToIndex> {
223311
final int docId;
224312
final int index;

server/src/main/java/org/opensearch/search/profile/SearchProfileShardResults.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -214,12 +214,8 @@ public static ProfileShardResult buildShardResults(Profilers profilers, ShardSea
214214
queryResults.add(result);
215215
}
216216
AggregationProfileShardResult aggResults = new AggregationProfileShardResult(aggProfiler.getTree());
217-
long fetchTime = 0L;
218217
List<ProfileResult> fetchTree = fetchProfiler.getTree();
219-
if (!fetchTree.isEmpty()) {
220-
fetchTime = fetchTree.get(0).getTime();
221-
}
222-
FetchProfileShardResult fetchResult = new FetchProfileShardResult(fetchTime);
218+
FetchProfileShardResult fetchResult = new FetchProfileShardResult(fetchTree);
223219
NetworkTime networkTime = new NetworkTime(0, 0);
224220
if (request != null) {
225221
networkTime.setInboundNetworkTime(request.getInboundNetworkTime());

server/src/main/java/org/opensearch/search/profile/fetch/FetchProfileShardResult.java

Lines changed: 29 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -15,58 +15,61 @@
1515
import org.opensearch.core.xcontent.ToXContentFragment;
1616
import org.opensearch.core.xcontent.XContentBuilder;
1717
import org.opensearch.core.xcontent.XContentParser;
18+
import org.opensearch.search.profile.ProfileResult;
1819

1920
import java.io.IOException;
21+
import java.util.ArrayList;
22+
import java.util.Collections;
23+
import java.util.List;
2024

2125
import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
2226
@ExperimentalApi()
2327
public class FetchProfileShardResult implements Writeable, ToXContentFragment {
2428
public static final String FETCH = "fetch";
25-
public static final String TIME_IN_NANOS = "time_in_nanos";
2629

27-
private final long fetchTime;
30+
private final List<ProfileResult> fetchProfileResults;
2831

29-
public FetchProfileShardResult(long fetchTime) {
30-
this.fetchTime = fetchTime;
32+
public FetchProfileShardResult(List<ProfileResult> results) {
33+
this.fetchProfileResults = Collections.unmodifiableList(results);
3134
}
3235

3336
public FetchProfileShardResult(StreamInput in) throws IOException {
34-
this.fetchTime = in.readLong();
37+
int profileSize = in.readVInt();
38+
List<ProfileResult> tmp = new ArrayList<>(profileSize);
39+
for (int j = 0; j < profileSize; j++) {
40+
tmp.add(new ProfileResult(in));
41+
}
42+
this.fetchProfileResults = Collections.unmodifiableList(tmp);
3543
}
3644

37-
public long getFetchTime() {
38-
return fetchTime;
45+
public List<ProfileResult> getFetchProfileResults() {
46+
return fetchProfileResults;
3947
}
4048

4149
@Override
4250
public void writeTo(StreamOutput out) throws IOException {
43-
out.writeLong(fetchTime);
51+
out.writeVInt(fetchProfileResults.size());
52+
for (ProfileResult p : fetchProfileResults) {
53+
p.writeTo(out);
54+
}
4455
}
4556

4657
@Override
4758
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
48-
return builder.startObject(FETCH).field(TIME_IN_NANOS, fetchTime).endObject();
59+
builder.startArray(FETCH);
60+
for (ProfileResult p : fetchProfileResults) {
61+
p.toXContent(builder, params);
62+
}
63+
return builder.endArray();
4964
}
5065

5166
public static FetchProfileShardResult fromXContent(XContentParser parser) throws IOException {
52-
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
53-
String currentFieldName = null;
54-
long time = 0;
55-
XContentParser.Token token;
56-
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
57-
if (token == XContentParser.Token.FIELD_NAME) {
58-
currentFieldName = parser.currentName();
59-
} else if (token.isValue()) {
60-
if (TIME_IN_NANOS.equals(currentFieldName)) {
61-
time = parser.longValue();
62-
} else {
63-
parser.skipChildren();
64-
}
65-
} else {
66-
parser.skipChildren();
67-
}
67+
ensureExpectedToken(XContentParser.Token.START_ARRAY, parser.currentToken(), parser);
68+
List<ProfileResult> results = new ArrayList<>();
69+
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
70+
results.add(ProfileResult.fromXContent(parser));
6871
}
69-
return new FetchProfileShardResult(time);
72+
return new FetchProfileShardResult(results);
7073
}
7174

7275

server/src/main/java/org/opensearch/search/profile/fetch/FetchTimingType.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,22 @@
1212
* Timing points for fetch phase profiling.
1313
*/
1414
public enum FetchTimingType {
15-
/** Time spent executing the fetch phase. */
16-
EXECUTE_FETCH_PHASE;
15+
SORT_DOC_IDS,
16+
CREATE_STORED_FIELDS_VISITOR,
17+
BUILD_SUB_PHASE_PROCESSORS,
18+
GET_LEAF_READER,
19+
PREPARE_HIT_CONTEXT,
20+
EXPLAIN,
21+
FETCH_DOC_VALUES,
22+
SCRIPT_FIELDS,
23+
FETCH_SOURCE,
24+
FETCH_FIELDS,
25+
FETCH_VERSION,
26+
SEQ_NO_PRIMARY_TERM,
27+
MATCHED_QUERIES,
28+
HIGHLIGHT,
29+
FETCH_SCORE,
30+
BUILD_SEARCH_HITS;
1731

1832
@Override
1933
public String toString() {

server/src/main/java/org/opensearch/search/query/QueryPhase.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@ static boolean executeInternal(SearchContext searchContext, QueryPhaseSearcher q
193193
final IndexReader reader = searcher.getIndexReader();
194194
QuerySearchResult queryResult = searchContext.queryResult();
195195
queryResult.searchTimedOut(false);
196+
196197
try {
197198
queryResult.from(searchContext.from());
198199
queryResult.size(searchContext.size());

0 commit comments

Comments
 (0)