Skip to content

Commit

Permalink
Improve default option update logic
Browse files Browse the repository at this point in the history
Signed-off-by: Louis Chu <clingzhi@amazon.com>
  • Loading branch information
noCharger committed Oct 8, 2024
1 parent 01d76a8 commit fdbf5e7
Show file tree
Hide file tree
Showing 5 changed files with 234 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import java.util.{Collections, UUID}
import org.json4s.{Formats, NoTypeHints}
import org.json4s.native.JsonMethods._
import org.json4s.native.Serialization
import org.opensearch.flint.core.logging.CustomLogging.logInfo
import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, EXTRA_OPTIONS, INCREMENTAL_REFRESH, INDEX_SETTINGS, OptionName, OUTPUT_MODE, REFRESH_INTERVAL, SCHEDULER_MODE, WATERMARK_DELAY}
import org.opensearch.flint.spark.FlintSparkIndexOptions.validateOptionNames
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.SchedulerMode
Expand Down Expand Up @@ -236,26 +237,36 @@ object FlintSparkIndexOptions {
IntervalSchedulerParser.parse(flintSparkConf.externalSchedulerIntervalThreshold())
val currentInterval = options.refreshInterval().map(IntervalSchedulerParser.parse)

logInfo(s"updateOptionsWithDefaults - before updatedOptions: ${updatedOptions}")
logInfo(s"currentInterval.isDefined: ${currentInterval.isDefined}")
logInfo(s"${updatedOptions.get(SCHEDULER_MODE.toString).equals(Some("external"))}")
(
externalSchedulerEnabled,
currentInterval,
currentInterval.isDefined,
updatedOptions.get(SCHEDULER_MODE.toString)) match {
case (true, Some(interval), _) if interval.getInterval >= thresholdInterval.getInterval =>
case (true, true, None | Some("external"))
if currentInterval.get.getInterval >= thresholdInterval.getInterval =>
updatedOptions += (SCHEDULER_MODE.toString -> SchedulerMode.EXTERNAL.toString)
case (true, None, Some("external")) =>
case (true, true, Some("external"))
if currentInterval.get.getInterval < thresholdInterval.getInterval =>
throw new IllegalArgumentException(
s"Input refresh_interval is ${options.refreshInterval().get}, required above the interval threshold of external scheduler: ${flintSparkConf
.externalSchedulerIntervalThreshold()}")
case (true, false, Some("external")) =>
updatedOptions += (REFRESH_INTERVAL.toString -> flintSparkConf
.externalSchedulerIntervalThreshold())
case (true, None, None) =>
case (true, false, None) =>
updatedOptions += (SCHEDULER_MODE.toString -> SchedulerMode.EXTERNAL.toString)
updatedOptions += (REFRESH_INTERVAL.toString -> flintSparkConf
.externalSchedulerIntervalThreshold())
case (false, _, Some("external")) =>
throw new IllegalArgumentException(
"External scheduler mode spark conf is not enabled but refresh interval is set to external scheduler mode")
case _ =>
logInfo("Debug only")
updatedOptions += (SCHEDULER_MODE.toString -> SchedulerMode.INTERNAL.toString)
}

logInfo(s"updateOptionsWithDefaults - updatedOptions: ${updatedOptions}")
FlintSparkIndexOptions(updatedOptions.toMap)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.flint.spark.scheduler.util

import org.opensearch.flint.spark.FlintSparkIndex
import org.opensearch.flint.spark.FlintSparkIndex.quotedTableName
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex
Expand All @@ -23,13 +24,14 @@ object RefreshQueryGenerator {
* if the provided parameters are invalid
*/
def generateRefreshQuery(index: FlintSparkIndex): String = {
val tableName = quotedTableName(index.metadata().source)
index match {
case skippingIndex: FlintSparkSkippingIndex =>
s"REFRESH SKIPPING INDEX ON ${skippingIndex.tableName}"
case _: FlintSparkSkippingIndex =>
s"REFRESH SKIPPING INDEX ON ${tableName}"
case coveringIndex: FlintSparkCoveringIndex =>
s"REFRESH INDEX ${coveringIndex.indexName} ON ${coveringIndex.tableName}"
case materializedView: FlintSparkMaterializedView =>
s"REFRESH MATERIALIZED VIEW ${materializedView.mvName}"
s"REFRESH INDEX ${coveringIndex.indexName} ON ${tableName}"
case _: FlintSparkMaterializedView =>
s"REFRESH MATERIALIZED VIEW ${tableName}"
case _ =>
throw new IllegalArgumentException(
s"Unsupported index type: ${index.getClass.getSimpleName}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,20 @@ package org.opensearch.flint.spark

import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{CHECKPOINT_LOCATION, REFRESH_INTERVAL, SCHEDULER_MODE}
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.SchedulerMode
import org.scalatest.Inspectors.forAll
import org.scalatest.matchers.should.Matchers
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper
import org.scalatest.matchers.should.Matchers.not.include
import org.scalatest.prop.TableDrivenPropertyChecks
import org.scalatest.wordspec.AnyWordSpec

import org.apache.spark.FlintSuite
import org.apache.spark.sql.flint.config.FlintSparkConf

class FlintSparkIndexBuilderSuite extends FlintSuite {
class FlintSparkIndexBuilderSuite
extends FlintSuite
with Matchers
with TableDrivenPropertyChecks {

val indexName: String = "test_index"
val testCheckpointLocation = "/test/checkpoints/"
Expand Down Expand Up @@ -143,71 +150,148 @@ class FlintSparkIndexBuilderSuite extends FlintSuite {
}
}

test(
"updateOptionsWithDefaults should set internal scheduler mode when auto refresh is false") {
val options = FlintSparkIndexOptions(Map("auto_refresh" -> "false"))
val builder = new FakeFlintSparkIndexBuilder

val updatedOptions = builder.options(options, indexName).testOptions
updatedOptions.options.get(SCHEDULER_MODE.toString) shouldBe None
}

test(
"updateOptionsWithDefaults should set internal scheduler mode when external scheduler is disabled") {
setFlintSparkConf(FlintSparkConf.EXTERNAL_SCHEDULER_ENABLED, false)
val options = FlintSparkIndexOptions(Map("auto_refresh" -> "true"))
val builder = new FakeFlintSparkIndexBuilder

val updatedOptions = builder.options(options, indexName).testOptions
updatedOptions.options(SCHEDULER_MODE.toString) shouldBe SchedulerMode.INTERNAL.toString
}

test(
"updateOptionsWithDefaults should set external scheduler mode when interval is above threshold") {
setFlintSparkConf(FlintSparkConf.EXTERNAL_SCHEDULER_ENABLED, true)
setFlintSparkConf(FlintSparkConf.EXTERNAL_SCHEDULER_INTERVAL_THRESHOLD, "5 minutes")
val options =
FlintSparkIndexOptions(Map("auto_refresh" -> "true", "refresh_interval" -> "10 minutes"))
val builder = new FakeFlintSparkIndexBuilder

val updatedOptions = builder.options(options, indexName).testOptions
updatedOptions.options(SCHEDULER_MODE.toString) shouldBe SchedulerMode.EXTERNAL.toString
}

test(
"updateOptionsWithDefaults should set external scheduler mode and default interval when no interval is provided") {
setFlintSparkConf(FlintSparkConf.EXTERNAL_SCHEDULER_ENABLED, true)
setFlintSparkConf(FlintSparkConf.EXTERNAL_SCHEDULER_INTERVAL_THRESHOLD, "5 minutes")
val options = FlintSparkIndexOptions(Map("auto_refresh" -> "true"))
val builder = new FakeFlintSparkIndexBuilder

val updatedOptions = builder.options(options, indexName).testOptions
updatedOptions.options(SCHEDULER_MODE.toString) shouldBe SchedulerMode.EXTERNAL.toString
updatedOptions.options(REFRESH_INTERVAL.toString) shouldBe "5 minutes"
}

test("updateOptionsWithDefaults should set external scheduler mode when explicitly specified") {
setFlintSparkConf(FlintSparkConf.EXTERNAL_SCHEDULER_ENABLED, true)
val options =
FlintSparkIndexOptions(Map("auto_refresh" -> "true", "scheduler_mode" -> "external"))
val builder = new FakeFlintSparkIndexBuilder

val updatedOptions = builder.options(options, indexName).testOptions
updatedOptions.options(SCHEDULER_MODE.toString) shouldBe SchedulerMode.EXTERNAL.toString
}

test(
"updateOptionsWithDefaults should throw exception when external scheduler is disabled but mode is external") {
setFlintSparkConf(FlintSparkConf.EXTERNAL_SCHEDULER_ENABLED, false)
val options =
FlintSparkIndexOptions(Map("auto_refresh" -> "true", "scheduler_mode" -> "external"))
val builder = new FakeFlintSparkIndexBuilder

val exception = intercept[IllegalArgumentException] {
builder.options(options, indexName)
test("updateOptionsWithDefaults scenarios") {
val scenarios = Table(
(
"testName",
"externalSchedulerEnabled",
"thresholdInterval",
"inputOptions",
"expectedMode",
"expectedInterval",
"expectedException"),
(
"set internal mode when auto refresh is false",
false,
"5 minutes",
Map("auto_refresh" -> "false"),
None,
None,
None),
(
"set internal mode when external scheduler is disabled",
false,
"5 minutes",
Map("auto_refresh" -> "true"),
Some(SchedulerMode.INTERNAL.toString),
None,
None),
(
"set external mode when interval is above threshold",
true,
"5 minutes",
Map("auto_refresh" -> "true", "refresh_interval" -> "10 minutes"),
Some(SchedulerMode.EXTERNAL.toString),
Some("10 minutes"),
None),
(
"set external mode and default interval when no interval provided",
true,
"5 minutes",
Map("auto_refresh" -> "true"),
Some(SchedulerMode.EXTERNAL.toString),
Some("5 minutes"),
None),
(
"set external mode when explicitly specified",
true,
"5 minutes",
Map("auto_refresh" -> "true", "scheduler_mode" -> "external"),
Some(SchedulerMode.EXTERNAL.toString),
None,
None),
(
"throw exception when external scheduler disabled but mode is external",
false,
"5 minutes",
Map("auto_refresh" -> "true", "scheduler_mode" -> "external"),
None,
None,
Some(
"External scheduler mode spark conf is not enabled but refresh interval is set to external schedule")),
(
"set external mode when interval above threshold and no mode specified",
true,
"5 minutes",
Map("auto_refresh" -> "true", "refresh_interval" -> "10 minutes"),
Some(SchedulerMode.EXTERNAL.toString),
Some("10 minutes"),
None),
(
"throw exception when interval below threshold but mode is external",
true,
"5 minutes",
Map(
"auto_refresh" -> "true",
"refresh_interval" -> "1 minute",
"scheduler_mode" -> "external"),
None,
None,
Some("Input refresh_interval is 1 minute, required above the interval threshold")),
(
"set external mode when interval above threshold and mode specified",
true,
"5 minutes",
Map(
"auto_refresh" -> "true",
"refresh_interval" -> "10 minute",
"scheduler_mode" -> "external"),
Some(SchedulerMode.EXTERNAL.toString),
None,
None),
(
"set default interval when mode is external but no interval provided",
true,
"5 minutes",
Map("auto_refresh" -> "true", "scheduler_mode" -> "external"),
Some(SchedulerMode.EXTERNAL.toString),
Some("5 minutes"),
None),
(
"set external mode when external scheduler enabled but no mode or interval specified",
true,
"5 minutes",
Map("auto_refresh" -> "true"),
Some(SchedulerMode.EXTERNAL.toString),
None,
None))

forAll(scenarios) {
(
testName,
externalSchedulerEnabled,
thresholdInterval,
inputOptions,
expectedMode,
expectedInterval,
expectedException) =>
withClue(s"Scenario: $testName - ") {
setFlintSparkConf(FlintSparkConf.EXTERNAL_SCHEDULER_ENABLED, externalSchedulerEnabled)
setFlintSparkConf(
FlintSparkConf.EXTERNAL_SCHEDULER_INTERVAL_THRESHOLD,
thresholdInterval)

val options = FlintSparkIndexOptions(inputOptions)
val builder = new FakeFlintSparkIndexBuilder

expectedException match {
case Some(exceptionMessage) =>
val exception = intercept[IllegalArgumentException] {
builder.options(options, indexName)
}
exception.getMessage should include(exceptionMessage)

case None =>
val updatedOptions = builder.options(options, indexName).testOptions
expectedMode.foreach { mode =>
updatedOptions.options(SCHEDULER_MODE.toString) shouldBe mode
}
expectedInterval.foreach { interval =>
updatedOptions.options(REFRESH_INTERVAL.toString) shouldBe interval
}
}
}
}
exception.getMessage should include(
"External scheduler mode is not enabled in the configuration")
}

override def afterEach(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.flint.spark.scheduler.util;

import org.mockito.Mockito._
import org.opensearch.flint.common.metadata.FlintMetadata
import org.opensearch.flint.spark.FlintSparkIndex
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
Expand All @@ -16,33 +17,43 @@ import org.apache.spark.SparkFunSuite

class RefreshQueryGeneratorTest extends SparkFunSuite with Matchers {

val testTable = "dummy.default.testTable"
val expectedTableName = "dummy.default.`testTable`"

val mockMetadata = mock(classOf[FlintMetadata])

test("generateRefreshQuery should return correct query for FlintSparkSkippingIndex") {
val mockIndex = mock(classOf[FlintSparkSkippingIndex])
when(mockIndex.tableName).thenReturn("testTable")
when(mockIndex.metadata()).thenReturn(mockMetadata)
when(mockIndex.metadata().source).thenReturn(testTable)

val result = RefreshQueryGenerator.generateRefreshQuery(mockIndex)
result shouldBe "REFRESH SKIPPING INDEX ON testTable"
result shouldBe s"REFRESH SKIPPING INDEX ON ${expectedTableName}"
}

test("generateRefreshQuery should return correct query for FlintSparkCoveringIndex") {
val mockIndex = mock(classOf[FlintSparkCoveringIndex])
when(mockIndex.indexName).thenReturn("testIndex")
when(mockIndex.tableName).thenReturn("testTable")
when(mockIndex.metadata()).thenReturn(mockMetadata)
when(mockIndex.metadata().source).thenReturn(testTable)

val result = RefreshQueryGenerator.generateRefreshQuery(mockIndex)
result shouldBe "REFRESH INDEX testIndex ON testTable"
result shouldBe s"REFRESH INDEX testIndex ON ${expectedTableName}"
}

test("generateRefreshQuery should return correct query for FlintSparkMaterializedView") {
val mockIndex = mock(classOf[FlintSparkMaterializedView])
when(mockIndex.mvName).thenReturn("testMV")
when(mockIndex.metadata()).thenReturn(mockMetadata)
when(mockIndex.metadata().source).thenReturn(testTable)

val result = RefreshQueryGenerator.generateRefreshQuery(mockIndex)
result shouldBe "REFRESH MATERIALIZED VIEW testMV"
result shouldBe s"REFRESH MATERIALIZED VIEW ${expectedTableName}"
}

test("generateRefreshQuery should throw IllegalArgumentException for unsupported index type") {
val mockIndex = mock(classOf[FlintSparkIndex])
when(mockIndex.metadata()).thenReturn(mockMetadata)
when(mockIndex.metadata().source).thenReturn(testTable)

val exception = intercept[IllegalArgumentException] {
RefreshQueryGenerator.generateRefreshQuery(mockIndex)
Expand Down
Loading

0 comments on commit fdbf5e7

Please sign in to comment.