Skip to content

Commit

Permalink
switch to new optional method
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Alfonsi <petealft@amazon.com>
  • Loading branch information
Peter Alfonsi committed Feb 14, 2025
1 parent 5824a3d commit afc4dcc
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 63 deletions.
18 changes: 15 additions & 3 deletions server/src/main/java/org/opensearch/action/search/SearchPhase.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.io.IOException;
import java.util.Locale;
import java.util.Objects;
import java.util.Optional;

/**
* Base class for all individual search phases like collecting distributed frequencies, fetching documents, querying shards.
Expand Down Expand Up @@ -69,15 +70,26 @@ public String getName() {
}

/**
* Returns the SearchPhase name as {@link SearchPhaseName}. Exception will come if SearchPhase name is not defined
* Returns the SearchPhase name as {@link SearchPhaseName}. Exception will come if SearchPhase name is not defined.
* Deprecated; use getSearchPhaseNameOptional() to avoid possible exceptions.
* in {@link SearchPhaseName}
* @return {@link SearchPhaseName}
*/
@Deprecated
public SearchPhaseName getSearchPhaseName() {
return SearchPhaseName.valueOf(name.toUpperCase(Locale.ROOT));
}

/**
* Returns an Optional of the SearchPhase name as {@link SearchPhaseName}. If there's not a matching SearchPhaseName,
* returns an empty Optional.
* @return {@link Optional<SearchPhaseName>}
*/
public Optional<SearchPhaseName> getSearchPhaseNameOptional() {
try {
return SearchPhaseName.valueOf(name.toUpperCase(Locale.ROOT));
return Optional.of(SearchPhaseName.valueOf(name.toUpperCase(Locale.ROOT)));
} catch (IllegalArgumentException e) {
return SearchPhaseName.OTHER_PHASE_TYPES;
return Optional.empty();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,7 @@ public enum SearchPhaseName {
FETCH("fetch"),
DFS_QUERY("dfs_query"),
EXPAND("expand"),
CAN_MATCH("can_match"),

/**
A catch-all for other phase types which shouldn't appear in the search phase stats API.
*/
OTHER_PHASE_TYPES("other_phase_types");
CAN_MATCH("can_match");

private final String name;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,20 +73,22 @@ public long getTookMetric() {

@Override
protected void onPhaseStart(SearchPhaseContext context) {
phaseStatsMap.get(context.getCurrentPhase().getSearchPhaseName()).current.inc();
context.getCurrentPhase().getSearchPhaseNameOptional().ifPresent(name -> phaseStatsMap.get(name).current.inc());
}

@Override
protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
StatsHolder phaseStats = phaseStatsMap.get(context.getCurrentPhase().getSearchPhaseName());
phaseStats.current.dec();
phaseStats.total.inc();
phaseStats.timing.inc(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - context.getCurrentPhase().getStartTimeInNanos()));
context.getCurrentPhase().getSearchPhaseNameOptional().ifPresent(name -> {
StatsHolder phaseStats = phaseStatsMap.get(name);
phaseStats.current.dec();
phaseStats.total.inc();
phaseStats.timing.inc(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - context.getCurrentPhase().getStartTimeInNanos()));
});
}

@Override
protected void onPhaseFailure(SearchPhaseContext context, Throwable cause) {
phaseStatsMap.get(context.getCurrentPhase().getSearchPhaseName()).current.dec();
context.getCurrentPhase().getSearchPhaseNameOptional().ifPresent(name -> phaseStatsMap.get(name).current.dec());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -524,8 +524,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws

for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) {
PhaseStatsLongHolder statsLongHolder = requestStatsLongHolder.requestStatsHolder.get(searchPhaseName.getName());
// The catch-all OTHER_PHASE_TYPES should not appear in the API response.
if (statsLongHolder == null || searchPhaseName.equals(SearchPhaseName.OTHER_PHASE_TYPES)) {
if (statsLongHolder == null) {
continue;
}
builder.startObject(searchPhaseName.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,29 +399,29 @@ public void testOnPhaseFailureAndVerifyListeners() {
final List<SearchRequestOperationsListener> requestOperationListeners = List.of(testListener, assertingListener);
SearchQueryThenFetchAsyncAction action = createSearchQueryThenFetchAsyncAction(requestOperationListeners);
action.start();
assertEquals(1, testListener.getPhaseCurrent(action.getSearchPhaseName()));
assertEquals(1, testListener.getPhaseCurrent(action.getSearchPhaseNameOptional().get()));
action.onPhaseFailure(new SearchPhase("test") {
@Override
public void run() {

}
}, "message", null);
assertEquals(0, testListener.getPhaseCurrent(action.getSearchPhaseName()));
assertEquals(0, testListener.getPhaseTotal(action.getSearchPhaseName()));
assertEquals(0, testListener.getPhaseCurrent(action.getSearchPhaseNameOptional().get()));
assertEquals(0, testListener.getPhaseTotal(action.getSearchPhaseNameOptional().get()));

SearchDfsQueryThenFetchAsyncAction searchDfsQueryThenFetchAsyncAction = createSearchDfsQueryThenFetchAsyncAction(
requestOperationListeners
);
searchDfsQueryThenFetchAsyncAction.start();
assertEquals(1, testListener.getPhaseCurrent(searchDfsQueryThenFetchAsyncAction.getSearchPhaseName()));
assertEquals(1, testListener.getPhaseCurrent(searchDfsQueryThenFetchAsyncAction.getSearchPhaseNameOptional().get()));
searchDfsQueryThenFetchAsyncAction.onPhaseFailure(new SearchPhase("test") {
@Override
public void run() {

}
}, "message", null);
assertEquals(0, testListener.getPhaseCurrent(action.getSearchPhaseName()));
assertEquals(0, testListener.getPhaseTotal(action.getSearchPhaseName()));
assertEquals(0, testListener.getPhaseCurrent(action.getSearchPhaseNameOptional().get()));
assertEquals(0, testListener.getPhaseTotal(action.getSearchPhaseNameOptional().get()));

FetchSearchPhase fetchPhase = createFetchSearchPhase();
ShardId shardId = new ShardId(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLength(10), randomInt());
Expand All @@ -430,15 +430,15 @@ public void run() {
action.skipShard(searchShardIterator);
action.start();
action.executeNextPhase(action, fetchPhase);
assertEquals(1, testListener.getPhaseCurrent(fetchPhase.getSearchPhaseName()));
assertEquals(1, testListener.getPhaseCurrent(fetchPhase.getSearchPhaseNameOptional().get()));
action.onPhaseFailure(new SearchPhase("test") {
@Override
public void run() {

}
}, "message", null);
assertEquals(0, testListener.getPhaseCurrent(fetchPhase.getSearchPhaseName()));
assertEquals(0, testListener.getPhaseTotal(fetchPhase.getSearchPhaseName()));
assertEquals(0, testListener.getPhaseCurrent(fetchPhase.getSearchPhaseNameOptional().get()));
assertEquals(0, testListener.getPhaseTotal(fetchPhase.getSearchPhaseNameOptional().get()));
}

public void testOnPhaseFailure() {
Expand Down Expand Up @@ -722,7 +722,7 @@ public void testOnPhaseListenersWithQueryAndThenFetchType() throws InterruptedEx
action.start();

// Verify queryPhase current metric
assertEquals(1, testListener.getPhaseCurrent(action.getSearchPhaseName()));
assertEquals(1, testListener.getPhaseCurrent(action.getSearchPhaseNameOptional().get()));
TimeUnit.MILLISECONDS.sleep(delay);

FetchSearchPhase fetchPhase = createFetchSearchPhase();
Expand All @@ -733,31 +733,31 @@ public void testOnPhaseListenersWithQueryAndThenFetchType() throws InterruptedEx
action.executeNextPhase(action, fetchPhase);

// Verify queryPhase total, current and latency metrics
assertEquals(0, testListener.getPhaseCurrent(action.getSearchPhaseName()));
assertThat(testListener.getPhaseMetric(action.getSearchPhaseName()), greaterThanOrEqualTo(delay));
assertEquals(1, testListener.getPhaseTotal(action.getSearchPhaseName()));
assertEquals(0, testListener.getPhaseCurrent(action.getSearchPhaseNameOptional().get()));
assertThat(testListener.getPhaseMetric(action.getSearchPhaseNameOptional().get()), greaterThanOrEqualTo(delay));
assertEquals(1, testListener.getPhaseTotal(action.getSearchPhaseNameOptional().get()));

// Verify fetchPhase current metric
assertEquals(1, testListener.getPhaseCurrent(fetchPhase.getSearchPhaseName()));
assertEquals(1, testListener.getPhaseCurrent(fetchPhase.getSearchPhaseNameOptional().get()));
TimeUnit.MILLISECONDS.sleep(delay);

ExpandSearchPhase expandPhase = createExpandSearchPhase();
action.executeNextPhase(fetchPhase, expandPhase);
TimeUnit.MILLISECONDS.sleep(delay);

// Verify fetchPhase total, current and latency metrics
assertThat(testListener.getPhaseMetric(fetchPhase.getSearchPhaseName()), greaterThanOrEqualTo(delay));
assertEquals(1, testListener.getPhaseTotal(fetchPhase.getSearchPhaseName()));
assertEquals(0, testListener.getPhaseCurrent(fetchPhase.getSearchPhaseName()));
assertThat(testListener.getPhaseMetric(fetchPhase.getSearchPhaseNameOptional().get()), greaterThanOrEqualTo(delay));
assertEquals(1, testListener.getPhaseTotal(fetchPhase.getSearchPhaseNameOptional().get()));
assertEquals(0, testListener.getPhaseCurrent(fetchPhase.getSearchPhaseNameOptional().get()));

assertEquals(1, testListener.getPhaseCurrent(expandPhase.getSearchPhaseName()));
assertEquals(1, testListener.getPhaseCurrent(expandPhase.getSearchPhaseNameOptional().get()));

action.executeNextPhase(expandPhase, fetchPhase);
action.onPhaseDone(); /* finish phase since we don't have reponse being sent */

assertThat(testListener.getPhaseMetric(expandPhase.getSearchPhaseName()), greaterThanOrEqualTo(delay));
assertEquals(1, testListener.getPhaseTotal(expandPhase.getSearchPhaseName()));
assertEquals(0, testListener.getPhaseCurrent(expandPhase.getSearchPhaseName()));
assertThat(testListener.getPhaseMetric(expandPhase.getSearchPhaseNameOptional().get()), greaterThanOrEqualTo(delay));
assertEquals(1, testListener.getPhaseTotal(expandPhase.getSearchPhaseNameOptional().get()));
assertEquals(0, testListener.getPhaseCurrent(expandPhase.getSearchPhaseNameOptional().get()));
}

public void testOnPhaseListenersWithDfsType() throws InterruptedException {
Expand All @@ -772,7 +772,7 @@ public void testOnPhaseListenersWithDfsType() throws InterruptedException {

FetchSearchPhase fetchPhase = createFetchSearchPhase();
searchDfsQueryThenFetchAsyncAction.start();
assertEquals(1, testListener.getPhaseCurrent(searchDfsQueryThenFetchAsyncAction.getSearchPhaseName()));
assertEquals(1, testListener.getPhaseCurrent(searchDfsQueryThenFetchAsyncAction.getSearchPhaseNameOptional().get()));
TimeUnit.MILLISECONDS.sleep(delay);
ShardId shardId = new ShardId(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLength(10), randomInt());
SearchShardIterator searchShardIterator = new SearchShardIterator(null, shardId, Collections.emptyList(), OriginalIndices.NONE);
Expand All @@ -786,9 +786,12 @@ public void testOnPhaseListenersWithDfsType() throws InterruptedException {
null
); /* finalizing the fetch phase since we do adhoc phase lifecycle calls */

assertThat(testListener.getPhaseMetric(searchDfsQueryThenFetchAsyncAction.getSearchPhaseName()), greaterThanOrEqualTo(delay));
assertEquals(1, testListener.getPhaseTotal(searchDfsQueryThenFetchAsyncAction.getSearchPhaseName()));
assertEquals(0, testListener.getPhaseCurrent(searchDfsQueryThenFetchAsyncAction.getSearchPhaseName()));
assertThat(
testListener.getPhaseMetric(searchDfsQueryThenFetchAsyncAction.getSearchPhaseNameOptional().get()),
greaterThanOrEqualTo(delay)
);
assertEquals(1, testListener.getPhaseTotal(searchDfsQueryThenFetchAsyncAction.getSearchPhaseNameOptional().get()));
assertEquals(0, testListener.getPhaseCurrent(searchDfsQueryThenFetchAsyncAction.getSearchPhaseNameOptional().get()));
}

private SearchDfsQueryThenFetchAsyncAction createSearchDfsQueryThenFetchAsyncAction(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand All @@ -30,18 +31,18 @@ public void testListenersAreExecuted() {

@Override
public void onPhaseStart(SearchPhaseContext context) {
searchPhaseMap.get(context.getCurrentPhase().getSearchPhaseName()).current.inc();
searchPhaseMap.get(context.getCurrentPhase().getSearchPhaseNameOptional().get()).current.inc();
}

@Override
public void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
searchPhaseMap.get(context.getCurrentPhase().getSearchPhaseName()).current.dec();
searchPhaseMap.get(context.getCurrentPhase().getSearchPhaseName()).total.inc();
searchPhaseMap.get(context.getCurrentPhase().getSearchPhaseNameOptional().get()).current.dec();
searchPhaseMap.get(context.getCurrentPhase().getSearchPhaseNameOptional().get()).total.inc();
}

@Override
public void onPhaseFailure(SearchPhaseContext context, Throwable cause) {
searchPhaseMap.get(context.getCurrentPhase().getSearchPhaseName()).current.dec();
searchPhaseMap.get(context.getCurrentPhase().getSearchPhaseNameOptional().get()).current.dec();
}
};

Expand All @@ -61,7 +62,7 @@ public void onPhaseFailure(SearchPhaseContext context, Throwable cause) {

for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) {
when(ctx.getCurrentPhase()).thenReturn(searchPhase);
when(searchPhase.getSearchPhaseName()).thenReturn(searchPhaseName);
when(searchPhase.getSearchPhaseNameOptional()).thenReturn(Optional.of(searchPhaseName));
compositeListener.onPhaseStart(ctx);
assertEquals(totalListeners, searchPhaseMap.get(searchPhaseName).current.count());
}
Expand Down
Loading

0 comments on commit afc4dcc

Please sign in to comment.