Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark][Sharing] Add CDF support for "delta format sharing" #2457

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.sharing.spark

import java.lang.ref.WeakReference
import java.nio.charset.StandardCharsets.UTF_8

import org.apache.spark.sql.delta.catalog.DeltaTableV2
import com.google.common.hash.Hashing
import io.delta.sharing.client.DeltaSharingClient
import io.delta.sharing.client.model.{Table => DeltaSharingTable}
import org.apache.hadoop.fs.Path

import org.apache.spark.delta.sharing.CachedTableManager
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.sources.BaseRelation

object DeltaSharingCDFUtils extends Logging {

private def getDuration(start: Long): Double = {
(System.currentTimeMillis() - start) / 1000.0
}

/**
* Prepares the BaseRelation for cdf queries on a delta sharing table. Since there's no limit
* pushdown or filter pushdown involved, it wiill firatly fetch all the files from the delta
* sharing server, prepare the local delta log, and leverage DeltaTableV2 to produce the relation.
*/
private[sharing] def prepareCDFRelation(
sqlContext: SQLContext,
options: DeltaSharingOptions,
table: DeltaSharingTable,
client: DeltaSharingClient): BaseRelation = {
val startTime = System.currentTimeMillis()
// 1. Get all files with DeltaSharingClient.
// includeHistoricalMetadata is always set to true, to get the metadata at the startingVersion
// and also any metadata changes between [startingVersion, endingVersion], to put them in the
// delta log. This is to allow delta library to check the metadata change and handle it
// properly -- currently it throws error for column mapping changes.
val deltaTableFiles =
client.getCDFFiles(table, options.cdfOptions, includeHistoricalMetadata = true)
logInfo(
s"Fetched ${deltaTableFiles.lines.size} lines with cdf options ${options.cdfOptions} " +
s"for table ${table} from delta sharing server, took ${getDuration(startTime)}s."
)

val path = options.options.getOrElse("path", throw DeltaSharingErrors.pathNotSpecifiedException)
// 2. Prepare local delta log
val queryCustomTablePath = client.getProfileProvider.getCustomTablePath(path)
val queryParamsHashId = DeltaSharingUtils.getQueryParamsHashId(options.cdfOptions)
val tablePathWithHashIdSuffix =
linzhou-db marked this conversation as resolved.
Show resolved Hide resolved
DeltaSharingUtils.getTablePathWithIdSuffix(queryCustomTablePath, queryParamsHashId)
val deltaLogMetadata = DeltaSharingLogFileSystem.constructLocalDeltaLogAcrossVersions(
lines = deltaTableFiles.lines,
customTablePath = tablePathWithHashIdSuffix,
startingVersionOpt = None,
endingVersionOpt = None
)

// 3. Register parquet file id to url mapping
CachedTableManager.INSTANCE.register(
// Using path instead of queryCustomTablePath because it will be customized within
// CachedTableManager.
tablePath = DeltaSharingUtils.getTablePathWithIdSuffix(path, queryParamsHashId),
idToUrl = deltaLogMetadata.idToUrl,
// A weak reference is needed by the CachedTableManager to decide whether the query is done
// and it's ok to clean up the id to url mapping for this table.
refs = Seq(new WeakReference(this)),
linzhou-db marked this conversation as resolved.
Show resolved Hide resolved
profileProvider = client.getProfileProvider,
refresher = DeltaSharingUtils.getRefresherForGetCDFFiles(
client = client,
table = table,
cdfOptions = options.cdfOptions
),
expirationTimestamp =
if (CachedTableManager.INSTANCE
.isValidUrlExpirationTime(deltaLogMetadata.minUrlExpirationTimestamp)) {
deltaLogMetadata.minUrlExpirationTimestamp.get
} else {
System.currentTimeMillis() + CachedTableManager.INSTANCE.preSignedUrlExpirationMs
},
refreshToken = None
)

// 4. return Delta
val localDeltaCdfOptions = Map(
DeltaSharingOptions.CDF_START_VERSION -> deltaLogMetadata.minVersion.toString,
DeltaSharingOptions.CDF_END_VERSION -> deltaLogMetadata.maxVersion.toString,
DeltaSharingOptions.CDF_READ_OPTION -> "true"
)
DeltaTableV2(
spark = sqlContext.sparkSession,
path = DeltaSharingLogFileSystem.encode(tablePathWithHashIdSuffix),
options = localDeltaCdfOptions
).toBaseRelation
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ private[sharing] class DeltaSharingDataSource
name = parsedPath.table
)

if (options.readChangeFeed) {
return DeltaSharingCDFUtils.prepareCDFRelation(sqlContext, options, dsTable, client)
}
// 2. getMetadata for schema to be used in the file index.
val deltaTableMetadata = DeltaSharingUtils.queryDeltaTableMetadata(
client = client,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,233 @@ private[sharing] object DeltaSharingLogFileSystem extends Logging {
}


/**
* Construct local delta log based on delta log actions returned from delta sharing server.
*
* @param lines a list of delta actions, to be processed and put in the local delta log,
* each action contains a version field to indicate the version of log to
* put it in.
* @param customTablePath query customized table path, used to construct action.path field for
* DeltaSharingFileSystem
* @param startingVersionOpt If set, used to construct the delta file (.json log file) from the
* given startingVersion. This is needed by DeltaSharingSource to
* construct the delta log for the rpc no matter if there are files in
* that version or not, so DeltaSource can read delta actions from the
* starting version (instead from checkpoint).
* @param endingVersionOpt If set, used to construct the delta file (.json log file) until the
* given endingVersion. This is needed by DeltaSharingSource to construct
* the delta log for the rpc no matter if there are files in that version
* or not.
* NOTE: DeltaSource will not advance the offset if there are no files in
* a version of the delta log, but we still create the delta log file for
* that version to avoid missing delta log (json) files.
* @return ConstructedDeltaLogMetadata, which contains 3 fields:
* - idToUrl: mapping from file id to pre-signed url
* - minUrlExpirationTimestamp timestamp indicating the when to refresh pre-signed urls.
* Both are used to register to CachedTableManager.
* - maxVersion: the max version returned in the http response, used by
* DeltaSharingSource to quickly understand the progress of rpcs from the server.
*/
def constructLocalDeltaLogAcrossVersions(
lines: Seq[String],
customTablePath: String,
startingVersionOpt: Option[Long],
endingVersionOpt: Option[Long]): ConstructedDeltaLogMetadata = {
val startTime = System.currentTimeMillis()
assert(
startingVersionOpt.isDefined == endingVersionOpt.isDefined,
s"startingVersionOpt($startingVersionOpt) and endingVersionOpt($endingVersionOpt) should be" +
" both defined or not."
)
if (startingVersionOpt.isDefined) {
assert(
startingVersionOpt.get <= endingVersionOpt.get,
s"startingVersionOpt($startingVersionOpt) must be smaller than " +
s"endingVersionOpt($endingVersionOpt)."
)
}
var minVersion = Long.MaxValue
var maxVersion = 0L
var minUrlExpirationTimestamp: Option[Long] = None
val idToUrl = scala.collection.mutable.Map[String, String]()
val versionToDeltaSharingFileActions =
scala.collection.mutable.Map[Long, ArrayBuffer[model.DeltaSharingFileAction]]()
val versionToMetadata = scala.collection.mutable.Map[Long, model.DeltaSharingMetadata]()
val versionToJsonLogBuilderMap = scala.collection.mutable.Map[Long, ArrayBuffer[String]]()
val versionToJsonLogSize = scala.collection.mutable.Map[Long, Long]().withDefaultValue(0L)
var numFileActionsInMinVersion = 0
val versionToTimestampMap = scala.collection.mutable.Map[Long, Long]()
var startingMetadataLineOpt: Option[String] = None
var startingProtocolLineOpt: Option[String] = None

lines.foreach { line =>
linzhou-db marked this conversation as resolved.
Show resolved Hide resolved
val action = JsonUtils.fromJson[model.DeltaSharingSingleAction](line).unwrap
action match {
case fileAction: model.DeltaSharingFileAction =>
minVersion = minVersion.min(fileAction.version)
maxVersion = maxVersion.max(fileAction.version)
// Store file actions in an array to sort them based on id later.
versionToDeltaSharingFileActions.getOrElseUpdate(
fileAction.version,
ArrayBuffer[model.DeltaSharingFileAction]()
) += fileAction
case metadata: model.DeltaSharingMetadata =>
if (metadata.version != null) {
// This is to handle the cdf and streaming query result.
minVersion = minVersion.min(metadata.version)
maxVersion = maxVersion.max(metadata.version)
versionToMetadata(metadata.version) = metadata
if (metadata.version == minVersion) {
startingMetadataLineOpt = Some(metadata.deltaMetadata.json + "\n")
}
} else {
// This is to handle the snapshot query result from DeltaSharingSource.
startingMetadataLineOpt = Some(metadata.deltaMetadata.json + "\n")
}
case protocol: model.DeltaSharingProtocol =>
startingProtocolLineOpt = Some(protocol.deltaProtocol.json + "\n")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we expect to get only one protocol action from the server? what if we have multiple protocol actions (due to alter etc) in the log files between starting and end version?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same qn for metadata.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good question, the answer is two fold:

  1. The server performs basic checks that the recipient/client is able to handle the readerFeatures, and throw error if not.
  2. The rest of the job is left to the client side delta library, and fallback to the same delta behavior. if delta cannot handle it, it throws the same error as reading a delta table, such as error when reading cdf on a table with column name changed.

case _ => // do nothing, ignore the line.
}
}

if (startingVersionOpt.isDefined) {
minVersion = minVersion.min(startingVersionOpt.get)
} else if (minVersion == Long.MaxValue) {
// This means there are no files returned from server for this cdf request.
// A 0.json file will be prepared with metadata and protocol only.
minVersion = 0
}
if (endingVersionOpt.isDefined) {
maxVersion = maxVersion.max(endingVersionOpt.get)
}
// Store the starting protocol and metadata in the minVersion.json.
val protocolAndMetadataStr = startingMetadataLineOpt.getOrElse("") + startingProtocolLineOpt
.getOrElse("")
versionToJsonLogBuilderMap.getOrElseUpdate(
minVersion,
ArrayBuffer[String]()
) += protocolAndMetadataStr
versionToJsonLogSize(minVersion) += protocolAndMetadataStr.length
numFileActionsInMinVersion = versionToDeltaSharingFileActions
.getOrElseUpdate(minVersion, ArrayBuffer[model.DeltaSharingFileAction]())
.size

// Write metadata to the delta log json file.
versionToMetadata.foreach {
case (version, metadata) =>
if (version != minVersion) {
val metadataStr = metadata.deltaMetadata.json + "\n"
versionToJsonLogBuilderMap.getOrElseUpdate(
version,
ArrayBuffer[String]()
) += metadataStr
versionToJsonLogSize(version) += metadataStr.length
}
}
// Write file actions to the delta log json file.
var previousIdOpt: Option[String] = None
versionToDeltaSharingFileActions.foreach {
case (version, actions) =>
previousIdOpt = None
actions.toSeq.sortWith(deltaSharingFileActionIncreaseOrderFunc).foreach { fileAction =>
assert(
// Using > instead of >= because there can be a removeFile and addFile pointing to the
// same parquet file which result in the same file id, since id is a hash of file path.
// This is ok because eventually it can read data out of the correct parquet file.
!previousIdOpt.exists(_ > fileAction.id),
s"fileActions must be in increasing order by id: ${previousIdOpt} is not smaller than" +
s" ${fileAction.id}, in version:$version."
)
previousIdOpt = Some(fileAction.id)

// 1. build it to url mapping
idToUrl(fileAction.id) = fileAction.path
if (requiresIdToUrlForDV(fileAction.getDeletionVectorOpt)) {
idToUrl(fileAction.deletionVectorFileId) =
fileAction.getDeletionVectorOpt.get.pathOrInlineDv
}

// 2. prepare json log content.
versionToTimestampMap.getOrElseUpdate(version, fileAction.timestamp)
val actionJsonStr = getActionWithDeltaSharingPath(fileAction, customTablePath) + "\n"
versionToJsonLogBuilderMap.getOrElseUpdate(
version,
ArrayBuffer[String]()
) += actionJsonStr
versionToJsonLogSize(version) += actionJsonStr.length

// 3. process expiration timestamp
if (fileAction.expirationTimestamp != null) {
minUrlExpirationTimestamp = minUrlExpirationTimestamp
.filter(_ < fileAction.expirationTimestamp)
.orElse(Some(fileAction.expirationTimestamp))
}
}
}

val encodedTablePath = DeltaSharingLogFileSystem.encode(customTablePath)
val deltaLogPath = s"${encodedTablePath.toString}/_delta_log"
val fileSizeTsSeq = Seq.newBuilder[DeltaSharingLogFileStatus]

if (minVersion > 0) {
// If the minVersion is not 0 in the response, then prepare checkpoint at minVersion - 1:
// need to prepare two files: 1) (minVersion-1).checkpoint.parquet 2) _last_checkpoint
val checkpointVersion = minVersion - 1

// 1) store the checkpoint byte array in BlockManager for future read.
val checkpointParquetFileName =
FileNames.checkpointFileSingular(new Path(deltaLogPath), checkpointVersion).toString
fileSizeTsSeq += DeltaSharingLogFileStatus(
path = checkpointParquetFileName,
size = FAKE_CHECKPOINT_BYTE_ARRAY.size,
modificationTime = 0L
)

// 2) Prepare the content for _last_checkpoint
val lastCheckpointContent =
s"""{"version":${checkpointVersion},"size":${FAKE_CHECKPOINT_BYTE_ARRAY.size}}"""
val lastCheckpointPath = new Path(deltaLogPath, "_last_checkpoint").toString
fileSizeTsSeq += DeltaSharingLogFileStatus(
path = lastCheckpointPath,
size = lastCheckpointContent.length,
modificationTime = 0L
)
DeltaSharingUtils.overrideSingleBlock[String](
blockId = getDeltaSharingLogBlockId(lastCheckpointPath),
value = lastCheckpointContent
)
}

for (version <- minVersion to maxVersion) {
val jsonFilePath = FileNames.deltaFile(new Path(deltaLogPath), version).toString
DeltaSharingUtils.overrideIteratorBlock[String](
getDeltaSharingLogBlockId(jsonFilePath),
versionToJsonLogBuilderMap.getOrElse(version, Seq.empty).toIterator
)
fileSizeTsSeq += DeltaSharingLogFileStatus(
path = jsonFilePath,
size = versionToJsonLogSize.getOrElse(version, 0),
modificationTime = versionToTimestampMap.get(version).getOrElse(0L)
)
}

DeltaSharingUtils.overrideIteratorBlock[DeltaSharingLogFileStatus](
getDeltaSharingLogBlockId(deltaLogPath),
fileSizeTsSeq.result().toIterator
)
logInfo(
s"It takes ${(System.currentTimeMillis() - startTime) / 1000.0}s to construct delta log" +
s"for $customTablePath from $minVersion to $maxVersion, with ${idToUrl.toMap.size} urls."
)
ConstructedDeltaLogMetadata(
idToUrl = idToUrl.toMap,
minUrlExpirationTimestamp = minUrlExpirationTimestamp,
numFileActionsInMinVersionOpt = Some(numFileActionsInMinVersion),
minVersion = minVersion,
maxVersion = maxVersion
)
}

/** Set the modificationTime to zero, this is to align with the time returned from
* DeltaSharingFileSystem.getFileStatus
*/
Expand Down
Loading
Loading