diff --git a/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingCDFUtils.scala b/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingCDFUtils.scala new file mode 100644 index 00000000000..7b8a8294c67 --- /dev/null +++ b/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingCDFUtils.scala @@ -0,0 +1,112 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.sharing.spark + +import java.lang.ref.WeakReference +import java.nio.charset.StandardCharsets.UTF_8 + +import org.apache.spark.sql.delta.catalog.DeltaTableV2 +import com.google.common.hash.Hashing +import io.delta.sharing.client.DeltaSharingClient +import io.delta.sharing.client.model.{Table => DeltaSharingTable} +import org.apache.hadoop.fs.Path + +import org.apache.spark.delta.sharing.CachedTableManager +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.sources.BaseRelation + +object DeltaSharingCDFUtils extends Logging { + + private def getDuration(start: Long): Double = { + (System.currentTimeMillis() - start) / 1000.0 + } + + /** + * Prepares the BaseRelation for cdf queries on a delta sharing table. Since there's no limit + * pushdown or filter pushdown involved, it wiill firatly fetch all the files from the delta + * sharing server, prepare the local delta log, and leverage DeltaTableV2 to produce the relation. + */ + private[sharing] def prepareCDFRelation( + sqlContext: SQLContext, + options: DeltaSharingOptions, + table: DeltaSharingTable, + client: DeltaSharingClient): BaseRelation = { + val startTime = System.currentTimeMillis() + // 1. Get all files with DeltaSharingClient. + // includeHistoricalMetadata is always set to true, to get the metadata at the startingVersion + // and also any metadata changes between [startingVersion, endingVersion], to put them in the + // delta log. This is to allow delta library to check the metadata change and handle it + // properly -- currently it throws error for column mapping changes. + val deltaTableFiles = + client.getCDFFiles(table, options.cdfOptions, includeHistoricalMetadata = true) + logInfo( + s"Fetched ${deltaTableFiles.lines.size} lines with cdf options ${options.cdfOptions} " + + s"for table ${table} from delta sharing server, took ${getDuration(startTime)}s." + ) + + val path = options.options.getOrElse("path", throw DeltaSharingErrors.pathNotSpecifiedException) + // 2. Prepare local delta log + val queryCustomTablePath = client.getProfileProvider.getCustomTablePath(path) + val queryParamsHashId = DeltaSharingUtils.getQueryParamsHashId(options.cdfOptions) + val tablePathWithHashIdSuffix = + DeltaSharingUtils.getTablePathWithIdSuffix(queryCustomTablePath, queryParamsHashId) + val deltaLogMetadata = DeltaSharingLogFileSystem.constructLocalDeltaLogAcrossVersions( + lines = deltaTableFiles.lines, + customTablePath = tablePathWithHashIdSuffix, + startingVersionOpt = None, + endingVersionOpt = None + ) + + // 3. Register parquet file id to url mapping + CachedTableManager.INSTANCE.register( + // Using path instead of queryCustomTablePath because it will be customized within + // CachedTableManager. + tablePath = DeltaSharingUtils.getTablePathWithIdSuffix(path, queryParamsHashId), + idToUrl = deltaLogMetadata.idToUrl, + // A weak reference is needed by the CachedTableManager to decide whether the query is done + // and it's ok to clean up the id to url mapping for this table. + refs = Seq(new WeakReference(this)), + profileProvider = client.getProfileProvider, + refresher = DeltaSharingUtils.getRefresherForGetCDFFiles( + client = client, + table = table, + cdfOptions = options.cdfOptions + ), + expirationTimestamp = + if (CachedTableManager.INSTANCE + .isValidUrlExpirationTime(deltaLogMetadata.minUrlExpirationTimestamp)) { + deltaLogMetadata.minUrlExpirationTimestamp.get + } else { + System.currentTimeMillis() + CachedTableManager.INSTANCE.preSignedUrlExpirationMs + }, + refreshToken = None + ) + + // 4. return Delta + val localDeltaCdfOptions = Map( + DeltaSharingOptions.CDF_START_VERSION -> deltaLogMetadata.minVersion.toString, + DeltaSharingOptions.CDF_END_VERSION -> deltaLogMetadata.maxVersion.toString, + DeltaSharingOptions.CDF_READ_OPTION -> "true" + ) + DeltaTableV2( + spark = sqlContext.sparkSession, + path = DeltaSharingLogFileSystem.encode(tablePathWithHashIdSuffix), + options = localDeltaCdfOptions + ).toBaseRelation + } +} diff --git a/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingDataSource.scala b/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingDataSource.scala index 231be28ce50..c76b9389d49 100644 --- a/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingDataSource.scala +++ b/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingDataSource.scala @@ -99,6 +99,9 @@ private[sharing] class DeltaSharingDataSource name = parsedPath.table ) + if (options.readChangeFeed) { + return DeltaSharingCDFUtils.prepareCDFRelation(sqlContext, options, dsTable, client) + } // 2. getMetadata for schema to be used in the file index. val deltaTableMetadata = DeltaSharingUtils.queryDeltaTableMetadata( client = client, diff --git a/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingLogFileSystem.scala b/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingLogFileSystem.scala index 7214f6368b5..02f9d9e0462 100644 --- a/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingLogFileSystem.scala +++ b/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingLogFileSystem.scala @@ -353,6 +353,233 @@ private[sharing] object DeltaSharingLogFileSystem extends Logging { } + /** + * Construct local delta log based on delta log actions returned from delta sharing server. + * + * @param lines a list of delta actions, to be processed and put in the local delta log, + * each action contains a version field to indicate the version of log to + * put it in. + * @param customTablePath query customized table path, used to construct action.path field for + * DeltaSharingFileSystem + * @param startingVersionOpt If set, used to construct the delta file (.json log file) from the + * given startingVersion. This is needed by DeltaSharingSource to + * construct the delta log for the rpc no matter if there are files in + * that version or not, so DeltaSource can read delta actions from the + * starting version (instead from checkpoint). + * @param endingVersionOpt If set, used to construct the delta file (.json log file) until the + * given endingVersion. This is needed by DeltaSharingSource to construct + * the delta log for the rpc no matter if there are files in that version + * or not. + * NOTE: DeltaSource will not advance the offset if there are no files in + * a version of the delta log, but we still create the delta log file for + * that version to avoid missing delta log (json) files. + * @return ConstructedDeltaLogMetadata, which contains 3 fields: + * - idToUrl: mapping from file id to pre-signed url + * - minUrlExpirationTimestamp timestamp indicating the when to refresh pre-signed urls. + * Both are used to register to CachedTableManager. + * - maxVersion: the max version returned in the http response, used by + * DeltaSharingSource to quickly understand the progress of rpcs from the server. + */ + def constructLocalDeltaLogAcrossVersions( + lines: Seq[String], + customTablePath: String, + startingVersionOpt: Option[Long], + endingVersionOpt: Option[Long]): ConstructedDeltaLogMetadata = { + val startTime = System.currentTimeMillis() + assert( + startingVersionOpt.isDefined == endingVersionOpt.isDefined, + s"startingVersionOpt($startingVersionOpt) and endingVersionOpt($endingVersionOpt) should be" + + " both defined or not." + ) + if (startingVersionOpt.isDefined) { + assert( + startingVersionOpt.get <= endingVersionOpt.get, + s"startingVersionOpt($startingVersionOpt) must be smaller than " + + s"endingVersionOpt($endingVersionOpt)." + ) + } + var minVersion = Long.MaxValue + var maxVersion = 0L + var minUrlExpirationTimestamp: Option[Long] = None + val idToUrl = scala.collection.mutable.Map[String, String]() + val versionToDeltaSharingFileActions = + scala.collection.mutable.Map[Long, ArrayBuffer[model.DeltaSharingFileAction]]() + val versionToMetadata = scala.collection.mutable.Map[Long, model.DeltaSharingMetadata]() + val versionToJsonLogBuilderMap = scala.collection.mutable.Map[Long, ArrayBuffer[String]]() + val versionToJsonLogSize = scala.collection.mutable.Map[Long, Long]().withDefaultValue(0L) + var numFileActionsInMinVersion = 0 + val versionToTimestampMap = scala.collection.mutable.Map[Long, Long]() + var startingMetadataLineOpt: Option[String] = None + var startingProtocolLineOpt: Option[String] = None + + lines.foreach { line => + val action = JsonUtils.fromJson[model.DeltaSharingSingleAction](line).unwrap + action match { + case fileAction: model.DeltaSharingFileAction => + minVersion = minVersion.min(fileAction.version) + maxVersion = maxVersion.max(fileAction.version) + // Store file actions in an array to sort them based on id later. + versionToDeltaSharingFileActions.getOrElseUpdate( + fileAction.version, + ArrayBuffer[model.DeltaSharingFileAction]() + ) += fileAction + case metadata: model.DeltaSharingMetadata => + if (metadata.version != null) { + // This is to handle the cdf and streaming query result. + minVersion = minVersion.min(metadata.version) + maxVersion = maxVersion.max(metadata.version) + versionToMetadata(metadata.version) = metadata + if (metadata.version == minVersion) { + startingMetadataLineOpt = Some(metadata.deltaMetadata.json + "\n") + } + } else { + // This is to handle the snapshot query result from DeltaSharingSource. + startingMetadataLineOpt = Some(metadata.deltaMetadata.json + "\n") + } + case protocol: model.DeltaSharingProtocol => + startingProtocolLineOpt = Some(protocol.deltaProtocol.json + "\n") + case _ => // do nothing, ignore the line. + } + } + + if (startingVersionOpt.isDefined) { + minVersion = minVersion.min(startingVersionOpt.get) + } else if (minVersion == Long.MaxValue) { + // This means there are no files returned from server for this cdf request. + // A 0.json file will be prepared with metadata and protocol only. + minVersion = 0 + } + if (endingVersionOpt.isDefined) { + maxVersion = maxVersion.max(endingVersionOpt.get) + } + // Store the starting protocol and metadata in the minVersion.json. + val protocolAndMetadataStr = startingMetadataLineOpt.getOrElse("") + startingProtocolLineOpt + .getOrElse("") + versionToJsonLogBuilderMap.getOrElseUpdate( + minVersion, + ArrayBuffer[String]() + ) += protocolAndMetadataStr + versionToJsonLogSize(minVersion) += protocolAndMetadataStr.length + numFileActionsInMinVersion = versionToDeltaSharingFileActions + .getOrElseUpdate(minVersion, ArrayBuffer[model.DeltaSharingFileAction]()) + .size + + // Write metadata to the delta log json file. + versionToMetadata.foreach { + case (version, metadata) => + if (version != minVersion) { + val metadataStr = metadata.deltaMetadata.json + "\n" + versionToJsonLogBuilderMap.getOrElseUpdate( + version, + ArrayBuffer[String]() + ) += metadataStr + versionToJsonLogSize(version) += metadataStr.length + } + } + // Write file actions to the delta log json file. + var previousIdOpt: Option[String] = None + versionToDeltaSharingFileActions.foreach { + case (version, actions) => + previousIdOpt = None + actions.toSeq.sortWith(deltaSharingFileActionIncreaseOrderFunc).foreach { fileAction => + assert( + // Using > instead of >= because there can be a removeFile and addFile pointing to the + // same parquet file which result in the same file id, since id is a hash of file path. + // This is ok because eventually it can read data out of the correct parquet file. + !previousIdOpt.exists(_ > fileAction.id), + s"fileActions must be in increasing order by id: ${previousIdOpt} is not smaller than" + + s" ${fileAction.id}, in version:$version." + ) + previousIdOpt = Some(fileAction.id) + + // 1. build it to url mapping + idToUrl(fileAction.id) = fileAction.path + if (requiresIdToUrlForDV(fileAction.getDeletionVectorOpt)) { + idToUrl(fileAction.deletionVectorFileId) = + fileAction.getDeletionVectorOpt.get.pathOrInlineDv + } + + // 2. prepare json log content. + versionToTimestampMap.getOrElseUpdate(version, fileAction.timestamp) + val actionJsonStr = getActionWithDeltaSharingPath(fileAction, customTablePath) + "\n" + versionToJsonLogBuilderMap.getOrElseUpdate( + version, + ArrayBuffer[String]() + ) += actionJsonStr + versionToJsonLogSize(version) += actionJsonStr.length + + // 3. process expiration timestamp + if (fileAction.expirationTimestamp != null) { + minUrlExpirationTimestamp = minUrlExpirationTimestamp + .filter(_ < fileAction.expirationTimestamp) + .orElse(Some(fileAction.expirationTimestamp)) + } + } + } + + val encodedTablePath = DeltaSharingLogFileSystem.encode(customTablePath) + val deltaLogPath = s"${encodedTablePath.toString}/_delta_log" + val fileSizeTsSeq = Seq.newBuilder[DeltaSharingLogFileStatus] + + if (minVersion > 0) { + // If the minVersion is not 0 in the response, then prepare checkpoint at minVersion - 1: + // need to prepare two files: 1) (minVersion-1).checkpoint.parquet 2) _last_checkpoint + val checkpointVersion = minVersion - 1 + + // 1) store the checkpoint byte array in BlockManager for future read. + val checkpointParquetFileName = + FileNames.checkpointFileSingular(new Path(deltaLogPath), checkpointVersion).toString + fileSizeTsSeq += DeltaSharingLogFileStatus( + path = checkpointParquetFileName, + size = FAKE_CHECKPOINT_BYTE_ARRAY.size, + modificationTime = 0L + ) + + // 2) Prepare the content for _last_checkpoint + val lastCheckpointContent = + s"""{"version":${checkpointVersion},"size":${FAKE_CHECKPOINT_BYTE_ARRAY.size}}""" + val lastCheckpointPath = new Path(deltaLogPath, "_last_checkpoint").toString + fileSizeTsSeq += DeltaSharingLogFileStatus( + path = lastCheckpointPath, + size = lastCheckpointContent.length, + modificationTime = 0L + ) + DeltaSharingUtils.overrideSingleBlock[String]( + blockId = getDeltaSharingLogBlockId(lastCheckpointPath), + value = lastCheckpointContent + ) + } + + for (version <- minVersion to maxVersion) { + val jsonFilePath = FileNames.deltaFile(new Path(deltaLogPath), version).toString + DeltaSharingUtils.overrideIteratorBlock[String]( + getDeltaSharingLogBlockId(jsonFilePath), + versionToJsonLogBuilderMap.getOrElse(version, Seq.empty).toIterator + ) + fileSizeTsSeq += DeltaSharingLogFileStatus( + path = jsonFilePath, + size = versionToJsonLogSize.getOrElse(version, 0), + modificationTime = versionToTimestampMap.get(version).getOrElse(0L) + ) + } + + DeltaSharingUtils.overrideIteratorBlock[DeltaSharingLogFileStatus]( + getDeltaSharingLogBlockId(deltaLogPath), + fileSizeTsSeq.result().toIterator + ) + logInfo( + s"It takes ${(System.currentTimeMillis() - startTime) / 1000.0}s to construct delta log" + + s"for $customTablePath from $minVersion to $maxVersion, with ${idToUrl.toMap.size} urls." + ) + ConstructedDeltaLogMetadata( + idToUrl = idToUrl.toMap, + minUrlExpirationTimestamp = minUrlExpirationTimestamp, + numFileActionsInMinVersionOpt = Some(numFileActionsInMinVersion), + minVersion = minVersion, + maxVersion = maxVersion + ) + } + /** Set the modificationTime to zero, this is to align with the time returned from * DeltaSharingFileSystem.getFileStatus */ diff --git a/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingUtils.scala b/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingUtils.scala index bfd590ca00b..0d23577a3a7 100644 --- a/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingUtils.scala +++ b/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingUtils.scala @@ -162,6 +162,44 @@ object DeltaSharingUtils extends Logging { } } + /** + * Get the refresher function for a delta sharing table who calls client.getCDFFiles with the + * provided parameters. + * + * @return A refresher function used by the CachedTableManager to refresh urls. + */ + def getRefresherForGetCDFFiles( + client: DeltaSharingClient, + table: Table, + cdfOptions: Map[String, String]): RefresherFunction = { (_: Option[String]) => + { + val tableFiles = client.getCDFFiles( + table = table, + cdfOptions = cdfOptions, + includeHistoricalMetadata = true + ) + getTableRefreshResult(tableFiles) + } + } + + /** + * Get the refresher function for a delta sharing table who calls client.getFiles with the + * provided parameters. + * + * @return A refresher function used by the CachedTableManager to refresh urls. + */ + def getRefresherForGetFilesWithStartingVersion( + client: DeltaSharingClient, + table: Table, + startingVersion: Long, + endingVersion: Option[Long]): RefresherFunction = { (_: Option[String]) => + { + val tableFiles = client + .getFiles(table = table, startingVersion = startingVersion, endingVersion = endingVersion) + getTableRefreshResult(tableFiles) + } + } + def overrideSingleBlock[T: ClassTag](blockId: BlockId, value: T): Unit = { assert( blockId.name.startsWith(DELTA_SHARING_BLOCK_ID_PREFIX), @@ -205,6 +243,15 @@ object DeltaSharingUtils extends Logging { Hashing.sha256().hashString(fullQueryString, UTF_8).toString } + // Get a query hash id based on the query parameters: cdfOptions. + // The id concatenated with table name and used in local DeltaLoc and CachedTableManager. + // This is to uniquely identify the delta sharing table used twice in the same query but with + // different query parameters, so we can differentiate their delta log and entries in the + // CachedTableManager. + private[sharing] def getQueryParamsHashId(cdfOptions: Map[String, String]): String = { + Hashing.sha256().hashString(cdfOptions.toString, UTF_8).toString + } + // Concatenate table path with an id as a suffix, to uniquely identify a delta sharing table and // its corresponding delta log in a query. private[sharing] def getTablePathWithIdSuffix(customTablePath: String, id: String): String = { diff --git a/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingCDFUtilsSuite.scala b/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingCDFUtilsSuite.scala new file mode 100644 index 00000000000..61d5fc1f40b --- /dev/null +++ b/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingCDFUtilsSuite.scala @@ -0,0 +1,243 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.sharing.spark + +import java.io.File + +import org.apache.spark.sql.delta.test.DeltaSQLCommandTest +import io.delta.sharing.client.{ + DeltaSharingClient, + DeltaSharingProfileProvider, + DeltaSharingRestClient +} +import io.delta.sharing.client.model.{DeltaTableFiles, DeltaTableMetadata, Table} +import org.apache.commons.io.FileUtils +import org.apache.hadoop.fs.Path + +import org.apache.spark.{SparkConf, SparkEnv} +import org.apache.spark.delta.sharing.{PreSignedUrlCache, PreSignedUrlFetcher} +import org.apache.spark.sql.{QueryTest, SparkSession} +import org.apache.spark.sql.delta.sharing.DeltaSharingTestSparkUtils +import org.apache.spark.sql.test.{SharedSparkSession} + +private object CDFTesTUtils { + val paths = Seq("http://path1", "http://path2") + + val SparkConfForReturnExpTime = "spark.delta.sharing.fileindexsuite.returnexptime" + + // 10 seconds + val expirationTimeMs = 10000 + + def getExpirationTimestampStr(returnExpTime: Boolean): String = { + if (returnExpTime) { + s""""expirationTimestamp":${System.currentTimeMillis() + expirationTimeMs},""" + } else { + "" + } + } + + // scalastyle:off line.size.limit + val fileStr1Id = "11d9b72771a72f178a6f2839f7f08528" + val metaDataStr = + """{"metaData":{"size":809,"deltaMetadata":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c2\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["c2"],"configuration":{"delta.enableChangeDataFeed":"true"},"createdTime":1691734718560}}}""" + def getAddFileStr1(path: String, returnExpTime: Boolean = false): String = { + s"""{"file":{"id":"11d9b72771a72f178a6f2839f7f08528",${getExpirationTimestampStr( + returnExpTime + )}"deltaSingleAction":{"add":{"path":"${path}",""" + """"partitionValues":{"c2":"one"},"size":809,"modificationTime":1691734726073,"dataChange":true,"stats":"{\"numRecords\":2,\"minValues\":{\"c1\":1,\"c2\":\"one\"},\"maxValues\":{\"c1\":2,\"c2\":\"one\"},\"nullCount\":{\"c1\":0,\"c2\":0}}","tags":{"INSERTION_TIME":"1691734726073000","MIN_INSERTION_TIME":"1691734726073000","MAX_INSERTION_TIME":"1691734726073000","OPTIMIZE_TARGET_SIZE":"268435456"}}}}}""" + } + def getAddFileStr2(returnExpTime: Boolean = false): String = { + s"""{"file":{"id":"22d9b72771a72f178a6f2839f7f08529",${getExpirationTimestampStr( + returnExpTime + )}""" + """"deltaSingleAction":{"add":{"path":"http://path2","partitionValues":{"c2":"two"},"size":809,"modificationTime":1691734726073,"dataChange":true,"stats":"{\"numRecords\":2,\"minValues\":{\"c1\":1,\"c2\":\"two\"},\"maxValues\":{\"c1\":2,\"c2\":\"two\"},\"nullCount\":{\"c1\":0,\"c2\":0}}","tags":{"INSERTION_TIME":"1691734726073000","MIN_INSERTION_TIME":"1691734726073000","MAX_INSERTION_TIME":"1691734726073000","OPTIMIZE_TARGET_SIZE":"268435456"}}}}}""" + } + // scalastyle:on line.size.limit +} + +/** + * A mocked delta sharing client for unit tests. + */ +class TestDeltaSharingClientForCDFUtils( + profileProvider: DeltaSharingProfileProvider, + timeoutInSeconds: Int = 120, + numRetries: Int = 10, + maxRetryDuration: Long = Long.MaxValue, + sslTrustAll: Boolean = false, + forStreaming: Boolean = false, + responseFormat: String = DeltaSharingRestClient.RESPONSE_FORMAT_DELTA, + readerFeatures: String = "", + queryTablePaginationEnabled: Boolean = false, + maxFilesPerReq: Int = 100000) + extends DeltaSharingClient { + + import CDFTesTUtils._ + + private lazy val returnExpirationTimestamp = SparkSession.active.sessionState.conf + .getConfString( + SparkConfForReturnExpTime + ) + .toBoolean + + var numGetFileCalls: Int = -1 + + override def listAllTables(): Seq[Table] = throw new UnsupportedOperationException("not needed") + + override def getMetadata( + table: Table, + versionAsOf: Option[Long], + timestampAsOf: Option[String]): DeltaTableMetadata = { + throw new UnsupportedOperationException("getMetadata is not supported now.") + } + + override def getTableVersion(table: Table, startingTimestamp: Option[String] = None): Long = { + throw new UnsupportedOperationException("getTableVersion is not supported now.") + } + + override def getFiles( + table: Table, + predicates: Seq[String], + limit: Option[Long], + versionAsOf: Option[Long], + timestampAsOf: Option[String], + jsonPredicateHints: Option[String], + refreshToken: Option[String] + ): DeltaTableFiles = { + throw new UnsupportedOperationException("getFiles is not supported now.") + } + + override def getFiles( + table: Table, + startingVersion: Long, + endingVersion: Option[Long] + ): DeltaTableFiles = { + throw new UnsupportedOperationException(s"getFiles with startingVersion($startingVersion)") + } + + override def getCDFFiles( + table: Table, + cdfOptions: Map[String, String], + includeHistoricalMetadata: Boolean + ): DeltaTableFiles = { + numGetFileCalls += 1 + DeltaTableFiles( + version = 0, + lines = Seq[String]( + """{"protocol":{"deltaProtocol":{"minReaderVersion": 1, "minWriterVersion": 1}}}""", + metaDataStr, + getAddFileStr1(paths(numGetFileCalls.min(1)), returnExpirationTimestamp), + getAddFileStr2(returnExpirationTimestamp) + ), + respondedFormat = DeltaSharingRestClient.RESPONSE_FORMAT_DELTA + ) + } + + override def getForStreaming(): Boolean = forStreaming + + override def getProfileProvider: DeltaSharingProfileProvider = profileProvider +} + +class DeltaSharingCDFUtilsSuite + extends QueryTest + with DeltaSQLCommandTest + with SharedSparkSession + with DeltaSharingTestSparkUtils { + + import CDFTesTUtils._ + + private val shareName = "share" + private val schemaName = "default" + private val sharedTableName = "table" + + override protected def sparkConf: SparkConf = { + super.sparkConf + .set("spark.delta.sharing.preSignedUrl.expirationMs", expirationTimeMs.toString) + .set("spark.delta.sharing.driver.refreshCheckIntervalMs", "1000") + .set("spark.delta.sharing.driver.refreshThresholdMs", "2000") + .set("spark.delta.sharing.driver.accessThresholdToExpireMs", "60000") + } + + test("refresh works") { + PreSignedUrlCache.registerIfNeeded(SparkEnv.get) + + withTempDir { tempDir => + val profileFile = new File(tempDir, "foo.share") + FileUtils.writeStringToFile( + profileFile, + s"""{ + | "shareCredentialsVersion": 1, + | "endpoint": "https://localhost:12345/not-used-endpoint", + | "bearerToken": "mock" + |}""".stripMargin, + "utf-8" + ) + + def test(): Unit = { + val profilePath = profileFile.getCanonicalPath + val tablePath = new Path(s"$profilePath#$shareName.$schemaName.$sharedTableName") + val client = DeltaSharingRestClient(profilePath, false, "delta") + val dsTable = Table(share = shareName, schema = schemaName, name = sharedTableName) + + val options = new DeltaSharingOptions(Map("path" -> tablePath.toString)) + DeltaSharingCDFUtils.prepareCDFRelation( + SparkSession.active.sqlContext, + options, + dsTable, + client + ) + + val preSignedUrlCacheRef = PreSignedUrlCache.getEndpointRefInExecutor(SparkEnv.get) + val path = options.options.getOrElse( + "path", + throw DeltaSharingErrors.pathNotSpecifiedException + ) + val fetcher = new PreSignedUrlFetcher( + preSignedUrlCacheRef, + DeltaSharingUtils.getTablePathWithIdSuffix( + path, + DeltaSharingUtils.getQueryParamsHashId(options.cdfOptions) + ), + fileStr1Id, + 1000 + ) + // sleep for expirationTimeMs to ensure that the urls are refreshed. + Thread.sleep(expirationTimeMs) + + // Verify that the url is refreshed as paths(1), not paths(0) anymore. + assert(fetcher.getUrl == paths(1)) + } + + withSQLConf( + "spark.delta.sharing.client.class" -> classOf[TestDeltaSharingClientForCDFUtils].getName, + "fs.delta-sharing-log.impl" -> classOf[DeltaSharingLogFileSystem].getName, + "spark.delta.sharing.profile.provider.class" -> + "io.delta.sharing.client.DeltaSharingFileProfileProvider", + SparkConfForReturnExpTime -> "true" + ) { + test() + } + + withSQLConf( + "spark.delta.sharing.client.class" -> classOf[TestDeltaSharingClientForCDFUtils].getName, + "fs.delta-sharing-log.impl" -> classOf[DeltaSharingLogFileSystem].getName, + "spark.delta.sharing.profile.provider.class" -> + "io.delta.sharing.client.DeltaSharingFileProfileProvider", + SparkConfForReturnExpTime -> "false" + ) { + test() + } + } + } +} diff --git a/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceDeltaSuite.scala b/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceDeltaSuite.scala index 3334c76ade5..13d0fca3fcf 100644 --- a/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceDeltaSuite.scala +++ b/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceDeltaSuite.scala @@ -559,6 +559,293 @@ trait DeltaSharingDataSourceDeltaSuiteBase } } } + + test("DeltaSharingDataSource able to read empty data") { + withTempDir { tempDir => + val deltaTableName = "delta_table_empty" + withTable(deltaTableName) { + createSimpleTable(deltaTableName, enableCdf = true) + sql(s"""INSERT INTO $deltaTableName VALUES (1, "first"), (2, "first")""") + sql(s"""INSERT INTO $deltaTableName VALUES (1, "second"), (2, "second")""") + sql(s"DELETE FROM $deltaTableName WHERE c1 <= 2") + // This command is just to create an empty table version at version 4. + spark.sql(s"ALTER TABLE $deltaTableName SET TBLPROPERTIES('delta.minReaderVersion' = 1)") + + val sharedTableName = "shared_table_empty" + prepareMockedClientAndFileSystemResult(deltaTableName, sharedTableName) + prepareMockedClientGetTableVersion(deltaTableName, sharedTableName) + + def testEmpty(tablePath: String): Unit = { + // linzhou + val deltaDf = spark.read.format("delta").table(deltaTableName) + val sharingDf = + spark.read.format("deltaSharing").option("responseFormat", "delta").load(tablePath) + checkAnswer(deltaDf, sharingDf) + assert(sharingDf.count() == 0) + + val deltaCdfDf = spark.read + .format("delta") + .option("readChangeFeed", "true") + .option("startingVersion", 4) + .table(deltaTableName) + val sharingCdfDf = spark.read + .format("deltaSharing") + .option("responseFormat", "delta") + .option("readChangeFeed", "true") + .option("startingVersion", 4) + .load(tablePath) + checkAnswer(deltaCdfDf, sharingCdfDf) + assert(sharingCdfDf.count() == 0) + } + + // There's only metadata change but not actual files in version 4. + prepareMockedClientAndFileSystemResultForCdf( + deltaTableName, + sharedTableName, + startingVersion = 4 + ) + + withSQLConf(getDeltaSharingClassesSQLConf.toSeq: _*) { + val profileFile = prepareProfileFile(tempDir) + testEmpty(s"${profileFile.getCanonicalPath}#share1.default.$sharedTableName") + } + } + } + } + + /** + * cdf queries + */ + test("DeltaSharingDataSource able to read data for simple cdf query") { + withTempDir { tempDir => + val deltaTableName = "delta_table_cdf" + withTable(deltaTableName) { + sql(s""" + |CREATE TABLE $deltaTableName (c1 INT, c2 STRING) USING DELTA PARTITIONED BY (c2) + |TBLPROPERTIES (delta.enableChangeDataFeed = true) + |""".stripMargin) + // 2 inserts in version 1, 1 with c1=2 + sql(s"""INSERT INTO $deltaTableName VALUES (1, "one"), (2, "two")""") + // 1 insert in version 2, 0 with c1=2 + sql(s"""INSERT INTO $deltaTableName VALUES (3, "two")""") + // 0 operations in version 3 + sql(s"""OPTIMIZE $deltaTableName""") + // 2 updates in version 4, 2 with c1=2 + sql(s"""UPDATE $deltaTableName SET c2="new two" where c1=2""") + // 1 delete in version 5, 1 with c1=2 + sql(s"""DELETE FROM $deltaTableName WHERE c1 = 2""") + + val sharedTableName = "shard_table_cdf" + prepareMockedClientGetTableVersion(deltaTableName, sharedTableName) + + Seq(0, 1, 2, 3, 4, 5).foreach { startingVersion => + val ts = getTimeStampForVersion(deltaTableName, startingVersion) + val startingTimestamp = DateTimeUtils.toJavaTimestamp(ts * 1000).toInstant.toString + prepareMockedClientAndFileSystemResultForCdf( + deltaTableName, + sharedTableName, + startingVersion, + Some(startingTimestamp) + ) + + def test(tablePath: String): Unit = { + val expectedSchema: StructType = new StructType() + .add("c1", IntegerType) + .add("c2", StringType) + .add("_change_type", StringType) + .add("_commit_version", LongType) + .add("_commit_timestamp", TimestampType) + val schema = spark.read + .format("deltaSharing") + .option("responseFormat", "delta") + .option("readChangeFeed", "true") + .option("startingVersion", startingVersion) + .load(tablePath) + .schema + assert(expectedSchema == schema) + + val expected = spark.read + .format("delta") + .option("readChangeFeed", "true") + .option("startingVersion", startingVersion) + .table(deltaTableName) + val df = spark.read + .format("deltaSharing") + .option("responseFormat", "delta") + .option("readChangeFeed", "true") + .option("startingVersion", startingVersion) + .load(tablePath) + checkAnswer(df, expected) + assert(df.count() > 0) + } + + def testFiltersAndSelect(tablePath: String): Unit = { + val expectedSchema: StructType = new StructType() + .add("c2", StringType) + .add("_change_type", StringType) + .add("_commit_version", LongType) + val schema = spark.read + .format("deltaSharing") + .option("responseFormat", "delta") + .option("readChangeFeed", "true") + .option("startingVersion", startingVersion) + .load(tablePath) + .select("c2", "_change_type", "_commit_version") + .schema + assert(expectedSchema == schema) + + val expected = spark.read + .format("delta") + .option("readChangeFeed", "true") + .option("startingVersion", startingVersion) + .table(deltaTableName) + .select("c2", "_change_type", "_commit_version") + val dfVersion = spark.read + .format("deltaSharing") + .option("responseFormat", "delta") + .option("readChangeFeed", "true") + .option("startingVersion", startingVersion) + .load(tablePath) + .select("c2", "_change_type", "_commit_version") + checkAnswer(dfVersion, expected) + val dfTime = spark.read + .format("deltaSharing") + .option("responseFormat", "delta") + .option("readChangeFeed", "true") + .option("startingTimestamp", startingTimestamp) + .load(tablePath) + .select("c2", "_change_type", "_commit_version") + checkAnswer(dfTime, expected) + assert(dfTime.count() > 0) + + val expectedFiltered = spark.read + .format("delta") + .option("readChangeFeed", "true") + .option("startingVersion", startingVersion) + .table(deltaTableName) + .select("c2", "_change_type", "_commit_version") + .filter(col("c1") === 2) + val dfFiltered = spark.read + .format("deltaSharing") + .option("responseFormat", "delta") + .option("readChangeFeed", "true") + .option("startingVersion", startingVersion) + .load(tablePath) + .select("c2", "_change_type", "_commit_version") + .filter(col("c1") === 2) + checkAnswer(dfFiltered, expectedFiltered) + assert(dfFiltered.count() > 0) + } + + withSQLConf(getDeltaSharingClassesSQLConf.toSeq: _*) { + val profileFile = prepareProfileFile(tempDir) + test(profileFile.getCanonicalPath + s"#share1.default.$sharedTableName") + testFiltersAndSelect( + profileFile.getCanonicalPath + s"#share1.default.$sharedTableName" + ) + } + } + + // test join on the same table in cdf query + def testJoin(tablePath: String): Unit = { + val deltaV0 = spark.read + .format("delta") + .option("readChangeFeed", "true") + .option("startingVersion", 0) + .table(deltaTableName) + val deltaV3 = spark.read + .format("delta") + .option("readChangeFeed", "true") + .option("startingVersion", 3) + .table(deltaTableName) + val sharingV0 = spark.read + .format("deltaSharing") + .option("responseFormat", "delta") + .option("readChangeFeed", "true") + .option("startingVersion", 0) + .load(tablePath) + val sharingV3 = spark.read + .format("deltaSharing") + .option("responseFormat", "delta") + .option("readChangeFeed", "true") + .option("startingVersion", 3) + .load(tablePath) + + def testJoinedDf( + deltaLeft: DataFrame, + deltaRight: DataFrame, + sharingLeft: DataFrame, + sharingRight: DataFrame, + expectedSize: Int): Unit = { + val deltaJoined = deltaLeft.join(deltaRight, usingColumns = Seq("c1", "c2")) + val sharingJoined = sharingLeft.join(sharingRight, usingColumns = Seq("c1", "c2")) + checkAnswer(deltaJoined, sharingJoined) + assert(sharingJoined.count() > 0) + } + testJoinedDf(deltaV0, deltaV0, sharingV0, sharingV0, 10) + testJoinedDf(deltaV3, deltaV3, sharingV3, sharingV3, 5) + testJoinedDf(deltaV0, deltaV3, sharingV0, sharingV3, 6) + } + + withSQLConf(getDeltaSharingClassesSQLConf.toSeq: _*) { + val profileFile = prepareProfileFile(tempDir) + testJoin(profileFile.getCanonicalPath + s"#share1.default.$sharedTableName") + } + } + } + } + + test("DeltaSharingDataSource able to read data for cdf query with more entries") { + withTempDir { tempDir => + val deltaTableName = "delta_table_cdf_more" + withTable(deltaTableName) { + createSimpleTable(deltaTableName, enableCdf = true) + // The table operations take about 20~30 seconds. + for (i <- 0 to 9) { + val iteration = s"iteration $i" + val valuesBuilder = Seq.newBuilder[String] + for (j <- 0 to 49) { + valuesBuilder += s"""(${i * 10 + j}, "$iteration")""" + } + sql(s"INSERT INTO $deltaTableName VALUES ${valuesBuilder.result().mkString(",")}") + sql(s"""UPDATE $deltaTableName SET c1 = c1 + 100 where c2 = "${iteration}"""") + sql(s"""DELETE FROM $deltaTableName where c2 = "${iteration}"""") + } + + val sharedTableName = "shard_table_cdf_more" + prepareMockedClientGetTableVersion(deltaTableName, sharedTableName) + Seq(0, 10, 20, 30).foreach { startingVersion => + prepareMockedClientAndFileSystemResultForCdf( + deltaTableName, + sharedTableName, + startingVersion + ) + + val expected = spark.read + .format("delta") + .option("readChangeFeed", "true") + .option("startingVersion", startingVersion) + .table(deltaTableName) + + def test(tablePath: String): Unit = { + val df = spark.read + .format("deltaSharing") + .option("responseFormat", "delta") + .option("readChangeFeed", "true") + .option("startingVersion", startingVersion) + .load(tablePath) + checkAnswer(df, expected) + assert(df.count() > 0) + } + + withSQLConf(getDeltaSharingClassesSQLConf.toSeq: _*) { + val profileFile = prepareProfileFile(tempDir) + test(profileFile.getCanonicalPath + s"#share1.default.$sharedTableName") + } + } + } + } + } } class DeltaSharingDataSourceDeltaSuite extends DeltaSharingDataSourceDeltaSuiteBase {} diff --git a/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceDeltaTestUtils.scala b/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceDeltaTestUtils.scala index 2aaab54e91b..601a6297ac6 100644 --- a/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceDeltaTestUtils.scala +++ b/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceDeltaTestUtils.scala @@ -525,6 +525,123 @@ trait DeltaSharingDataSourceDeltaTestUtils extends SharedSparkSession { ) } + private[spark] def prepareMockedClientAndFileSystemResultForCdf( + deltaTable: String, + sharedTable: String, + startingVersion: Long, + startingTimestamp: Option[String] = None, + inlineDvFormat: Option[RoaringBitmapArrayFormat.Value] = None, + assertMultipleDvsInOneFile: Boolean = false): Unit = { + val actionLines = Seq.newBuilder[String] + + var maxVersion = -1L + var totalSize = 0L + + val deltaLog = DeltaLog.forTable(spark, new TableIdentifier(deltaTable)) + val startingSnapshot = deltaLog.getSnapshotAt(startingVersion) + actionLines += DeltaSharingProtocol(deltaProtocol = startingSnapshot.protocol).json + actionLines += DeltaSharingMetadata( + deltaMetadata = startingSnapshot.metadata, + version = startingVersion + ).json + + val dvPathToCount = scala.collection.mutable.Map[String, Int]() + val files = + FileUtils.listFiles(new File(deltaLog.logPath.toUri()), null, true).asScala + files.foreach { f => + if (FileNames.isDeltaFile(new Path(f.getName))) { + val version = FileNames.getFileVersion(new Path(f.getName)) + if (version >= startingVersion) { + // protocol/metadata are processed from startingSnapshot, only process versions greater + // than startingVersion for real actions and possible metadata changes. + maxVersion = maxVersion.max(version) + val timestamp = f.lastModified + FileUtils.readLines(f).asScala.foreach { l => + val action = Action.fromJson(l) + action match { + case m: Metadata => + actionLines += DeltaSharingMetadata( + deltaMetadata = m, + version = version + ).json + case addFile: AddFile if addFile.dataChange => + if (assertMultipleDvsInOneFile) { + updateDvPathToCount(addFile, dvPathToCount) + } + val updatedAdd = if (inlineDvFormat.isDefined) { + // Remove row 0 and 1 in the AddFile. + updateAddFileWithInlineDV(addFile, inlineDvFormat.get, RoaringBitmapArray(0L, 1L)) + } else { + addFile + } + val dsAddFile = + getDeltaSharingFileActionForAddFile(updatedAdd, sharedTable, version, timestamp) + totalSize = totalSize + updatedAdd.size + actionLines += dsAddFile.json + case removeFile: RemoveFile if removeFile.dataChange => + // scalastyle:off removeFile + val dsRemoveFile = getDeltaSharingFileActionForRemoveFile( + removeFile, + sharedTable, + version, + timestamp + ) + // scalastyle:on removeFile + totalSize = totalSize + removeFile.size.getOrElse(0L) + actionLines += dsRemoveFile.json + case cdcFile: AddCDCFile => + val parquetFile = removePartitionPrefix(cdcFile.path) + + // Convert from delta AddCDCFile to DeltaSharingFileAction to serialize to json. + val dsCDCFile = DeltaSharingFileAction( + id = Hashing.sha256().hashString(parquetFile, UTF_8).toString, + version = version, + timestamp = timestamp, + deltaSingleAction = cdcFile + .copy( + path = TestDeltaSharingFileSystem.encode(sharedTable, parquetFile) + ) + .wrap + ) + totalSize = totalSize + cdcFile.size + actionLines += dsCDCFile.json + case _ => // ignore other lines + } + } + } + } + } + val dataFiles = + FileUtils.listFiles(new File(deltaLog.dataPath.toUri()), null, true).asScala + dataFiles.foreach { f => + val filePath = f.getCanonicalPath + if (isDataFile(filePath)) { + DeltaSharingUtils.overrideIteratorBlock[Byte]( + blockId = TestDeltaSharingFileSystem.getBlockId(sharedTable, f.getName), + values = FileUtils.readFileToByteArray(f).toIterator + ) + } + } + + if (assertMultipleDvsInOneFile) { + assert(dvPathToCount.max._2 > 1) + } + + DeltaSharingUtils.overrideIteratorBlock[String]( + blockId = + TestClientForDeltaFormatSharing.getBlockId(sharedTable, s"getCDFFiles_$startingVersion"), + values = actionLines.result().toIterator + ) + if (startingTimestamp.isDefined) { + DeltaSharingUtils.overrideIteratorBlock[String]( + blockId = TestClientForDeltaFormatSharing.getBlockId( + sharedTable, + s"getCDFFiles_${startingTimestamp.get}" + ), + values = actionLines.result().toIterator + ) + } + } protected def getDeltaSharingClassesSQLConf: Map[String, String] = { Map(