Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change delete index API to logical delete #191

Merged
merged 4 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ materializedViewStatement
| showMaterializedViewStatement
| describeMaterializedViewStatement
| dropMaterializedViewStatement
| vacuumMaterializedViewStatement
;

createMaterializedViewStatement
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,6 @@ class FlintSpark(val spark: SparkSession) extends Logging {
// TODO: share same transaction for now
flintIndexMonitor.stopMonitor(indexName)
stopRefreshingJob(indexName)
flintClient.deleteIndex(indexName)
true
})
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite {
super.afterEach()

// Delete all test indices
flint.deleteIndex(testFlintIndex)
deleteTestIndex(testFlintIndex)
}

test("create covering index with metadata successfully") {
Expand Down Expand Up @@ -126,6 +126,6 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite {
.onTable(testTable)
.addIndexColumns("address")
.create()
flint.deleteIndex(getFlintIndexName(newIndex, testTable))
deleteTestIndex(getFlintIndexName(newIndex, testTable))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import org.opensearch.flint.core.FlintOptions
import org.opensearch.flint.core.storage.FlintOpenSearchClient
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.getFlintIndexName
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName
import org.scalatest.matchers.must.Matchers.{defined, have}
import org.scalatest.matchers.must.Matchers.defined
import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the}

import org.apache.spark.sql.Row
Expand All @@ -38,7 +38,7 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite {
super.afterEach()

// Delete all test indices
flint.deleteIndex(testFlintIndex)
deleteTestIndex(testFlintIndex)
}

test("create covering index with auto refresh") {
Expand Down Expand Up @@ -252,8 +252,7 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite {
val result = sql(s"SHOW INDEX ON $testTable")
checkAnswer(result, Seq(Row(testIndex), Row("idx_address")))

flint.deleteIndex(getFlintIndexName("idx_address", testTable))
flint.deleteIndex(getSkippingIndexName(testTable))
deleteTestIndex(getFlintIndexName("idx_address", testTable), getSkippingIndexName(testTable))
}

test("describe covering index") {
Expand All @@ -268,7 +267,7 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite {
checkAnswer(result, Seq(Row("name", "string", "indexed"), Row("age", "int", "indexed")))
}

test("drop covering index") {
test("drop and vacuum covering index") {
flint
.coveringIndex()
.name(testIndex)
Expand All @@ -277,7 +276,7 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite {
.create()

sql(s"DROP INDEX $testIndex ON $testTable")

sql(s"VACUUM INDEX $testIndex ON $testTable")
flint.describeIndex(testFlintIndex) shouldBe empty
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,7 @@ class FlintSparkIndexJobITSuite extends OpenSearchTransactionSuite with Matchers
}

override def afterEach(): Unit = {

/**
* Todo, if state is not valid, will throw IllegalStateException. Should check flint
* .isRefresh before cleanup resource. Current solution, (1) try to delete flint index, (2) if
* failed, delete index itself.
*/
try {
flint.deleteIndex(testIndex)
} catch {
case _: IllegalStateException => deleteIndex(testIndex)
}
deleteTestIndex(testIndex)
super.afterEach()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class FlintSparkIndexJobSqlITSuite extends FlintSparkSuite with Matchers {
try {
test(new AssertionHelper(flintIndexName, checkpointDir))
} finally {
flint.deleteIndex(flintIndexName)
deleteTestIndex(flintIndexName)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,7 @@ class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matc
FlintSparkIndexMonitor.indexMonitorTracker.clear()

try {
flint.deleteIndex(testFlintIndex)
} catch {
// Index maybe end up with failed state in some test
case _: IllegalStateException =>
openSearchClient
.indices()
.delete(new DeleteIndexRequest(testFlintIndex), RequestOptions.DEFAULT)
deleteTestIndex(testFlintIndex)
} finally {
super.afterEach()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class FlintSparkIndexNameITSuite extends FlintSparkSuite {
indexData should have size 1

sql(s"DROP SKIPPING INDEX ON $testTable")
sql(s"VACUUM SKIPPING INDEX ON $testTable")
flint.describeIndex(flintIndexName) shouldBe empty
}

Expand All @@ -76,6 +77,7 @@ class FlintSparkIndexNameITSuite extends FlintSparkSuite {
indexData should have size 1

sql(s"DROP INDEX $testIndex ON $testTable")
sql(s"VACUUM INDEX $testIndex ON $testTable")
flint.describeIndex(flintIndexName) shouldBe empty
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite {

override def afterEach(): Unit = {
super.afterEach()
flint.deleteIndex(testFlintIndex)
deleteTestIndex(testFlintIndex)
}

test("create materialized view with metadata successfully") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite {

override def afterEach(): Unit = {
super.afterEach()
flint.deleteIndex(testFlintIndex)
deleteTestIndex(testFlintIndex)
}

test("create materialized view with auto refresh") {
Expand Down Expand Up @@ -255,15 +255,15 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite {
checkAnswer(sql("DESC MATERIALIZED VIEW nonexistent_mv"), Seq())
}

test("drop materialized view") {
test("drop and vacuum materialized view") {
flint
.materializedView()
.name(testMvName)
.query(testQuery)
.create()

sql(s"DROP MATERIALIZED VIEW $testMvName")

sql(s"VACUUM MATERIALIZED VIEW $testMvName")
flint.describeIndex(testFlintIndex) shouldBe empty
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
super.afterEach()

// Delete all test indices
flint.deleteIndex(testIndex)
deleteTestIndex(testIndex)
}

test("create skipping index with metadata successfully") {
Expand Down Expand Up @@ -606,7 +606,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
| }
|""".stripMargin)

flint.deleteIndex(testIndex)
deleteTestIndex(testIndex)
}

test("can build skipping index for varchar and char and rewrite applicable query") {
Expand Down Expand Up @@ -650,7 +650,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
hasIndexFilter((isnull(col("varchar_col")) || col("varchar_col") === "sample varchar") &&
(isnull(col("char_col")) || col("char_col") === paddedChar)))

flint.deleteIndex(testIndex)
deleteTestIndex(testIndex)
}

// Custom matcher to check if a SparkPlan uses FlintSparkSkippingFileIndex
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite {
protected override def afterEach(): Unit = {
super.afterEach()

flint.deleteIndex(testIndex)
deleteTestIndex(testIndex)
}

test("create skipping index with auto refresh") {
Expand Down Expand Up @@ -249,15 +249,15 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite {
checkAnswer(result, Seq.empty)
}

test("drop skipping index") {
test("drop and vacuum skipping index") {
flint
.skippingIndex()
.onTable(testTable)
.addPartitions("year")
.create()

sql(s"DROP SKIPPING INDEX ON $testTable")

sql(s"VACUUM SKIPPING INDEX ON $testTable")
flint.describeIndex(testIndex) shouldBe empty
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ import scala.concurrent.duration.TimeUnit
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.when
import org.mockito.invocation.InvocationOnMock
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest
import org.opensearch.client.RequestOptions
import org.opensearch.client.indices.GetIndexRequest
import org.opensearch.flint.OpenSearchSuite
import org.scalatestplus.mockito.MockitoSugar.mock

Expand Down Expand Up @@ -46,6 +49,29 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit
FlintSparkIndexMonitor.executor = mockExecutor
}

protected def deleteTestIndex(testIndexNames: String*): Unit = {
testIndexNames.foreach(testIndex => {
/**
* Todo, if state is not valid, will throw IllegalStateException. Should check flint
* .isRefresh before cleanup resource. Current solution, (1) try to delete flint index, (2)
* if failed, delete index itself.
*/
try {
flint.deleteIndex(testIndex)
flint.vacuumIndex(testIndex)
} catch {
case _: IllegalStateException =>
if (openSearchClient
.indices()
.exists(new GetIndexRequest(testIndex), RequestOptions.DEFAULT)) {
openSearchClient
.indices()
.delete(new DeleteIndexRequest(testIndex), RequestOptions.DEFAULT)
}
}
})
}

protected def awaitStreamingComplete(jobId: String): Unit = {
val job = spark.streams.get(jobId)
failAfter(streamingTimeout) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ import org.opensearch.action.get.GetRequest
import org.opensearch.client.RequestOptions
import org.opensearch.client.indices.GetIndexRequest
import org.opensearch.flint.OpenSearchTransactionSuite
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.DELETED
import org.opensearch.flint.spark.FlintSpark.RefreshMode.{FULL, INCREMENTAL}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName
import org.scalatest.matchers.should.Matchers
Expand All @@ -39,11 +37,7 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match
* .isRefresh before cleanup resource. Current solution, (1) try to delete flint index, (2) if
* failed, delete index itself.
*/
try {
flint.deleteIndex(testFlintIndex)
} catch {
case _: IllegalStateException => deleteIndex(testFlintIndex)
}
deleteTestIndex(testFlintIndex)
super.afterEach()
}

Expand Down Expand Up @@ -108,27 +102,19 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match
latest("jobStartTime").asInstanceOf[Number].longValue() should be > prevStartTime
}

test("delete index") {
test("delete and vacuum index") {
flint
.skippingIndex()
.onTable(testTable)
.addPartitions("year", "month")
.create()
flint.deleteIndex(testFlintIndex)

// Logical delete index
flint.deleteIndex(testFlintIndex)
latestLogEntry(testLatestId) should contain("state" -> "deleted")
}

test("vacuum index") {
flint
.skippingIndex()
.onTable(testTable)
.addPartitions("year", "month")
.create()
deleteLogically(testLatestId)
flint.vacuumIndex(testFlintIndex)

// Both index data and metadata log should be vacuumed
flint.vacuumIndex(testFlintIndex)
openSearchClient
.indices()
.exists(new GetIndexRequest(testFlintIndex), RequestOptions.DEFAULT) shouldBe false
Expand All @@ -137,25 +123,6 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match
RequestOptions.DEFAULT) shouldBe false
}

test("should recreate index if logical deleted") {
flint
.skippingIndex()
.onTable(testTable)
.addPartitions("year", "month")
.create()

// Simulate that user deletes index data manually
flint.deleteIndex(testFlintIndex)
latestLogEntry(testLatestId) should contain("state" -> "deleted")

// Simulate that user recreate the index
flint
.skippingIndex()
.onTable(testTable)
.addValueSet("name")
.create()
}

test("should not recreate index if index data still exists") {
flint
.skippingIndex()
Expand All @@ -164,7 +131,7 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match
.create()

// Simulate that PPL plugin leaves index data as logical deleted
deleteLogically(testLatestId)
flint.deleteIndex(testFlintIndex)
latestLogEntry(testLatestId) should contain("state" -> "deleted")

// Simulate that user recreate the index but forgot to cleanup index data
Expand All @@ -176,16 +143,4 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match
.create()
} should have message s"Flint index $testFlintIndex already exists"
}

private def deleteLogically(latestId: String): Unit = {
val response = openSearchClient
.get(new GetRequest(testMetaLogIndex, latestId), RequestOptions.DEFAULT)

val latest = new FlintMetadataLogEntry(
latestId,
response.getSeqNo,
response.getPrimaryTerm,
response.getSourceAsMap)
updateLatestLogEntry(latest, DELETED)
}
}
Loading