Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 0.6] Fix bug for not able to get sourceTables from metadata #894

Merged
merged 1 commit into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import org.opensearch.flint.common.metadata.FlintMetadata
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.COVERING_INDEX_TYPE
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.MV_INDEX_TYPE
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.{getSourceTablesFromMetadata, MV_INDEX_TYPE}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.SKIPPING_INDEX_TYPE
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind
Expand Down Expand Up @@ -141,9 +141,9 @@ object FlintSparkIndexFactory extends Logging {
}

private def getMvSourceTables(spark: SparkSession, metadata: FlintMetadata): Array[String] = {
val sourceTables = getArrayString(metadata.properties, "sourceTables")
val sourceTables = getSourceTablesFromMetadata(metadata)
if (sourceTables.isEmpty) {
FlintSparkMaterializedView.extractSourceTableNames(spark, metadata.source)
FlintSparkMaterializedView.extractSourceTablesFromQuery(spark, metadata.source)
} else {
sourceTables
}
Expand All @@ -161,12 +161,4 @@ object FlintSparkIndexFactory extends Logging {
Some(value.asInstanceOf[String])
}
}

private def getArrayString(map: java.util.Map[String, AnyRef], key: String): Array[String] = {
map.get(key) match {
case list: java.util.ArrayList[_] =>
list.toArray.map(_.toString)
case _ => Array.empty[String]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import scala.collection.JavaConverters.mapAsScalaMapConverter
import org.opensearch.flint.common.metadata.FlintMetadata
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry
import org.opensearch.flint.spark.FlintSparkIndexOptions
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.MV_INDEX_TYPE
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.{getSourceTablesFromMetadata, MV_INDEX_TYPE}
import org.opensearch.flint.spark.scheduler.util.IntervalSchedulerParser

/**
Expand Down Expand Up @@ -61,12 +61,7 @@ object FlintMetadataCache {
None
}
val sourceTables = metadata.kind match {
case MV_INDEX_TYPE =>
metadata.properties.get("sourceTables") match {
case list: java.util.ArrayList[_] =>
list.toArray.map(_.toString)
case _ => Array.empty[String]
}
case MV_INDEX_TYPE => getSourceTablesFromMetadata(metadata)
case _ => Array(metadata.source)
}
val lastRefreshTime: Option[Long] = metadata.latestLogEntry.flatMap { entry =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,15 @@ class FlintOpenSearchMetadataCacheWriter(options: FlintOptions)
.isInstanceOf[FlintOpenSearchIndexMetadataService]

override def updateMetadataCache(indexName: String, metadata: FlintMetadata): Unit = {
logInfo(s"Updating metadata cache for $indexName");
logInfo(s"Updating metadata cache for $indexName with $metadata");
val osIndexName = OpenSearchClientUtils.sanitizeIndexName(indexName)
var client: IRestHighLevelClient = null
try {
client = OpenSearchClientUtils.createClient(options)
val request = new PutMappingRequest(osIndexName)
request.source(serialize(metadata), XContentType.JSON)
val serialized = serialize(metadata)
logInfo(s"Serialized: $serialized")
request.source(serialized, XContentType.JSON)
client.updateIndexMapping(request, RequestOptions.DEFAULT)
} catch {
case e: Exception =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package org.opensearch.flint.spark.mv

import java.util.Locale

import scala.collection.JavaConverters.mapAsJavaMapConverter
import scala.collection.JavaConverters._
import scala.collection.convert.ImplicitConversions.`map AsScala`

import org.opensearch.flint.common.metadata.FlintMetadata
Expand All @@ -18,6 +18,7 @@ import org.opensearch.flint.spark.FlintSparkIndexOptions.empty
import org.opensearch.flint.spark.function.TumbleFunction
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.{getFlintIndexName, MV_INDEX_TYPE}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.analysis.{UnresolvedFunction, UnresolvedRelation}
Expand Down Expand Up @@ -64,10 +65,14 @@ case class FlintSparkMaterializedView(
}.toArray
val schema = generateSchema(outputSchema).asJava

// Convert Scala Array to Java ArrayList for consistency with OpenSearch JSON parsing.
// OpenSearch uses Jackson, which deserializes JSON arrays into ArrayLists.
val sourceTablesProperty = new java.util.ArrayList[String](sourceTables.toSeq.asJava)

metadataBuilder(this)
.name(mvName)
.source(query)
.addProperty("sourceTables", sourceTables)
.addProperty("sourceTables", sourceTablesProperty)
.indexedColumns(indexColumnMaps)
.schema(schema)
.build()
Expand Down Expand Up @@ -147,7 +152,7 @@ case class FlintSparkMaterializedView(
}
}

object FlintSparkMaterializedView {
object FlintSparkMaterializedView extends Logging {

/** MV index type name */
val MV_INDEX_TYPE = "mv"
Expand Down Expand Up @@ -179,13 +184,40 @@ object FlintSparkMaterializedView {
* @return
* source table names
*/
def extractSourceTableNames(spark: SparkSession, query: String): Array[String] = {
spark.sessionState.sqlParser
def extractSourceTablesFromQuery(spark: SparkSession, query: String): Array[String] = {
logInfo(s"Extracting source tables from query $query")
val sourceTables = spark.sessionState.sqlParser
.parsePlan(query)
.collect { case relation: UnresolvedRelation =>
qualifyTableName(spark, relation.tableName)
}
.toArray
logInfo(s"Extracted tables: [${sourceTables.mkString(", ")}]")
sourceTables
}

/**
* Get source tables from Flint metadata properties field.
*
* @param metadata
* Flint metadata
* @return
* source table names
*/
def getSourceTablesFromMetadata(metadata: FlintMetadata): Array[String] = {
logInfo(s"Getting source tables from metadata $metadata")
val sourceTables = metadata.properties.get("sourceTables")
sourceTables match {
case list: java.util.ArrayList[_] =>
logInfo(s"sourceTables is [${list.asScala.mkString(", ")}]")
list.toArray.map(_.toString)
case null =>
logInfo("sourceTables property does not exist")
Array.empty[String]
case _ =>
logInfo(s"sourceTables has unexpected type: ${sourceTables.getClass.getName}")
Array.empty[String]
}
}

/** Builder class for MV build */
Expand Down Expand Up @@ -217,7 +249,7 @@ object FlintSparkMaterializedView {
*/
def query(query: String): Builder = {
this.query = query
this.sourceTables = extractSourceTableNames(flint.spark, query)
this.sourceTables = extractSourceTablesFromQuery(flint.spark, query)
this
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ class FlintSparkMaterializedViewSuite extends FlintSuite {
metadata.kind shouldBe MV_INDEX_TYPE
metadata.source shouldBe "SELECT 1"
metadata.properties should contain key "sourceTables"
metadata.properties.get("sourceTables").asInstanceOf[Array[String]] should have size 0
metadata.properties
.get("sourceTables")
.asInstanceOf[java.util.ArrayList[String]] should have size 0
metadata.indexedColumns shouldBe Array(
Map("columnName" -> "test_col", "columnType" -> "integer").asJava)
metadata.schema shouldBe Map("test_col" -> Map("type" -> "integer").asJava).asJava
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import org.opensearch.flint.core.FlintOptions
import org.opensearch.flint.core.storage.{FlintOpenSearchIndexMetadataService, OpenSearchClientUtils}
import org.opensearch.flint.spark.FlintSparkIndex.quotedTableName
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.{extractSourceTableNames, getFlintIndexName}
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.{extractSourceTablesFromQuery, getFlintIndexName, getSourceTablesFromMetadata, MV_INDEX_TYPE}
import org.opensearch.flint.spark.scheduler.OpenSearchAsyncQueryScheduler
import org.scalatest.matchers.must.Matchers._
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper
Expand Down Expand Up @@ -65,14 +65,76 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite {
| FROM spark_catalog.default.`table/3`
| INNER JOIN spark_catalog.default.`table.4`
|""".stripMargin
extractSourceTableNames(flint.spark, testComplexQuery) should contain theSameElementsAs
extractSourceTablesFromQuery(flint.spark, testComplexQuery) should contain theSameElementsAs
Array(
"spark_catalog.default.table1",
"spark_catalog.default.table2",
"spark_catalog.default.`table/3`",
"spark_catalog.default.`table.4`")

extractSourceTableNames(flint.spark, "SELECT 1") should have size 0
extractSourceTablesFromQuery(flint.spark, "SELECT 1") should have size 0
}

test("get source table names from index metadata successfully") {
val mv = FlintSparkMaterializedView(
"spark_catalog.default.mv",
s"SELECT 1 FROM $testTable",
Array(testTable),
Map("1" -> "integer"))
val metadata = mv.metadata()
getSourceTablesFromMetadata(metadata) should contain theSameElementsAs Array(testTable)
}

test("get source table names from deserialized metadata successfully") {
val metadata = FlintOpenSearchIndexMetadataService.deserialize(s""" {
| "_meta": {
| "kind": "$MV_INDEX_TYPE",
| "properties": {
| "sourceTables": [
| "$testTable"
| ]
| }
| },
| "properties": {
| "age": {
| "type": "integer"
| }
| }
| }
|""".stripMargin)
getSourceTablesFromMetadata(metadata) should contain theSameElementsAs Array(testTable)
}

test("get empty source tables from invalid field in metadata") {
val metadataWrongType = FlintOpenSearchIndexMetadataService.deserialize(s""" {
| "_meta": {
| "kind": "$MV_INDEX_TYPE",
| "properties": {
| "sourceTables": "$testTable"
| }
| },
| "properties": {
| "age": {
| "type": "integer"
| }
| }
| }
|""".stripMargin)
val metadataMissingField = FlintOpenSearchIndexMetadataService.deserialize(s""" {
| "_meta": {
| "kind": "$MV_INDEX_TYPE",
| "properties": { }
| },
| "properties": {
| "age": {
| "type": "integer"
| }
| }
| }
|""".stripMargin)

getSourceTablesFromMetadata(metadataWrongType) shouldBe empty
getSourceTablesFromMetadata(metadataMissingField) shouldBe empty
}

test("create materialized view with metadata successfully") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import org.opensearch.flint.core.FlintOptions
import org.opensearch.flint.core.storage.{FlintOpenSearchClient, FlintOpenSearchIndexMetadataService}
import org.opensearch.flint.spark.{FlintSparkIndexOptions, FlintSparkSuite}
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.COVERING_INDEX_TYPE
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.MV_INDEX_TYPE
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, SKIPPING_INDEX_TYPE}
import org.scalatest.Entry
Expand Down Expand Up @@ -161,12 +162,29 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat
val properties = flintIndexMetadataService.getIndexMetadata(testFlintIndex).properties
properties
.get("sourceTables")
.asInstanceOf[List[String]]
.toArray should contain theSameElementsAs Array(testTable)
.asInstanceOf[java.util.ArrayList[String]] should contain theSameElementsAs Array(
testTable)
}
}

test(s"write metadata cache to materialized view index mappings with source tables") {
test("write metadata cache with source tables from index metadata") {
val mv = FlintSparkMaterializedView(
"spark_catalog.default.mv",
s"SELECT 1 FROM $testTable",
Array(testTable),
Map("1" -> "integer"))
val metadata = mv.metadata().copy(latestLogEntry = Some(flintMetadataLogEntry))

flintClient.createIndex(testFlintIndex, metadata)
flintMetadataCacheWriter.updateMetadataCache(testFlintIndex, metadata)

val properties = flintIndexMetadataService.getIndexMetadata(testFlintIndex).properties
properties
.get("sourceTables")
.asInstanceOf[java.util.ArrayList[String]] should contain theSameElementsAs Array(testTable)
}

test("write metadata cache with source tables from deserialized metadata") {
val testTable2 = "spark_catalog.default.metadatacache_test2"
val content =
s""" {
Expand Down Expand Up @@ -194,8 +212,9 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat
val properties = flintIndexMetadataService.getIndexMetadata(testFlintIndex).properties
properties
.get("sourceTables")
.asInstanceOf[List[String]]
.toArray should contain theSameElementsAs Array(testTable, testTable2)
.asInstanceOf[java.util.ArrayList[String]] should contain theSameElementsAs Array(
testTable,
testTable2)
}

test("write metadata cache to index mappings with refresh interval") {
Expand Down
Loading