Skip to content

Commit

Permalink
Refactor to common stats/metrics classes
Browse files Browse the repository at this point in the history
Search pipelines and ingest pipelines had identical functionality for
tracking metrics around operations and converting those to immutable
"stats" objects.

That approach isn't even really specific to pipelines, but can be used
to track metrics on any repeated operation, so I moved that common
logic to the common.metrics package.

Signed-off-by: Michael Froh <froh@amazon.com>
  • Loading branch information
msfroh committed Jun 21, 2023
1 parent 48ea4ca commit 88074fd
Show file tree
Hide file tree
Showing 22 changed files with 367 additions and 540 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public void testFailureInConditionalProcessor() {
for (int k = 0; k < nodeCount; k++) {
List<IngestStats.ProcessorStat> stats = r.getNodes().get(k).getIngestStats().getProcessorStats().get(pipelineId);
for (IngestStats.ProcessorStat st : stats) {
assertThat(st.getStats().getIngestCurrent(), greaterThanOrEqualTo(0L));
assertThat(st.getStats().getCurrent(), greaterThanOrEqualTo(0L));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodeRole;
import org.opensearch.common.metrics.OperationStats;
import org.opensearch.common.network.NetworkModule;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.transport.TransportAddress;
Expand Down Expand Up @@ -800,18 +801,18 @@ static class IngestStats implements ToXContentFragment {
pipelineIds.add(processorStats.getKey());
for (org.opensearch.ingest.IngestStats.ProcessorStat stat : processorStats.getValue()) {
stats.compute(stat.getType(), (k, v) -> {
org.opensearch.ingest.IngestStats.Stats nodeIngestStats = stat.getStats();
OperationStats nodeIngestStats = stat.getStats();
if (v == null) {
return new long[] {
nodeIngestStats.getIngestCount(),
nodeIngestStats.getIngestFailedCount(),
nodeIngestStats.getIngestCurrent(),
nodeIngestStats.getIngestTimeInMillis() };
nodeIngestStats.getCount(),
nodeIngestStats.getFailedCount(),
nodeIngestStats.getCurrent(),
nodeIngestStats.getTotalTimeInMillis() };
} else {
v[0] += nodeIngestStats.getIngestCount();
v[1] += nodeIngestStats.getIngestFailedCount();
v[2] += nodeIngestStats.getIngestCurrent();
v[3] += nodeIngestStats.getIngestTimeInMillis();
v[0] += nodeIngestStats.getCount();
v[1] += nodeIngestStats.getFailedCount();
v[2] += nodeIngestStats.getCurrent();
v[3] += nodeIngestStats.getTotalTimeInMillis();
return v;
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,16 @@
* compatible open source license.
*/

package org.opensearch.search.pipeline;

import org.opensearch.common.metrics.CounterMetric;
import org.opensearch.common.metrics.MeanMetric;
package org.opensearch.common.metrics;

import java.util.concurrent.atomic.AtomicLong;

/**
* Mutable tracker of search pipeline processing operations.
* Mutable tracker of a repeated operation.
*
* @opensearch.internal
*/
class SearchPipelineMetrics {
public class OperationMetrics {
/**
* The mean time it takes to complete the measured item.
*/
Expand All @@ -31,26 +30,39 @@ class SearchPipelineMetrics {
*/
private final CounterMetric failed = new CounterMetric();

/**
* Invoked before the given operation begins.
*/
public void before() {
current.incrementAndGet();
}

/**
* Invoked upon completion (success or failure) of the given operation
* @param currentTime elapsed time of the operation
*/
public void after(long currentTime) {
current.decrementAndGet();
time.inc(currentTime);
}

/**
* Invoked upon failure of the operation.
*/
public void failed() {
failed.inc();
}

public void add(SearchPipelineMetrics other) {
public void add(OperationMetrics other) {
// Don't try copying over current, since in-flight requests will be linked to the existing metrics instance.
failed.inc(other.failed.count());
time.add(other.time);
}

SearchPipelineStats.Stats createStats() {
return new SearchPipelineStats.Stats(time.count(), time.sum(), current.get(), failed.count());
/**
* @return an immutable snapshot of the current metric values.
*/
public OperationStats createStats() {
return new OperationStats(time.count(), time.sum(), current.get(), failed.count());
}
}
107 changes: 107 additions & 0 deletions server/src/main/java/org/opensearch/common/metrics/OperationStats.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.metrics;

import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

/**
* An immutable representation of a {@link OperationMetrics}
*/
public class OperationStats implements Writeable, ToXContentFragment {
private final long count;
private final long totalTimeInMillis;
private final long current;
private final long failedCount;

public OperationStats(long count, long totalTimeInMillis, long current, long failedCount) {
this.count = count;
this.totalTimeInMillis = totalTimeInMillis;
this.current = current;
this.failedCount = failedCount;
}

/**
* Read from a stream.
*/
public OperationStats(StreamInput in) throws IOException {
count = in.readVLong();
totalTimeInMillis = in.readVLong();
current = in.readVLong();
failedCount = in.readVLong();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(count);
out.writeVLong(totalTimeInMillis);
out.writeVLong(current);
out.writeVLong(failedCount);
}

/**
* @return The total number of executed operations.
*/
public long getCount() {
return count;
}

/**
* @return The total time spent of in millis.
*/
public long getTotalTimeInMillis() {
return totalTimeInMillis;
}

/**
* @return The total number of operations currently executing.
*/
public long getCurrent() {
return current;
}

/**
* @return The total number of operations that have failed.
*/
public long getFailedCount() {
return failedCount;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.field("count", count)
.humanReadableField("time_in_millis", "time", new TimeValue(totalTimeInMillis, TimeUnit.MILLISECONDS))
.field("current", current)
.field("failed", failedCount);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
OperationStats that = (OperationStats) o;
return Objects.equals(count, that.count)
&& Objects.equals(totalTimeInMillis, that.totalTimeInMillis)
&& Objects.equals(failedCount, that.failedCount)
&& Objects.equals(current, that.current);
}

@Override
public int hashCode() {
return Objects.hash(count, totalTimeInMillis, failedCount, current);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.opensearch.OpenSearchException;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.metrics.OperationMetrics;

import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -60,7 +61,7 @@ public class CompoundProcessor implements Processor {
private final boolean ignoreFailure;
private final List<Processor> processors;
private final List<Processor> onFailureProcessors;
private final List<Tuple<Processor, IngestMetric>> processorsWithMetrics;
private final List<Tuple<Processor, OperationMetrics>> processorsWithMetrics;
private final LongSupplier relativeTimeProvider;

CompoundProcessor(LongSupplier relativeTimeProvider, Processor... processor) {
Expand All @@ -87,10 +88,10 @@ public CompoundProcessor(boolean ignoreFailure, List<Processor> processors, List
this.onFailureProcessors = onFailureProcessors;
this.relativeTimeProvider = relativeTimeProvider;
this.processorsWithMetrics = new ArrayList<>(processors.size());
processors.forEach(p -> processorsWithMetrics.add(new Tuple<>(p, new IngestMetric())));
processors.forEach(p -> processorsWithMetrics.add(new Tuple<>(p, new OperationMetrics())));
}

List<Tuple<Processor, IngestMetric>> getProcessorsWithMetrics() {
List<Tuple<Processor, OperationMetrics>> getProcessorsWithMetrics() {
return processorsWithMetrics;
}

Expand Down Expand Up @@ -155,17 +156,17 @@ void innerExecute(int currentProcessor, IngestDocument ingestDocument, BiConsume
return;
}

Tuple<Processor, IngestMetric> processorWithMetric = processorsWithMetrics.get(currentProcessor);
Tuple<Processor, OperationMetrics> processorWithMetric = processorsWithMetrics.get(currentProcessor);
final Processor processor = processorWithMetric.v1();
final IngestMetric metric = processorWithMetric.v2();
final OperationMetrics metric = processorWithMetric.v2();
final long startTimeInNanos = relativeTimeProvider.getAsLong();
metric.preIngest();
metric.before();
processor.execute(ingestDocument, (result, e) -> {
long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos);
metric.postIngest(ingestTimeInMillis);
metric.after(ingestTimeInMillis);

if (e != null) {
metric.ingestFailed();
metric.failed();
if (ignoreFailure) {
innerExecute(currentProcessor + 1, ingestDocument, handler);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.ingest;

import org.opensearch.common.metrics.OperationMetrics;
import org.opensearch.script.IngestConditionalScript;
import org.opensearch.script.Script;
import org.opensearch.script.ScriptException;
Expand Down Expand Up @@ -66,7 +67,7 @@ public class ConditionalProcessor extends AbstractProcessor implements WrappingP
private final Script condition;
private final ScriptService scriptService;
private final Processor processor;
private final IngestMetric metric;
private final OperationMetrics metric;
private final LongSupplier relativeTimeProvider;
private final IngestConditionalScript precompiledConditionScript;

Expand All @@ -86,7 +87,7 @@ public class ConditionalProcessor extends AbstractProcessor implements WrappingP
this.condition = script;
this.scriptService = scriptService;
this.processor = processor;
this.metric = new IngestMetric();
this.metric = new OperationMetrics();
this.relativeTimeProvider = relativeTimeProvider;

try {
Expand Down Expand Up @@ -114,12 +115,12 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex

if (matches) {
final long startTimeInNanos = relativeTimeProvider.getAsLong();
metric.preIngest();
metric.before();
processor.execute(ingestDocument, (result, e) -> {
long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos);
metric.postIngest(ingestTimeInMillis);
metric.after(ingestTimeInMillis);
if (e != null) {
metric.ingestFailed();
metric.failed();
handler.accept(null, e);
} else {
handler.accept(result, null);
Expand Down Expand Up @@ -148,7 +149,7 @@ public Processor getInnerProcessor() {
return processor;
}

IngestMetric getMetric() {
OperationMetrics getMetric() {
return metric;
}

Expand Down
Loading

0 comments on commit 88074fd

Please sign in to comment.