diff --git a/CHANGELOG.md b/CHANGELOG.md index 05d89358b993d..8b6170bfdf212 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -66,7 +66,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Change `com.amazonaws.sdk.ec2MetadataServiceEndpointOverride` to `aws.ec2MetadataServiceEndpoint` ([7372](https://github.com/opensearch-project/OpenSearch/pull/7372/)) - Change `com.amazonaws.sdk.stsEndpointOverride` to `aws.stsEndpointOverride` ([7372](https://github.com/opensearch-project/OpenSearch/pull/7372/)) - Align range and default value for deletes_pct_allowed in merge policy ([#7730](https://github.com/opensearch-project/OpenSearch/pull/7730)) -- Rename QueryPhase actors like Suggest, Rescore to be processors rather than phase ([#8025](https://github.com/opensearch-project/OpenSearch/pull/8025)) +- Rename QueryPhase actors like Suggest, Rescore to be processors rather than phase ([#8025](https://github.com/opensearch-project/OpenSearch/pull/8025)) ### Deprecated @@ -102,6 +102,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add TokenManager Interface ([#7452](https://github.com/opensearch-project/OpenSearch/pull/7452)) - Add Remote store as a segment replication source ([#7653](https://github.com/opensearch-project/OpenSearch/pull/7653)) - Add descending order search optimization through reverse segment read. ([#7967](https://github.com/opensearch-project/OpenSearch/pull/7967)) +- [Search pipelines] Added search pipelines output to node stats ([#8053](https://github.com/opensearch-project/OpenSearch/pull/8053)) - Update components of segrep backpressure to support remote store. ([#8020](https://github.com/opensearch-project/OpenSearch/pull/8020)) ### Dependencies diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java index c183562e2e85a..6b8e06594acb7 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java @@ -59,6 +59,7 @@ import org.opensearch.script.ScriptCacheStats; import org.opensearch.script.ScriptStats; import org.opensearch.search.backpressure.stats.SearchBackpressureStats; +import org.opensearch.search.pipeline.SearchPipelineStats; import org.opensearch.tasks.TaskCancellationStats; import org.opensearch.threadpool.ThreadPoolStats; import org.opensearch.transport.TransportStats; @@ -138,6 +139,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment { @Nullable private TaskCancellationStats taskCancellationStats; + @Nullable + private SearchPipelineStats searchPipelineStats; + public NodeStats(StreamInput in) throws IOException { super(in); timestamp = in.readVLong(); @@ -189,6 +193,11 @@ public NodeStats(StreamInput in) throws IOException { } else { taskCancellationStats = null; } + if (in.getVersion().onOrAfter(Version.V_3_0_0)) { // TODO Update to 2_9_0 when we backport to 2.x + searchPipelineStats = in.readOptionalWriteable(SearchPipelineStats::new); + } else { + searchPipelineStats = null; + } } public NodeStats( @@ -214,7 +223,8 @@ public NodeStats( @Nullable ClusterManagerThrottlingStats clusterManagerThrottlingStats, @Nullable WeightedRoutingStats weightedRoutingStats, @Nullable FileCacheStats fileCacheStats, - @Nullable TaskCancellationStats taskCancellationStats + @Nullable TaskCancellationStats taskCancellationStats, + @Nullable SearchPipelineStats searchPipelineStats ) { super(node); this.timestamp = timestamp; @@ -239,6 +249,7 @@ public NodeStats( this.weightedRoutingStats = weightedRoutingStats; this.fileCacheStats = fileCacheStats; this.taskCancellationStats = taskCancellationStats; + this.searchPipelineStats = searchPipelineStats; } public long getTimestamp() { @@ -371,6 +382,11 @@ public TaskCancellationStats getTaskCancellationStats() { return taskCancellationStats; } + @Nullable + public SearchPipelineStats getSearchPipelineStats() { + return searchPipelineStats; + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); @@ -411,6 +427,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_2_9_0)) { out.writeOptionalWriteable(taskCancellationStats); } + if (out.getVersion().onOrAfter(Version.V_3_0_0)) { // TODO: Update to 2_9_0 once we backport to 2.x + out.writeOptionalWriteable(searchPipelineStats); + } } @Override @@ -498,6 +517,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (getTaskCancellationStats() != null) { getTaskCancellationStats().toXContent(builder, params); } + if (getSearchPipelineStats() != null) { + getSearchPipelineStats().toXContent(builder, params); + } return builder; } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java index 68f391b91507c..f37a837c6f0ef 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java @@ -211,7 +211,8 @@ public enum Metric { CLUSTER_MANAGER_THROTTLING("cluster_manager_throttling"), WEIGHTED_ROUTING_STATS("weighted_routing"), FILE_CACHE_STATS("file_cache"), - TASK_CANCELLATION("task_cancellation"); + TASK_CANCELLATION("task_cancellation"), + SEARCH_PIPELINE("search_pipeline"); private String metricName; diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java index 6aadf546d30f7..660142f05bab2 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java @@ -123,7 +123,8 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) { NodesStatsRequest.Metric.CLUSTER_MANAGER_THROTTLING.containedIn(metrics), NodesStatsRequest.Metric.WEIGHTED_ROUTING_STATS.containedIn(metrics), NodesStatsRequest.Metric.FILE_CACHE_STATS.containedIn(metrics), - NodesStatsRequest.Metric.TASK_CANCELLATION.containedIn(metrics) + NodesStatsRequest.Metric.TASK_CANCELLATION.containedIn(metrics), + NodesStatsRequest.Metric.SEARCH_PIPELINE.containedIn(metrics) ); } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java index 726f8a0de19ae..aee6dfddd203e 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java @@ -167,6 +167,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq false, false, false, + false, false ); List shardsStats = new ArrayList<>(); diff --git a/server/src/main/java/org/opensearch/common/metrics/MeanMetric.java b/server/src/main/java/org/opensearch/common/metrics/MeanMetric.java index 79c04d431e97b..33f12c8cb42d3 100644 --- a/server/src/main/java/org/opensearch/common/metrics/MeanMetric.java +++ b/server/src/main/java/org/opensearch/common/metrics/MeanMetric.java @@ -49,6 +49,11 @@ public void inc(long n) { sum.add(n); } + public void add(MeanMetric other) { + counter.add(other.counter.sum()); + sum.add(other.sum.sum()); + } + public void dec(long n) { counter.decrement(); sum.add(-n); diff --git a/server/src/main/java/org/opensearch/ingest/IngestMetric.java b/server/src/main/java/org/opensearch/ingest/IngestMetric.java index 2d4a1dc9cfdee..e6f2606dfbef2 100644 --- a/server/src/main/java/org/opensearch/ingest/IngestMetric.java +++ b/server/src/main/java/org/opensearch/ingest/IngestMetric.java @@ -40,9 +40,9 @@ /** *

Metrics to measure ingest actions. *

This counts measure documents and timings for a given scope. - * The scope is determined by the calling code. For example you can use this class to count all documents across all pipeline, + * The scope is determined by the calling code. For example, you can use this class to count all documents across all pipelines, * or you can use this class to count documents for a given pipeline or a specific processor. - * This class does not make assumptions about it's given scope. + * This class does not make assumptions about its given scope. * * @opensearch.internal */ @@ -57,10 +57,6 @@ class IngestMetric { * Useful when aggregating multiple metrics to see how many things are in flight. */ private final AtomicLong ingestCurrent = new AtomicLong(); - /** - * The ever increasing count of things being measured - */ - private final CounterMetric ingestCount = new CounterMetric(); /** * The only increasing count of failures */ @@ -80,7 +76,6 @@ void preIngest() { void postIngest(long ingestTimeInMillis) { ingestCurrent.decrementAndGet(); ingestTime.inc(ingestTimeInMillis); - ingestCount.inc(); } /** @@ -98,8 +93,7 @@ void ingestFailed() { * @param metrics The metric to add. */ void add(IngestMetric metrics) { - ingestCount.inc(metrics.ingestCount.count()); - ingestTime.inc(metrics.ingestTime.sum()); + ingestTime.add(metrics.ingestTime); ingestFailed.inc(metrics.ingestFailed.count()); } @@ -107,6 +101,6 @@ void add(IngestMetric metrics) { * Creates a serializable representation for these metrics. */ IngestStats.Stats createStats() { - return new IngestStats.Stats(ingestCount.count(), ingestTime.sum(), ingestCurrent.get(), ingestFailed.count()); + return new IngestStats.Stats(ingestTime.count(), ingestTime.sum(), ingestCurrent.get(), ingestFailed.count()); } } diff --git a/server/src/main/java/org/opensearch/node/NodeService.java b/server/src/main/java/org/opensearch/node/NodeService.java index 9382746081c18..6f4fe1e083ad7 100644 --- a/server/src/main/java/org/opensearch/node/NodeService.java +++ b/server/src/main/java/org/opensearch/node/NodeService.java @@ -216,7 +216,8 @@ public NodeStats stats( boolean clusterManagerThrottling, boolean weightedRoutingStats, boolean fileCacheStats, - boolean taskCancellation + boolean taskCancellation, + boolean searchPipelineStats ) { // for indices stats we want to include previous allocated shards stats as well (it will // only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats) @@ -243,7 +244,8 @@ public NodeStats stats( clusterManagerThrottling ? this.clusterService.getClusterManagerService().getThrottlingStats() : null, weightedRoutingStats ? WeightedRoutingStats.getInstance() : null, fileCacheStats && fileCache != null ? fileCache.fileCacheStats() : null, - taskCancellation ? this.taskCancellationMonitoringService.stats() : null + taskCancellation ? this.taskCancellationMonitoringService.stats() : null, + searchPipelineStats ? this.searchPipelineService.stats() : null ); } diff --git a/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java b/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java index c9a5f865d507e..13f50e192c192 100644 --- a/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java +++ b/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java @@ -21,8 +21,12 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.LongSupplier; +import java.util.stream.Collectors; import static org.opensearch.ingest.ConfigurationUtils.TAG_KEY; import static org.opensearch.ingest.Pipeline.DESCRIPTION_KEY; @@ -41,10 +45,24 @@ class Pipeline { // TODO: Refactor org.opensearch.ingest.CompoundProcessor to implement our generic Processor interface // Then these can be CompoundProcessors instead of lists. - private final List searchRequestProcessors; - private final List searchResponseProcessors; + private final List> searchRequestProcessors; + private final List> searchResponseProcessors; private final NamedWriteableRegistry namedWriteableRegistry; + private final SearchPipelineMetrics totalRequestMetrics; + private final SearchPipelineMetrics totalResponseMetrics; + private final SearchPipelineMetrics pipelineRequestMetrics = new SearchPipelineMetrics(); + private final SearchPipelineMetrics pipelineResponseMetrics = new SearchPipelineMetrics(); + private final LongSupplier relativeTimeSupplier; + + private static class ProcessorWithMetrics { + private final T processor; + private final SearchPipelineMetrics metrics = new SearchPipelineMetrics(); + + public ProcessorWithMetrics(T processor) { + this.processor = processor; + } + } private Pipeline( String id, @@ -52,14 +70,20 @@ private Pipeline( @Nullable Integer version, List requestProcessors, List responseProcessors, - NamedWriteableRegistry namedWriteableRegistry + NamedWriteableRegistry namedWriteableRegistry, + SearchPipelineMetrics totalRequestMetrics, + SearchPipelineMetrics totalResponseMetrics, + LongSupplier relativeTimeSupplier ) { this.id = id; this.description = description; this.version = version; - this.searchRequestProcessors = requestProcessors; - this.searchResponseProcessors = responseProcessors; + this.searchRequestProcessors = requestProcessors.stream().map(ProcessorWithMetrics::new).collect(Collectors.toList()); + this.searchResponseProcessors = responseProcessors.stream().map(ProcessorWithMetrics::new).collect(Collectors.toList()); this.namedWriteableRegistry = namedWriteableRegistry; + this.totalRequestMetrics = totalRequestMetrics; + this.totalResponseMetrics = totalResponseMetrics; + this.relativeTimeSupplier = relativeTimeSupplier; } static Pipeline create( @@ -67,7 +91,9 @@ static Pipeline create( Map config, Map> requestProcessorFactories, Map> responseProcessorFactories, - NamedWriteableRegistry namedWriteableRegistry + NamedWriteableRegistry namedWriteableRegistry, + SearchPipelineMetrics totalRequestProcessingMetrics, + SearchPipelineMetrics totalResponseProcessingMetrics ) throws Exception { String description = ConfigurationUtils.readOptionalStringProperty(null, null, config, DESCRIPTION_KEY); Integer version = ConfigurationUtils.readIntProperty(null, null, config, VERSION_KEY, null); @@ -88,7 +114,17 @@ static Pipeline create( + Arrays.toString(config.keySet().toArray()) ); } - return new Pipeline(id, description, version, requestProcessors, responseProcessors, namedWriteableRegistry); + return new Pipeline( + id, + description, + version, + requestProcessors, + responseProcessors, + namedWriteableRegistry, + totalRequestProcessingMetrics, + totalResponseProcessingMetrics, + System::nanoTime + ); } private static List readProcessors( @@ -127,39 +163,83 @@ Integer getVersion() { } List getSearchRequestProcessors() { - return searchRequestProcessors; + return searchRequestProcessors.stream().map(p -> p.processor).collect(Collectors.toList()); } List getSearchResponseProcessors() { - return searchResponseProcessors; + return searchResponseProcessors.stream().map(p -> p.processor).collect(Collectors.toList()); } - SearchRequest transformRequest(SearchRequest request) throws Exception { + SearchRequest transformRequest(SearchRequest request) throws SearchPipelineProcessingException { if (searchRequestProcessors.isEmpty() == false) { - try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) { - request.writeTo(bytesStreamOutput); - try (StreamInput in = bytesStreamOutput.bytes().streamInput()) { - try (StreamInput input = new NamedWriteableAwareStreamInput(in, namedWriteableRegistry)) { - request = new SearchRequest(input); + long pipelineStart = relativeTimeSupplier.getAsLong(); + totalRequestMetrics.before(); + pipelineRequestMetrics.before(); + try { + try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) { + request.writeTo(bytesStreamOutput); + try (StreamInput in = bytesStreamOutput.bytes().streamInput()) { + try (StreamInput input = new NamedWriteableAwareStreamInput(in, namedWriteableRegistry)) { + request = new SearchRequest(input); + } } } - } - for (SearchRequestProcessor searchRequestProcessor : searchRequestProcessors) { - request = searchRequestProcessor.processRequest(request); + for (ProcessorWithMetrics processorWithMetrics : searchRequestProcessors) { + processorWithMetrics.metrics.before(); + long start = relativeTimeSupplier.getAsLong(); + try { + request = processorWithMetrics.processor.processRequest(request); + } catch (Exception e) { + processorWithMetrics.metrics.failed(); + throw e; + } finally { + long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - start); + processorWithMetrics.metrics.after(took); + } + } + } catch (Exception e) { + totalRequestMetrics.failed(); + pipelineRequestMetrics.failed(); + throw new SearchPipelineProcessingException(e); + } finally { + long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - pipelineStart); + totalRequestMetrics.after(took); + pipelineRequestMetrics.after(took); } } return request; } SearchResponse transformResponse(SearchRequest request, SearchResponse response) throws SearchPipelineProcessingException { - try { - for (SearchResponseProcessor responseProcessor : searchResponseProcessors) { - response = responseProcessor.processResponse(request, response); + if (searchResponseProcessors.isEmpty() == false) { + long pipelineStart = relativeTimeSupplier.getAsLong(); + totalResponseMetrics.before(); + pipelineResponseMetrics.before(); + try { + for (ProcessorWithMetrics processorWithMetrics : searchResponseProcessors) { + processorWithMetrics.metrics.before(); + long start = relativeTimeSupplier.getAsLong(); + try { + response = processorWithMetrics.processor.processResponse(request, response); + } catch (Exception e) { + processorWithMetrics.metrics.failed(); + throw e; + } finally { + long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - start); + processorWithMetrics.metrics.after(took); + } + } + } catch (Exception e) { + totalResponseMetrics.failed(); + pipelineResponseMetrics.failed(); + throw new SearchPipelineProcessingException(e); + } finally { + long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - pipelineStart); + totalResponseMetrics.after(took); + pipelineResponseMetrics.after(took); } - return response; - } catch (Exception e) { - throw new SearchPipelineProcessingException(e); } + return response; } static final Pipeline NO_OP_PIPELINE = new Pipeline( @@ -168,6 +248,62 @@ SearchResponse transformResponse(SearchRequest request, SearchResponse response) 0, Collections.emptyList(), Collections.emptyList(), - null + null, + new SearchPipelineMetrics(), + new SearchPipelineMetrics(), + () -> 0L ); + + void copyMetrics(Pipeline oldPipeline) { + pipelineRequestMetrics.add(oldPipeline.pipelineRequestMetrics); + pipelineResponseMetrics.add(oldPipeline.pipelineResponseMetrics); + copyProcessorMetrics(searchRequestProcessors, oldPipeline.searchRequestProcessors); + copyProcessorMetrics(searchResponseProcessors, oldPipeline.searchResponseProcessors); + } + + private static void copyProcessorMetrics( + List> newProcessorsWithMetrics, + List> oldProcessorsWithMetrics + ) { + Map> requestProcessorsByKey = new HashMap<>(); + for (ProcessorWithMetrics processorWithMetrics : newProcessorsWithMetrics) { + requestProcessorsByKey.putIfAbsent(getProcessorKey(processorWithMetrics.processor), processorWithMetrics); + } + for (ProcessorWithMetrics oldProcessorWithMetrics : oldProcessorsWithMetrics) { + ProcessorWithMetrics newProcessor = requestProcessorsByKey.get(getProcessorKey(oldProcessorWithMetrics.processor)); + if (newProcessor != null) { + newProcessor.metrics.add(oldProcessorWithMetrics.metrics); + } + } + } + + private static String getProcessorKey(Processor processor) { + String key = processor.getType(); + if (processor.getTag() != null) { + return key + ":" + processor.getTag(); + } + return key; + } + + void populateStats(SearchPipelineStats.Builder statsBuilder) { + statsBuilder.addPipelineStats(getId(), pipelineRequestMetrics, pipelineResponseMetrics); + for (ProcessorWithMetrics requestProcessorWithMetrics : searchRequestProcessors) { + Processor processor = requestProcessorWithMetrics.processor; + statsBuilder.addRequestProcessorStats( + getId(), + getProcessorKey(processor), + processor.getType(), + requestProcessorWithMetrics.metrics + ); + } + for (ProcessorWithMetrics responseProcessorWithMetrics : searchResponseProcessors) { + Processor processor = responseProcessorWithMetrics.processor; + statsBuilder.addResponseProcessorStats( + getId(), + getProcessorKey(processor), + processor.getType(), + responseProcessorWithMetrics.metrics + ); + } + } } diff --git a/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineMetrics.java b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineMetrics.java new file mode 100644 index 0000000000000..f9cefd802e526 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineMetrics.java @@ -0,0 +1,56 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.pipeline; + +import org.opensearch.common.metrics.CounterMetric; +import org.opensearch.common.metrics.MeanMetric; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * Mutable tracker of search pipeline processing operations. + */ +class SearchPipelineMetrics { + /** + * The mean time it takes to complete the measured item. + */ + private final MeanMetric time = new MeanMetric(); + /** + * The current count of things being measured. + * Useful when aggregating multiple metrics to see how many things are in flight. + */ + private final AtomicLong current = new AtomicLong(); + /** + * The non-decreasing count of failures + */ + private final CounterMetric failed = new CounterMetric(); + + public void before() { + current.incrementAndGet(); + } + + public void after(long currentTime) { + current.decrementAndGet(); + time.inc(currentTime); + } + + public void failed() { + failed.inc(); + } + + public void add(SearchPipelineMetrics other) { + // Don't try copying over current, since in-flight requests will be linked to the existing metrics instance. + failed.inc(other.failed.count()); + time.add(other.time); + } + + SearchPipelineStats.Stats createStats() { + return new SearchPipelineStats.Stats(time.count(), time.sum(), current.get(), failed.count()); + } +} diff --git a/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java index 87c09bd971284..e0c8ed31f6cf7 100644 --- a/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java +++ b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java @@ -80,6 +80,9 @@ public class SearchPipelineService implements ClusterStateApplier, ReportingServ private final NamedWriteableRegistry namedWriteableRegistry; private volatile ClusterState state; + private final SearchPipelineMetrics totalRequestProcessingMetrics = new SearchPipelineMetrics(); + private final SearchPipelineMetrics totalResponseProcessingMetrics = new SearchPipelineMetrics(); + private final boolean isEnabled; public SearchPipelineService( @@ -177,21 +180,21 @@ void innerUpdatePipelines(SearchPipelineMetadata newSearchPipelineMetadata) { newConfiguration.getConfigAsMap(), requestProcessorFactories, responseProcessorFactories, - namedWriteableRegistry + namedWriteableRegistry, + totalRequestProcessingMetrics, + totalResponseProcessingMetrics ); newPipelines.put(newConfiguration.getId(), new PipelineHolder(newConfiguration, newPipeline)); - if (previous == null) { - continue; + if (previous != null) { + newPipeline.copyMetrics(previous.pipeline); } - // TODO -- once we add in pipeline metrics (like in ingest pipelines), we will need to deep-copy - // the old pipeline's metrics into the new pipeline. } catch (Exception e) { OpenSearchParseException parseException = new OpenSearchParseException( "Error updating pipeline with id [" + newConfiguration.getId() + "]", e ); - // TODO -- replace pipeline with one that throws an exception when we try to use it + // TODO -- replace pipeline with one that throws this exception when we try to use it if (exceptions == null) { exceptions = new ArrayList<>(); } @@ -276,7 +279,9 @@ void validatePipeline(Map searchPipelineInfos pipelineConfig, requestProcessorFactories, responseProcessorFactories, - namedWriteableRegistry + namedWriteableRegistry, + new SearchPipelineMetrics(), // Use ephemeral metrics for validation + new SearchPipelineMetrics() ); List exceptions = new ArrayList<>(); for (SearchRequestProcessor processor : pipeline.getSearchRequestProcessors()) { @@ -372,7 +377,9 @@ public PipelinedRequest resolvePipeline(SearchRequest searchRequest) throws Exce searchRequest.source().searchPipelineSource(), requestProcessorFactories, responseProcessorFactories, - namedWriteableRegistry + namedWriteableRegistry, + totalRequestProcessingMetrics, + totalResponseProcessingMetrics ); } catch (Exception e) { throw new SearchPipelineProcessingException(e); @@ -400,12 +407,8 @@ public PipelinedRequest resolvePipeline(SearchRequest searchRequest) throws Exce pipeline = pipelineHolder.pipeline; } } - try { - SearchRequest transformedRequest = pipeline.transformRequest(searchRequest); - return new PipelinedRequest(pipeline, transformedRequest); - } catch (Exception e) { - throw new SearchPipelineProcessingException(e); - } + SearchRequest transformedRequest = pipeline.transformRequest(searchRequest); + return new PipelinedRequest(pipeline, transformedRequest); } Map> getRequestProcessorFactories() { @@ -431,6 +434,16 @@ public SearchPipelineInfo info() { ); } + public SearchPipelineStats stats() { + SearchPipelineStats.Builder builder = new SearchPipelineStats.Builder(); + builder.withTotalStats(totalRequestProcessingMetrics, totalResponseProcessingMetrics); + for (PipelineHolder pipelineHolder : pipelines.values()) { + Pipeline pipeline = pipelineHolder.pipeline; + pipeline.populateStats(builder); + } + return builder.build(); + } + public static List getPipelines(ClusterState clusterState, String... ids) { SearchPipelineMetadata metadata = clusterState.getMetadata().custom(SearchPipelineMetadata.TYPE); return innerGetPipelines(metadata, ids); diff --git a/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineStats.java b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineStats.java new file mode 100644 index 0000000000000..f0f26536a1a4f --- /dev/null +++ b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineStats.java @@ -0,0 +1,461 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.pipeline; + +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.io.stream.Writeable; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; + +import static java.util.Collections.emptyList; +import static java.util.Collections.unmodifiableList; +import static java.util.Collections.unmodifiableMap; + +/** + * Serializable, immutable search pipeline statistics to be returned via stats APIs. + * + * @opensearch.internal + */ +public class SearchPipelineStats implements Writeable, ToXContentFragment { + + private final Stats totalRequestStats; + private final Stats totalResponseStats; + private final List pipelineStats; + private final Map perPipelineProcessorStats; + + public SearchPipelineStats( + Stats totalRequestStats, + Stats totalResponseStats, + List pipelineStats, + Map perPipelineProcessorStats + ) { + this.totalRequestStats = totalRequestStats; + this.totalResponseStats = totalResponseStats; + this.pipelineStats = pipelineStats; + this.perPipelineProcessorStats = perPipelineProcessorStats; + } + + public SearchPipelineStats(StreamInput in) throws IOException { + this.totalRequestStats = new Stats(in); + this.totalResponseStats = new Stats(in); + int size = in.readVInt(); + List pipelineStats = new ArrayList<>(size); + Map pipelineDetailStatsMap = new TreeMap<>(); + for (int i = 0; i < size; i++) { + String pipelineId = in.readString(); + Stats pipelineRequestStats = new Stats(in); + Stats pipelineResponseStats = new Stats(in); + pipelineStats.add(new PipelineStats(pipelineId, pipelineRequestStats, pipelineResponseStats)); + int numRequestProcessors = in.readVInt(); + List requestProcessorStats = new ArrayList<>(numRequestProcessors); + for (int j = 0; j < numRequestProcessors; j++) { + String processorName = in.readString(); + String processorType = in.readString(); + Stats processorStats = new Stats(in); + requestProcessorStats.add(new ProcessorStats(processorName, processorType, processorStats)); + } + int numResponseProcessors = in.readVInt(); + List responseProcessorStats = new ArrayList<>(numResponseProcessors); + for (int j = 0; j < numResponseProcessors; j++) { + String processorName = in.readString(); + String processorType = in.readString(); + Stats processorStats = new Stats(in); + responseProcessorStats.add(new ProcessorStats(processorName, processorType, processorStats)); + } + pipelineDetailStatsMap.put( + pipelineId, + new PipelineDetailStats(unmodifiableList(requestProcessorStats), unmodifiableList(responseProcessorStats)) + ); + } + this.pipelineStats = unmodifiableList(pipelineStats); + this.perPipelineProcessorStats = unmodifiableMap(pipelineDetailStatsMap); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject("search_pipeline"); + builder.startObject("total_request"); + totalRequestStats.toXContent(builder, params); + builder.endObject(); + builder.startObject("total_response"); + totalResponseStats.toXContent(builder, params); + builder.endObject(); + builder.startObject("pipelines"); + for (PipelineStats pipelineStat : pipelineStats) { + builder.startObject(pipelineStat.pipelineId); + builder.startObject("request"); + pipelineStat.requestStats.toXContent(builder, params); + builder.endObject(); + builder.startObject("response"); + pipelineStat.responseStats.toXContent(builder, params); + builder.endObject(); + + PipelineDetailStats pipelineDetailStats = perPipelineProcessorStats.get(pipelineStat.pipelineId); + builder.startArray("request_processors"); + for (ProcessorStats processorStats : pipelineDetailStats.requestProcessorStats) { + builder.startObject(); + processorStats.toXContent(builder, params); + builder.endObject(); + } + builder.endArray(); + builder.startArray("response_processors"); + for (ProcessorStats processorStats : pipelineDetailStats.responseProcessorStats) { + builder.startObject(); + processorStats.toXContent(builder, params); + builder.endObject(); + } + builder.endArray(); + builder.endObject(); + } + builder.endObject(); + builder.endObject(); + return builder; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + totalRequestStats.writeTo(out); + totalResponseStats.writeTo(out); + out.writeVInt(pipelineStats.size()); + for (PipelineStats pipelineStat : pipelineStats) { + out.writeString(pipelineStat.pipelineId); + pipelineStat.requestStats.writeTo(out); + pipelineStat.responseStats.writeTo(out); + PipelineDetailStats pipelineDetailStats = perPipelineProcessorStats.get(pipelineStat.pipelineId); + out.writeVInt(pipelineDetailStats.requestProcessorStats.size()); + for (ProcessorStats processorStats : pipelineDetailStats.requestProcessorStats) { + out.writeString(processorStats.processorName); + out.writeString(processorStats.processorType); + processorStats.stats.writeTo(out); + } + out.writeVInt(pipelineDetailStats.responseProcessorStats.size()); + for (ProcessorStats processorStats : pipelineDetailStats.responseProcessorStats) { + out.writeString(processorStats.processorName); + out.writeString(processorStats.processorType); + processorStats.stats.writeTo(out); + } + } + } + + static class Builder { + private Stats totalRequestStats; + private Stats totalResponseStats; + private final List pipelineStats = new ArrayList<>(); + private final Map> requestProcessorStatsPerPipeline = new HashMap<>(); + private final Map> responseProcessorStatsPerPipeline = new HashMap<>(); + + Builder withTotalStats(SearchPipelineMetrics totalRequestMetrics, SearchPipelineMetrics totalResponseMetrics) { + this.totalRequestStats = totalRequestMetrics.createStats(); + this.totalResponseStats = totalResponseMetrics.createStats(); + return this; + } + + Builder addPipelineStats( + String pipelineId, + SearchPipelineMetrics pipelineRequestMetrics, + SearchPipelineMetrics pipelineResponseMetrics + ) { + this.pipelineStats.add( + new PipelineStats(pipelineId, pipelineRequestMetrics.createStats(), pipelineResponseMetrics.createStats()) + ); + return this; + } + + Builder addRequestProcessorStats( + String pipelineId, + String processorName, + String processorType, + SearchPipelineMetrics processorMetrics + ) { + this.requestProcessorStatsPerPipeline.computeIfAbsent(pipelineId, k -> new ArrayList<>()) + .add(new ProcessorStats(processorName, processorType, processorMetrics.createStats())); + return this; + } + + Builder addResponseProcessorStats( + String pipelineId, + String processorName, + String processorType, + SearchPipelineMetrics processorMetrics + ) { + this.responseProcessorStatsPerPipeline.computeIfAbsent(pipelineId, k -> new ArrayList<>()) + .add(new ProcessorStats(processorName, processorType, processorMetrics.createStats())); + return this; + } + + SearchPipelineStats build() { + Map pipelineDetailStatsMap = new TreeMap<>(); + for (PipelineStats pipelineStat : pipelineStats) { + List requestProcessorStats = requestProcessorStatsPerPipeline.getOrDefault( + pipelineStat.pipelineId, + emptyList() + ); + List responseProcessorStats = responseProcessorStatsPerPipeline.getOrDefault( + pipelineStat.pipelineId, + emptyList() + ); + PipelineDetailStats pipelineDetailStats = new PipelineDetailStats( + unmodifiableList(requestProcessorStats), + unmodifiableList(responseProcessorStats) + ); + pipelineDetailStatsMap.put(pipelineStat.pipelineId, pipelineDetailStats); + } + return new SearchPipelineStats( + totalRequestStats, + totalResponseStats, + unmodifiableList(pipelineStats), + unmodifiableMap(pipelineDetailStatsMap) + ); + } + } + + static class PipelineStats { + private final String pipelineId; + private final Stats requestStats; + private final Stats responseStats; + + public PipelineStats(String pipelineId, Stats requestStats, Stats responseStats) { + this.pipelineId = pipelineId; + this.requestStats = requestStats; + this.responseStats = responseStats; + } + + public String getPipelineId() { + return pipelineId; + } + + public Stats getRequestStats() { + return requestStats; + } + + public Stats getResponseStats() { + return responseStats; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + PipelineStats that = (PipelineStats) o; + return pipelineId.equals(that.pipelineId) && requestStats.equals(that.requestStats) && responseStats.equals(that.responseStats); + } + + @Override + public int hashCode() { + return Objects.hash(pipelineId, requestStats, responseStats); + } + } + + static class PipelineDetailStats { + private final List requestProcessorStats; + private final List responseProcessorStats; + + public PipelineDetailStats(List requestProcessorStats, List responseProcessorStats) { + this.requestProcessorStats = requestProcessorStats; + this.responseProcessorStats = responseProcessorStats; + } + + public List requestProcessorStats() { + return requestProcessorStats; + } + + public List responseProcessorStats() { + return responseProcessorStats; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + PipelineDetailStats that = (PipelineDetailStats) o; + return requestProcessorStats.equals(that.requestProcessorStats) && responseProcessorStats.equals(that.responseProcessorStats); + } + + @Override + public int hashCode() { + return Objects.hash(requestProcessorStats, responseProcessorStats); + } + } + + static class ProcessorStats implements ToXContentFragment { + private final String processorName; // type:tag + private final String processorType; + private final Stats stats; + + public ProcessorStats(String processorName, String processorType, Stats stats) { + this.processorName = processorName; + this.processorType = processorType; + this.stats = stats; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ProcessorStats that = (ProcessorStats) o; + return processorName.equals(that.processorName) && processorType.equals(that.processorType) && stats.equals(that.stats); + } + + @Override + public int hashCode() { + return Objects.hash(processorName, processorType, stats); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(processorName); + builder.field("type", processorType); + builder.startObject("stats"); + stats.toXContent(builder, params); + builder.endObject(); + builder.endObject(); + return builder; + } + + String getProcessorName() { + return processorName; + } + + String getProcessorType() { + return processorType; + } + + Stats getStats() { + return stats; + } + } + + static class Stats implements Writeable, ToXContentFragment { + + private final long count; + private final long totalTimeInMillis; + private final long current; + private final long failedCount; + + public Stats(long count, long totalTimeInMillis, long current, long failedCount) { + this.count = count; + this.totalTimeInMillis = totalTimeInMillis; + this.current = current; + this.failedCount = failedCount; + } + + /** + * Read from a stream. + */ + public Stats(StreamInput in) throws IOException { + count = in.readVLong(); + totalTimeInMillis = in.readVLong(); + current = in.readVLong(); + failedCount = in.readVLong(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVLong(count); + out.writeVLong(totalTimeInMillis); + out.writeVLong(current); + out.writeVLong(failedCount); + } + + /** + * @return The total number of executed operations. + */ + public long getCount() { + return count; + } + + /** + * @return The total time spent of in millis. + */ + public long getTotalTimeInMillis() { + return totalTimeInMillis; + } + + /** + * @return The total number of operations currently executing. + */ + public long getCurrent() { + return current; + } + + /** + * @return The total number of operations that have failed. + */ + public long getFailedCount() { + return failedCount; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return builder.field("count", count) + .humanReadableField("time_in_millis", "time", new TimeValue(totalTimeInMillis, TimeUnit.MILLISECONDS)) + .field("current", current) + .field("failed", failedCount); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + SearchPipelineStats.Stats that = (SearchPipelineStats.Stats) o; + return Objects.equals(count, that.count) + && Objects.equals(totalTimeInMillis, that.totalTimeInMillis) + && Objects.equals(failedCount, that.failedCount) + && Objects.equals(current, that.current); + } + + @Override + public int hashCode() { + return Objects.hash(count, totalTimeInMillis, failedCount, current); + } + } + + Stats getTotalRequestStats() { + return totalRequestStats; + } + + Stats getTotalResponseStats() { + return totalResponseStats; + } + + List getPipelineStats() { + return pipelineStats; + } + + Map getPerPipelineProcessorStats() { + return perPipelineProcessorStats; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + SearchPipelineStats stats = (SearchPipelineStats) o; + return totalRequestStats.equals(stats.totalRequestStats) + && totalResponseStats.equals(stats.totalResponseStats) + && pipelineStats.equals(stats.pipelineStats) + && perPipelineProcessorStats.equals(stats.perPipelineProcessorStats); + } + + @Override + public int hashCode() { + return Objects.hash(totalRequestStats, totalResponseStats, pipelineStats, perPipelineProcessorStats); + } +} diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java index d99b93b780140..0eb9e282e5122 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -750,6 +750,7 @@ public static NodeStats createNodeStats() { clusterManagerThrottlingStats, weightedRoutingStats, null, + null, null ); } diff --git a/server/src/test/java/org/opensearch/cluster/DiskUsageTests.java b/server/src/test/java/org/opensearch/cluster/DiskUsageTests.java index 73349d45bd5c7..e5833ea619774 100644 --- a/server/src/test/java/org/opensearch/cluster/DiskUsageTests.java +++ b/server/src/test/java/org/opensearch/cluster/DiskUsageTests.java @@ -189,6 +189,7 @@ public void testFillDiskUsage() { null, null, null, + null, null ), new NodeStats( @@ -214,6 +215,7 @@ public void testFillDiskUsage() { null, null, null, + null, null ), new NodeStats( @@ -239,6 +241,7 @@ public void testFillDiskUsage() { null, null, null, + null, null ) ); @@ -295,6 +298,7 @@ public void testFillDiskUsageSomeInvalidValues() { null, null, null, + null, null ), new NodeStats( @@ -320,6 +324,7 @@ public void testFillDiskUsageSomeInvalidValues() { null, null, null, + null, null ), new NodeStats( @@ -345,6 +350,7 @@ public void testFillDiskUsageSomeInvalidValues() { null, null, null, + null, null ) ); diff --git a/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java b/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java index d49d9fd41031c..9b61cff9d9a27 100644 --- a/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java +++ b/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java @@ -786,4 +786,125 @@ public void testExceptionOnResponseProcessing() throws Exception { // Exception thrown when processing response expectThrows(SearchPipelineProcessingException.class, () -> pipelinedRequest.transformResponse(response)); } + + public void testStats() throws Exception { + SearchRequestProcessor throwingRequestProcessor = new FakeRequestProcessor("throwing_request", "1", null, r -> { + throw new RuntimeException(); + }); + Map> requestProcessors = Map.of( + "successful_request", + (pf, t, f, c) -> new FakeRequestProcessor("successful_request", "2", null, r -> {}), + "throwing_request", + (pf, t, f, c) -> throwingRequestProcessor + ); + SearchResponseProcessor throwingResponseProcessor = new FakeResponseProcessor("throwing_response", "3", null, r -> { + throw new RuntimeException(); + }); + Map> responseProcessors = Map.of( + "successful_response", + (pf, t, f, c) -> new FakeResponseProcessor("successful_response", "4", null, r -> {}), + "throwing_response", + (pf, t, f, c) -> throwingResponseProcessor + ); + SearchPipelineService searchPipelineService = createWithProcessors(requestProcessors, responseProcessors); + + SearchPipelineMetadata metadata = new SearchPipelineMetadata( + Map.of( + "good_response_pipeline", + new PipelineConfiguration( + "good_response_pipeline", + new BytesArray("{\"response_processors\" : [ { \"successful_response\": {} } ] }"), + XContentType.JSON + ), + "bad_response_pipeline", + new PipelineConfiguration( + "bad_response_pipeline", + new BytesArray("{\"response_processors\" : [ { \"throwing_response\": {} } ] }"), + XContentType.JSON + ), + "good_request_pipeline", + new PipelineConfiguration( + "good_request_pipeline", + new BytesArray("{\"request_processors\" : [ { \"successful_request\": {} } ] }"), + XContentType.JSON + ), + "bad_request_pipeline", + new PipelineConfiguration( + "bad_request_pipeline", + new BytesArray("{\"request_processors\" : [ { \"throwing_request\": {} } ] }"), + XContentType.JSON + ) + ) + ); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); + ClusterState previousState = clusterState; + clusterState = ClusterState.builder(clusterState) + .metadata(Metadata.builder().putCustom(SearchPipelineMetadata.TYPE, metadata)) + .build(); + searchPipelineService.applyClusterState(new ClusterChangedEvent("", clusterState, previousState)); + + SearchRequest request = new SearchRequest(); + SearchResponse response = new SearchResponse(null, null, 0, 0, 0, 0, null, null); + + searchPipelineService.resolvePipeline(request.pipeline("good_request_pipeline")).transformResponse(response); + expectThrows( + SearchPipelineProcessingException.class, + () -> searchPipelineService.resolvePipeline(request.pipeline("bad_request_pipeline")).transformResponse(response) + ); + searchPipelineService.resolvePipeline(request.pipeline("good_response_pipeline")).transformResponse(response); + expectThrows( + SearchPipelineProcessingException.class, + () -> searchPipelineService.resolvePipeline(request.pipeline("bad_response_pipeline")).transformResponse(response) + ); + + SearchPipelineStats stats = searchPipelineService.stats(); + assertPipelineStats(stats.getTotalRequestStats(), 2, 1); + assertPipelineStats(stats.getTotalResponseStats(), 2, 1); + for (SearchPipelineStats.PipelineStats pipelineStats : stats.getPipelineStats()) { + SearchPipelineStats.PipelineDetailStats detailStats = stats.getPerPipelineProcessorStats().get(pipelineStats.getPipelineId()); + switch (pipelineStats.getPipelineId()) { + case "good_request_pipeline": + assertPipelineStats(pipelineStats.getRequestStats(), 1, 0); + assertPipelineStats(pipelineStats.getResponseStats(), 0, 0); + assertEquals(1, detailStats.requestProcessorStats().size()); + assertEquals(0, detailStats.responseProcessorStats().size()); + assertEquals("successful_request:2", detailStats.requestProcessorStats().get(0).getProcessorName()); + assertEquals("successful_request", detailStats.requestProcessorStats().get(0).getProcessorType()); + assertPipelineStats(detailStats.requestProcessorStats().get(0).getStats(), 1, 0); + break; + case "bad_request_pipeline": + assertPipelineStats(pipelineStats.getRequestStats(), 1, 1); + assertPipelineStats(pipelineStats.getResponseStats(), 0, 0); + assertEquals(1, detailStats.requestProcessorStats().size()); + assertEquals(0, detailStats.responseProcessorStats().size()); + assertEquals("throwing_request:1", detailStats.requestProcessorStats().get(0).getProcessorName()); + assertEquals("throwing_request", detailStats.requestProcessorStats().get(0).getProcessorType()); + assertPipelineStats(detailStats.requestProcessorStats().get(0).getStats(), 1, 1); + break; + case "good_response_pipeline": + assertPipelineStats(pipelineStats.getRequestStats(), 0, 0); + assertPipelineStats(pipelineStats.getResponseStats(), 1, 0); + assertEquals(0, detailStats.requestProcessorStats().size()); + assertEquals(1, detailStats.responseProcessorStats().size()); + assertEquals("successful_response:4", detailStats.responseProcessorStats().get(0).getProcessorName()); + assertEquals("successful_response", detailStats.responseProcessorStats().get(0).getProcessorType()); + assertPipelineStats(detailStats.responseProcessorStats().get(0).getStats(), 1, 0); + break; + case "bad_response_pipeline": + assertPipelineStats(pipelineStats.getRequestStats(), 0, 0); + assertPipelineStats(pipelineStats.getResponseStats(), 1, 1); + assertEquals(0, detailStats.requestProcessorStats().size()); + assertEquals(1, detailStats.responseProcessorStats().size()); + assertEquals("throwing_response:3", detailStats.responseProcessorStats().get(0).getProcessorName()); + assertEquals("throwing_response", detailStats.responseProcessorStats().get(0).getProcessorType()); + assertPipelineStats(detailStats.responseProcessorStats().get(0).getStats(), 1, 1); + break; + } + } + } + + private static void assertPipelineStats(SearchPipelineStats.Stats stats, long count, long failed) { + assertEquals(stats.getCount(), count); + assertEquals(stats.getFailedCount(), failed); + } } diff --git a/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineStatsTests.java b/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineStatsTests.java new file mode 100644 index 0000000000000..0107445e51f10 --- /dev/null +++ b/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineStatsTests.java @@ -0,0 +1,183 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.pipeline; + +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.xcontent.json.JsonXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; + +public class SearchPipelineStatsTests extends OpenSearchTestCase { + public void testSerializationRoundtrip() throws IOException { + SearchPipelineStats stats = createStats(); + SearchPipelineStats deserialized; + try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) { + stats.writeTo(bytesStreamOutput); + try (StreamInput bytesStreamInput = bytesStreamOutput.bytes().streamInput()) { + deserialized = new SearchPipelineStats(bytesStreamInput); + } + } + assertEquals(stats, deserialized); + } + + private static SearchPipelineStats createStats() { + return new SearchPipelineStats( + new SearchPipelineStats.Stats(1, 2, 3, 4), + new SearchPipelineStats.Stats(5, 6, 7, 8), + List.of( + new SearchPipelineStats.PipelineStats( + "p1", + new SearchPipelineStats.Stats(9, 10, 11, 12), + new SearchPipelineStats.Stats(13, 14, 15, 16) + ), + new SearchPipelineStats.PipelineStats( + "p2", + new SearchPipelineStats.Stats(17, 18, 19, 20), + new SearchPipelineStats.Stats(21, 22, 23, 24) + ) + + ), + Map.of( + "p1", + new SearchPipelineStats.PipelineDetailStats( + List.of(new SearchPipelineStats.ProcessorStats("req1:a", "req1", new SearchPipelineStats.Stats(25, 26, 27, 28))), + List.of(new SearchPipelineStats.ProcessorStats("rsp1:a", "rsp1", new SearchPipelineStats.Stats(29, 30, 31, 32))) + ), + "p2", + new SearchPipelineStats.PipelineDetailStats( + List.of( + new SearchPipelineStats.ProcessorStats("req1:a", "req1", new SearchPipelineStats.Stats(33, 34, 35, 36)), + new SearchPipelineStats.ProcessorStats("req2", "req2", new SearchPipelineStats.Stats(37, 38, 39, 40)) + ), + List.of() + ) + ) + ); + } + + public void testToXContent() throws IOException { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + XContentBuilder xContentBuilder = new XContentBuilder(JsonXContent.jsonXContent, bos); + xContentBuilder.prettyPrint(); + xContentBuilder.startObject(); + createStats().toXContent(xContentBuilder, null); + xContentBuilder.endObject(); + xContentBuilder.close(); + + String jsonContent = bos.toString(StandardCharsets.UTF_8); + + assertEquals( + "{\n" + + " \"search_pipeline\" : {\n" + + " \"total_request\" : {\n" + + " \"count\" : 1,\n" + + " \"time_in_millis\" : 2,\n" + + " \"current\" : 3,\n" + + " \"failed\" : 4\n" + + " },\n" + + " \"total_response\" : {\n" + + " \"count\" : 5,\n" + + " \"time_in_millis\" : 6,\n" + + " \"current\" : 7,\n" + + " \"failed\" : 8\n" + + " },\n" + + " \"pipelines\" : {\n" + + " \"p1\" : {\n" + + " \"request\" : {\n" + + " \"count\" : 9,\n" + + " \"time_in_millis\" : 10,\n" + + " \"current\" : 11,\n" + + " \"failed\" : 12\n" + + " },\n" + + " \"response\" : {\n" + + " \"count\" : 13,\n" + + " \"time_in_millis\" : 14,\n" + + " \"current\" : 15,\n" + + " \"failed\" : 16\n" + + " },\n" + + " \"request_processors\" : [\n" + + " {\n" + + " \"req1:a\" : {\n" + + " \"type\" : \"req1\",\n" + + " \"stats\" : {\n" + + " \"count\" : 25,\n" + + " \"time_in_millis\" : 26,\n" + + " \"current\" : 27,\n" + + " \"failed\" : 28\n" + + " }\n" + + " }\n" + + " }\n" + + " ],\n" + + " \"response_processors\" : [\n" + + " {\n" + + " \"rsp1:a\" : {\n" + + " \"type\" : \"rsp1\",\n" + + " \"stats\" : {\n" + + " \"count\" : 29,\n" + + " \"time_in_millis\" : 30,\n" + + " \"current\" : 31,\n" + + " \"failed\" : 32\n" + + " }\n" + + " }\n" + + " }\n" + + " ]\n" + + " },\n" + + " \"p2\" : {\n" + + " \"request\" : {\n" + + " \"count\" : 17,\n" + + " \"time_in_millis\" : 18,\n" + + " \"current\" : 19,\n" + + " \"failed\" : 20\n" + + " },\n" + + " \"response\" : {\n" + + " \"count\" : 21,\n" + + " \"time_in_millis\" : 22,\n" + + " \"current\" : 23,\n" + + " \"failed\" : 24\n" + + " },\n" + + " \"request_processors\" : [\n" + + " {\n" + + " \"req1:a\" : {\n" + + " \"type\" : \"req1\",\n" + + " \"stats\" : {\n" + + " \"count\" : 33,\n" + + " \"time_in_millis\" : 34,\n" + + " \"current\" : 35,\n" + + " \"failed\" : 36\n" + + " }\n" + + " }\n" + + " },\n" + + " {\n" + + " \"req2\" : {\n" + + " \"type\" : \"req2\",\n" + + " \"stats\" : {\n" + + " \"count\" : 37,\n" + + " \"time_in_millis\" : 38,\n" + + " \"current\" : 39,\n" + + " \"failed\" : 40\n" + + " }\n" + + " }\n" + + " }\n" + + " ],\n" + + " \"response_processors\" : [ ]\n" + + " }\n" + + " }\n" + + " }\n" + + "}", + jsonContent + ); + } +} diff --git a/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java b/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java index cf5f6613c3ea1..6634d1b4dbafc 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java +++ b/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java @@ -119,7 +119,8 @@ List adjustNodesStats(List nodesStats) { nodeStats.getClusterManagerThrottlingStats(), nodeStats.getWeightedRoutingStats(), nodeStats.getFileCacheStats(), - nodeStats.getTaskCancellationStats() + nodeStats.getTaskCancellationStats(), + nodeStats.getSearchPipelineStats() ); }).collect(Collectors.toList()); } diff --git a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java index a3612167f16c3..9987c54e04c9f 100644 --- a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java @@ -2685,6 +2685,7 @@ public void ensureEstimatedStats() { false, false, false, + false, false ); assertThat(