Skip to content

Commit

Permalink
[Search Pipelines] Add stats for search pipelines (opensearch-project…
Browse files Browse the repository at this point in the history
…#8053)

* [Search Pipelines] Add stats for search pipelines

This adds statistics on executions and time spent on search pipeline
operations, similar to the stats that are available for ingest
pipelines.

Signed-off-by: Michael Froh <froh@amazon.com>

* Compare parsed JSON structure, not exact JSON string

As @lukas-vlcek pointed out, asserting equality with an exact JSON
string is sensitive to formatting, which makes the test brittle.

Instead, we can parse the expected JSON and compare as Maps.

Signed-off-by: Michael Froh <froh@amazon.com>

* Refactor to common stats/metrics classes

Search pipelines and ingest pipelines had identical functionality for
tracking metrics around operations and converting those to immutable
"stats" objects.

That approach isn't even really specific to pipelines, but can be used
to track metrics on any repeated operation, so I moved that common
logic to the common.metrics package.

Signed-off-by: Michael Froh <froh@amazon.com>

* Split pipeline metrics tracking into its own class

Thanks @saratvemulapalli for the suggestion! This lets the Pipeline
class focus on transforming requests / responses, while the subclass
focuses on tracking and managing metrics.

Signed-off-by: Michael Froh <froh@amazon.com>

---------

Signed-off-by: Michael Froh <froh@amazon.com>
(cherry picked from commit 46c9a21)
  • Loading branch information
msfroh authored and mingshl committed Jul 5, 2023
1 parent 9b9718f commit 9bd4069
Show file tree
Hide file tree
Showing 33 changed files with 1,398 additions and 457 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Implement concurrent aggregations support without profile option ([#7514](https://github.com/opensearch-project/OpenSearch/pull/7514))
- Add dynamic index and cluster setting for concurrent segment search ([#7956](https://github.com/opensearch-project/OpenSearch/pull/7956))
- Add descending order search optimization through reverse segment read. ([#7967](https://github.com/opensearch-project/OpenSearch/pull/7967))
- [Search pipelines] Added search pipelines output to node stats ([#8053](https://github.com/opensearch-project/OpenSearch/pull/8053))
- Update components of segrep backpressure to support remote store. ([#8020](https://github.com/opensearch-project/OpenSearch/pull/8020))
- Make remote cluster connection setup in async ([#8038](https://github.com/opensearch-project/OpenSearch/pull/8038))
- Add API to initialize extensions ([#8029]()https://github.com/opensearch-project/OpenSearch/pull/8029)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public void testFailureInConditionalProcessor() {
for (int k = 0; k < nodeCount; k++) {
List<IngestStats.ProcessorStat> stats = r.getNodes().get(k).getIngestStats().getProcessorStats().get(pipelineId);
for (IngestStats.ProcessorStat st : stats) {
assertThat(st.getStats().getIngestCurrent(), greaterThanOrEqualTo(0L));
assertThat(st.getStats().getCurrent(), greaterThanOrEqualTo(0L));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.opensearch.script.ScriptCacheStats;
import org.opensearch.script.ScriptStats;
import org.opensearch.search.backpressure.stats.SearchBackpressureStats;
import org.opensearch.search.pipeline.SearchPipelineStats;
import org.opensearch.tasks.TaskCancellationStats;
import org.opensearch.threadpool.ThreadPoolStats;
import org.opensearch.transport.TransportStats;
Expand Down Expand Up @@ -139,6 +140,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
@Nullable
private TaskCancellationStats taskCancellationStats;

@Nullable
private SearchPipelineStats searchPipelineStats;

public NodeStats(StreamInput in) throws IOException {
super(in);
timestamp = in.readVLong();
Expand Down Expand Up @@ -202,6 +206,11 @@ public NodeStats(StreamInput in) throws IOException {
} else {
taskCancellationStats = null;
}
if (in.getVersion().onOrAfter(Version.V_3_0_0)) { // TODO Update to 2_9_0 when we backport to 2.x
searchPipelineStats = in.readOptionalWriteable(SearchPipelineStats::new);
} else {
searchPipelineStats = null;
}
}

public NodeStats(
Expand All @@ -227,7 +236,8 @@ public NodeStats(
@Nullable ClusterManagerThrottlingStats clusterManagerThrottlingStats,
@Nullable WeightedRoutingStats weightedRoutingStats,
@Nullable FileCacheStats fileCacheStats,
@Nullable TaskCancellationStats taskCancellationStats
@Nullable TaskCancellationStats taskCancellationStats,
@Nullable SearchPipelineStats searchPipelineStats
) {
super(node);
this.timestamp = timestamp;
Expand All @@ -252,6 +262,7 @@ public NodeStats(
this.weightedRoutingStats = weightedRoutingStats;
this.fileCacheStats = fileCacheStats;
this.taskCancellationStats = taskCancellationStats;
this.searchPipelineStats = searchPipelineStats;
}

public long getTimestamp() {
Expand Down Expand Up @@ -384,6 +395,11 @@ public TaskCancellationStats getTaskCancellationStats() {
return taskCancellationStats;
}

@Nullable
public SearchPipelineStats getSearchPipelineStats() {
return searchPipelineStats;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand Down Expand Up @@ -430,6 +446,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_9_0)) {
out.writeOptionalWriteable(taskCancellationStats);
}
if (out.getVersion().onOrAfter(Version.V_3_0_0)) { // TODO: Update to 2_9_0 once we backport to 2.x
out.writeOptionalWriteable(searchPipelineStats);
}
}

@Override
Expand Down Expand Up @@ -517,6 +536,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (getTaskCancellationStats() != null) {
getTaskCancellationStats().toXContent(builder, params);
}
if (getSearchPipelineStats() != null) {
getSearchPipelineStats().toXContent(builder, params);
}

return builder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,8 @@ public enum Metric {
CLUSTER_MANAGER_THROTTLING("cluster_manager_throttling"),
WEIGHTED_ROUTING_STATS("weighted_routing"),
FILE_CACHE_STATS("file_cache"),
TASK_CANCELLATION("task_cancellation");
TASK_CANCELLATION("task_cancellation"),
SEARCH_PIPELINE("search_pipeline");

private String metricName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) {
NodesStatsRequest.Metric.CLUSTER_MANAGER_THROTTLING.containedIn(metrics),
NodesStatsRequest.Metric.WEIGHTED_ROUTING_STATS.containedIn(metrics),
NodesStatsRequest.Metric.FILE_CACHE_STATS.containedIn(metrics),
NodesStatsRequest.Metric.TASK_CANCELLATION.containedIn(metrics)
NodesStatsRequest.Metric.TASK_CANCELLATION.containedIn(metrics),
NodesStatsRequest.Metric.SEARCH_PIPELINE.containedIn(metrics)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodeRole;
import org.opensearch.common.Strings;
import org.opensearch.common.metrics.OperationStats;
import org.opensearch.common.network.NetworkModule;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.transport.TransportAddress;
Expand Down Expand Up @@ -800,18 +801,18 @@ static class IngestStats implements ToXContentFragment {
pipelineIds.add(processorStats.getKey());
for (org.opensearch.ingest.IngestStats.ProcessorStat stat : processorStats.getValue()) {
stats.compute(stat.getType(), (k, v) -> {
org.opensearch.ingest.IngestStats.Stats nodeIngestStats = stat.getStats();
OperationStats nodeIngestStats = stat.getStats();
if (v == null) {
return new long[] {
nodeIngestStats.getIngestCount(),
nodeIngestStats.getIngestFailedCount(),
nodeIngestStats.getIngestCurrent(),
nodeIngestStats.getIngestTimeInMillis() };
nodeIngestStats.getCount(),
nodeIngestStats.getFailedCount(),
nodeIngestStats.getCurrent(),
nodeIngestStats.getTotalTimeInMillis() };
} else {
v[0] += nodeIngestStats.getIngestCount();
v[1] += nodeIngestStats.getIngestFailedCount();
v[2] += nodeIngestStats.getIngestCurrent();
v[3] += nodeIngestStats.getIngestTimeInMillis();
v[0] += nodeIngestStats.getCount();
v[1] += nodeIngestStats.getFailedCount();
v[2] += nodeIngestStats.getCurrent();
v[3] += nodeIngestStats.getTotalTimeInMillis();
return v;
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
false,
false,
false,
false,
false
);
List<ShardStats> shardsStats = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ public void inc(long n) {
sum.add(n);
}

public void add(MeanMetric other) {
counter.add(other.counter.sum());
sum.add(other.sum.sum());
}

public void dec(long n) {
counter.decrement();
sum.add(-n);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.metrics;

import java.util.concurrent.atomic.AtomicLong;

/**
* Mutable tracker of a repeated operation.
*
* @opensearch.internal
*/
public class OperationMetrics {
/**
* The mean time it takes to complete the measured item.
*/
private final MeanMetric time = new MeanMetric();
/**
* The current count of things being measured.
* Useful when aggregating multiple metrics to see how many things are in flight.
*/
private final AtomicLong current = new AtomicLong();
/**
* The non-decreasing count of failures
*/
private final CounterMetric failed = new CounterMetric();

/**
* Invoked before the given operation begins.
*/
public void before() {
current.incrementAndGet();
}

/**
* Invoked upon completion (success or failure) of the given operation
* @param currentTime elapsed time of the operation
*/
public void after(long currentTime) {
current.decrementAndGet();
time.inc(currentTime);
}

/**
* Invoked upon failure of the operation.
*/
public void failed() {
failed.inc();
}

public void add(OperationMetrics other) {
// Don't try copying over current, since in-flight requests will be linked to the existing metrics instance.
failed.inc(other.failed.count());
time.add(other.time);
}

/**
* @return an immutable snapshot of the current metric values.
*/
public OperationStats createStats() {
return new OperationStats(time.count(), time.sum(), current.get(), failed.count());
}
}
107 changes: 107 additions & 0 deletions server/src/main/java/org/opensearch/common/metrics/OperationStats.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.metrics;

import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

/**
* An immutable representation of a {@link OperationMetrics}
*/
public class OperationStats implements Writeable, ToXContentFragment {
private final long count;
private final long totalTimeInMillis;
private final long current;
private final long failedCount;

public OperationStats(long count, long totalTimeInMillis, long current, long failedCount) {
this.count = count;
this.totalTimeInMillis = totalTimeInMillis;
this.current = current;
this.failedCount = failedCount;
}

/**
* Read from a stream.
*/
public OperationStats(StreamInput in) throws IOException {
count = in.readVLong();
totalTimeInMillis = in.readVLong();
current = in.readVLong();
failedCount = in.readVLong();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(count);
out.writeVLong(totalTimeInMillis);
out.writeVLong(current);
out.writeVLong(failedCount);
}

/**
* @return The total number of executed operations.
*/
public long getCount() {
return count;
}

/**
* @return The total time spent of in millis.
*/
public long getTotalTimeInMillis() {
return totalTimeInMillis;
}

/**
* @return The total number of operations currently executing.
*/
public long getCurrent() {
return current;
}

/**
* @return The total number of operations that have failed.
*/
public long getFailedCount() {
return failedCount;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.field("count", count)
.humanReadableField("time_in_millis", "time", new TimeValue(totalTimeInMillis, TimeUnit.MILLISECONDS))
.field("current", current)
.field("failed", failedCount);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
OperationStats that = (OperationStats) o;
return Objects.equals(count, that.count)
&& Objects.equals(totalTimeInMillis, that.totalTimeInMillis)
&& Objects.equals(failedCount, that.failedCount)
&& Objects.equals(current, that.current);
}

@Override
public int hashCode() {
return Objects.hash(count, totalTimeInMillis, failedCount, current);
}
}
Loading

0 comments on commit 9bd4069

Please sign in to comment.