-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Spark] Managed Commits: add a DynamoDB-based commit owner #3107
Conversation
assert(tableCommitOwnerClient.getCommits() == GetCommitsResponse(Seq.empty, -1)) | ||
var expectedVersion = -1 | ||
if (tableCommitOwnerClient.commitOwnerClient.isInstanceOf[DynamoDBCommitOwnerClient]) { | ||
// DynamoDBCommitOwnerClient stores attemptVersion as the current table version when |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DynamoDBCommitOwnerClient.getCommits
starts advertising attemptVersion
as the current commit version even without knowledge around whether the commit has gone through or not. Practically this won't cause any issue because until the commit is done, no client will invoke DynamoDBCommitOwnerClient.getCommits
since the table was a FS table. But just for the sake of semantics, we should fix it in a followup PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, for DynamoDB, we treat registerTable as a valid commit and store the attemptVersion instead of the current Delta table version. This is not risky because the DynamoDB table entry is only treated as valid if the corresponding filesystem commit goes through (which means that the attemptVersion is in fact the Delta table version). In case of a filesystem conflict, it is expected that registerTable will be called again. However, for consistency, we can use another bool to track whether any commits have gone through DynamoDB and return -1.
5 /* readCapacityUnits */, | ||
5 /* writeCapacityUnits */, | ||
false /* skipPathCheck */); | ||
tryEnsureTableExists(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are calling the other constructor here so it will take care of invoking tryEnsureTableExists ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, will fix
return tableConf; | ||
} | ||
|
||
// Copied from DynamoDbLogStore. TODO: add the logging back. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What logging needs to be added back?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- That the table exists/does not exist
- Table creation attempt
- Table creation failure
I will add these and logs in other parts of this class in a follow up PR.
String dynamoDBEndpoint = conf.get(DYNAMO_DB_ENDPOINT_KEY).getOrElse(() -> { | ||
throw new RuntimeException(DYNAMO_DB_ENDPOINT_KEY + " not found"); | ||
}); | ||
String awsCredentialsProviderName = conf.get(AWS_CREDENTIALS_PROVIDER_KEY).getOrElse( () -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
String awsCredentialsProviderName = conf.get(AWS_CREDENTIALS_PROVIDER_KEY).getOrElse( () -> | |
String awsCredentialsProviderName = conf.get(AWS_CREDENTIALS_PROVIDER_KEY).getOrElse(() -> |
String dynamoDBEndpoint = conf.get(DYNAMO_DB_ENDPOINT_KEY).getOrElse(() -> { | ||
throw new RuntimeException(DYNAMO_DB_ENDPOINT_KEY + " not found"); | ||
}); | ||
String awsCredentialsProviderName = conf.get(AWS_CREDENTIALS_PROVIDER_KEY).getOrElse( () -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should AWS_CREDENTIALS_PROVIDER_KEY come from the commitOwnerConf from DeltaLog? Isn't this more of an authentication config which should be configured at JVM level i.e. via SparkSession.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, once the changes around passing sparkSession to the CommitOwnerBuilder have baked in, I will update this to read it directly from sparkSession. For now, I will hardcode this to the default credentials provider.
|
||
private static final String MANAGED_COMMITS_TABLE_NAME_KEY = "managedCommitsTableName"; | ||
private static final String DYNAMO_DB_ENDPOINT_KEY = "dynamoDBEndpoint"; | ||
private static final String AWS_CREDENTIALS_PROVIDER_KEY = "awsCredentialsProvider"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add comments for these properties.
* signature. This method wraps the write method and declares the exception to ensure that the | ||
* caller is aware of the exception. | ||
*/ | ||
private void writeActionsToFile( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we call this writeActionsToBackfilledFile
?
"The requested backfill version " + version + " is greater than the latest " + | ||
"version " + resp.getLatestTableVersion() + " for the table."); | ||
} | ||
// If partial writes a visible in this filesystem, we should not try to overwrite existing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// If partial writes a visible in this filesystem, we should not try to overwrite existing | |
// If partial writes are visible in this filesystem, we should not try to overwrite existing |
@@ -43,7 +43,7 @@ case class Commit( | |||
case class CommitFailedException( | |||
private val retryable: Boolean, | |||
private val conflict: Boolean, | |||
private val message: String) extends Exception(message) { | |||
private val message: String) extends RuntimeException(message) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this changed needed? Isn't that just making it more restrictive?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is needed because the CommitOwnerClient trait does not declare that commit
can throw CommitFailedException
in the signature (Scala doesn't let us do that). The Java code won't compile if I try to throw an exception of type Exception if it has not been declared in the function signature. RuntimeException
s can be thrown though.
Which Delta project/connector is this regarding?
Description
Taking inspiration from #339, this PR adds a Commit Owner Client which uses DynamoDB as the backend. Each Delta table managed by a DynamoDB instance will have one corresponding entry in a DynamoDB table. The table schema is as follows:
For a table to be managed by DynamoDB,
registerTable
must be called for that Delta table. This will create a new entry in the db for this Delta table. Everycommit
invocation appends the UUID delta file status to thecommits
list in the table entry.commit
is performed through a conditional write in DynamoDB.How was this patch tested?
Added a new suite called
DynamoDBCommitOwnerClient5BackfillSuite
which uses a mock DynamoDB client. + plus manual testing against a DynamoDB instance.Does this PR introduce any user-facing changes?