Skip to content

Commit

Permalink
Support recovery for index with external scheduler (#717) (#776)
Browse files Browse the repository at this point in the history
* Support recovery for index with external scheduler



* Improve default option update logic



* Resolve comments



* Add index metrics



* Remove debugging log and refactor updateSchedulerMode



* refactor metrics with aop



* Add more IT



---------


(cherry picked from commit a345373)

Signed-off-by: Louis Chu <clingzhi@amazon.com>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent 288f88a commit 6dc5726
Show file tree
Hide file tree
Showing 28 changed files with 959 additions and 286 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ lazy val flintCommons = (project in file("flint-commons"))
"org.scalatest" %% "scalatest" % "3.2.15" % "test",
"org.scalatest" %% "scalatest-flatspec" % "3.2.15" % "test",
"org.scalatestplus" %% "mockito-4-6" % "3.2.15.0" % "test",
"org.projectlombok" % "lombok" % "1.18.30",
"org.projectlombok" % "lombok" % "1.18.30" % "provided",
),
libraryDependencies ++= deps(sparkVersion),
publish / skip := true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.codahale.metrics.Timer;
import org.apache.spark.SparkEnv;
import org.apache.spark.metrics.source.FlintMetricSource;
import org.apache.spark.metrics.source.FlintIndexMetricSource;
import org.apache.spark.metrics.source.Source;
import scala.collection.Seq;

Expand All @@ -33,10 +34,20 @@ private MetricsUtil() {
* If the counter does not exist, it is created before being incremented.
*
* @param metricName The name of the metric for which the counter is incremented.
* This name is used to retrieve or create the counter.
*/
public static void incrementCounter(String metricName) {
Counter counter = getOrCreateCounter(metricName);
incrementCounter(metricName, false);
}

/**
* Increments the Counter metric associated with the given metric name.
* If the counter does not exist, it is created before being incremented.
*
* @param metricName The name of the metric for which the counter is incremented.
* @param isIndexMetric Whether this metric is an index-specific metric.
*/
public static void incrementCounter(String metricName, boolean isIndexMetric) {
Counter counter = getOrCreateCounter(metricName, isIndexMetric);
if (counter != null) {
counter.inc();
}
Expand All @@ -48,29 +59,48 @@ public static void incrementCounter(String metricName) {
* @param metricName The name of the metric counter to be decremented.
*/
public static void decrementCounter(String metricName) {
Counter counter = getOrCreateCounter(metricName);
decrementCounter(metricName, false);
}

/**
* Decrements the value of the specified metric counter by one, if the counter exists and its current count is greater than zero.
*
* @param metricName The name of the metric counter to be decremented.
* @param isIndexMetric Whether this metric is an index-specific metric.
*/
public static void decrementCounter(String metricName, boolean isIndexMetric) {
Counter counter = getOrCreateCounter(metricName, isIndexMetric);
if (counter != null && counter.getCount() > 0) {
counter.dec();
}
}

/**
* Retrieves a {@link Timer.Context} for the specified metric name, creating a new timer if one does not already exist.
* This context can be used to measure the duration of a particular operation or event.
*
* @param metricName The name of the metric timer to retrieve the context for.
* @return A {@link Timer.Context} instance for timing operations, or {@code null} if the timer could not be created or retrieved.
*/
public static Timer.Context getTimerContext(String metricName) {
Timer timer = getOrCreateTimer(metricName);
return getTimerContext(metricName, false);
}

/**
* Retrieves a {@link Timer.Context} for the specified metric name, creating a new timer if one does not already exist.
*
* @param metricName The name of the metric timer to retrieve the context for.
* @param isIndexMetric Whether this metric is an index-specific metric.
* @return A {@link Timer.Context} instance for timing operations, or {@code null} if the timer could not be created or retrieved.
*/
public static Timer.Context getTimerContext(String metricName, boolean isIndexMetric) {
Timer timer = getOrCreateTimer(metricName, isIndexMetric);
return timer != null ? timer.time() : null;
}

/**
* Stops the timer associated with the given {@link Timer.Context}, effectively recording the elapsed time since the timer was started
* and returning the duration. If the context is {@code null}, this method does nothing and returns {@code null}.
* Stops the timer associated with the given {@link Timer.Context}.
*
* @param context The {@link Timer.Context} to stop. May be {@code null}, in which case this method has no effect and returns {@code null}.
* @param context The {@link Timer.Context} to stop. May be {@code null}.
* @return The elapsed time in nanoseconds since the timer was started, or {@code null} if the context was {@code null}.
*/
public static Long stopTimer(Timer.Context context) {
Expand All @@ -79,53 +109,61 @@ public static Long stopTimer(Timer.Context context) {

/**
* Registers a gauge metric with the provided name and value.
* The gauge will reflect the current value of the AtomicInteger provided.
*
* @param metricName The name of the gauge metric to register.
* @param value The AtomicInteger whose current value should be reflected by the gauge.
* @param value The AtomicInteger whose current value should be reflected by the gauge.
*/
public static void registerGauge(String metricName, final AtomicInteger value) {
MetricRegistry metricRegistry = getMetricRegistry();
registerGauge(metricName, value, false);
}

/**
* Registers a gauge metric with the provided name and value.
*
* @param metricName The name of the gauge metric to register.
* @param value The AtomicInteger whose current value should be reflected by the gauge.
* @param isIndexMetric Whether this metric is an index-specific metric.
*/
public static void registerGauge(String metricName, final AtomicInteger value, boolean isIndexMetric) {
MetricRegistry metricRegistry = getMetricRegistry(isIndexMetric);
if (metricRegistry == null) {
LOG.warning("MetricRegistry not available, cannot register gauge: " + metricName);
return;
}
metricRegistry.register(metricName, (Gauge<Integer>) value::get);
}

// Retrieves or creates a new counter for the given metric name
private static Counter getOrCreateCounter(String metricName) {
MetricRegistry metricRegistry = getMetricRegistry();
private static Counter getOrCreateCounter(String metricName, boolean isIndexMetric) {
MetricRegistry metricRegistry = getMetricRegistry(isIndexMetric);
return metricRegistry != null ? metricRegistry.counter(metricName) : null;
}

// Retrieves or creates a new Timer for the given metric name
private static Timer getOrCreateTimer(String metricName) {
MetricRegistry metricRegistry = getMetricRegistry();
private static Timer getOrCreateTimer(String metricName, boolean isIndexMetric) {
MetricRegistry metricRegistry = getMetricRegistry(isIndexMetric);
return metricRegistry != null ? metricRegistry.timer(metricName) : null;
}

// Retrieves the MetricRegistry from the current Spark environment.
private static MetricRegistry getMetricRegistry() {
private static MetricRegistry getMetricRegistry(boolean isIndexMetric) {
SparkEnv sparkEnv = SparkEnv.get();
if (sparkEnv == null) {
LOG.warning("Spark environment not available, cannot access MetricRegistry.");
return null;
}

FlintMetricSource flintMetricSource = getOrInitFlintMetricSource(sparkEnv);
return flintMetricSource.metricRegistry();
Source metricSource = isIndexMetric ?
getOrInitMetricSource(sparkEnv, FlintMetricSource.FLINT_INDEX_METRIC_SOURCE_NAME(), FlintIndexMetricSource::new) :
getOrInitMetricSource(sparkEnv, FlintMetricSource.FLINT_METRIC_SOURCE_NAME(), FlintMetricSource::new);
return metricSource.metricRegistry();
}

// Gets or initializes the FlintMetricSource
private static FlintMetricSource getOrInitFlintMetricSource(SparkEnv sparkEnv) {
Seq<Source> metricSourceSeq = sparkEnv.metricsSystem().getSourcesByName(FlintMetricSource.FLINT_METRIC_SOURCE_NAME());
private static Source getOrInitMetricSource(SparkEnv sparkEnv, String sourceName, java.util.function.Supplier<Source> sourceSupplier) {
Seq<Source> metricSourceSeq = sparkEnv.metricsSystem().getSourcesByName(sourceName);

if (metricSourceSeq == null || metricSourceSeq.isEmpty()) {
FlintMetricSource metricSource = new FlintMetricSource();
Source metricSource = sourceSupplier.get();
sparkEnv.metricsSystem().registerSource(metricSource);
return metricSource;
}
return (FlintMetricSource) metricSourceSeq.head();
return metricSourceSeq.head();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,25 @@ package org.apache.spark.metrics.source

import com.codahale.metrics.MetricRegistry

class FlintMetricSource() extends Source {
/**
* Metric source for general Flint metrics.
*/
class FlintMetricSource extends Source {

// Implementing the Source trait
override val sourceName: String = FlintMetricSource.FLINT_METRIC_SOURCE_NAME
override val metricRegistry: MetricRegistry = new MetricRegistry
}

/**
* Metric source for Flint index-specific metrics.
*/
class FlintIndexMetricSource extends Source {
override val sourceName: String = FlintMetricSource.FLINT_INDEX_METRIC_SOURCE_NAME
override val metricRegistry: MetricRegistry = new MetricRegistry
}

object FlintMetricSource {
val FLINT_METRIC_SOURCE_NAME = "Flint" // Default source name
val FLINT_INDEX_METRIC_SOURCE_NAME = "FlintIndex" // Index specific source name
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,11 @@ public class FlintOptions implements Serializable {

public static final String DEFAULT_SUPPORT_SHARD = "true";

private static final String UNKNOWN = "UNKNOWN";

public static final String BULK_REQUEST_RATE_LIMIT_PER_NODE = "bulkRequestRateLimitPerNode";
public static final String DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE = "0";
public static final String DEFAULT_EXTERNAL_SCHEDULER_INTERVAL = "5 minutes";

public FlintOptions(Map<String, String> options) {
this.options = options;
Expand Down Expand Up @@ -185,9 +188,9 @@ public String getDataSourceName() {
* @return the AWS accountId
*/
public String getAWSAccountId() {
String clusterName = System.getenv().getOrDefault("FLINT_CLUSTER_NAME", "");
String clusterName = System.getenv().getOrDefault("FLINT_CLUSTER_NAME", UNKNOWN + ":" + UNKNOWN);
String[] parts = clusterName.split(":");
return parts.length == 2 ? parts[0] : "";
return parts.length == 2 ? parts[0] : UNKNOWN;
}

public String getSystemIndexName() {
Expand Down
Loading

0 comments on commit 6dc5726

Please sign in to comment.