Skip to content

Commit

Permalink
[Spark][Sharing] Adds deletion vector support for "delta format sharing"
Browse files Browse the repository at this point in the history
- Extends PrepareDeltaScan to PrepareDeltaSharingScan, to convert DeltaSharingFileIndex to TahoeLogFileIndex.
- Update DeltaSparkSessionExtension to add the rule of PrepareDeltaSharingScan
- Added unit test in DeltaSharingDataSourceDeltaSuite

Closes #2480

GitOrigin-RevId: 816ae9b4c9409f301690e205621ed252848cbb5b
  • Loading branch information
linzhou-db authored and vkorukanti committed Jan 12, 2024
1 parent 11cd832 commit e69a4d3
Show file tree
Hide file tree
Showing 8 changed files with 766 additions and 199 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,20 +72,25 @@ case class DeltaSharingFileIndex(

override def partitionSchema: StructType = params.metadata.partitionSchema

// Returns the partition columns of the shared delta table based on the returned metadata.
def partitionColumns: Seq[String] = params.metadata.deltaMetadata.partitionColumns

override def rootPaths: Seq[Path] = params.path :: Nil

override def inputFiles: Array[String] = {
throw new UnsupportedOperationException("DeltaSharingFileIndex.inputFiles")
}

// A set that includes the queriedTableQueryId that we've issued delta sharing rpc.
// This is because listFiles will be called twice or more in a spark query, with this set, we
// A map that from queriedTableQueryId that we've issued delta sharing rpc, to the deltaLog
// constructed with the response.
// It is because this function will be called twice or more in a spark query, with this set, we
// can avoid doing duplicated work of making expensive rpc and constructing the delta log.
private val queriedTableQueryIdSet = scala.collection.mutable.Set[String]()
private val queriedTableQueryIdToDeltaLog = scala.collection.mutable.Map[String, DeltaLog]()

override def listFiles(
def fetchFilesAndConstructDeltaLog(
partitionFilters: Seq[Expression],
dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
dataFilters: Seq[Expression],
overrideLimit: Option[Long]): DeltaLog = {
val jsonPredicateHints = convertToJsonPredicate(partitionFilters, dataFilters)
val queryParamsHashId = DeltaSharingUtils.getQueryParamsHashId(
params.options,
Expand All @@ -101,79 +106,109 @@ case class DeltaSharingFileIndex(
)
// listFiles will be called twice or more in a spark query, with this check we can avoid
// duplicated work of making expensive rpc and constructing the delta log.
if (!queriedTableQueryIdSet.contains(tablePathWithHashIdSuffix)) {
// 1. Call client.getFiles.
val startTime = System.currentTimeMillis()
val deltaTableFiles = client.getFiles(
queriedTableQueryIdToDeltaLog.get(tablePathWithHashIdSuffix) match {
case Some(deltaLog) => deltaLog
case None =>
createDeltaLog(
jsonPredicateHints,
queryParamsHashId,
tablePathWithHashIdSuffix,
overrideLimit
)
}
}

private def createDeltaLog(
jsonPredicateHints: Option[String],
queryParamsHashId: String,
tablePathWithHashIdSuffix: String,
overrideLimit: Option[Long]): DeltaLog = {
// 1. Call client.getFiles.
val startTime = System.currentTimeMillis()
val deltaTableFiles = client.getFiles(
table = table,
predicates = Nil,
limit = overrideLimit.orElse(limitHint),
versionAsOf = params.options.versionAsOf,
timestampAsOf = params.options.timestampAsOf,
jsonPredicateHints = jsonPredicateHints,
refreshToken = None
)
logInfo(
s"Fetched ${deltaTableFiles.lines.size} lines for table $table with version " +
s"${deltaTableFiles.version} from delta sharing server, took " +
s"${(System.currentTimeMillis() - startTime) / 1000.0}s."
)

// 2. Prepare a DeltaLog.
val deltaLogMetadata =
DeltaSharingLogFileSystem.constructLocalDeltaLogAtVersionZero(
deltaTableFiles.lines,
tablePathWithHashIdSuffix
)

// 3. Register parquet file id to url mapping
CachedTableManager.INSTANCE.register(
// Using params.path instead of queryCustomTablePath because it will be customized
// within CachedTableManager.
tablePath = DeltaSharingUtils.getTablePathWithIdSuffix(
params.path.toString,
queryParamsHashId
),
idToUrl = deltaLogMetadata.idToUrl,
refs = Seq(new WeakReference(this)),
profileProvider = client.getProfileProvider,
refresher = DeltaSharingUtils.getRefresherForGetFiles(
client = client,
table = table,
predicates = Nil,
limit = limitHint,
limit = overrideLimit.orElse(limitHint),
versionAsOf = params.options.versionAsOf,
timestampAsOf = params.options.timestampAsOf,
jsonPredicateHints = jsonPredicateHints,
refreshToken = None
)
logInfo(
s"Fetched ${deltaTableFiles.lines.size} lines for table $table with version " +
s"${deltaTableFiles.version} from delta sharing server, took " +
s"${(System.currentTimeMillis() - startTime) / 1000.0}s."
)

// 2. Prepare a DeltaLog.
val deltaLogMetadata =
DeltaSharingLogFileSystem.constructLocalDeltaLogAtVersionZero(
deltaTableFiles.lines,
tablePathWithHashIdSuffix
)

// 3. Register parquet file id to url mapping
CachedTableManager.INSTANCE.register(
// Using params.path instead of queryCustomTablePath because it will be customized
// within CachedTableManager.
tablePath = DeltaSharingUtils.getTablePathWithIdSuffix(
params.path.toString,
queryParamsHashId
),
idToUrl = deltaLogMetadata.idToUrl,
refs = Seq(new WeakReference(this)),
profileProvider = client.getProfileProvider,
refresher = DeltaSharingUtils.getRefresherForGetFiles(
client = client,
table = table,
predicates = Nil,
limit = limitHint,
versionAsOf = params.options.versionAsOf,
timestampAsOf = params.options.timestampAsOf,
jsonPredicateHints = jsonPredicateHints,
refreshToken = deltaTableFiles.refreshToken
),
expirationTimestamp =
if (CachedTableManager.INSTANCE
.isValidUrlExpirationTime(deltaLogMetadata.minUrlExpirationTimestamp)) {
deltaLogMetadata.minUrlExpirationTimestamp.get
} else {
System.currentTimeMillis() + CachedTableManager.INSTANCE.preSignedUrlExpirationMs
},
refreshToken = deltaTableFiles.refreshToken
)

// In theory there should only be one entry in this set since each query creates its own
// FileIndex class. This is purged together with the FileIndex class when the query finishes.
queriedTableQueryIdSet.add(tablePathWithHashIdSuffix)
}
),
expirationTimestamp =
if (CachedTableManager.INSTANCE
.isValidUrlExpirationTime(deltaLogMetadata.minUrlExpirationTimestamp)) {
deltaLogMetadata.minUrlExpirationTimestamp.get
} else {
System.currentTimeMillis() + CachedTableManager.INSTANCE.preSignedUrlExpirationMs
},
refreshToken = deltaTableFiles.refreshToken
)

// 4. Create a local file index and call listFiles of this class.
val deltaLog = DeltaLog.forTable(
params.spark,
DeltaSharingLogFileSystem.encode(tablePathWithHashIdSuffix)
)
val fileIndex = new TahoeLogFileIndex(

// In theory there should only be one entry in this set since each query creates its own
// FileIndex class. This is purged together with the FileIndex class when the query
// finishes.
queriedTableQueryIdToDeltaLog.put(tablePathWithHashIdSuffix, deltaLog)

deltaLog
}

def asTahoeFileIndex(
partitionFilters: Seq[Expression],
dataFilters: Seq[Expression]): TahoeLogFileIndex = {
val deltaLog = fetchFilesAndConstructDeltaLog(partitionFilters, dataFilters, None)
new TahoeLogFileIndex(
params.spark,
deltaLog,
deltaLog.dataPath,
deltaLog.unsafeVolatileSnapshot
)
fileIndex.listFiles(partitionFilters, dataFilters)
}

override def listFiles(
partitionFilters: Seq[Expression],
dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
// NOTE: The server is not required to apply all filters, so we apply them client-side as well.
asTahoeFileIndex(partitionFilters, dataFilters).listFiles(partitionFilters, dataFilters)
}

// Converts the specified SQL expressions to a json predicate.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.sharing.spark

import org.apache.spark.sql.delta.{DeltaTableUtils => SqlDeltaTableUtils}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.stats.{PreparedDeltaFileIndex, PrepareDeltaScan}
import io.delta.sharing.client.util.ConfUtils
import io.delta.sharing.spark.DeltaSharingFileIndex

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._

/**
* Before query planning, we prepare any scans over delta sharing tables by pushing
* any filters or limits to delta sharing server through RPC, allowing us to return only needed
* files and gather more accurate statistics for CBO and metering.
*/
class PrepareDeltaSharingScan(override val spark: SparkSession) extends PrepareDeltaScan(spark) {

/**
* Prepares delta sharing scans sequentially.
*/
override protected def prepareDeltaScan(plan: LogicalPlan): LogicalPlan = {
transformWithSubqueries(plan) {
case scan @ DeltaSharingTableScan(_, filters, dsFileIndex, limit, _) =>
val partitionCols = dsFileIndex.partitionColumns
val (partitionFilters, dataFilters) = filters.partition { e =>
SqlDeltaTableUtils.isPredicatePartitionColumnsOnly(e, partitionCols, spark)
}
logInfo(s"Classified filters: partition: $partitionFilters, data: $dataFilters")
val deltaLog = dsFileIndex.fetchFilesAndConstructDeltaLog(
partitionFilters,
dataFilters,
limit.map(_.toLong)
)
val snapshot = deltaLog.snapshot
val deltaScan = limit match {
case Some(limit) => snapshot.filesForScan(limit, filters)
case _ => snapshot.filesForScan(filters)
}
val preparedIndex = PreparedDeltaFileIndex(
spark,
deltaLog,
deltaLog.dataPath,
preparedScan = deltaScan,
versionScanned = Some(snapshot.version)
)
SqlDeltaTableUtils.replaceFileIndex(scan, preparedIndex)
}
}

// Just return the plan if statistics based skipping is off.
// It will fall back to just partition pruning at planning time.
// When data skipping is disabled, just convert Delta sharing scans to normal tahoe scans.
// NOTE: File skipping is only disabled on the client, so we still pass filters to the server.
override protected def prepareDeltaScanWithoutFileSkipping(plan: LogicalPlan): LogicalPlan = {
plan.transformDown {
case scan@DeltaSharingTableScan(_, filters, sharingIndex, _, _) =>
val partitionCols = sharingIndex.partitionColumns
val (partitionFilters, dataFilters) = filters.partition { e =>
SqlDeltaTableUtils.isPredicatePartitionColumnsOnly(e, partitionCols, spark)
}
logInfo(s"Classified filters: partition: $partitionFilters, data: $dataFilters")
val fileIndex = sharingIndex.asTahoeFileIndex(partitionFilters, dataFilters)
SqlDeltaTableUtils.replaceFileIndex(scan, fileIndex)
}
}

// TODO: Support metadata-only query optimization!
override def optimizeQueryWithMetadata(plan: LogicalPlan): LogicalPlan = plan

/**
* This is an extractor object. See https://docs.scala-lang.org/tour/extractor-objects.html.
*/
object DeltaSharingTableScan extends DeltaTableScan[DeltaSharingFileIndex] {
// Since delta library is used to read the data on constructed delta log, this should also
// consider the spark config for delta limit pushdown.
override def limitPushdownEnabled(plan: LogicalPlan): Boolean =
ConfUtils.limitPushdownEnabled(plan.conf) &&
(spark.conf.get(DeltaSQLConf.DELTA_LIMIT_PUSHDOWN_ENABLED.key) == "true")

override def getPartitionColumns(fileIndex: DeltaSharingFileIndex): Seq[String] =
fileIndex.partitionColumns

override def getPartitionFilters(fileIndex: DeltaSharingFileIndex): Seq[Expression] =
Seq.empty[Expression]

}
}
Loading

0 comments on commit e69a4d3

Please sign in to comment.