Skip to content

Commit

Permalink
Add more flint metrics (#255)
Browse files Browse the repository at this point in the history
* Refactor and add metrics for streaming job

Signed-off-by: Louis Chu <clingzhi@amazon.com>

* Add metrics for request / result index

Signed-off-by: Louis Chu <clingzhi@amazon.com>

* Add java doc for OpenSearchUpdater class

Signed-off-by: Louis Chu <clingzhi@amazon.com>

* Address comment from Peng

Signed-off-by: Louis Chu <clingzhi@amazon.com>

---------

Signed-off-by: Louis Chu <clingzhi@amazon.com>
  • Loading branch information
noCharger authored Feb 20, 2024
1 parent 9c34a1d commit be8cb32
Show file tree
Hide file tree
Showing 18 changed files with 439 additions and 182 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.flint.core;

import org.opensearch.OpenSearchException;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.delete.DeleteRequest;
Expand All @@ -26,6 +27,7 @@
import org.opensearch.client.indices.GetIndexResponse;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.client.RequestOptions;
import org.opensearch.flint.core.metrics.MetricsUtil;

import java.io.Closeable;
import java.io.IOException;
Expand All @@ -52,11 +54,62 @@ public interface IRestHighLevelClient extends Closeable {

IndexResponse index(IndexRequest indexRequest, RequestOptions options) throws IOException;

Boolean isIndexExists(GetIndexRequest getIndexRequest, RequestOptions options) throws IOException;
Boolean doesIndexExist(GetIndexRequest getIndexRequest, RequestOptions options) throws IOException;

SearchResponse search(SearchRequest searchRequest, RequestOptions options) throws IOException;

SearchResponse scroll(SearchScrollRequest searchScrollRequest, RequestOptions options) throws IOException;

DocWriteResponse update(UpdateRequest updateRequest, RequestOptions options) throws IOException;


/**
* Records the success of an OpenSearch operation by incrementing the corresponding metric counter.
* This method constructs the metric name by appending ".200.count" to the provided metric name prefix.
* The metric name is then used to increment the counter, indicating a successful operation.
*
* @param metricNamePrefix the prefix for the metric name which is used to construct the full metric name for success
*/
static void recordOperationSuccess(String metricNamePrefix) {
String successMetricName = metricNamePrefix + ".2xx.count";
MetricsUtil.incrementCounter(successMetricName);
}

/**
* Records the failure of an OpenSearch operation by incrementing the corresponding metric counter.
* If the exception is an OpenSearchException with a specific status code (e.g., 403),
* it increments a metric specifically for that status code.
* Otherwise, it increments a general failure metric counter based on the status code category (e.g., 4xx, 5xx).
*
* @param metricNamePrefix the prefix for the metric name which is used to construct the full metric name for failure
* @param e the exception encountered during the operation, used to determine the type of failure
*/
static void recordOperationFailure(String metricNamePrefix, Exception e) {
OpenSearchException openSearchException = extractOpenSearchException(e);
int statusCode = openSearchException != null ? openSearchException.status().getStatus() : 500;

if (statusCode == 403) {
String forbiddenErrorMetricName = metricNamePrefix + ".403.count";
MetricsUtil.incrementCounter(forbiddenErrorMetricName);
}

String failureMetricName = metricNamePrefix + "." + (statusCode / 100) + "xx.count";
MetricsUtil.incrementCounter(failureMetricName);
}

/**
* Extracts an OpenSearchException from the given Throwable.
* Checks if the Throwable is an instance of OpenSearchException or caused by one.
*
* @param ex the exception to be checked
* @return the extracted OpenSearchException, or null if not found
*/
private static OpenSearchException extractOpenSearchException(Throwable ex) {
if (ex instanceof OpenSearchException) {
return (OpenSearchException) ex;
} else if (ex.getCause() instanceof OpenSearchException) {
return (OpenSearchException) ex.getCause();
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,13 @@
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.SearchScrollRequest;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.OpenSearchException;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.client.indices.CreateIndexRequest;
import org.opensearch.client.indices.CreateIndexResponse;
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.client.indices.GetIndexResponse;
import org.opensearch.flint.core.metrics.MetricsUtil;

import java.io.IOException;

Expand Down Expand Up @@ -91,7 +89,7 @@ public IndexResponse index(IndexRequest indexRequest, RequestOptions options) th
}

@Override
public Boolean isIndexExists(GetIndexRequest getIndexRequest, RequestOptions options) throws IOException {
public Boolean doesIndexExist(GetIndexRequest getIndexRequest, RequestOptions options) throws IOException {
return execute(OS_READ_OP_METRIC_PREFIX, () -> client.indices().exists(getIndexRequest, options));
}

Expand Down Expand Up @@ -122,64 +120,14 @@ public UpdateResponse update(UpdateRequest updateRequest, RequestOptions options
private <T> T execute(String metricNamePrefix, IOCallable<T> operation) throws IOException {
try {
T result = operation.call();
recordOperationSuccess(metricNamePrefix);
IRestHighLevelClient.recordOperationSuccess(metricNamePrefix);
return result;
} catch (Exception e) {
recordOperationFailure(metricNamePrefix, e);
IRestHighLevelClient.recordOperationFailure(metricNamePrefix, e);
throw e;
}
}

/**
* Records the success of an OpenSearch operation by incrementing the corresponding metric counter.
* This method constructs the metric name by appending ".200.count" to the provided metric name prefix.
* The metric name is then used to increment the counter, indicating a successful operation.
*
* @param metricNamePrefix the prefix for the metric name which is used to construct the full metric name for success
*/
private void recordOperationSuccess(String metricNamePrefix) {
String successMetricName = metricNamePrefix + ".2xx.count";
MetricsUtil.incrementCounter(successMetricName);
}

/**
* Records the failure of an OpenSearch operation by incrementing the corresponding metric counter.
* If the exception is an OpenSearchException with a specific status code (e.g., 403),
* it increments a metric specifically for that status code.
* Otherwise, it increments a general failure metric counter based on the status code category (e.g., 4xx, 5xx).
*
* @param metricNamePrefix the prefix for the metric name which is used to construct the full metric name for failure
* @param e the exception encountered during the operation, used to determine the type of failure
*/
private void recordOperationFailure(String metricNamePrefix, Exception e) {
OpenSearchException openSearchException = extractOpenSearchException(e);
int statusCode = openSearchException != null ? openSearchException.status().getStatus() : 500;

if (statusCode == 403) {
String forbiddenErrorMetricName = metricNamePrefix + ".403.count";
MetricsUtil.incrementCounter(forbiddenErrorMetricName);
}

String failureMetricName = metricNamePrefix + "." + (statusCode / 100) + "xx.count";
MetricsUtil.incrementCounter(failureMetricName);
}

/**
* Extracts an OpenSearchException from the given Throwable.
* Checks if the Throwable is an instance of OpenSearchException or caused by one.
*
* @param ex the exception to be checked
* @return the extracted OpenSearchException, or null if not found
*/
private OpenSearchException extractOpenSearchException(Throwable ex) {
if (ex instanceof OpenSearchException) {
return (OpenSearchException) ex;
} else if (ex.getCause() instanceof OpenSearchException) {
return (OpenSearchException) ex.getCause();
}
return null;
}

/**
* Functional interface for operations that can throw IOException.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
/**
* This class defines custom metric constants used for monitoring flint operations.
*/
public class MetricConstants {
public final class MetricConstants {

/**
* The prefix for all read-related metrics in OpenSearch.
Expand Down Expand Up @@ -47,6 +47,26 @@ public class MetricConstants {
*/
public static final String REPL_PROCESSING_TIME_METRIC = "session.processingTime";

/**
* Prefix for metrics related to the request metadata read operations.
*/
public static final String REQUEST_METADATA_READ_METRIC_PREFIX = "request.metadata.read";

/**
* Prefix for metrics related to the request metadata write operations.
*/
public static final String REQUEST_METADATA_WRITE_METRIC_PREFIX = "request.metadata.write";

/**
* Metric name for counting failed heartbeat operations on request metadata.
*/
public static final String REQUEST_METADATA_HEARTBEAT_FAILED_METRIC = "request.metadata.heartbeat.failed.count";

/**
* Prefix for metrics related to the result metadata write operations.
*/
public static final String RESULT_METADATA_WRITE_METRIC_PREFIX = "result.metadata.write";

/**
* Metric name for counting the number of statements currently running.
*/
Expand All @@ -65,5 +85,29 @@ public class MetricConstants {
/**
* Metric name for tracking the processing time of statements.
*/
public static final String STATEMENT_PROCESSING_TIME_METRIC = "STATEMENT.processingTime";
public static final String STATEMENT_PROCESSING_TIME_METRIC = "statement.processingTime";

/**
* Metric for tracking the count of currently running streaming jobs.
*/
public static final String STREAMING_RUNNING_METRIC = "streaming.running.count";

/**
* Metric for tracking the count of streaming jobs that have failed.
*/
public static final String STREAMING_FAILED_METRIC = "streaming.failed.count";

/**
* Metric for tracking the count of streaming jobs that have completed successfully.
*/
public static final String STREAMING_SUCCESS_METRIC = "streaming.success.count";

/**
* Metric for tracking the count of failed heartbeat signals in streaming jobs.
*/
public static final String STREAMING_HEARTBEAT_FAILED_METRIC = "streaming.heartbeat.failed.count";

private MetricConstants() {
// Private constructor to prevent instantiation
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@
package org.opensearch.flint.core.metrics;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import org.apache.spark.SparkEnv;
import org.apache.spark.metrics.source.FlintMetricSource;
import org.apache.spark.metrics.source.Source;
import scala.collection.Seq;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;

/**
Expand All @@ -21,8 +24,8 @@ public final class MetricsUtil {

private static final Logger LOG = Logger.getLogger(MetricsUtil.class.getName());

// Private constructor to prevent instantiation
private MetricsUtil() {
// Private constructor to prevent instantiation
}

/**
Expand Down Expand Up @@ -60,10 +63,7 @@ public static void decrementCounter(String metricName) {
*/
public static Timer.Context getTimerContext(String metricName) {
Timer timer = getOrCreateTimer(metricName);
if (timer != null) {
return timer.time();
}
return null;
return timer != null ? timer.time() : null;
}

/**
Expand All @@ -74,42 +74,47 @@ public static Timer.Context getTimerContext(String metricName) {
* @return The elapsed time in nanoseconds since the timer was started, or {@code null} if the context was {@code null}.
*/
public static Long stopTimer(Timer.Context context) {
if (context != null) {
return context.stop();
return context != null ? context.stop() : null;
}

/**
* Registers a gauge metric with the provided name and value.
* The gauge will reflect the current value of the AtomicInteger provided.
*
* @param metricName The name of the gauge metric to register.
* @param value The AtomicInteger whose current value should be reflected by the gauge.
*/
public static void registerGauge(String metricName, final AtomicInteger value) {
MetricRegistry metricRegistry = getMetricRegistry();
if (metricRegistry == null) {
LOG.warning("MetricRegistry not available, cannot register gauge: " + metricName);
return;
}
return null;
metricRegistry.register(metricName, (Gauge<Integer>) value::get);
}

// Retrieves or creates a new counter for the given metric name
private static Counter getOrCreateCounter(String metricName) {
SparkEnv sparkEnv = SparkEnv.get();
if (sparkEnv == null) {
LOG.warning("Spark environment not available, cannot instrument metric: " + metricName);
return null;
}

FlintMetricSource flintMetricSource = getOrInitFlintMetricSource(sparkEnv);
Counter counter = flintMetricSource.metricRegistry().getCounters().get(metricName);
if (counter == null) {
counter = flintMetricSource.metricRegistry().counter(metricName);
}
return counter;
MetricRegistry metricRegistry = getMetricRegistry();
return metricRegistry != null ? metricRegistry.counter(metricName) : null;
}

// Retrieves or creates a new Timer for the given metric name
private static Timer getOrCreateTimer(String metricName) {
MetricRegistry metricRegistry = getMetricRegistry();
return metricRegistry != null ? metricRegistry.timer(metricName) : null;
}

// Retrieves the MetricRegistry from the current Spark environment.
private static MetricRegistry getMetricRegistry() {
SparkEnv sparkEnv = SparkEnv.get();
if (sparkEnv == null) {
LOG.warning("Spark environment not available, cannot instrument metric: " + metricName);
LOG.warning("Spark environment not available, cannot access MetricRegistry.");
return null;
}

FlintMetricSource flintMetricSource = getOrInitFlintMetricSource(sparkEnv);
Timer timer = flintMetricSource.metricRegistry().getTimers().get(metricName);
if (timer == null) {
timer = flintMetricSource.metricRegistry().timer(metricName);
}
return timer;
return flintMetricSource.metricRegistry();
}

// Gets or initializes the FlintMetricSource
Expand Down
Loading

0 comments on commit be8cb32

Please sign in to comment.