diff --git a/src/main/scala/io/qbeast/spark/internal/QbeastOptions.scala b/src/main/scala/io/qbeast/spark/internal/QbeastOptions.scala index 54597b15d..60218f8e7 100644 --- a/src/main/scala/io/qbeast/spark/internal/QbeastOptions.scala +++ b/src/main/scala/io/qbeast/spark/internal/QbeastOptions.scala @@ -89,17 +89,17 @@ object QbeastOptions { QbeastOptions(columnsToIndex, desiredCubeSize, stats) } - def loadTableIDFromParameters(parameters: Map[String, String]): QTableID = { + def loadTableIDFromOptions(options: Map[String, String]): QTableID = { new QTableID( - parameters.getOrElse( + options.getOrElse( PATH, { throw AnalysisExceptionFactory.create("'path' is not specified") })) } - def checkQbeastProperties(parameters: Map[String, String]): Unit = { + def checkQbeastOptions(options: Map[String, String]): Unit = { require( - parameters.contains("columnsToIndex") || parameters.contains("columnstoindex"), + options.contains("columnsToIndex") || options.contains("columnstoindex"), throw AnalysisExceptionFactory.create("'columnsToIndex is not specified")) } diff --git a/src/main/scala/io/qbeast/spark/internal/sources/QbeastBaseRelation.scala b/src/main/scala/io/qbeast/spark/internal/sources/QbeastBaseRelation.scala index 9127b8572..5a45cdcc1 100644 --- a/src/main/scala/io/qbeast/spark/internal/sources/QbeastBaseRelation.scala +++ b/src/main/scala/io/qbeast/spark/internal/sources/QbeastBaseRelation.scala @@ -33,11 +33,11 @@ object QbeastBaseRelation { */ def createRelation( sqlContext: SQLContext, - table: IndexedTable, + indexedTable: IndexedTable, options: Map[String, String]): BaseRelation = { val spark = SparkSession.active - val tableID = table.tableID + val tableID = indexedTable.tableID val snapshot = QbeastContext.metadataManager.loadSnapshot(tableID) val schema = QbeastContext.metadataManager.loadCurrentSchema(tableID) if (snapshot.isInitial) { @@ -52,22 +52,19 @@ object QbeastBaseRelation { new ParquetFileFormat(), options)(spark) with InsertableRelation { def insert(data: DataFrame, overwrite: Boolean): Unit = { - table.save(data, options, append = !overwrite) + indexedTable.save(data, options, append = !overwrite) } } } else { // If the table contains data, initialize it - val revision = snapshot.loadLatestRevision - val columnsToIndex = revision.columnTransformers.map(row => row.columnName).mkString(",") - val cubeSize = revision.desiredCubeSize - val parameters = - Map[String, String]("columnsToIndex" -> columnsToIndex, "cubeSize" -> cubeSize.toString()) - val path = new Path(tableID.id) val fileIndex = OTreeIndex(spark, path) val bucketSpec: Option[BucketSpec] = None val file = new ParquetFileFormat() + // Verify and Merge options with existing indexed properties + val parameters = indexedTable.verifyAndMergeProperties(options) + new HadoopFsRelation( fileIndex, partitionSchema = StructType(Seq.empty[StructField]), @@ -76,7 +73,7 @@ object QbeastBaseRelation { file, parameters)(spark) with InsertableRelation { def insert(data: DataFrame, overwrite: Boolean): Unit = { - table.save(data, parameters, append = !overwrite) + indexedTable.save(data, parameters, append = !overwrite) } } } diff --git a/src/main/scala/io/qbeast/spark/internal/sources/QbeastDataSource.scala b/src/main/scala/io/qbeast/spark/internal/sources/QbeastDataSource.scala index 06e9a1e32..56906b6e6 100644 --- a/src/main/scala/io/qbeast/spark/internal/sources/QbeastDataSource.scala +++ b/src/main/scala/io/qbeast/spark/internal/sources/QbeastDataSource.scala @@ -4,7 +4,6 @@ package io.qbeast.spark.internal.sources import io.qbeast.context.QbeastContext -import io.qbeast.context.QbeastContext.metadataManager import io.qbeast.spark.internal.QbeastOptions import io.qbeast.spark.internal.sources.v2.QbeastTableImpl import io.qbeast.spark.table.IndexedTableFactory @@ -54,15 +53,11 @@ class QbeastDataSource private[sources] (private val tableFactory: IndexedTableF schema: StructType, partitioning: Array[Transform], properties: util.Map[String, String]): Table = { - val tableId = QbeastOptions.loadTableIDFromParameters(properties.asScala.toMap) + val tableId = QbeastOptions.loadTableIDFromOptions(properties.asScala.toMap) val indexedTable = tableFactory.getIndexedTable(tableId) if (indexedTable.exists) { // If the table exists, we make sure to pass all the properties to QbeastTableImpl - val currentRevision = metadataManager.loadSnapshot(tableId).loadLatestRevision - val indexProperties = Map( - "columnsToIndex" -> currentRevision.columnTransformers.map(_.columnName).mkString(","), - "cubeSize" -> currentRevision.desiredCubeSize.toString) - val tableProperties = properties.asScala.toMap ++ indexProperties + val tableProperties = indexedTable.verifyAndMergeProperties(properties.asScala.toMap) new QbeastTableImpl( TableIdentifier(tableId.id), new Path(tableId.id), @@ -100,7 +95,7 @@ class QbeastDataSource private[sources] (private val tableFactory: IndexedTableF parameters.contains("columnsToIndex") || mode == SaveMode.Append, throw AnalysisExceptionFactory.create("'columnsToIndex' is not specified")) - val tableId = QbeastOptions.loadTableIDFromParameters(parameters) + val tableId = QbeastOptions.loadTableIDFromOptions(parameters) val table = tableFactory.getIndexedTable(tableId) mode match { case SaveMode.Append => table.save(data, parameters, append = true) @@ -116,7 +111,7 @@ class QbeastDataSource private[sources] (private val tableFactory: IndexedTableF override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { - val tableID = QbeastOptions.loadTableIDFromParameters(parameters) + val tableID = QbeastOptions.loadTableIDFromOptions(parameters) val table = tableFactory.getIndexedTable(tableID) if (table.exists) { table.load() diff --git a/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalog.scala b/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalog.scala index 89e70629c..0cd2bd3d0 100644 --- a/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalog.scala +++ b/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalog.scala @@ -4,7 +4,6 @@ package io.qbeast.spark.internal.sources.catalog import io.qbeast.context.QbeastContext -import io.qbeast.spark.internal.QbeastOptions.checkQbeastProperties import io.qbeast.spark.internal.sources.v2.{QbeastStagedTableImpl, QbeastTableImpl} import org.apache.hadoop.fs.Path import org.apache.spark.sql.catalyst.TableIdentifier @@ -110,7 +109,6 @@ class QbeastCatalog[T <: TableCatalog with SupportsNamespaces with FunctionCatal properties: util.Map[String, String]): Table = { if (QbeastCatalogUtils.isQbeastProvider(properties)) { - checkQbeastProperties(properties.asScala.toMap) // Create the table QbeastCatalogUtils.createQbeastTable( ident, diff --git a/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogUtils.scala b/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogUtils.scala index fda04dd32..2d2e7e7d8 100644 --- a/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogUtils.scala +++ b/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogUtils.scala @@ -3,7 +3,7 @@ */ package io.qbeast.spark.internal.sources.catalog -import io.qbeast.context.QbeastContext.metadataManager +import io.qbeast.context.QbeastContext.{metadataManager} import io.qbeast.core.model.QTableID import io.qbeast.spark.internal.sources.v2.QbeastTableImpl import io.qbeast.spark.table.IndexedTableFactory @@ -155,13 +155,14 @@ object QbeastCatalogUtils { tableFactory: IndexedTableFactory, existingSessionCatalog: SessionCatalog): Unit = { - val isPathTable = QbeastCatalogUtils.isPathTable(ident) + val isPathTable = this.isPathTable(ident) + val properties = allTableProperties.asScala.toMap // Get table location val location = if (isPathTable) { Option(ident.name()) } else { - Option(allTableProperties.get("location")) + properties.get("location") } // Define the table type. @@ -176,6 +177,10 @@ object QbeastCatalogUtils { .orElse(existingTableOpt.flatMap(_.storage.locationUri)) .getOrElse(existingSessionCatalog.defaultTablePath(id)) + // Process the parameters/options/configuration sent to the table + val indexedTable = tableFactory.getIndexedTable(QTableID(loc.toString)) + val allProperties = indexedTable.verifyAndMergeProperties(properties) + // Initialize the path option val storage = DataSource .buildStorageFormatFromOptions(writeOptions) @@ -198,7 +203,7 @@ object QbeastCatalogUtils { provider = Some("qbeast"), partitionColumnNames = Seq.empty, bucketSpec = None, - properties = allTableProperties.asScala.toMap, + properties = allProperties, comment = commentOpt) // Verify the schema if it's an external table @@ -210,9 +215,7 @@ object QbeastCatalogUtils { // Write data, if any val append = tableCreationMode.saveMode == SaveMode.Append dataFrame.map { df => - tableFactory - .getIndexedTable(QTableID(loc.toString)) - .save(df, allTableProperties.asScala.toMap, append) + indexedTable.save(df, allProperties, append) } // Update the existing session catalog with the Qbeast table information diff --git a/src/main/scala/io/qbeast/spark/internal/sources/v2/QbeastStagedTableImpl.scala b/src/main/scala/io/qbeast/spark/internal/sources/v2/QbeastStagedTableImpl.scala index 8c07b5632..79933c1ed 100644 --- a/src/main/scala/io/qbeast/spark/internal/sources/v2/QbeastStagedTableImpl.scala +++ b/src/main/scala/io/qbeast/spark/internal/sources/v2/QbeastStagedTableImpl.scala @@ -3,7 +3,7 @@ */ package io.qbeast.spark.internal.sources.v2 -import io.qbeast.spark.internal.QbeastOptions.checkQbeastProperties +import io.qbeast.spark.internal.QbeastOptions.checkQbeastOptions import io.qbeast.spark.internal.sources.catalog.{CreationMode, QbeastCatalogUtils} import io.qbeast.spark.table.IndexedTableFactory import org.apache.spark.sql.catalyst.catalog.SessionCatalog @@ -70,7 +70,7 @@ private[sources] class QbeastStagedTableImpl( writeOptions.foreach { case (k, v) => props.put(k, v) } // Check all the Qbeast properties are correctly specified - checkQbeastProperties(props.asScala.toMap) + checkQbeastOptions(props.asScala.toMap) // Creates the corresponding table on the Catalog and executes // the writing of the dataFrame (if any) diff --git a/src/main/scala/io/qbeast/spark/internal/sources/v2/QbeastWriteBuilder.scala b/src/main/scala/io/qbeast/spark/internal/sources/v2/QbeastWriteBuilder.scala index 47aa164e9..61ec278ae 100644 --- a/src/main/scala/io/qbeast/spark/internal/sources/v2/QbeastWriteBuilder.scala +++ b/src/main/scala/io/qbeast/spark/internal/sources/v2/QbeastWriteBuilder.scala @@ -62,8 +62,6 @@ class QbeastWriteBuilder( // Passing the options in the query plan plus the properties // because columnsToIndex needs to be included in the contract val writeOptions = info.options().toMap ++ properties - // scalastyle:off - println("data schema " + data.schema) indexedTable.save(data, writeOptions, append) // TODO: Push this to Apache Spark diff --git a/src/main/scala/io/qbeast/spark/table/IndexedTable.scala b/src/main/scala/io/qbeast/spark/table/IndexedTable.scala index 4e24065fa..05e69abaa 100644 --- a/src/main/scala/io/qbeast/spark/table/IndexedTable.scala +++ b/src/main/scala/io/qbeast/spark/table/IndexedTable.scala @@ -8,7 +8,7 @@ import io.qbeast.core.model._ import io.qbeast.spark.delta.{CubeDataLoader, StagingDataManager, StagingResolution} import io.qbeast.spark.index.QbeastColumns import io.qbeast.spark.internal.QbeastOptions -import io.qbeast.spark.internal.QbeastOptions.{COLUMNS_TO_INDEX, CUBE_SIZE} +import io.qbeast.spark.internal.QbeastOptions._ import io.qbeast.spark.internal.sources.QbeastBaseRelation import org.apache.spark.qbeast.config.DEFAULT_NUMBER_OF_RETRIES import org.apache.spark.sql.delta.actions.FileAction @@ -30,6 +30,12 @@ trait IndexedTable { */ def exists: Boolean + /** + * Returns whether the table contains Qbeast metadata + * @return + */ + def hasQbeastMetadata: Boolean + /** * Returns the table id which identifies the table. * @@ -37,6 +43,13 @@ trait IndexedTable { */ def tableID: QTableID + /** + * Merge new and index current properties + * @param properties the properties you want to merge + * @return + */ + def verifyAndMergeProperties(properties: Map[String, String]): Map[String, String] + /** * Saves given data in the table and updates the index. The specified columns are * used to define the index when the table is created or overwritten. The append @@ -139,11 +152,61 @@ private[table] class IndexedTableImpl( with StagingUtils { private var snapshotCache: Option[QbeastSnapshot] = None + /** + * Latest Revision Available + * + * @return + */ + private def latestRevision: Revision = snapshot.loadLatestRevision + override def exists: Boolean = !snapshot.isInitial - private def isNewRevision(qbeastOptions: QbeastOptions, latestRevision: Revision): Boolean = { + override def hasQbeastMetadata: Boolean = try { + snapshot.loadLatestRevision + true + } catch { + case _: Exception => false + } + + override def verifyAndMergeProperties(properties: Map[String, String]): Map[String, String] = { + if (!exists) { + // IF not exists, we should only check new properties + checkQbeastOptions(properties) + properties + } else if (hasQbeastMetadata) { + // IF has qbeast metadata, we can merge both properties: new and current + val currentColumnsIndexed = + latestRevision.columnTransformers.map(_.columnName).mkString(",") + val currentCubeSize = latestRevision.desiredCubeSize.toString + val finalProperties = { + (properties.contains(COLUMNS_TO_INDEX), properties.contains(CUBE_SIZE)) match { + case (true, true) => properties + case (false, false) => + properties + (COLUMNS_TO_INDEX -> currentColumnsIndexed, CUBE_SIZE -> currentCubeSize) + case (true, false) => properties + (CUBE_SIZE -> currentCubeSize) + case (false, true) => + properties + (COLUMNS_TO_INDEX -> currentColumnsIndexed) + } + } + finalProperties + } else { + throw AnalysisExceptionFactory.create( + s"Table ${tableID.id} exists but does not contain Qbeast metadata. " + + s"Please use ConvertToQbeastCommand to convert the table to Qbeast.") + } + } + + private def isNewRevision(qbeastOptions: QbeastOptions): Boolean = { + // TODO feature: columnsToIndex may change between revisions - checkColumnsToMatchSchema(latestRevision) + val columnsToIndex = qbeastOptions.columnsToIndex + val currentColumnsToIndex = latestRevision.columnTransformers.map(_.columnName) + val isNewColumns = !latestRevision.matchColumns(columnsToIndex) + if (isNewColumns) { + throw AnalysisExceptionFactory.create( + s"Columns to index '${columnsToIndex.mkString(",")}' do not match " + + s"existing index ${currentColumnsToIndex.mkString(",")}.") + } // Checks if the desiredCubeSize is different from the existing one val isNewCubeSize = latestRevision.desiredCubeSize != qbeastOptions.cubeSize // Checks if the user-provided column boundaries would trigger the creation of @@ -169,26 +232,6 @@ private[table] class IndexedTableImpl( } - /** - * Add the required indexing parameters when the SaveMode is Append. - * The user-provided parameters are respected. - * @param latestRevision the latest revision - * @param parameters the parameters required for indexing - */ - private def addRequiredParams( - latestRevision: Revision, - parameters: Map[String, String]): Map[String, String] = { - val columnsToIndex = latestRevision.columnTransformers.map(_.columnName).mkString(",") - val desiredCubeSize = latestRevision.desiredCubeSize.toString - (parameters.contains(COLUMNS_TO_INDEX), parameters.contains(CUBE_SIZE)) match { - case (true, true) => parameters - case (false, false) => - parameters + (COLUMNS_TO_INDEX -> columnsToIndex, CUBE_SIZE -> desiredCubeSize) - case (true, false) => parameters + (CUBE_SIZE -> desiredCubeSize) - case (false, true) => parameters + (COLUMNS_TO_INDEX -> columnsToIndex) - } - } - override def save( data: DataFrame, parameters: Map[String, String], @@ -197,12 +240,11 @@ private[table] class IndexedTableImpl( if (exists && append) { // If the table exists and we are appending new data // 1. Load existing IndexStatus - val latestRevision = snapshot.loadLatestRevision - val updatedParameters = addRequiredParams(latestRevision, parameters) + val updatedParameters = verifyAndMergeProperties(parameters) if (isStaging(latestRevision)) { // If the existing Revision is Staging IndexStatus(revisionBuilder.createNewRevision(tableID, data.schema, updatedParameters)) } else { - if (isNewRevision(QbeastOptions(updatedParameters), latestRevision)) { + if (isNewRevision(QbeastOptions(updatedParameters))) { // If the new parameters generate a new revision, we need to create another one val newPotentialRevision = revisionBuilder .createNewRevision(tableID, data.schema, updatedParameters) @@ -255,14 +297,6 @@ private[table] class IndexedTableImpl( snapshotCache = None } - private def checkColumnsToMatchSchema(revision: Revision): Unit = { - val columnsToIndex = revision.columnTransformers.map(_.columnName) - if (!snapshot.loadLatestRevision.matchColumns(columnsToIndex)) { - throw AnalysisExceptionFactory.create( - s"Columns to index '$columnsToIndex' do not match existing index.") - } - } - /** * Creates a QbeastBaseRelation for the given table. * @return the QbeastBaseRelation diff --git a/src/test/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogTest.scala b/src/test/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogTest.scala index 4ab5d8c3e..b6fffa504 100644 --- a/src/test/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogTest.scala +++ b/src/test/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogTest.scala @@ -4,7 +4,6 @@ import io.qbeast.spark.QbeastIntegrationTestSpec import io.qbeast.spark.internal.sources.v2.{QbeastStagedTableImpl, QbeastTableImpl} import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.CannotReplaceMissingTableException import org.apache.spark.sql.connector.catalog.{ CatalogExtension, CatalogPlugin, @@ -248,7 +247,7 @@ class QbeastCatalogTest extends QbeastIntegrationTestSpec with CatalogTestSuite "QbeastCatalogUtils" should "throw an error when trying to replace a non-existing table" in withQbeastContextSparkAndTmpWarehouse((spark, _) => { - an[CannotReplaceMissingTableException] shouldBe thrownBy( + an[AnalysisException] shouldBe thrownBy( QbeastCatalogUtils.createQbeastTable( Identifier.of(defaultNamespace, "students"), schema, diff --git a/src/test/scala/io/qbeast/spark/utils/QbeastCreateTableSQLTest.scala b/src/test/scala/io/qbeast/spark/utils/QbeastCreateTableSQLTest.scala new file mode 100644 index 000000000..7c5d55f73 --- /dev/null +++ b/src/test/scala/io/qbeast/spark/utils/QbeastCreateTableSQLTest.scala @@ -0,0 +1,128 @@ +package io.qbeast.spark.utils + +import io.qbeast.TestClasses.Student +import io.qbeast.spark.{QbeastIntegrationTestSpec, QbeastTable} +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.AnalysisException + +import scala.util.Random + +class QbeastCreateTableSQLTest extends QbeastIntegrationTestSpec { + + private val students = 1.to(10).map(i => Student(i, i.toString, Random.nextInt())) + + private def createStudentsTestData(spark: SparkSession): DataFrame = { + import spark.implicits._ + students.toDF() + } + + "Qbeast SQL" should "create EXTERNAL existing indexedTable WITHOUT options" in + withQbeastContextSparkAndTmpWarehouse((spark, tmpDir) => { + + val location = tmpDir + "/external_student/" + val data = createStudentsTestData(spark) + data.write.format("qbeast").option("columnsToIndex", "id,name").save(location) + + spark.sql( + s"CREATE EXTERNAL TABLE student (id INT, name STRING, age INT) " + + s"USING qbeast " + + s"LOCATION '$location'") + + }) + + it should "throw an error if the indexedTable is NOT qbeast" in + withQbeastContextSparkAndTmpWarehouse((spark, tmpDir) => { + + val location = tmpDir + "/external_student/" + val data = createStudentsTestData(spark) + data.write.format("delta").save(location) + + an[AnalysisException] shouldBe thrownBy( + spark.sql( + s"CREATE EXTERNAL TABLE student (id INT, name STRING, age INT) " + + s"USING qbeast " + + s"LOCATION '$location'")) + + an[AnalysisException] shouldBe thrownBy( + spark.sql( + s"CREATE EXTERNAL TABLE student (id INT, name STRING, age INT) " + + s"USING qbeast " + + "OPTIONS ('columnsToIndex'='id') " + + s"LOCATION '$location'")) + + }) + + it should "NOT overwrite existing columnsToIndex if specified" in + withQbeastContextSparkAndTmpWarehouse((spark, tmpDir) => { + + // WRITE INITIAL DATA WITH QBEAST + val location = tmpDir + "/external_student/" + val data = createStudentsTestData(spark) + data.write.format("qbeast").option("columnsToIndex", "id").save(location) + + // COLUMNS TO INDEX ARE CHANGED + spark.sql( + s"CREATE EXTERNAL TABLE student_column_change (id INT, name STRING, age INT) " + + s"USING qbeast " + + s"OPTIONS ('columnsToIndex'='id,name') " + + s"LOCATION '$location'") + + // COLUMNS TO INDEX CANNOT BE CHANGED + an[AnalysisException] shouldBe thrownBy(data.writeTo("student_column_change").append()) + }) + + it should "overwrite existing CUBE SIZE options if specified" in + withQbeastContextSparkAndTmpWarehouse((spark, tmpDir) => { + + // WRITE INITIAL DATA WITH QBEAST + val location = tmpDir + "/external_student/" + val data = createStudentsTestData(spark) + data.write + .format("qbeast") + .option("columnsToIndex", "id") + .option("cubeSize", "100") + .save(location) + + spark.sql( + s"CREATE EXTERNAL TABLE student_cube_change (id INT, name STRING, age INT) " + + s"USING qbeast " + + s"OPTIONS ('cubeSize'='50') " + + s"LOCATION '$location'") + + val qbeastTable = QbeastTable.forPath(spark, location) + qbeastTable.cubeSize() shouldBe 100 + + data.writeTo("student_cube_change").append() + + spark.sql("SELECT * FROM student_cube_change").count() shouldBe data.count() * 2 + qbeastTable.cubeSize() shouldBe 50 + + }) + + it should "create indexedTable even if location is not populated" in + withQbeastContextSparkAndTmpWarehouse((spark, tmpDir) => { + + val location = tmpDir + "/external_student/" + + spark.sql( + s"CREATE EXTERNAL TABLE student (id INT, name STRING, age INT) " + + s"USING qbeast " + + s"OPTIONS ('columnsToIndex'='id,name') " + + s"LOCATION '$location'") + + // SELECT FROM + spark.sql("SELECT * FROM student").count() shouldBe 0 + + // WRITE TO + val data = createStudentsTestData(spark) + data.writeTo("student").append() + + // SELECT FROM + spark.read.format("qbeast").load(location).count() shouldBe data.count() + spark.sql("SELECT * FROM student").count() shouldBe data.count() + val qbeastTable = QbeastTable.forPath(spark, location) + qbeastTable.indexedColumns() shouldBe Seq("id", "name") + + }) + +}