From f2150c51549b48bd3d07c6f1b73d827bb3db16f2 Mon Sep 17 00:00:00 2001 From: Dhruv Arya Date: Sun, 19 May 2024 23:53:48 -0700 Subject: [PATCH] add dynamodb commit owner --- build.sbt | 2 + .../DynamoDBCommitOwnerClient.java | 681 ++++++++++++++++++ .../DynamoDBCommitOwnerClientBuilder.java | 90 +++ .../DynamoDBTableEntryConstants.java | 49 ++ .../ManagedCommitUtils.java | 77 ++ .../managedcommit/CommitOwnerClient.scala | 5 +- .../CommitOwnerClientImplSuiteBase.scala | 64 +- .../DynamoDBCommitOwnerClientSuite.scala | 241 +++++++ .../InMemoryCommitOwnerSuite.scala | 12 +- 9 files changed, 1177 insertions(+), 44 deletions(-) create mode 100644 spark/src/main/java/io/delta/dynamodbcommitstore/DynamoDBCommitOwnerClient.java create mode 100644 spark/src/main/java/io/delta/dynamodbcommitstore/DynamoDBCommitOwnerClientBuilder.java create mode 100644 spark/src/main/java/io/delta/dynamodbcommitstore/DynamoDBTableEntryConstants.java create mode 100644 spark/src/main/java/io/delta/dynamodbcommitstore/ManagedCommitUtils.java create mode 100644 spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/DynamoDBCommitOwnerClientSuite.scala diff --git a/build.sbt b/build.sbt index 98c85d1bcf4..7f906b2fc0b 100644 --- a/build.sbt +++ b/build.sbt @@ -202,6 +202,8 @@ lazy val spark = (project in file("spark")) "org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided", "org.apache.spark" %% "spark-core" % sparkVersion.value % "provided", "org.apache.spark" %% "spark-catalyst" % sparkVersion.value % "provided", + // For DynamoDBCommitStore + "com.amazonaws" % "aws-java-sdk" % "1.12.262" % "provided", // Test deps "org.scalatest" %% "scalatest" % scalaTestVersion % "test", diff --git a/spark/src/main/java/io/delta/dynamodbcommitstore/DynamoDBCommitOwnerClient.java b/spark/src/main/java/io/delta/dynamodbcommitstore/DynamoDBCommitOwnerClient.java new file mode 100644 index 00000000000..4f72be9afcb --- /dev/null +++ b/spark/src/main/java/io/delta/dynamodbcommitstore/DynamoDBCommitOwnerClient.java @@ -0,0 +1,681 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.dynamodbcommitstore; + +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.model.*; +import org.apache.spark.sql.delta.managedcommit.*; +import org.apache.spark.sql.delta.storage.LogStore; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import scala.Option; +import scala.Tuple2; +import scala.collection.Iterator; +import scala.collection.immutable.Map; +import scala.collection.JavaConverters; +import scala.collection.immutable.Map$; + +import java.io.*; +import java.nio.file.FileAlreadyExistsException; +import java.util.ArrayList; +import java.util.HashMap; + +// TODO(dynamodb): +// 1. figure out whether we need retry logic like DeltaCommitClient +// ~~2. data schema versioning~~ +// 3. logging +// 4. error handling +// 5. local tests +// 6. passing hadoop conf to the credential provider + +/** + * A commit owner client that uses DynamoDB as the commit owner. The table schema is as follows: + * tableId: String --- The unique identifier for the table. This is a UUID. + * path: String --- The fully qualified path of the table in the file system. e.g. s3://bucket/path. + * acceptingCommits: Boolean --- Whether the commit owner is accepting new commits. This will only + * be set to false when the table is converted from managed commits to file system commits. + * tableVersion: Number --- The version of the latest commit. + * tableTimestamp: Number --- The inCommitTimestamp of the latest commit. + * schemaVersion: Number --- The version of the schema used to store the data. + * commits: --- The list of unbackfilled commits. + * version: Number --- The version of the commit. + * inCommitTimestamp: Number --- The inCommitTimestamp of the commit. + * fsName: String --- The name of the unbackfilled file. + * fsLength: Number --- The length of the unbackfilled file. + * fsTimestamp: Number --- The modification time of the unbackfilled file. + */ +public class DynamoDBCommitOwnerClient implements CommitOwnerClient { + + /** + * The name of the DynamoDB table used to store unbackfilled commits. + */ + final String managedCommitsTableName; + + /** + * The DynamoDB client used to interact with the DynamoDB table. + */ + final AmazonDynamoDB client; + + /** + * The endpoint of the DynamoDB table. + */ + final String endpoint; + + /** + * The number of write capacity units to provision for the DynamoDB table if the + * client ends up creating a new one. + */ + final long writeCapacityUnits; + + /** + * The number of read capacity units to provision for the DynamoDB table if the + * client ends up creating a new one. + */ + final long readCapacityUnits; + + /** + * The number of commits to batch backfill at once. A backfill is performed + * whenever commitVersion % batchSize == 0. + */ + public final long backfillBatchSize; + + /** + * Whether we should skip matching the current table path against the one stored in DynamoDB + * when interacting with it. + */ + final boolean skipPathCheck; + + /** + * The key used to store the tableId in the managed commit table configuration. + */ + final static String TABLE_CONF_TABLE_ID_KEY = "tableId"; + + /** + * The version of the client. This is used to ensure that the client is compatible with the + * schema of the data stored in the DynamoDB table. A client should only be able to + * access a table if the schema version of the table matches the client version. + */ + final int CLIENT_VERSION = 1; + + + public DynamoDBCommitOwnerClient( + String managedCommitsTableName, + String endpoint, + AmazonDynamoDB client, + long backfillBatchSize) throws IOException { + this( + managedCommitsTableName, + endpoint, + client, + backfillBatchSize, + 5 /* readCapacityUnits */, + 5 /* writeCapacityUnits */, + false /* skipPathCheck */); + } + + public DynamoDBCommitOwnerClient( + String managedCommitsTableName, + String endpoint, + AmazonDynamoDB client, + long backfillBatchSize, + long readCapacityUnits, + long writeCapacityUnits, + boolean skipPathCheck) throws IOException { + this.managedCommitsTableName = managedCommitsTableName; + this.endpoint = endpoint; + this.client = client; + this.backfillBatchSize = backfillBatchSize; + this.readCapacityUnits = readCapacityUnits; + this.writeCapacityUnits = writeCapacityUnits; + this.skipPathCheck = skipPathCheck; + tryEnsureTableExists(); + } + + private String getTableId(Map managedCommitTableConf) { + return managedCommitTableConf.get(TABLE_CONF_TABLE_ID_KEY).getOrElse(() -> { + throw new RuntimeException("tableId not found"); + }); + } + + /** + * Fetches the entry from the commit owner for the given table. Only the attributes defined + * in attributesToGet will be fetched. + */ + private GetItemResult getEntryFromCommitOwner( + Map managedCommitTableConf, String... attributesToGet) { + GetItemRequest request = new GetItemRequest() + .withTableName(managedCommitsTableName) + .addKeyEntry( + DynamoDBTableEntryConstants.TABLE_ID, + new AttributeValue().withS(getTableId(managedCommitTableConf))) + .withAttributesToGet(attributesToGet); + return client.getItem(request); + } + + /** + * Commits the given file to the commit owner. + * A conditional write is performed to the DynamoDB table entry associated with this Delta + * table. + * If the conditional write goes through, the filestatus of the UUID delta file will be + * appended to the list of unbackfilled commits, and other updates like setting the latest + * table version to `attemptVersion` will be performed. + * + * For the conditional write to go through, the following conditions must be met right before + * the write is performed: + * 1. The latest table version in DynamoDB is equal to attemptVersion - 1. + * 2. The commit owner is accepting new commits. + * 3. The schema version of the commit owner matches the schema version of the client. + * 4. The table path stored in DynamoDB matches the path of the table. This check is skipped + * if `skipPathCheck` is set to true. + * If the conditional write fails, we retrieve the current entry in DynamoDB to figure out + * which condition failed. (DynamoDB does not tell us which condition failed in the rejection.) + * If any of (2), (3), or (4) fail, an unretryable `CommitFailedException` will be thrown. + * For (1): + * If the retrieved latest table version is greater than or equal to attemptVersion, a retryable + * `CommitFailedException` will be thrown. + * If the retrieved latest table version is less than attemptVersion - 1, an unretryable + * `CommitFailedException` will be thrown. + */ + protected CommitResponse commitToOwner( + Path logPath, + Map managedCommitTableConf, + long attemptVersion, + FileStatus commitFile, + long inCommitTimestamp, + boolean isMCtoFSConversion) throws CommitFailedException { + // Add conditions for the conditional update. + java.util.Map expectedValuesBeforeUpdate = new HashMap<>(); + expectedValuesBeforeUpdate.put( + DynamoDBTableEntryConstants.TABLE_LATEST_VERSION, + new ExpectedAttributeValue() + .withValue(new AttributeValue().withN(Long.toString(attemptVersion - 1))) + ); + expectedValuesBeforeUpdate.put( + DynamoDBTableEntryConstants.ACCEPTING_COMMITS, + new ExpectedAttributeValue() + .withValue(new AttributeValue().withBOOL(true))); + if (!skipPathCheck) { + expectedValuesBeforeUpdate.put( + DynamoDBTableEntryConstants.TABLE_PATH, + new ExpectedAttributeValue() + .withValue(new AttributeValue().withS(logPath.getParent().toString()))); + } + expectedValuesBeforeUpdate.put( + DynamoDBTableEntryConstants.SCHEMA_VERSION, + new ExpectedAttributeValue() + .withValue(new AttributeValue().withN(Integer.toString(CLIENT_VERSION)))); + + java.util.Map newCommit = new HashMap<>(); + newCommit.put( + DynamoDBTableEntryConstants.COMMIT_VERSION, + new AttributeValue().withN(Long.toString(attemptVersion))); + newCommit.put( + DynamoDBTableEntryConstants.COMMIT_TIMESTAMP, + new AttributeValue().withN(Long.toString(inCommitTimestamp))); + newCommit.put( + DynamoDBTableEntryConstants.COMMIT_FILE_NAME, + new AttributeValue().withS(commitFile.getPath().getName())); + newCommit.put( + DynamoDBTableEntryConstants.COMMIT_FILE_LENGTH, + new AttributeValue().withN(Long.toString(commitFile.getLen()))); + newCommit.put( + DynamoDBTableEntryConstants.COMMIT_FILE_MODIFICATION_TIMESTAMP, + new AttributeValue().withN(Long.toString(commitFile.getModificationTime()))); + + UpdateItemRequest request = new UpdateItemRequest() + .withTableName(managedCommitsTableName) + .addKeyEntry( + DynamoDBTableEntryConstants.TABLE_ID, + new AttributeValue().withS(getTableId(managedCommitTableConf))) + .addAttributeUpdatesEntry( + DynamoDBTableEntryConstants.TABLE_LATEST_VERSION, new AttributeValueUpdate() + .withValue(new AttributeValue().withN(Long.toString(attemptVersion))) + .withAction(AttributeAction.PUT)) + .addAttributeUpdatesEntry( + DynamoDBTableEntryConstants.TABLE_LATEST_TIMESTAMP, new AttributeValueUpdate() + .withValue(new AttributeValue().withN(Long.toString(inCommitTimestamp))) + .withAction(AttributeAction.PUT)) + .addAttributeUpdatesEntry( + DynamoDBTableEntryConstants.COMMITS, + new AttributeValueUpdate() + .withAction(AttributeAction.ADD) + .withValue(new AttributeValue().withL( + new AttributeValue().withM(newCommit) + ) + ) + ) + .withExpected(expectedValuesBeforeUpdate); + + if (isMCtoFSConversion) { + // If this table is being converted from managed commits to file system commits, we need + // to set acceptingCommits to false. + request = request + .addAttributeUpdatesEntry( + DynamoDBTableEntryConstants.ACCEPTING_COMMITS, + new AttributeValueUpdate() + .withValue(new AttributeValue().withBOOL(false)) + .withAction(AttributeAction.PUT) + ); + } + + try { + client.updateItem(request); + } catch (ConditionalCheckFailedException e) { + // Conditional check failed. The exception will not indicate which condition failed. + // We need to check the conditions ourselves by fetching the item and checking the + // values. + GetItemResult latestEntry = getEntryFromCommitOwner( + managedCommitTableConf, + DynamoDBTableEntryConstants.TABLE_LATEST_VERSION, + DynamoDBTableEntryConstants.ACCEPTING_COMMITS, + DynamoDBTableEntryConstants.TABLE_PATH, + DynamoDBTableEntryConstants.SCHEMA_VERSION); + + int schemaVersion = Integer.parseInt( + latestEntry.getItem().get(DynamoDBTableEntryConstants.SCHEMA_VERSION).getN()); + if (schemaVersion != CLIENT_VERSION) { + throw new CommitFailedException( + false /* retryable */, + false /* conflict */, + "The schema version of the commit owner does not match the current" + + "DynamoDBCommitOwnerClient version. The data schema version is " + + " " + schemaVersion + " while the client version is " + + CLIENT_VERSION + ". Make sure that the correct client is being " + + "used to access this table." ); + } + long latestTableVersion = Long.parseLong( + latestEntry.getItem().get(DynamoDBTableEntryConstants.TABLE_LATEST_VERSION).getN()); + if (!skipPathCheck && + !latestEntry.getItem().get("path").getS().equals(logPath.getParent().toString())) { + throw new CommitFailedException( + false /* retryable */, + false /* conflict */, + "This commit was attempted from path " + logPath.getParent() + + " while the table is registered at " + + latestEntry.getItem().get("path").getS() + "."); + } + if (!latestEntry.getItem().get(DynamoDBTableEntryConstants.ACCEPTING_COMMITS).getBOOL()) { + throw new CommitFailedException( + false /* retryable */, + false /* conflict */, + "The commit owner is not accepting any new commits for this table."); + } + if (latestTableVersion != attemptVersion - 1) { + // The commit is only retryable if the conflict is due to someone else committing + // a version greater than the expected version. + boolean retryable = latestTableVersion > attemptVersion - 1; + throw new CommitFailedException( + retryable /* retryable */, + retryable /* conflict */, + "Commit version " + attemptVersion + " is not valid. Expected version: " + + (latestTableVersion + 1) + "."); + } + } + Commit resultantCommit = new Commit(attemptVersion, commitFile, inCommitTimestamp); + return new CommitResponse(resultantCommit); + } + + @Override + public CommitResponse commit( + LogStore logStore, + Configuration hadoopConf, + Path logPath, + Map managedCommitTableConf, + long commitVersion, + Iterator actions, + UpdatedActions updatedActions) { + if (commitVersion == 0) { + throw new CommitFailedException( + false /* retryable */, + false /* conflict */, + "Commit version 0 must go via filesystem."); + } + try { + FileSystem fs = logPath.getFileSystem(hadoopConf); + Path commitPath = + ManagedCommitUtils.generateUnbackfilledDeltaFilePath(logPath, commitVersion); + logStore.write(commitPath, actions, true /* overwrite */, hadoopConf); + FileStatus commitFileStatus = fs.getFileStatus(commitPath); + long inCommitTimestamp = updatedActions.getCommitInfo().getCommitTimestamp(); + boolean isMCtoFSConversion = + ManagedCommitUtils.isManagedCommitToFSConversion(commitVersion, updatedActions); + CommitResponse res = commitToOwner( + logPath, + managedCommitTableConf, + commitVersion, + commitFileStatus, + inCommitTimestamp, + isMCtoFSConversion); + + boolean shouldBackfillOnEveryCommit = backfillBatchSize <= 1; + boolean isBatchBackfillDue = commitVersion % backfillBatchSize == 0; + boolean shouldBackfill = shouldBackfillOnEveryCommit || isBatchBackfillDue || + // Always attempt a backfill for managed commit to filesystem conversion. + // Even if this fails, the next reader will attempt to backfill. + isMCtoFSConversion; + if (shouldBackfill) { + backfillToVersion( + logStore, + hadoopConf, + logPath, + managedCommitTableConf, + commitVersion, + Option.empty()); + } + return res; + } catch (IOException e) { + throw new CommitFailedException(false /* retryable */, false /* conflict */, e.getMessage()); + } + } + + private GetCommitsResponse getCommitsImpl( + Path logPath, + Map tableConf, + Option startVersion, + Option endVersion) throws IOException { + GetItemResult latestEntry = getEntryFromCommitOwner( + tableConf, + DynamoDBTableEntryConstants.COMMITS, + DynamoDBTableEntryConstants.TABLE_LATEST_VERSION); + + java.util.Map item = latestEntry.getItem(); + long currentVersion = + Long.parseLong(item.get(DynamoDBTableEntryConstants.TABLE_LATEST_VERSION).getN()); + AttributeValue allStoredCommits = item.get(DynamoDBTableEntryConstants.COMMITS); + ArrayList commits = new ArrayList<>(); + Path unbackfilledCommitsPath = new Path(logPath, ManagedCommitUtils.COMMIT_SUBDIR); + for(AttributeValue attr: allStoredCommits.getL()) { + java.util.Map commitMap = attr.getM(); + long commitVersion = + Long.parseLong(commitMap.get(DynamoDBTableEntryConstants.COMMIT_VERSION).getN()); + boolean commitInRange = startVersion.forall((start) -> commitVersion >= (long) start) && + endVersion.forall((end) -> (long) end >= commitVersion); + if (commitInRange) { + Path filePath = new Path( + unbackfilledCommitsPath, + commitMap.get(DynamoDBTableEntryConstants.COMMIT_FILE_NAME).getS()); + long length = + Long.parseLong(commitMap.get(DynamoDBTableEntryConstants.COMMIT_FILE_LENGTH).getN()); + long modificationTime = Long.parseLong( + commitMap.get(DynamoDBTableEntryConstants.COMMIT_FILE_MODIFICATION_TIMESTAMP).getN()); + FileStatus fileStatus = new FileStatus( + length, + false /* isDir */, + 0 /* blockReplication */, + 0 /* blockSize */, + modificationTime, + filePath); + long inCommitTimestamp = + Long.parseLong(commitMap.get(DynamoDBTableEntryConstants.COMMIT_TIMESTAMP).getN()); + commits.add(new Commit(commitVersion, fileStatus, inCommitTimestamp)); + } + } + return new GetCommitsResponse( + JavaConverters.asScalaIterator(commits.iterator()).toSeq(), currentVersion); + } + + @Override + public GetCommitsResponse getCommits( + Path logPath, + Map managedCommitTableConf, + Option startVersion, + Option endVersion) { + try { + return getCommitsImpl(logPath, managedCommitTableConf, startVersion, endVersion); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + /** + * Writes the given actions to a file. + * logStore.write(overwrite=false) will throw a FileAlreadyExistsException if the file already + * exists. However, the scala LogStore interface does not declare this as part of the function + * signature. This method wraps the write method and declares the exception to ensure that the + * caller is aware of the exception. + */ + private void writeActionsToFile( + LogStore logStore, + Path logPath, + long version, + scala.collection.Iterator actions, + Configuration hadoopConf, + boolean shouldOverwrite) throws FileAlreadyExistsException { + Path targetPath = ManagedCommitUtils.getBackfilledDeltaFilePath(logPath, version); + logStore.write(targetPath, actions, shouldOverwrite, hadoopConf); + } + + private void validateBackfilledFileExists( + Path logPath, Configuration hadoopConf, Option lastKnownBackfilledVersion) { + try { + if (lastKnownBackfilledVersion.isEmpty()) { + return; + } + long version = (long) lastKnownBackfilledVersion.get(); + Path lastKnownBackfilledFile = + ManagedCommitUtils.getBackfilledDeltaFilePath(logPath, version); + FileSystem fs = logPath.getFileSystem(hadoopConf); + if (!fs.exists(lastKnownBackfilledFile)) { + throw new IllegalArgumentException( + "Expected backfilled file at " + lastKnownBackfilledFile + " does not exist."); + } + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + /** + * Backfills all the unbackfilled commits returned by the commit owner and notifies the commit + * owner of the backfills. + * The version parameter is ignored in this implementation and all the unbackfilled commits + * are backfilled. This method will not throw any exception if the physical backfill + * succeeds but the update to the commit owner fails. + * @throws IllegalArgumentException if the requested backfill version is greater than the latest + * version for the table. + */ + @Override + public void backfillToVersion( + LogStore logStore, + Configuration hadoopConf, + Path logPath, + Map managedCommitTableConf, + long version, + Option lastKnownBackfilledVersion) { + GetCommitsResponse resp = + getCommits(logPath, managedCommitTableConf, lastKnownBackfilledVersion, Option.empty()); + validateBackfilledFileExists(logPath, hadoopConf, lastKnownBackfilledVersion); + if (version > resp.getLatestTableVersion()) { + throw new IllegalArgumentException( + "The requested backfill version " + version + " is greater than the latest " + + "version " + resp.getLatestTableVersion() + " for the table."); + } + // If partial writes are visible in this filesystem, we should not try to overwrite existing + // files. A failed overwrite can truncate the existing file. + boolean shouldOverwrite = !logStore.isPartialWriteVisible( + logPath, + hadoopConf); + for (Commit commit: JavaConverters.asJavaIterable(resp.getCommits())) { + scala.collection.Iterator actions = + logStore.read(commit.getFileStatus().getPath(), hadoopConf).toIterator(); + try { + writeActionsToFile( + logStore, + logPath, + commit.getVersion(), + actions, + hadoopConf, + shouldOverwrite); + } catch (java.nio.file.FileAlreadyExistsException e) { + // Ignore the exception. This indicates that the file has already been backfilled. + } + } + UpdateItemRequest request = new UpdateItemRequest() + .withTableName(managedCommitsTableName) + .addKeyEntry( + DynamoDBTableEntryConstants.TABLE_ID, + new AttributeValue().withS(getTableId(managedCommitTableConf))) + .addAttributeUpdatesEntry( + DynamoDBTableEntryConstants.COMMITS, + new AttributeValueUpdate() + .withAction(AttributeAction.PUT) + .withValue(new AttributeValue().withL()) + ) + .withExpected(new HashMap(){ + { + put(DynamoDBTableEntryConstants.TABLE_LATEST_VERSION, new ExpectedAttributeValue() + .withValue( + new AttributeValue() + .withN(Long.toString(resp.getLatestTableVersion()))) + ); + put(DynamoDBTableEntryConstants.TABLE_PATH, new ExpectedAttributeValue() + .withValue( + new AttributeValue() + .withS(logPath.getParent().toString())) + ); + put(DynamoDBTableEntryConstants.SCHEMA_VERSION, new ExpectedAttributeValue() + .withValue( + new AttributeValue() + .withN(Integer.toString(CLIENT_VERSION))) + ); + } + }); + try { + client.updateItem(request); + } catch (ConditionalCheckFailedException e) { + // Ignore the exception. The backfill succeeded but the update to + // the commit owner failed. The main purpose of a backfill operation is to ensure that + // UUID commit is physically copied to a standard commit file path. A failed update to + // the commit owner is not critical. + } + } + + @Override + public Map registerTable( + Path logPath, + long currentVersion, + AbstractMetadata currentMetadata, + AbstractProtocol currentProtocol) { + java.util.Map item = new HashMap<>(); + + String tableId = java.util.UUID.randomUUID().toString(); + item.put(DynamoDBTableEntryConstants.TABLE_ID, new AttributeValue().withS(tableId)); + + // We maintain the invariant that a commit will only succeed if the latestVersion stored + // in the table is equal to attemptVersion - 1. To maintain this, even though the + // filesystem-based commit after register table can fail, we still treat the attemptVersion + // at registration as a valid version. It is expected that the Delta client will perform + // another registration if the filesystem-based commit fails. When the filesystem-based + // commit does fail, this entry will not be associated with any Delta table because the + // `tableId` generated above must be committed as part of the filesystem-based commit for + // this entry to be associated with a Delta table. + long attemptVersion = currentVersion + 1; + item.put( + DynamoDBTableEntryConstants.TABLE_LATEST_VERSION, + new AttributeValue().withN(Long.toString(attemptVersion))); + + item.put( + DynamoDBTableEntryConstants.TABLE_PATH, + new AttributeValue().withS(logPath.getParent().toString())); + item.put(DynamoDBTableEntryConstants.COMMITS, new AttributeValue().withL()); + item.put( + DynamoDBTableEntryConstants.ACCEPTING_COMMITS, new AttributeValue().withBOOL(true)); + item.put( + DynamoDBTableEntryConstants.SCHEMA_VERSION, + new AttributeValue().withN(Integer.toString(CLIENT_VERSION))); + + PutItemRequest request = new PutItemRequest() + .withTableName(managedCommitsTableName) + .withItem(item) + .withConditionExpression( + String.format( + "attribute_not_exists(%s)", DynamoDBTableEntryConstants.TABLE_ID)); + client.putItem(request); + + Tuple2 tableIdEntry = new Tuple2<>(DynamoDBTableEntryConstants.TABLE_ID, tableId); + Map tableConf = Map$.MODULE$.empty().$plus(tableIdEntry); + + return tableConf; + } + + // Copied from DynamoDbLogStore. TODO: add the logging back. + + /** + * Ensures that the table used to store commits from all Delta tables exists. If the table + * does not exist, it will be created. + * @throws IOException + */ + private void tryEnsureTableExists() throws IOException { + int retries = 0; + boolean created = false; + while(retries < 20) { + String status = "CREATING"; + try { + DescribeTableResult result = client.describeTable(managedCommitsTableName); + TableDescription descr = result.getTable(); + status = descr.getTableStatus(); + } catch (ResourceNotFoundException e) { + try { + client.createTable( + // attributeDefinitions + java.util.Collections.singletonList( + new AttributeDefinition( + DynamoDBTableEntryConstants.TABLE_ID, + ScalarAttributeType.S) + ), + managedCommitsTableName, + // keySchema + java.util.Collections.singletonList( + new KeySchemaElement( + DynamoDBTableEntryConstants.TABLE_ID, + KeyType.HASH) + ), + new ProvisionedThroughput(this.readCapacityUnits, this.writeCapacityUnits) + ); + created = true; + } catch (ResourceInUseException e3) { + // race condition - table just created by concurrent process + } + } + if (status.equals("ACTIVE")) { + break; + } else if (status.equals("CREATING")) { + retries += 1; + try { + Thread.sleep(1000); + } catch(InterruptedException e) { + throw new InterruptedIOException(e.getMessage()); + } + } else { + break; // TODO - raise exception? + } + }; + } + + @Override + public boolean semanticEquals(CommitOwnerClient other) { + if (!(other instanceof DynamoDBCommitOwnerClient)) { + return false; + } + DynamoDBCommitOwnerClient otherStore = (DynamoDBCommitOwnerClient) other; + return this.managedCommitsTableName.equals(otherStore.managedCommitsTableName) + && this.endpoint.equals(otherStore.endpoint); + } +} diff --git a/spark/src/main/java/io/delta/dynamodbcommitstore/DynamoDBCommitOwnerClientBuilder.java b/spark/src/main/java/io/delta/dynamodbcommitstore/DynamoDBCommitOwnerClientBuilder.java new file mode 100644 index 00000000000..d1522b67092 --- /dev/null +++ b/spark/src/main/java/io/delta/dynamodbcommitstore/DynamoDBCommitOwnerClientBuilder.java @@ -0,0 +1,90 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.dynamodbcommitstore; + +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient; +import org.apache.spark.sql.delta.managedcommit.CommitOwnerBuilder; +import org.apache.spark.sql.delta.managedcommit.CommitOwnerClient; +import org.apache.spark.sql.SparkSession; +import scala.collection.immutable.Map; + +import java.lang.reflect.InvocationTargetException; + +public class DynamoDBCommitOwnerClientBuilder implements CommitOwnerBuilder { + + private final long BACKFILL_BATCH_SIZE = 1L; + + @Override + public String getName() { + return "dynamodb"; + } + + /** + * Key for the name of the DynamoDB table which stores all the unbackfilled + * commits for this owner. The value of this key is stored in the `conf` + * which is passed to the `build` method. + */ + private static final String MANAGED_COMMITS_TABLE_NAME_KEY = "managedCommitsTableName"; + /** + * Key for the endpoint of the DynamoDB service. The value of this key is stored in the + * `conf` which is passed to the `build` method. + */ + private static final String DYNAMO_DB_ENDPOINT_KEY = "dynamoDBEndpoint"; + + /** + * The AWS credentials provider chain to use when creating the DynamoDB client. + * This has temporarily been hardcoded until we have a way to read from sparkSession. + */ + private static final String AWS_CREDENTIALS_PROVIDER = + "com.amazonaws.auth.DefaultAWSCredentialsProviderChain"; + + // TODO: update this interface so that it can take a sparkSession. + @Override + public CommitOwnerClient build(SparkSession spark, Map conf) { + String managedCommitsTableName = conf.get(MANAGED_COMMITS_TABLE_NAME_KEY).getOrElse(() -> { + throw new RuntimeException(MANAGED_COMMITS_TABLE_NAME_KEY + " not found"); + }); + String dynamoDBEndpoint = conf.get(DYNAMO_DB_ENDPOINT_KEY).getOrElse(() -> { + throw new RuntimeException(DYNAMO_DB_ENDPOINT_KEY + " not found"); + }); + try { + AmazonDynamoDBClient client = + createAmazonDDBClient(dynamoDBEndpoint, AWS_CREDENTIALS_PROVIDER); + return new DynamoDBCommitOwnerClient( + managedCommitsTableName, dynamoDBEndpoint, client, BACKFILL_BATCH_SIZE); + } catch (Exception e) { + throw new RuntimeException("Failed to create DynamoDB client", e); + } + } + + private AmazonDynamoDBClient createAmazonDDBClient( + String endpoint, + String credentialProviderName + ) throws NoSuchMethodException, + ClassNotFoundException, + InvocationTargetException, + InstantiationException, + IllegalAccessException { + Class awsCredentialsProviderClass = Class.forName(credentialProviderName); + AWSCredentialsProvider awsCredentialsProvider = + (AWSCredentialsProvider) awsCredentialsProviderClass.getConstructor().newInstance(); + AmazonDynamoDBClient client = new AmazonDynamoDBClient(awsCredentialsProvider); + client.setEndpoint(endpoint); + return client; + } +} diff --git a/spark/src/main/java/io/delta/dynamodbcommitstore/DynamoDBTableEntryConstants.java b/spark/src/main/java/io/delta/dynamodbcommitstore/DynamoDBTableEntryConstants.java new file mode 100644 index 00000000000..b1d53d8f99c --- /dev/null +++ b/spark/src/main/java/io/delta/dynamodbcommitstore/DynamoDBTableEntryConstants.java @@ -0,0 +1,49 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.dynamodbcommitstore; + +/** + * Defines the field names used in the DynamoDB table entry. + */ +final class DynamoDBTableEntryConstants { + private DynamoDBTableEntryConstants() {} + + /** The primary key of the DynamoDB table. */ + public static final String TABLE_ID = "tableId"; + /** The version of the latest commit in the corresponding Delta table. */ + public static final String TABLE_LATEST_VERSION = "tableVersion"; + /** The inCommitTimestamp of the latest commit in the corresponding Delta table. */ + public static final String TABLE_LATEST_TIMESTAMP = "tableTimestamp"; + /** Whether this commit owner is accepting more commits for the corresponding Delta table. */ + public static final String ACCEPTING_COMMITS = "acceptingCommits"; + /** The path of the corresponding Delta table. */ + public static final String TABLE_PATH = "path"; + /** The schema version of this DynamoDB table entry. */ + public static final String SCHEMA_VERSION = "schemaVersion"; + /** The name of the field used to store unbackfilled commits. */ + public static final String COMMITS = "commits"; + /** The unbackfilled commit version. */ + public static final String COMMIT_VERSION = "version"; + /** The inCommitTimestamp of the unbackfilled commit. */ + public static final String COMMIT_TIMESTAMP = "timestamp"; + /** The name of the unbackfilled file. e.g. 00001.uuid.json */ + public static final String COMMIT_FILE_NAME = "fsName"; + /** The length of the unbackfilled file as per the file status. */ + public static final String COMMIT_FILE_LENGTH = "fsLength"; + /** The modification timestamp of the unbackfilled file as per the file status. */ + public static final String COMMIT_FILE_MODIFICATION_TIMESTAMP = "fsTimestamp"; +} diff --git a/spark/src/main/java/io/delta/dynamodbcommitstore/ManagedCommitUtils.java b/spark/src/main/java/io/delta/dynamodbcommitstore/ManagedCommitUtils.java new file mode 100644 index 00000000000..97caa36c896 --- /dev/null +++ b/spark/src/main/java/io/delta/dynamodbcommitstore/ManagedCommitUtils.java @@ -0,0 +1,77 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.dynamodbcommitstore; + +import org.apache.spark.sql.delta.managedcommit.AbstractMetadata; +import org.apache.spark.sql.delta.managedcommit.UpdatedActions; +import org.apache.hadoop.fs.Path; + +import java.util.UUID; + +public class ManagedCommitUtils { + + private ManagedCommitUtils() {} + + /** The subdirectory in which to store the unbackfilled commit files. */ + final static String COMMIT_SUBDIR = "_commits"; + + /** The configuration key for the managed commit owner. */ + private static final String MANAGED_COMMIT_OWNER_CONF_KEY = + "delta.managedCommits.commitOwner-dev"; + + /** + * Creates a new unbackfilled delta file path for the given commit version. + * The path is of the form `tablePath/_delta_log/_commits/00000000000000000001.uuid.json`. + */ + public static Path generateUnbackfilledDeltaFilePath( + Path logPath, + long version) { + String uuid = UUID.randomUUID().toString(); + Path basePath = new Path(logPath, COMMIT_SUBDIR); + return new Path(basePath, String.format("%020d.%s.json", version, uuid)); + } + + /** + * Returns the path to the backfilled delta file for the given commit version. + * The path is of the form `tablePath/_delta_log/00000000000000000001.json`. + */ + public static Path getBackfilledDeltaFilePath( + Path logPath, + Long version) { + return new Path(logPath, String.format("%020d.json", version)); + } + + private static String getManagedCommitOwner(AbstractMetadata metadata) { + return metadata + .getConfiguration() + .get(MANAGED_COMMIT_OWNER_CONF_KEY) + .getOrElse(() -> ""); + } + + /** + * Returns true if the commit is a managed commit to filesystem conversion. + */ + public static boolean isManagedCommitToFSConversion( + Long commitVersion, + UpdatedActions updatedActions) { + boolean oldMetadataHasManagedCommits = + !getManagedCommitOwner(updatedActions.getOldMetadata()).isEmpty(); + boolean newMetadataHasManagedCommits = + !getManagedCommitOwner(updatedActions.getNewMetadata()).isEmpty(); + return oldMetadataHasManagedCommits && !newMetadataHasManagedCommits && commitVersion > 0; + } +} diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/CommitOwnerClient.scala b/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/CommitOwnerClient.scala index bb669a007d9..c3d731da157 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/CommitOwnerClient.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/CommitOwnerClient.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.delta.managedcommit import scala.collection.mutable import org.apache.spark.sql.delta.storage.LogStore +import io.delta.dynamodbcommitstore.DynamoDBCommitOwnerClientBuilder import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} @@ -45,7 +46,7 @@ case class Commit( case class CommitFailedException( private val retryable: Boolean, private val conflict: Boolean, - private val message: String) extends Exception(message) { + private val message: String) extends RuntimeException(message) { def getRetryable: Boolean = retryable def getConflict: Boolean = conflict } @@ -237,7 +238,7 @@ object CommitOwnerProvider { } private val initialCommitOwnerBuilders = Seq[CommitOwnerBuilder]( - // Any new commit-owner builder will be registered here. + new DynamoDBCommitOwnerClientBuilder() ) initialCommitOwnerBuilders.foreach(registerBuilder) } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/CommitOwnerClientImplSuiteBase.scala b/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/CommitOwnerClientImplSuiteBase.scala index 350e99bd0a6..f8557e32bf9 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/CommitOwnerClientImplSuiteBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/CommitOwnerClientImplSuiteBase.scala @@ -20,17 +20,21 @@ import java.io.File import java.util.concurrent.{Executors, TimeUnit} import java.util.concurrent.atomic.AtomicInteger +import scala.concurrent.duration._ + import org.apache.spark.sql.delta.DeltaLog import org.apache.spark.sql.delta.actions.{CommitInfo, Metadata, Protocol} import org.apache.spark.sql.delta.storage.{LogStore, LogStoreProvider} import org.apache.spark.sql.delta.test.{DeltaSQLCommandTest, DeltaSQLTestUtils} import org.apache.spark.sql.delta.util.FileNames +import org.apache.spark.sql.delta.util.threads.DeltaThreadPool +import io.delta.dynamodbcommitstore.DynamoDBCommitOwnerClient import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.sql.QueryTest import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.util.Utils +import org.apache.spark.util.{ThreadUtils, Utils} trait CommitOwnerClientImplSuiteBase extends QueryTest with SharedSparkSession @@ -50,7 +54,7 @@ trait CommitOwnerClientImplSuiteBase extends QueryTest * of backfill registration. */ protected def registerBackfillOp( - commitOwnerClient: CommitOwnerClient, + tableCommitOwnerClient: TableCommitOwnerClient, deltaLog: DeltaLog, version: Long): Unit @@ -90,11 +94,7 @@ trait CommitOwnerClientImplSuiteBase extends QueryTest tableCommitOwnerClient: TableCommitOwnerClient, commitTimestampsOpt: Option[Array[Long]] = None): Unit = { val maxUntrackedVersion: Int = { - val commitResponse = - tableCommitOwnerClient.commitOwnerClient.getCommits( - logPath, - tableCommitOwnerClient.tableConf - ) + val commitResponse = tableCommitOwnerClient.getCommits() if (commitResponse.getCommits.isEmpty) { commitResponse.getLatestTableVersion.toInt } else { @@ -198,15 +198,19 @@ trait CommitOwnerClientImplSuiteBase extends QueryTest val log = DeltaLog.forTable(spark, tempDir.toString) val logPath = log.logPath val tableCommitOwnerClient = createTableCommitOwnerClient(log) - tableCommitOwnerClient.commitOwnerClient.registerTable( - logPath, currentVersion = -1L, initMetadata, Protocol(1, 1)) val e = intercept[CommitFailedException] { commit(version = 0, timestamp = 0, tableCommitOwnerClient) } assert(e.getMessage === "Commit version 0 must go via filesystem.") writeCommitZero(logPath) - assert(tableCommitOwnerClient.getCommits() == GetCommitsResponse(Seq.empty, -1)) + var expectedVersion = -1 + if (tableCommitOwnerClient.commitOwnerClient.isInstanceOf[DynamoDBCommitOwnerClient]) { + // DynamoDBCommitOwnerClient stores attemptVersion as the current table version when + // registerTable is called. + expectedVersion = 0 + } + assert(tableCommitOwnerClient.getCommits() == GetCommitsResponse(Seq.empty, expectedVersion)) assertBackfilled(version = 0, logPath, Some(0L)) // Test backfilling functionality for commits 1 - 8 @@ -218,7 +222,7 @@ trait CommitOwnerClientImplSuiteBase extends QueryTest // Test that out-of-order backfill is rejected intercept[IllegalArgumentException] { - registerBackfillOp(tableCommitOwnerClient.commitOwnerClient, log, 10) + registerBackfillOp(tableCommitOwnerClient, log, 10) } assertInvariants(logPath, tableCommitOwnerClient) } @@ -239,9 +243,6 @@ trait CommitOwnerClientImplSuiteBase extends QueryTest val log = DeltaLog.forTable(spark, tempDir.toString) val logPath = log.logPath val tableCommitOwnerClient = createTableCommitOwnerClient(log) - tableCommitOwnerClient.commitOwnerClient.registerTable( - logPath, currentVersion = -1L, initMetadata, Protocol(1, 1) - ) writeCommitZero(logPath) val maxVersion = 15 (1 to maxVersion).foreach { version => @@ -261,22 +262,17 @@ trait CommitOwnerClientImplSuiteBase extends QueryTest val log = DeltaLog.forTable(spark, tempDir.getPath) val logPath = log.logPath val tableCommitOwnerClient = createTableCommitOwnerClient(log) - intercept[IllegalArgumentException] { - registerBackfillOp(tableCommitOwnerClient.commitOwnerClient, log, 0) - } - tableCommitOwnerClient.commitOwnerClient.registerTable( - logPath, currentVersion = -1L, initMetadata, Protocol(1, 1)) // commit-0 must be file system based writeCommitZero(logPath) (1 to 3).foreach(i => commit(i, i, tableCommitOwnerClient)) // Test that backfilling is idempotent for already-backfilled commits. - registerBackfillOp(tableCommitOwnerClient.commitOwnerClient, log, 2) - registerBackfillOp(tableCommitOwnerClient.commitOwnerClient, log, 2) + registerBackfillOp(tableCommitOwnerClient, log, 2) + registerBackfillOp(tableCommitOwnerClient, log, 2) // Test that backfilling uncommited commits fail. intercept[IllegalArgumentException] { - registerBackfillOp(tableCommitOwnerClient.commitOwnerClient, log, 4) + registerBackfillOp(tableCommitOwnerClient, log, 4) } } } @@ -286,8 +282,6 @@ trait CommitOwnerClientImplSuiteBase extends QueryTest val log = DeltaLog.forTable(spark, tempDir.toString) val logPath = log.logPath val tableCommitOwnerClient = createTableCommitOwnerClient(log) - tableCommitOwnerClient.commitOwnerClient.registerTable( - logPath, currentVersion = -1L, initMetadata, Protocol(1, 1)) // commit-0 must be file system based writeCommitZero(logPath) @@ -316,43 +310,37 @@ trait CommitOwnerClientImplSuiteBase extends QueryTest val numberOfWriters = 11 val numberOfCommitsPerWriter = 11 // scalastyle:off sparkThreadPools - val executor = Executors.newFixedThreadPool(numberOfWriters) + val executor = DeltaThreadPool("commitOwnerSuite", numberOfWriters) // scalastyle:on sparkThreadPools val runningTimestamp = new AtomicInteger(0) val commitFailedExceptions = new AtomicInteger(0) - val totalCommits = numberOfWriters * numberOfCommitsPerWriter - val commitTimestamp: Array[Long] = new Array[Long](totalCommits) + // commit-0 must be file system based + writeCommitZero(logPath) try { - (0 until numberOfWriters).foreach { i => - executor.submit(new Runnable { - override def run(): Unit = { + val tasks = (0 until numberOfWriters).map { i => + executor.submit(spark) { var currentWriterCommits = 0 while (currentWriterCommits < numberOfCommitsPerWriter) { - val nextVersion = tcs.getCommits().getLatestTableVersion + 1 + val nextVersion = math.max(tcs.getCommits().getLatestTableVersion + 1, 1) try { val currentTimestamp = runningTimestamp.getAndIncrement() val commitResponse = commit(nextVersion, currentTimestamp, tcs) currentWriterCommits += 1 assert(commitResponse.getCommitTimestamp == currentTimestamp) assert(commitResponse.getVersion == nextVersion) - commitTimestamp(commitResponse.getVersion.toInt) = - commitResponse.getCommitTimestamp } catch { case e: CommitFailedException => assert(e.getConflict) assert(e.getRetryable) commitFailedExceptions.getAndIncrement() } finally { - assertInvariants(logPath, tcs, Some(commitTimestamp)) + assertInvariants(logPath, tcs) } } } - }) } - - executor.shutdown() - executor.awaitTermination(15, TimeUnit.SECONDS) + tasks.foreach(ThreadUtils.awaitResult(_, 150.seconds)) } catch { case e: InterruptedException => fail("Test interrupted: " + e.getMessage) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/DynamoDBCommitOwnerClientSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/DynamoDBCommitOwnerClientSuite.scala new file mode 100644 index 00000000000..2ea8c2b51ad --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/DynamoDBCommitOwnerClientSuite.scala @@ -0,0 +1,241 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.managedcommit + +import java.util.concurrent.locks.ReentrantReadWriteLock + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import com.amazonaws.services.dynamodbv2.AbstractAmazonDynamoDB +import com.amazonaws.services.dynamodbv2.model.{AttributeValue, ConditionalCheckFailedException, CreateTableRequest, CreateTableResult, DescribeTableResult, GetItemRequest, GetItemResult, PutItemRequest, PutItemResult, ResourceInUseException, ResourceNotFoundException, TableDescription, UpdateItemRequest, UpdateItemResult} +import org.apache.spark.sql.delta.DeltaLog +import org.apache.spark.sql.delta.actions.{Metadata, Protocol} +import org.apache.spark.sql.delta.util.FileNames +import io.delta.dynamodbcommitstore.DynamoDBCommitOwnerClient +import org.apache.hadoop.fs.Path + +import org.apache.spark.sql.SparkSession + +/** + * An in-memory implementation of DynamoDB client for testing. Only the methods used by + * `DynamoDBCommitOwnerClient` are implemented. + */ +class InMemoryDynamoDBClient extends AbstractAmazonDynamoDB { + /** + * The db has multiple tables (outer map). Each table has multiple entries (inner map). + */ + val db: mutable.Map[String, mutable.Map[String, PerEntryData]] = mutable.Map.empty + case class PerEntryData( + lock: ReentrantReadWriteLock, + data: mutable.Map[String, AttributeValue]) + + private def getTableData(tableName: String): mutable.Map[String, PerEntryData] = { + db.getOrElse(tableName, throw new ResourceNotFoundException("table does not exist")) + } + + override def createTable(createTableRequest: CreateTableRequest): CreateTableResult = { + val tableName = createTableRequest.getTableName + if (db.contains(tableName)) { + throw new ResourceInUseException("Table already exists") + } + db.getOrElseUpdate(tableName, mutable.Map.empty) + new CreateTableResult().withTableDescription( + new TableDescription().withTableName(tableName)); + } + + override def describeTable(tableName: String): DescribeTableResult = { + if (!db.contains(tableName)) { + throw new ResourceNotFoundException("table does not exist") + } + val tableDesc = + new TableDescription().withTableName(tableName).withTableStatus("ACTIVE") + new DescribeTableResult().withTable(tableDesc) + } + + override def getItem(getItemRequest: GetItemRequest): GetItemResult = { + val table = getTableData(getItemRequest.getTableName) + val tableId = getItemRequest.getKey.values().iterator().next(); + val entry = table.getOrElse(tableId.getS, + throw new ResourceNotFoundException("table does not exist")) + val lock = entry.lock.readLock() + try { + lock.lock() + val result = new GetItemResult() + getItemRequest.getAttributesToGet.forEach(attr => { + entry.data.get(attr).foreach(result.addItemEntry(attr, _)) + }) + result + } finally { + lock.unlock() + } + } + + override def putItem(putItemRequest: PutItemRequest): PutItemResult = { + val table = getTableData(putItemRequest.getTableName) + val item = putItemRequest.getItem + val tableId = item.get("tableId").getS + if (table.contains(tableId)) { + throw new ResourceInUseException("table already exists") + } + val entry = PerEntryData(new ReentrantReadWriteLock(), item.asScala) + // This is not really safe, but tableId is a UUID, so it should be fine. + table.put(tableId, entry) + new PutItemResult() + } + + override def updateItem(request: UpdateItemRequest): UpdateItemResult = { + val table = getTableData(request.getTableName) + val tableId = request.getKey.values().iterator().next(); + val entry = table.getOrElse(tableId.getS, + throw new ResourceNotFoundException("table does not exist")) + val lock = entry.lock.writeLock() + try { + lock.lock() + request.getExpected.forEach((attr, expectedVal) => { + val actualVal = entry.data.getOrElse(attr, + throw new ConditionalCheckFailedException("Expected attr not found")) + if (actualVal != expectedVal.getValue) { + throw new ConditionalCheckFailedException("Value does not match") + } + }) + request.getAttributeUpdates.forEach((attr, update) => { + if (attr != "commits") { + entry.data.put(attr, update.getValue) + } else { + val commits = update.getValue.getL.asScala + if (update.getAction == "ADD") { + val existingCommits = + entry.data.get("commits").map(_.getL.asScala).getOrElse(List()) + entry.data.put( + "commits", new AttributeValue().withL((existingCommits ++ commits).asJava)) + } else if (update.getAction == "PUT") { + entry.data.put("commits", update.getValue) + } else { + throw new IllegalArgumentException("Unsupported action") + } + } + }) + new UpdateItemResult() + } finally { + lock.unlock() + } + } +} + +case class TestDynamoDBCommitOwnerBuilder(batchSize: Long) extends CommitOwnerBuilder { + override def getName: String = "test-dynamodb" + override def build(spark: SparkSession, config: Map[String, String]): CommitOwnerClient = { + new DynamoDBCommitOwnerClient( + "testTable", + "test-endpoint", + new InMemoryDynamoDBClient(), + batchSize) + } +} + +abstract class DynamoDBCommitOwnerClientSuite(batchSize: Long) + extends CommitOwnerClientImplSuiteBase { + + override protected def createTableCommitOwnerClient( + deltaLog: DeltaLog) + : TableCommitOwnerClient = { + val cs = TestDynamoDBCommitOwnerBuilder(batchSize = batchSize).build(spark, Map.empty) + val tableConf = cs.registerTable( + deltaLog.logPath, + currentVersion = -1L, + Metadata(), + Protocol(1, 1)) + TableCommitOwnerClient(cs, deltaLog, tableConf) + } + + override protected def registerBackfillOp( + tableCommitOwnerClient: TableCommitOwnerClient, + deltaLog: DeltaLog, + version: Long): Unit = { + tableCommitOwnerClient.backfillToVersion(version) + } + + override protected def validateBackfillStrategy( + tableCommitOwnerClient: TableCommitOwnerClient, + logPath: Path, + version: Long): Unit = { + val lastExpectedBackfilledVersion = (version - (version % batchSize)).toInt + val unbackfilledCommitVersionsAll = tableCommitOwnerClient + .getCommits().getCommits.map(_.getVersion) + val expectedVersions = lastExpectedBackfilledVersion + 1 to version.toInt + + assert(unbackfilledCommitVersionsAll == expectedVersions) + (0 to lastExpectedBackfilledVersion).foreach { v => + assertBackfilled(v, logPath, Some(v)) + } + } + + protected def validateGetCommitsResult( + result: GetCommitsResponse, + startVersion: Option[Long], + endVersion: Option[Long], + maxVersion: Long): Unit = { + val commitVersions = result.getCommits.map(_.getVersion) + val lastExpectedBackfilledVersion = (maxVersion - (maxVersion % batchSize)).toInt + val expectedVersions = lastExpectedBackfilledVersion + 1 to maxVersion.toInt + assert(commitVersions == expectedVersions) + assert(result.getLatestTableVersion == maxVersion) + } + + for (skipPathCheck <- Seq(true, false)) + test(s"skipPathCheck should work correctly [skipPathCheck = $skipPathCheck]") { + withTempTableDir { tempDir => + val log = DeltaLog.forTable(spark, tempDir.toString) + val logPath = log.logPath + writeCommitZero(logPath) + val dynamoDB = new InMemoryDynamoDBClient(); + val commitOwner = new DynamoDBCommitOwnerClient( + "testTable", + "test-endpoint", + dynamoDB, + batchSize, + 1, // readCapacityUnits + 1, // writeCapacityUnits + skipPathCheck) + val tableConf = commitOwner.registerTable( + logPath, + -1L, + Metadata(), + Protocol(1, 1)) + val wrongTablePath = new Path(logPath.getParent, "wrongTable") + val wrongLogPath = new Path(wrongTablePath, logPath.getName) + val fs = wrongLogPath.getFileSystem(log.newDeltaHadoopConf()) + fs.mkdirs(wrongTablePath) + fs.mkdirs(FileNames.commitDirPath(wrongLogPath)) + val wrongTablePathTableCommitOwner = new TableCommitOwnerClient( + commitOwner, wrongLogPath, tableConf, log.newDeltaHadoopConf(), log.store) + if (skipPathCheck) { + // This should succeed because we are skipping the path check. + val resp = commit(1L, 1L, wrongTablePathTableCommitOwner) + assert(resp.getVersion == 1L) + } else { + val e = intercept[CommitFailedException] { + commit(1L, 1L, wrongTablePathTableCommitOwner) + } + assert(e.getMessage.contains("while the table is registered at")) + } + } + } +} + +class DynamoDBCommitOwnerClient5BackfillSuite extends DynamoDBCommitOwnerClientSuite(5) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/InMemoryCommitOwnerSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/InMemoryCommitOwnerSuite.scala index 7ba4265926d..be1fd909e70 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/InMemoryCommitOwnerSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/InMemoryCommitOwnerSuite.scala @@ -25,13 +25,19 @@ abstract class InMemoryCommitOwnerSuite(batchSize: Int) extends CommitOwnerClien override protected def createTableCommitOwnerClient( deltaLog: DeltaLog): TableCommitOwnerClient = { val cs = InMemoryCommitOwnerBuilder(batchSize).build(spark, Map.empty) - TableCommitOwnerClient(cs, deltaLog, Map.empty[String, String]) + val conf = cs.registerTable( + deltaLog.logPath, + currentVersion = -1L, + initMetadata, + Protocol(1, 1)) + TableCommitOwnerClient(cs, deltaLog, conf) } override protected def registerBackfillOp( - commitOwnerClient: CommitOwnerClient, + tableCommitOwnerClient: TableCommitOwnerClient, deltaLog: DeltaLog, version: Long): Unit = { + val commitOwnerClient = tableCommitOwnerClient.commitOwnerClient val inMemoryCS = commitOwnerClient.asInstanceOf[InMemoryCommitOwner] inMemoryCS.registerBackfill(deltaLog.logPath, version) } @@ -91,8 +97,6 @@ abstract class InMemoryCommitOwnerSuite(batchSize: Int) extends CommitOwnerClien val log = DeltaLog.forTable(spark, tempDir.toString) val logPath = log.logPath val tcs = createTableCommitOwnerClient(log) - tcs.commitOwnerClient.registerTable( - logPath, currentVersion = -1L, initMetadata, Protocol(1, 1)) // Anything other than version-0 or version-1 should be rejected as the first commit // version-0 will be directly backfilled and won't be recorded in InMemoryCommitOwner.