From c07deb2d0ad8f20f4e768b39a72adb8f33a91787 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Fri, 12 Sep 2025 11:52:00 +0200 Subject: [PATCH 1/4] Delay SearchResponseActionListener wrapping to a single location --- .../action/search/TransportOpenPointInTimeAction.java | 1 + .../elasticsearch/action/search/TransportSearchAction.java | 5 +++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java index ac23731c38b84..0cbb53d3ebf4d 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java @@ -121,6 +121,7 @@ protected void doExecute(Task task, OpenPointInTimeRequest request, ActionListen .source(new SearchSourceBuilder().query(request.indexFilter())); searchRequest.setMaxConcurrentShardRequests(request.maxConcurrentShardRequests()); searchRequest.setCcsMinimizeRoundtrips(false); + transportSearchAction.executeRequest((SearchTask) task, searchRequest, listener.map(r -> { assert r.pointInTimeId() != null : r; return new OpenPointInTimeResponse( diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 154e2fcc8d539..82d6c2df520ba 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -333,7 +333,7 @@ public long buildTookInMillis() { @Override protected void doExecute(Task task, SearchRequest searchRequest, ActionListener listener) { - executeRequest((SearchTask) task, searchRequest, new SearchResponseActionListener(listener), AsyncSearchActionProvider::new); + executeRequest((SearchTask) task, searchRequest, listener, AsyncSearchActionProvider::new); } void executeRequest( @@ -372,6 +372,7 @@ void executeRequest( frozenIndexCheck(resolvedIndices); } + listener = new SearchResponseActionListener(listener); ActionListener rewriteListener = listener.delegateFailureAndWrap((delegate, rewritten) -> { if (ccsCheckCompatibility) { checkCCSVersionCompatibility(rewritten); @@ -526,7 +527,7 @@ void executeRequest( // We set the keep alive to -1 to indicate that we don't need the pit id in the response. // This is needed since we delete the pit prior to sending the response so the id doesn't exist anymore. source.pointInTimeBuilder(new PointInTimeBuilder(resp.getPointInTimeId()).setKeepAlive(TimeValue.MINUS_ONE)); - var pitListener = new SearchResponseActionListener(delegate) { + var pitListener = new ActionListener() { @Override public void onResponse(SearchResponse response) { // we need to close the PIT first so we delay the release of the response to after the closing From a44bb231ae705e4d195836b4621b04695f014c9e Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Fri, 12 Sep 2025 17:18:23 +0200 Subject: [PATCH 2/4] iter --- .../TransportOpenPointInTimeAction.java | 2 +- .../action/search/TransportSearchAction.java | 184 +++++++++--------- 2 files changed, 93 insertions(+), 93 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java index 0cbb53d3ebf4d..1038308bc6bf3 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java @@ -122,7 +122,7 @@ protected void doExecute(Task task, OpenPointInTimeRequest request, ActionListen searchRequest.setMaxConcurrentShardRequests(request.maxConcurrentShardRequests()); searchRequest.setCcsMinimizeRoundtrips(false); - transportSearchAction.executeRequest((SearchTask) task, searchRequest, listener.map(r -> { + transportSearchAction.executeOpenPit((SearchTask) task, searchRequest, listener.map(r -> { assert r.pointInTimeId() != null : r; return new OpenPointInTimeResponse( r.pointInTimeId(), diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 82d6c2df520ba..91ea50b29a66d 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -167,7 +167,7 @@ public class TransportSearchAction extends HandledTransportAction listener) { - executeRequest((SearchTask) task, searchRequest, listener, AsyncSearchActionProvider::new); + executeRequest((SearchTask) task, searchRequest, listener, AsyncSearchActionProvider::new, true); } - void executeRequest( + void executeOpenPit( SearchTask task, SearchRequest original, - ActionListener listener, + ActionListener originalListener, Function, SearchPhaseProvider> searchPhaseProvider + ) { + executeRequest(task, original, originalListener, searchPhaseProvider, false); + } + + private void executeRequest( + SearchTask task, + SearchRequest original, + ActionListener originalListener, + Function, SearchPhaseProvider> searchPhaseProvider, + boolean collectSearchTelemetry ) { final long relativeStartNanos = System.nanoTime(); final SearchTimeProvider timeProvider = new SearchTimeProvider( @@ -372,12 +382,16 @@ void executeRequest( frozenIndexCheck(resolvedIndices); } - listener = new SearchResponseActionListener(listener); - ActionListener rewriteListener = listener.delegateFailureAndWrap((delegate, rewritten) -> { + ActionListener rewriteListener = originalListener.delegateFailureAndWrap((delegate, rewritten) -> { if (ccsCheckCompatibility) { checkCCSVersionCompatibility(rewritten); } + final CCSUsage.Builder ccsUsageBuilder = new CCSUsage.Builder(); + final ActionListener searchResponseActionListener = collectSearchTelemetry + ? new SearchTelemetryListener(delegate, ccsUsageBuilder, searchResponseMetrics, usageService, collectCCSTelemetry) + : delegate; + if (resolvedIndices.getRemoteClusterIndices().isEmpty()) { executeLocalSearch( task, @@ -386,35 +400,33 @@ void executeRequest( resolvedIndices, projectState, SearchResponse.Clusters.EMPTY, - searchPhaseProvider.apply(delegate) + searchPhaseProvider.apply(searchResponseActionListener) ); } else { - if (delegate instanceof TelemetryListener tl) { - tl.setRemotes(resolvedIndices.getRemoteClusterIndices().size()); - if (task.isAsync()) { - tl.setFeature(CCSUsageTelemetry.ASYNC_FEATURE); - } - if (original.pointInTimeBuilder() != null) { - tl.setFeature(CCSUsageTelemetry.PIT_FEATURE); - } - tl.setClient(task); - // Check if any of the index patterns are wildcard patterns - var localIndices = resolvedIndices.getLocalIndices(); - if (localIndices != null && Arrays.stream(localIndices.indices()).anyMatch(Regex::isSimpleMatchPattern)) { - tl.setFeature(CCSUsageTelemetry.WILDCARD_FEATURE); - } - if (resolvedIndices.getRemoteClusterIndices() - .values() - .stream() - .anyMatch(indices -> Arrays.stream(indices.indices()).anyMatch(Regex::isSimpleMatchPattern))) { - tl.setFeature(CCSUsageTelemetry.WILDCARD_FEATURE); - } + ccsUsageBuilder.setRemotesCount(resolvedIndices.getRemoteClusterIndices().size()); + if (task.isAsync()) { + ccsUsageBuilder.setFeature(CCSUsageTelemetry.ASYNC_FEATURE); } + if (original.pointInTimeBuilder() != null) { + ccsUsageBuilder.setFeature(CCSUsageTelemetry.PIT_FEATURE); + } + ccsUsageBuilder.setClientFromTask(task); + // Check if any of the index patterns are wildcard patterns + var localIndices = resolvedIndices.getLocalIndices(); + if (localIndices != null && Arrays.stream(localIndices.indices()).anyMatch(Regex::isSimpleMatchPattern)) { + ccsUsageBuilder.setFeature(CCSUsageTelemetry.WILDCARD_FEATURE); + } + if (resolvedIndices.getRemoteClusterIndices() + .values() + .stream() + .anyMatch(indices -> Arrays.stream(indices.indices()).anyMatch(Regex::isSimpleMatchPattern))) { + ccsUsageBuilder.setFeature(CCSUsageTelemetry.WILDCARD_FEATURE); + } + final TaskId parentTaskId = task.taskInfo(clusterService.localNode().getId(), false).taskId(); if (shouldMinimizeRoundtrips(rewritten)) { - if (delegate instanceof TelemetryListener tl) { - tl.setFeature(CCSUsageTelemetry.MRT_FEATURE); - } + ccsUsageBuilder.setFeature(CCSUsageTelemetry.MRT_FEATURE); + final AggregationReduceContext.Builder aggregationReduceContextBuilder = rewritten.source() != null && rewritten.source().aggregations() != null ? searchService.aggReduceContextBuilder(task::isCancelled, rewritten.source().aggregations()) @@ -440,7 +452,7 @@ void executeRequest( aggregationReduceContextBuilder, remoteClusterService, threadPool, - delegate, + searchResponseActionListener, (r, l) -> executeLocalSearch( task, timeProvider, @@ -474,7 +486,7 @@ void executeRequest( clusters, timeProvider, transportService, - delegate.delegateFailureAndWrap((finalDelegate, searchShardsResponses) -> { + searchResponseActionListener.delegateFailureAndWrap((finalDelegate, searchShardsResponses) -> { final BiFunction clusterNodeLookup = getRemoteClusterNodeLookup( searchShardsResponses ); @@ -523,29 +535,34 @@ void executeRequest( if (shouldOpenPIT(source)) { // disabling shard reordering for request original.setPreFilterShardSize(Integer.MAX_VALUE); - openPIT(client, original, searchService.getDefaultKeepAliveInMillis(), listener.delegateFailureAndWrap((delegate, resp) -> { - // We set the keep alive to -1 to indicate that we don't need the pit id in the response. - // This is needed since we delete the pit prior to sending the response so the id doesn't exist anymore. - source.pointInTimeBuilder(new PointInTimeBuilder(resp.getPointInTimeId()).setKeepAlive(TimeValue.MINUS_ONE)); - var pitListener = new ActionListener() { - @Override - public void onResponse(SearchResponse response) { - // we need to close the PIT first so we delay the release of the response to after the closing - response.incRef(); - closePIT( - client, - original.source().pointInTimeBuilder(), - () -> ActionListener.respondAndRelease(delegate, response) - ); - } + openPIT( + client, + original, + searchService.getDefaultKeepAliveInMillis(), + originalListener.delegateFailureAndWrap((delegate, resp) -> { + // We set the keep alive to -1 to indicate that we don't need the pit id in the response. + // This is needed since we delete the pit prior to sending the response so the id doesn't exist anymore. + source.pointInTimeBuilder(new PointInTimeBuilder(resp.getPointInTimeId()).setKeepAlive(TimeValue.MINUS_ONE)); + var pitListener = new ActionListener() { + @Override + public void onResponse(SearchResponse response) { + // we need to close the PIT first so we delay the release of the response to after the closing + response.incRef(); + closePIT( + client, + original.source().pointInTimeBuilder(), + () -> ActionListener.respondAndRelease(delegate, response) + ); + } - @Override - public void onFailure(Exception e) { - closePIT(client, original.source().pointInTimeBuilder(), () -> delegate.onFailure(e)); - } - }; - executeRequest(task, original, pitListener, searchPhaseProvider); - })); + @Override + public void onFailure(Exception e) { + closePIT(client, original.source().pointInTimeBuilder(), () -> delegate.onFailure(e)); + } + }; + executeRequest(task, original, pitListener, searchPhaseProvider, true); + }) + ); } else { Rewriteable.rewriteAndFetch( original, @@ -2002,49 +2019,34 @@ static String[] ignoreBlockedIndices(ProjectState projectState, String[] concret .toArray(String[]::new); } return concreteIndices; - } - private interface TelemetryListener { - void setRemotes(int count); - - void setFeature(String feature); - - void setClient(Task task); } - private class SearchResponseActionListener extends DelegatingActionListener - implements - TelemetryListener { + private static class SearchTelemetryListener extends DelegatingActionListener { private final CCSUsage.Builder usageBuilder; - - SearchResponseActionListener(ActionListener listener) { + private final SearchResponseMetrics searchResponseMetrics; + private final UsageService usageService; + private final boolean collectCCSTelemetry; + + SearchTelemetryListener( + ActionListener listener, + CCSUsage.Builder usageBuilder, + SearchResponseMetrics searchResponseMetrics, + UsageService usageService, + boolean collectCCSTelemetry + ) { super(listener); - if (listener instanceof SearchResponseActionListener srListener) { - usageBuilder = srListener.usageBuilder; - } else { - usageBuilder = new CCSUsage.Builder(); - } + this.usageBuilder = usageBuilder; + this.searchResponseMetrics = searchResponseMetrics; + this.usageService = usageService; + this.collectCCSTelemetry = collectCCSTelemetry; } /** * Should we collect telemetry for this search? */ - private boolean collectTelemetry() { - return collectTelemetry && usageBuilder.getRemotesCount() > 0; - } - - public void setRemotes(int count) { - usageBuilder.setRemotesCount(count); - } - - @Override - public void setFeature(String feature) { - usageBuilder.setFeature(feature); - } - - @Override - public void setClient(Task task) { - usageBuilder.setClientFromTask(task); + private boolean collectCCSTelemetry() { + return collectCCSTelemetry && usageBuilder.getRemotesCount() > 0; } @Override @@ -2070,7 +2072,7 @@ public void onResponse(SearchResponse searchResponse) { } searchResponseMetrics.incrementResponseCount(responseCountTotalStatus); - if (collectTelemetry()) { + if (collectCCSTelemetry()) { extractCCSTelemetry(searchResponse); recordTelemetry(); } @@ -2085,7 +2087,7 @@ public void onResponse(SearchResponse searchResponse) { @Override public void onFailure(Exception e) { searchResponseMetrics.incrementResponseCount(SearchResponseMetrics.ResponseCountTotalStatus.FAILURE); - if (collectTelemetry()) { + if (collectCCSTelemetry()) { usageBuilder.setFailure(e); recordTelemetry(); } @@ -2110,8 +2112,6 @@ private void extractCCSTelemetry(SearchResponse searchResponse) { usageBuilder.perClusterUsage(clusterAlias, cluster.getTook()); } } - } - } } From 0b9936f68212c92135c2a1605757a71d6e115cf8 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Mon, 15 Sep 2025 11:36:08 +0200 Subject: [PATCH 3/4] iter --- .../action/admin/cluster/stats/CCSUsage.java | 5 + .../action/search/TransportSearchAction.java | 176 +++++++++--------- .../SearchTookTimeTelemetryTests.java | 28 +++ 3 files changed, 125 insertions(+), 84 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/CCSUsage.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/CCSUsage.java index 29a7dcb5d07d8..e4e16e8d14128 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/CCSUsage.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/CCSUsage.java @@ -81,6 +81,11 @@ public Builder setFeature(String feature) { return this; } + public Builder setFeatures(Set features) { + this.features.addAll(features); + return this; + } + public Builder setClient(String client) { this.client = client; return this; diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 91ea50b29a66d..8c5e861c199b6 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -382,15 +382,79 @@ private void executeRequest( frozenIndexCheck(resolvedIndices); } + final SearchSourceBuilder source = original.source(); + if (shouldOpenPIT(source)) { + // disabling shard reordering for request + original.setPreFilterShardSize(Integer.MAX_VALUE); + openPIT( + client, + original, + searchService.getDefaultKeepAliveInMillis(), + originalListener.delegateFailureAndWrap((delegate, resp) -> { + // We set the keep alive to -1 to indicate that we don't need the pit id in the response. + // This is needed since we delete the pit prior to sending the response so the id doesn't exist anymore. + source.pointInTimeBuilder(new PointInTimeBuilder(resp.getPointInTimeId()).setKeepAlive(TimeValue.MINUS_ONE)); + var pitListener = new ActionListener() { + @Override + public void onResponse(SearchResponse response) { + // we need to close the PIT first so we delay the release of the response to after the closing + response.incRef(); + closePIT( + client, + original.source().pointInTimeBuilder(), + () -> ActionListener.respondAndRelease(delegate, response) + ); + } + + @Override + public void onFailure(Exception e) { + closePIT(client, original.source().pointInTimeBuilder(), () -> delegate.onFailure(e)); + } + }; + executeRequest(task, original, pitListener, searchPhaseProvider, true); + }) + ); + return; + } + ActionListener rewriteListener = originalListener.delegateFailureAndWrap((delegate, rewritten) -> { if (ccsCheckCompatibility) { checkCCSVersionCompatibility(rewritten); } - final CCSUsage.Builder ccsUsageBuilder = new CCSUsage.Builder(); - final ActionListener searchResponseActionListener = collectSearchTelemetry - ? new SearchTelemetryListener(delegate, ccsUsageBuilder, searchResponseMetrics, usageService, collectCCSTelemetry) - : delegate; + final ActionListener searchResponseActionListener; + if (collectSearchTelemetry) { + if (collectCCSTelemetry == false || resolvedIndices.getRemoteClusterIndices().isEmpty()) { + searchResponseActionListener = new SearchTelemetryListener(delegate, searchResponseMetrics); + } else { + CCSUsage.Builder usageBuilder = new CCSUsage.Builder(); + usageBuilder.setRemotesCount(resolvedIndices.getRemoteClusterIndices().size()); + usageBuilder.setClientFromTask(task); + if (task.isAsync()) { + usageBuilder.setFeature(CCSUsageTelemetry.ASYNC_FEATURE); + } + if (original.pointInTimeBuilder() != null) { + usageBuilder.setFeature(CCSUsageTelemetry.PIT_FEATURE); + } + // Check if any of the index patterns are wildcard patterns + var localIndices = resolvedIndices.getLocalIndices(); + if (localIndices != null && Arrays.stream(localIndices.indices()).anyMatch(Regex::isSimpleMatchPattern)) { + usageBuilder.setFeature(CCSUsageTelemetry.WILDCARD_FEATURE); + } + if (resolvedIndices.getRemoteClusterIndices() + .values() + .stream() + .anyMatch(indices -> Arrays.stream(indices.indices()).anyMatch(Regex::isSimpleMatchPattern))) { + usageBuilder.setFeature(CCSUsageTelemetry.WILDCARD_FEATURE); + } + if (shouldMinimizeRoundtrips(rewritten)) { + usageBuilder.setFeature(CCSUsageTelemetry.MRT_FEATURE); + } + searchResponseActionListener = new SearchTelemetryListener(delegate, searchResponseMetrics, usageService, usageBuilder); + } + } else { + searchResponseActionListener = delegate; + } if (resolvedIndices.getRemoteClusterIndices().isEmpty()) { executeLocalSearch( @@ -403,30 +467,8 @@ private void executeRequest( searchPhaseProvider.apply(searchResponseActionListener) ); } else { - ccsUsageBuilder.setRemotesCount(resolvedIndices.getRemoteClusterIndices().size()); - if (task.isAsync()) { - ccsUsageBuilder.setFeature(CCSUsageTelemetry.ASYNC_FEATURE); - } - if (original.pointInTimeBuilder() != null) { - ccsUsageBuilder.setFeature(CCSUsageTelemetry.PIT_FEATURE); - } - ccsUsageBuilder.setClientFromTask(task); - // Check if any of the index patterns are wildcard patterns - var localIndices = resolvedIndices.getLocalIndices(); - if (localIndices != null && Arrays.stream(localIndices.indices()).anyMatch(Regex::isSimpleMatchPattern)) { - ccsUsageBuilder.setFeature(CCSUsageTelemetry.WILDCARD_FEATURE); - } - if (resolvedIndices.getRemoteClusterIndices() - .values() - .stream() - .anyMatch(indices -> Arrays.stream(indices.indices()).anyMatch(Regex::isSimpleMatchPattern))) { - ccsUsageBuilder.setFeature(CCSUsageTelemetry.WILDCARD_FEATURE); - } - final TaskId parentTaskId = task.taskInfo(clusterService.localNode().getId(), false).taskId(); if (shouldMinimizeRoundtrips(rewritten)) { - ccsUsageBuilder.setFeature(CCSUsageTelemetry.MRT_FEATURE); - final AggregationReduceContext.Builder aggregationReduceContextBuilder = rewritten.source() != null && rewritten.source().aggregations() != null ? searchService.aggReduceContextBuilder(task::isCancelled, rewritten.source().aggregations()) @@ -530,54 +572,20 @@ private void executeRequest( } }); - final SearchSourceBuilder source = original.source(); final boolean isExplain = source != null && source.explain() != null && source.explain(); - if (shouldOpenPIT(source)) { - // disabling shard reordering for request - original.setPreFilterShardSize(Integer.MAX_VALUE); - openPIT( - client, - original, - searchService.getDefaultKeepAliveInMillis(), - originalListener.delegateFailureAndWrap((delegate, resp) -> { - // We set the keep alive to -1 to indicate that we don't need the pit id in the response. - // This is needed since we delete the pit prior to sending the response so the id doesn't exist anymore. - source.pointInTimeBuilder(new PointInTimeBuilder(resp.getPointInTimeId()).setKeepAlive(TimeValue.MINUS_ONE)); - var pitListener = new ActionListener() { - @Override - public void onResponse(SearchResponse response) { - // we need to close the PIT first so we delay the release of the response to after the closing - response.incRef(); - closePIT( - client, - original.source().pointInTimeBuilder(), - () -> ActionListener.respondAndRelease(delegate, response) - ); - } - - @Override - public void onFailure(Exception e) { - closePIT(client, original.source().pointInTimeBuilder(), () -> delegate.onFailure(e)); - } - }; - executeRequest(task, original, pitListener, searchPhaseProvider, true); - }) - ); - } else { - Rewriteable.rewriteAndFetch( - original, - searchService.getRewriteContext( - timeProvider::absoluteStartMillis, - clusterState.getMinTransportVersion(), - original.getLocalClusterAlias(), - resolvedIndices, - original.pointInTimeBuilder(), - shouldMinimizeRoundtrips(original), - isExplain - ), - rewriteListener - ); - } + Rewriteable.rewriteAndFetch( + original, + searchService.getRewriteContext( + timeProvider::absoluteStartMillis, + clusterState.getMinTransportVersion(), + original.getLocalClusterAlias(), + resolvedIndices, + original.pointInTimeBuilder(), + shouldMinimizeRoundtrips(original), + isExplain + ), + rewriteListener + ); } /** @@ -2030,23 +2038,23 @@ private static class SearchTelemetryListener extends DelegatingActionListener listener, - CCSUsage.Builder usageBuilder, SearchResponseMetrics searchResponseMetrics, UsageService usageService, - boolean collectCCSTelemetry + CCSUsage.Builder usageBuilder ) { super(listener); - this.usageBuilder = usageBuilder; this.searchResponseMetrics = searchResponseMetrics; + this.collectCCSTelemetry = true; this.usageService = usageService; - this.collectCCSTelemetry = collectCCSTelemetry; + this.usageBuilder = usageBuilder; } - /** - * Should we collect telemetry for this search? - */ - private boolean collectCCSTelemetry() { - return collectCCSTelemetry && usageBuilder.getRemotesCount() > 0; + SearchTelemetryListener(ActionListener listener, SearchResponseMetrics searchResponseMetrics) { + super(listener); + this.searchResponseMetrics = searchResponseMetrics; + this.collectCCSTelemetry = false; + this.usageService = null; + this.usageBuilder = null; } @Override @@ -2072,7 +2080,7 @@ public void onResponse(SearchResponse searchResponse) { } searchResponseMetrics.incrementResponseCount(responseCountTotalStatus); - if (collectCCSTelemetry()) { + if (collectCCSTelemetry) { extractCCSTelemetry(searchResponse); recordTelemetry(); } @@ -2087,7 +2095,7 @@ public void onResponse(SearchResponse searchResponse) { @Override public void onFailure(Exception e) { searchResponseMetrics.incrementResponseCount(SearchResponseMetrics.ResponseCountTotalStatus.FAILURE); - if (collectCCSTelemetry()) { + if (collectCCSTelemetry) { usageBuilder.setFailure(e); recordTelemetry(); } diff --git a/server/src/test/java/org/elasticsearch/search/TelemetryMetrics/SearchTookTimeTelemetryTests.java b/server/src/test/java/org/elasticsearch/search/TelemetryMetrics/SearchTookTimeTelemetryTests.java index 2c000314c7dad..3d9797a5f5f48 100644 --- a/server/src/test/java/org/elasticsearch/search/TelemetryMetrics/SearchTookTimeTelemetryTests.java +++ b/server/src/test/java/org/elasticsearch/search/TelemetryMetrics/SearchTookTimeTelemetryTests.java @@ -16,12 +16,17 @@ import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.query.MatchAllQueryBuilder; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.rescore.QueryRescorerBuilder; +import org.elasticsearch.search.retriever.RescorerRetrieverBuilder; +import org.elasticsearch.search.retriever.StandardRetrieverBuilder; import org.elasticsearch.telemetry.Measurement; import org.elasticsearch.telemetry.TestTelemetryPlugin; import org.elasticsearch.test.ESSingleNodeTestCase; +import org.hamcrest.Matchers; import org.junit.After; import org.junit.Before; @@ -85,6 +90,29 @@ public void testSimpleQuery() { assertEquals(searchResponse.getTook().millis(), measurements.getFirst().getLong()); } + public void testCompoundRetriever() { + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.retriever( + new RescorerRetrieverBuilder( + new StandardRetrieverBuilder(new MatchAllQueryBuilder()), + List.of(new QueryRescorerBuilder(new MatchAllQueryBuilder())) + ) + ); + SearchResponse searchResponse = client().prepareSearch(indexName).setSource(searchSourceBuilder).get(); + try { + assertNoFailures(searchResponse); + assertSearchHits(searchResponse, "1", "2"); + } finally { + searchResponse.decRef(); + } + + List measurements = getTestTelemetryPlugin().getLongHistogramMeasurement(TOOK_DURATION_TOTAL_HISTOGRAM_NAME); + // compound retriever does its own search as an async action, whose took time is recorded separately + assertEquals(2, measurements.size()); + assertThat(measurements.getFirst().getLong(), Matchers.lessThan(searchResponse.getTook().millis())); + assertEquals(searchResponse.getTook().millis(), measurements.getLast().getLong()); + } + public void testMultiSearch() { MultiSearchRequestBuilder multiSearchRequestBuilder = client().prepareMultiSearch(); int numSearchRequests = randomIntBetween(3, 10); From 176d7bf30eaa7c77be7e373d259619edf714acfd Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Mon, 15 Sep 2025 11:38:38 +0200 Subject: [PATCH 4/4] iter --- .../elasticsearch/action/admin/cluster/stats/CCSUsage.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/CCSUsage.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/CCSUsage.java index e4e16e8d14128..29a7dcb5d07d8 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/CCSUsage.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/CCSUsage.java @@ -81,11 +81,6 @@ public Builder setFeature(String feature) { return this; } - public Builder setFeatures(Set features) { - this.features.addAll(features); - return this; - } - public Builder setClient(String client) { this.client = client; return this;