Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CREATE EXTERNAL TABLE without OPTIONS #248

Merged
merged 15 commits into from
Dec 18, 2023
8 changes: 4 additions & 4 deletions src/main/scala/io/qbeast/spark/internal/QbeastOptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,17 +89,17 @@ object QbeastOptions {
QbeastOptions(columnsToIndex, desiredCubeSize, stats)
}

def loadTableIDFromParameters(parameters: Map[String, String]): QTableID = {
def loadTableIDFromOptions(options: Map[String, String]): QTableID = {
new QTableID(
parameters.getOrElse(
options.getOrElse(
PATH, {
throw AnalysisExceptionFactory.create("'path' is not specified")
}))
}

def checkQbeastProperties(parameters: Map[String, String]): Unit = {
def checkQbeastOptions(options: Map[String, String]): Unit = {
require(
parameters.contains("columnsToIndex") || parameters.contains("columnstoindex"),
options.contains("columnsToIndex") || options.contains("columnstoindex"),
throw AnalysisExceptionFactory.create("'columnsToIndex is not specified"))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ object QbeastBaseRelation {
*/
def createRelation(
sqlContext: SQLContext,
table: IndexedTable,
indexedTable: IndexedTable,
options: Map[String, String]): BaseRelation = {

val spark = SparkSession.active
val tableID = table.tableID
val tableID = indexedTable.tableID
val snapshot = QbeastContext.metadataManager.loadSnapshot(tableID)
val schema = QbeastContext.metadataManager.loadCurrentSchema(tableID)
if (snapshot.isInitial) {
Expand All @@ -52,22 +52,19 @@ object QbeastBaseRelation {
new ParquetFileFormat(),
options)(spark) with InsertableRelation {
def insert(data: DataFrame, overwrite: Boolean): Unit = {
table.save(data, options, append = !overwrite)
indexedTable.save(data, options, append = !overwrite)
}
}
} else {
// If the table contains data, initialize it
val revision = snapshot.loadLatestRevision
val columnsToIndex = revision.columnTransformers.map(row => row.columnName).mkString(",")
val cubeSize = revision.desiredCubeSize
val parameters =
Map[String, String]("columnsToIndex" -> columnsToIndex, "cubeSize" -> cubeSize.toString())

val path = new Path(tableID.id)
val fileIndex = OTreeIndex(spark, path)
val bucketSpec: Option[BucketSpec] = None
val file = new ParquetFileFormat()

// Verify and Merge options with existing indexed properties
val parameters = indexedTable.verifyAndMergeProperties(options)

new HadoopFsRelation(
fileIndex,
partitionSchema = StructType(Seq.empty[StructField]),
Expand All @@ -76,7 +73,7 @@ object QbeastBaseRelation {
file,
parameters)(spark) with InsertableRelation {
def insert(data: DataFrame, overwrite: Boolean): Unit = {
table.save(data, parameters, append = !overwrite)
indexedTable.save(data, parameters, append = !overwrite)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package io.qbeast.spark.internal.sources

import io.qbeast.context.QbeastContext
import io.qbeast.context.QbeastContext.metadataManager
import io.qbeast.spark.internal.QbeastOptions
import io.qbeast.spark.internal.sources.v2.QbeastTableImpl
import io.qbeast.spark.table.IndexedTableFactory
Expand Down Expand Up @@ -54,15 +53,11 @@ class QbeastDataSource private[sources] (private val tableFactory: IndexedTableF
schema: StructType,
partitioning: Array[Transform],
properties: util.Map[String, String]): Table = {
val tableId = QbeastOptions.loadTableIDFromParameters(properties.asScala.toMap)
val tableId = QbeastOptions.loadTableIDFromOptions(properties.asScala.toMap)
val indexedTable = tableFactory.getIndexedTable(tableId)
if (indexedTable.exists) {
// If the table exists, we make sure to pass all the properties to QbeastTableImpl
val currentRevision = metadataManager.loadSnapshot(tableId).loadLatestRevision
val indexProperties = Map(
"columnsToIndex" -> currentRevision.columnTransformers.map(_.columnName).mkString(","),
"cubeSize" -> currentRevision.desiredCubeSize.toString)
val tableProperties = properties.asScala.toMap ++ indexProperties
val tableProperties = indexedTable.verifyAndMergeProperties(properties.asScala.toMap)
new QbeastTableImpl(
TableIdentifier(tableId.id),
new Path(tableId.id),
Expand Down Expand Up @@ -100,7 +95,7 @@ class QbeastDataSource private[sources] (private val tableFactory: IndexedTableF
parameters.contains("columnsToIndex") || mode == SaveMode.Append,
throw AnalysisExceptionFactory.create("'columnsToIndex' is not specified"))

val tableId = QbeastOptions.loadTableIDFromParameters(parameters)
val tableId = QbeastOptions.loadTableIDFromOptions(parameters)
val table = tableFactory.getIndexedTable(tableId)
mode match {
case SaveMode.Append => table.save(data, parameters, append = true)
Expand All @@ -116,7 +111,7 @@ class QbeastDataSource private[sources] (private val tableFactory: IndexedTableF
override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation = {
val tableID = QbeastOptions.loadTableIDFromParameters(parameters)
val tableID = QbeastOptions.loadTableIDFromOptions(parameters)
val table = tableFactory.getIndexedTable(tableID)
if (table.exists) {
table.load()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package io.qbeast.spark.internal.sources.catalog

import io.qbeast.context.QbeastContext
import io.qbeast.spark.internal.QbeastOptions.checkQbeastProperties
import io.qbeast.spark.internal.sources.v2.{QbeastStagedTableImpl, QbeastTableImpl}
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.catalyst.TableIdentifier
Expand Down Expand Up @@ -110,7 +109,6 @@ class QbeastCatalog[T <: TableCatalog with SupportsNamespaces with FunctionCatal
properties: util.Map[String, String]): Table = {

if (QbeastCatalogUtils.isQbeastProvider(properties)) {
checkQbeastProperties(properties.asScala.toMap)
// Create the table
QbeastCatalogUtils.createQbeastTable(
ident,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
*/
package io.qbeast.spark.internal.sources.catalog

import io.qbeast.context.QbeastContext.metadataManager
import io.qbeast.context.QbeastContext.{metadataManager}
import io.qbeast.core.model.QTableID
import io.qbeast.spark.internal.sources.v2.QbeastTableImpl
import io.qbeast.spark.table.IndexedTableFactory
Expand Down Expand Up @@ -155,13 +155,14 @@ object QbeastCatalogUtils {
tableFactory: IndexedTableFactory,
existingSessionCatalog: SessionCatalog): Unit = {

val isPathTable = QbeastCatalogUtils.isPathTable(ident)
val isPathTable = this.isPathTable(ident)
val properties = allTableProperties.asScala.toMap

// Get table location
val location = if (isPathTable) {
Option(ident.name())
} else {
Option(allTableProperties.get("location"))
properties.get("location")
}

// Define the table type.
Expand All @@ -176,6 +177,10 @@ object QbeastCatalogUtils {
.orElse(existingTableOpt.flatMap(_.storage.locationUri))
.getOrElse(existingSessionCatalog.defaultTablePath(id))

// Process the parameters/options/configuration sent to the table
val indexedTable = tableFactory.getIndexedTable(QTableID(loc.toString))
val allProperties = indexedTable.verifyAndMergeProperties(properties)

// Initialize the path option
val storage = DataSource
.buildStorageFormatFromOptions(writeOptions)
Expand All @@ -198,7 +203,7 @@ object QbeastCatalogUtils {
provider = Some("qbeast"),
partitionColumnNames = Seq.empty,
bucketSpec = None,
properties = allTableProperties.asScala.toMap,
properties = allProperties,
comment = commentOpt)

// Verify the schema if it's an external table
Expand All @@ -210,9 +215,7 @@ object QbeastCatalogUtils {
// Write data, if any
val append = tableCreationMode.saveMode == SaveMode.Append
dataFrame.map { df =>
tableFactory
.getIndexedTable(QTableID(loc.toString))
.save(df, allTableProperties.asScala.toMap, append)
indexedTable.save(df, allProperties, append)
}

// Update the existing session catalog with the Qbeast table information
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
*/
package io.qbeast.spark.internal.sources.v2

import io.qbeast.spark.internal.QbeastOptions.checkQbeastProperties
import io.qbeast.spark.internal.QbeastOptions.checkQbeastOptions
import io.qbeast.spark.internal.sources.catalog.{CreationMode, QbeastCatalogUtils}
import io.qbeast.spark.table.IndexedTableFactory
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
Expand Down Expand Up @@ -70,7 +70,7 @@ private[sources] class QbeastStagedTableImpl(
writeOptions.foreach { case (k, v) => props.put(k, v) }

// Check all the Qbeast properties are correctly specified
checkQbeastProperties(props.asScala.toMap)
checkQbeastOptions(props.asScala.toMap)

// Creates the corresponding table on the Catalog and executes
// the writing of the dataFrame (if any)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ class QbeastWriteBuilder(
// Passing the options in the query plan plus the properties
// because columnsToIndex needs to be included in the contract
val writeOptions = info.options().toMap ++ properties
// scalastyle:off
println("data schema " + data.schema)
indexedTable.save(data, writeOptions, append)

// TODO: Push this to Apache Spark
Expand Down
102 changes: 68 additions & 34 deletions src/main/scala/io/qbeast/spark/table/IndexedTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import io.qbeast.core.model._
import io.qbeast.spark.delta.{CubeDataLoader, StagingDataManager, StagingResolution}
import io.qbeast.spark.index.QbeastColumns
import io.qbeast.spark.internal.QbeastOptions
import io.qbeast.spark.internal.QbeastOptions.{COLUMNS_TO_INDEX, CUBE_SIZE}
import io.qbeast.spark.internal.QbeastOptions._
import io.qbeast.spark.internal.sources.QbeastBaseRelation
import org.apache.spark.qbeast.config.DEFAULT_NUMBER_OF_RETRIES
import org.apache.spark.sql.delta.actions.FileAction
Expand All @@ -30,13 +30,26 @@ trait IndexedTable {
*/
def exists: Boolean

/**
* Returns whether the table contains Qbeast metadata
* @return
*/
def hasQbeastMetadata: Boolean

/**
* Returns the table id which identifies the table.
*
* @return the table id
*/
def tableID: QTableID

/**
* Merge new and index current properties
* @param properties the properties you want to merge
* @return
*/
def verifyAndMergeProperties(properties: Map[String, String]): Map[String, String]

/**
* Saves given data in the table and updates the index. The specified columns are
* used to define the index when the table is created or overwritten. The append
Expand Down Expand Up @@ -139,11 +152,61 @@ private[table] class IndexedTableImpl(
with StagingUtils {
private var snapshotCache: Option[QbeastSnapshot] = None

/**
* Latest Revision Available
*
* @return
*/
private def latestRevision: Revision = snapshot.loadLatestRevision

override def exists: Boolean = !snapshot.isInitial

private def isNewRevision(qbeastOptions: QbeastOptions, latestRevision: Revision): Boolean = {
override def hasQbeastMetadata: Boolean = try {
snapshot.loadLatestRevision
true
} catch {
case _: Exception => false
}

override def verifyAndMergeProperties(properties: Map[String, String]): Map[String, String] = {
if (!exists) {
// IF not exists, we should only check new properties
checkQbeastOptions(properties)
properties
} else if (hasQbeastMetadata) {
// IF has qbeast metadata, we can merge both properties: new and current
val currentColumnsIndexed =
latestRevision.columnTransformers.map(_.columnName).mkString(",")
val currentCubeSize = latestRevision.desiredCubeSize.toString
val finalProperties = {
(properties.contains(COLUMNS_TO_INDEX), properties.contains(CUBE_SIZE)) match {
case (true, true) => properties
case (false, false) =>
properties + (COLUMNS_TO_INDEX -> currentColumnsIndexed, CUBE_SIZE -> currentCubeSize)
case (true, false) => properties + (CUBE_SIZE -> currentCubeSize)
case (false, true) =>
properties + (COLUMNS_TO_INDEX -> currentColumnsIndexed)
}
}
finalProperties
} else {
throw AnalysisExceptionFactory.create(
s"Table ${tableID.id} exists but does not contain Qbeast metadata. " +
s"Please use ConvertToQbeastCommand to convert the table to Qbeast.")
}
}

private def isNewRevision(qbeastOptions: QbeastOptions): Boolean = {

// TODO feature: columnsToIndex may change between revisions
checkColumnsToMatchSchema(latestRevision)
val columnsToIndex = qbeastOptions.columnsToIndex
val currentColumnsToIndex = latestRevision.columnTransformers.map(_.columnName)
val isNewColumns = !latestRevision.matchColumns(columnsToIndex)
if (isNewColumns) {
throw AnalysisExceptionFactory.create(
s"Columns to index '${columnsToIndex.mkString(",")}' do not match " +
s"existing index ${currentColumnsToIndex.mkString(",")}.")
}
// Checks if the desiredCubeSize is different from the existing one
val isNewCubeSize = latestRevision.desiredCubeSize != qbeastOptions.cubeSize
// Checks if the user-provided column boundaries would trigger the creation of
Expand All @@ -169,26 +232,6 @@ private[table] class IndexedTableImpl(

}

/**
* Add the required indexing parameters when the SaveMode is Append.
* The user-provided parameters are respected.
* @param latestRevision the latest revision
* @param parameters the parameters required for indexing
*/
private def addRequiredParams(
latestRevision: Revision,
parameters: Map[String, String]): Map[String, String] = {
val columnsToIndex = latestRevision.columnTransformers.map(_.columnName).mkString(",")
val desiredCubeSize = latestRevision.desiredCubeSize.toString
(parameters.contains(COLUMNS_TO_INDEX), parameters.contains(CUBE_SIZE)) match {
case (true, true) => parameters
case (false, false) =>
parameters + (COLUMNS_TO_INDEX -> columnsToIndex, CUBE_SIZE -> desiredCubeSize)
case (true, false) => parameters + (CUBE_SIZE -> desiredCubeSize)
case (false, true) => parameters + (COLUMNS_TO_INDEX -> columnsToIndex)
}
}

override def save(
data: DataFrame,
parameters: Map[String, String],
Expand All @@ -197,12 +240,11 @@ private[table] class IndexedTableImpl(
if (exists && append) {
// If the table exists and we are appending new data
// 1. Load existing IndexStatus
val latestRevision = snapshot.loadLatestRevision
val updatedParameters = addRequiredParams(latestRevision, parameters)
val updatedParameters = verifyAndMergeProperties(parameters)
if (isStaging(latestRevision)) { // If the existing Revision is Staging
IndexStatus(revisionBuilder.createNewRevision(tableID, data.schema, updatedParameters))
} else {
if (isNewRevision(QbeastOptions(updatedParameters), latestRevision)) {
if (isNewRevision(QbeastOptions(updatedParameters))) {
// If the new parameters generate a new revision, we need to create another one
val newPotentialRevision = revisionBuilder
.createNewRevision(tableID, data.schema, updatedParameters)
Expand Down Expand Up @@ -255,14 +297,6 @@ private[table] class IndexedTableImpl(
snapshotCache = None
}

private def checkColumnsToMatchSchema(revision: Revision): Unit = {
val columnsToIndex = revision.columnTransformers.map(_.columnName)
if (!snapshot.loadLatestRevision.matchColumns(columnsToIndex)) {
throw AnalysisExceptionFactory.create(
s"Columns to index '$columnsToIndex' do not match existing index.")
}
}

/**
* Creates a QbeastBaseRelation for the given table.
* @return the QbeastBaseRelation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import io.qbeast.spark.QbeastIntegrationTestSpec
import io.qbeast.spark.internal.sources.v2.{QbeastStagedTableImpl, QbeastTableImpl}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.CannotReplaceMissingTableException
import org.apache.spark.sql.connector.catalog.{
CatalogExtension,
CatalogPlugin,
Expand Down Expand Up @@ -248,7 +247,7 @@ class QbeastCatalogTest extends QbeastIntegrationTestSpec with CatalogTestSuite

"QbeastCatalogUtils" should "throw an error when trying to replace a non-existing table" in
withQbeastContextSparkAndTmpWarehouse((spark, _) => {
an[CannotReplaceMissingTableException] shouldBe thrownBy(
an[AnalysisException] shouldBe thrownBy(
QbeastCatalogUtils.createQbeastTable(
Identifier.of(defaultNamespace, "students"),
schema,
Expand Down
Loading
Loading