Skip to content

Commit

Permalink
Update metadata in MetadataManager
Browse files Browse the repository at this point in the history
  • Loading branch information
Jiaweihu08 committed Jan 27, 2023
1 parent 57cacd4 commit 1332bc9
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 35 deletions.
11 changes: 11 additions & 0 deletions core/src/main/scala/io/qbeast/core/model/MetadataManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import io.qbeast.IISeq
* @tparam FileDescriptor type of file descriptor
*/
trait MetadataManager[DataSchema, FileDescriptor] {
type Configuration = Map[String, String]

/**
* Gets the Snapshot for a given table
Expand All @@ -33,6 +34,16 @@ trait MetadataManager[DataSchema, FileDescriptor] {
def updateWithTransaction(tableID: QTableID, schema: DataSchema, append: Boolean)(
writer: => (TableChanges, IISeq[FileDescriptor])): Unit

/**
* Updates the table metadata by overwriting the metadata configurations
* with the provided key-value pairs.
* @param tableID QTableID
* @param schema table schema
* @param update configurations used to overwrite the existing metadata
*/
def updateMetadataWithTransaction(tableID: QTableID, schema: DataSchema)(
update: => Configuration): Unit

/**
* Updates the Revision with the given RevisionChanges
* @param tableID the QTableID
Expand Down
27 changes: 20 additions & 7 deletions src/main/scala/io/qbeast/spark/delta/DeltaMetadataWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,9 @@ package io.qbeast.spark.delta

import io.qbeast.core.model.{QTableID, RevisionID, TableChanges}
import io.qbeast.spark.delta.writer.StatsTracker.registerStatsTrackers
import io.qbeast.spark.utils.QbeastExceptionMessages.partitionedTableExceptionMsg
import io.qbeast.spark.utils.TagColumns
import org.apache.spark.sql.delta.actions.{
Action,
AddFile,
FileAction,
RemoveFile,
SetTransaction
}
import org.apache.spark.sql.delta.actions._
import org.apache.spark.sql.delta.commands.DeltaCommand
import org.apache.spark.sql.delta.{DeltaLog, DeltaOperations, DeltaOptions, OptimisticTransaction}
import org.apache.spark.sql.execution.datasources.{
Expand Down Expand Up @@ -84,6 +79,24 @@ private[delta] case class DeltaMetadataWriter(
}
}

def updateMetadataWithTransaction(update: => Configuration): Unit = {
deltaLog.withNewTransaction { txn =>
if (txn.metadata.partitionColumns.nonEmpty) {
throw AnalysisExceptionFactory.create(partitionedTableExceptionMsg)
}

val config = update
val updatedConfig = config.foldLeft(txn.metadata.configuration) { case (accConf, (k, v)) =>
accConf.updated(k, v)
}
val updatedMetadata = txn.metadata.copy(configuration = updatedConfig)

val op = DeltaOperations.SetTableProperties(config)
txn.updateMetadata(updatedMetadata)
txn.commit(Seq.empty, op)
}
}

private def updateReplicatedFiles(tableChanges: TableChanges): Seq[Action] = {

val revision = tableChanges.updatedRevision
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
package io.qbeast.spark.delta

import io.qbeast.IISeq
import io.qbeast.core.model.{MetadataManager, _}
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.delta.{DeltaLog, DeltaOptions}
import io.qbeast.core.model._
import org.apache.spark.sql.delta.actions.FileAction
import org.apache.spark.sql.delta.{DeltaLog, DeltaOptions}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{SaveMode, SparkSession}

/**
* Spark+Delta implementation of the MetadataManager interface
Expand All @@ -26,6 +26,18 @@ object SparkDeltaMetadataManager extends MetadataManager[StructType, FileAction]
metadataWriter.writeWithTransaction(writer)
}

override def updateMetadataWithTransaction(tableID: QTableID, schema: StructType)(
update: => Configuration): Unit = {
val deltaLog = loadDeltaQbeastLog(tableID).deltaLog
val options =
new DeltaOptions(Map("path" -> tableID.id), SparkSession.active.sessionState.conf)

val metadataWriter =
DeltaMetadataWriter(tableID, mode = SaveMode.Append, deltaLog, options, schema)

metadataWriter.updateMetadataWithTransaction(update)
}

override def loadSnapshot(tableID: QTableID): DeltaQbeastSnapshot = {
DeltaQbeastSnapshot(loadDeltaQbeastLog(tableID).deltaLog.update())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
package io.qbeast.spark.internal.commands

import io.qbeast.core.model._
import io.qbeast.spark.delta.DeltaQbeastSnapshot
import io.qbeast.spark.delta.{DeltaQbeastSnapshot, SparkDeltaMetadataManager}
import io.qbeast.spark.utils.MetadataConfig.{lastRevisionID, revision}
import io.qbeast.spark.utils.QbeastExceptionMessages.{
incorrectIdentifierFormat,
Expand All @@ -16,7 +16,6 @@ import org.apache.spark.internal.Logging
import org.apache.spark.qbeast.config.DEFAULT_CUBE_SIZE
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.delta.DeltaLog
import org.apache.spark.sql.delta.DeltaOperations.Convert
import org.apache.spark.sql.execution.command.LeafRunnableCommand
import org.apache.spark.sql.{AnalysisException, AnalysisExceptionFactory, Row, SparkSession}

Expand Down Expand Up @@ -76,33 +75,21 @@ case class ConvertToQbeastCommand(
case _ => throw AnalysisExceptionFactory.create(unsupportedFormatExceptionMsg(fileFormat))
}

// Convert delta to qbeast
deltaLog.update()
// Convert delta to qbeast through metadata modification
val tableID = QTableID(tableId.table)
val schema = deltaLog.snapshot.schema

val txn = deltaLog.startTransaction()
SparkDeltaMetadataManager.updateMetadataWithTransaction(tableID, schema) {
val convRevision = Revision.emptyRevision(tableID, cubeSize, columnsToIndex)
val revisionID = convRevision.revisionID

// Converting a partitioned delta table is not supported, for qbeast files
// are not partitioned.
val isPartitionedDelta = txn.metadata.partitionColumns.nonEmpty
if (isPartitionedDelta) {
throw AnalysisExceptionFactory.create(partitionedTableExceptionMsg)
// Add staging revision to Revision Map, set it as the latestRevision
Map(
lastRevisionID -> revisionID.toString,
s"$revision.$revisionID" -> mapper.writeValueAsString(convRevision))
}

val convRevision = Revision.emptyRevision(QTableID(tableId.table), cubeSize, columnsToIndex)
val revisionID = convRevision.revisionID

// Update revision map
val updatedConf =
txn.metadata.configuration
.updated(lastRevisionID, revisionID.toString)
.updated(s"$revision.$revisionID", mapper.writeValueAsString(convRevision))

val newMetadata =
txn.metadata.copy(configuration = updatedConf)

txn.updateMetadata(newMetadata)
txn.commit(Seq.empty, Convert(0, Seq.empty, collectStats = false, None))
}

Seq.empty[Row]
}

Expand Down

0 comments on commit 1332bc9

Please sign in to comment.