Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: optimistic transaction protocol #632

Merged
merged 87 commits into from
Apr 7, 2023
Merged

Conversation

roeap
Copy link
Collaborator

@roeap roeap commented Jun 9, 2022

Description

This PR adds a ConflictChecker struct for conflict resolution in cases of concurrent commit failures. The implementation is heavily inspired by the reference implementation. So far we have most tests from spark that specifically target conflict resolution covered.

Working on this I thought a bit about what we may consider going forward, as we move through the protocol versions :). In the end we could end up with three main structs that are involved in validating a commit.

  • The existing DataChecker, which validates and potentially mutates data when writing data files to disk. (Currently supports invariants)
  • The upcoming ConflictChecker, which checks if a commit can be re-tried in case of commit conflicts.
  • A new CommitChecker, which does a-priory validation of the commit itself (e.g. append only and other rules covered by tests in spark)

My hope is to get this PR merged right after we release 0.8.0, so there is some time to fill some holes and fully leverage the new feature for 0.9.0.

If folks agree, I would open some issues and start work on some follow-ups..

Follow-ups

  • Extend ConflictChecker support conflict resolution for streaming transactions
  • Implement CommitChecker
  • Deprecate old commit function.
  • Extend DataChecker.
  • Consolidate record batch writer implementations.

Related Issue(s)

part of #593

Documentation

@houqp
Copy link
Member

houqp commented Jun 13, 2022

Thank you @roeap for picking up this work! I will take a closer look at it this weekend. 👍 from me for including datafusion-expr if needed.

@houqp
Copy link
Member

houqp commented Jun 19, 2022

The structure looks good to me 👍 When we introduce a proper expression handling abstraction, we should upgrade what we have in partition handling with it as well:

pub enum PartitionValue<T> {

@roeap roeap dismissed a stale review via aedecde June 27, 2022 22:20
@roeap roeap force-pushed the conflict-checker branch from cd8609b to aedecde Compare June 27, 2022 22:20
rust/src/conflict_checker.rs Outdated Show resolved Hide resolved
rust/src/conflict_checker.rs Outdated Show resolved Hide resolved
rust/src/conflict_checker.rs Outdated Show resolved Hide resolved
rust/src/delta.rs Outdated Show resolved Hide resolved
rust/src/delta.rs Outdated Show resolved Hide resolved
rust/src/delta.rs Outdated Show resolved Hide resolved
rust/src/delta.rs Outdated Show resolved Hide resolved
rust/src/delta.rs Outdated Show resolved Hide resolved
@roeap roeap requested review from rtyler and mosyp as code owners March 11, 2023 21:11
@roeap roeap requested a review from wjones127 March 11, 2023 21:12
@roeap roeap changed the title Add ConflictChecker for concurrency control feat: optimistic transaction protocol Mar 11, 2023
@wjones127
Copy link
Collaborator

FYI I'm planning on reviewing this weekend. :)

chitralverma pushed a commit to chitralverma/delta-rs that referenced this pull request Mar 17, 2023
# Description

This PR refactors and extends the table configuration handling. The
approach is analogous to how we and object_store handle configuration
via storage properties. The idea is to provide a somewhat typed layer
over the untyped configuration keys.

There was one surprising thing along the way. From what I can tell, we
may have been omitting the `delta.` prefix on the config keys we parse.
So this would definitely be breaking behaviour, since we no longer
recognize keys we were parsing before. We can in principle handle
aliases for keys quite easily, but I was not sure what the desired
behaviour is.

cc @rtyler @xianwill - This change would probably affect
`kafka-delta-ingest`, so especially interested in your opinions!

# Related Issue(s)

part of delta-io#632

# Documentation

<!---
Share links to useful documentation
--->
chitralverma pushed a commit to chitralverma/delta-rs that referenced this pull request Mar 17, 2023
# Description

Another PR on the road to delta-io#632 - ~~keeping it a draft, as it is based on
delta-io#1206~~

While the `commitInfo` action is defined as completely optional, spark
and delta-rs write at the very least interesting, but often also quite
helpful information into the commit info. To make it easier to work with
and centralize some conventions, we introduce a `CommitInfo` struct,
that exposes some of the fields at the top level. Additionally we
harmonize a bit between spark and delta-rs conventions.

# Related Issue(s)

part of delta-io#632 

# Documentation

<!---
Share links to useful documentation
--->

---------

Co-authored-by: Will Jones <willjones127@gmail.com>
@roeap roeap dismissed a stale review via 3d801e0 March 17, 2023 17:36
Comment on lines +659 to +666
pub fn read_whole_table(&self) -> bool {
match self {
// TODO just adding one operation example, as currently none of the
// implemented operations scan the entire table.
Self::Write { predicate, .. } if predicate.is_none() => false,
_ => false,
}
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reference implementaition allows for txn to "taint" the entire table, in which case we disregrad analysing the specific actions. The conflict checker covers this behavior in tests, but I haven't investigated yet, when this is actually set. Mainly leaving this fn, as a "reminder" for subsequent PRs soon to come.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does overwrite read the entire table?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or could you provide examples of which operations will?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our operations should actually never use this directly right now. I went through the spark code base, and it seems this is invoked mostly in cases when a plan querying multiple tables with spark has a delta table set as a sink.. This usually uses generic plans and does not invoke the delta operations directly, or rather when we are not too sure what actually happened.

Did not get too deep though. Personally I am a bit conflicted on how to proceed with this. on one hand it does nothing useful right now. On the other hand, we can keep it as a reminder to mindful of this. While right now I think datafusion does not yet have the concept of a sink in their plans, I believe they might be added fairly soon. At this point we may encounter situations, where a plan wants to write to delta, but we have no way of knowing what was scanned.

That said, I do hope that we will be able to inspect the plan metrics and figure out what was read anyhow. Especially since we report the scanned files in out scan operator.

&actions,
DeltaOperation::FileSystemCheck {},
snapshot,
// TODO pass through metadata
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I plan to do a pass through all our operations in the next Pr, where also I'll be focussing on testing the commit function, and addressing these kinds of TODOs.

Comment on lines +63 to +85
pub fn parse_predicate_expression(&self, expr: impl AsRef<str>) -> DeltaResult<Expr> {
let dialect = &GenericDialect {};
let mut tokenizer = Tokenizer::new(dialect, expr.as_ref());
let tokens = tokenizer
.tokenize()
.map_err(|err| DeltaTableError::GenericError {
source: Box::new(err),
})?;
let sql = Parser::new(dialect)
.with_tokens(tokens)
.parse_expr()
.map_err(|err| DeltaTableError::GenericError {
source: Box::new(err),
})?;

// TODO should we add the table name as qualifier when available?
let df_schema = DFSchema::try_from_qualified_schema("", self.arrow_schema()?.as_ref())?;
let context_provider = DummyContextProvider::default();
let sql_to_rel = SqlToRel::new(&context_provider);

Ok(sql_to_rel.sql_to_expr(sql, &df_schema, &mut Default::default())?)
}
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Parsing predicate expressions is actually kind of broken. Regardless of the schema we pass in, int fields are always recognized as i64. I hope, that the abstractions in the core project will eventually help us out. Until then I plan to either do a follow-up in re-writing these expressions, or, if this is a bug, addressing this upstream in datafusion.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yet another set of test utilities, and yet another TODO for follow up PR, to consolidate our test helpers...

Copy link
Collaborator

@wjones127 wjones127 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks like a great improvement. I think we want to be more conservative in cases where there are multiple concurrent transactions that beat the candidate one. Also the error messages could be improved.

_ => None,
}
}

/// Denotes if the operation reads the entire table
pub fn read_whole_table(&self) -> bool {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method and changes_data makes me wonder whether DeltaOperation should instead be a trait so that we require each instance to implement.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed! If OK with you, I would defer exploring that to a follow up PR. To really get a feel for how that API should look like, I would like to work a bit more with operations that actually need the conflict resolution :).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds good to me!

Comment on lines +659 to +666
pub fn read_whole_table(&self) -> bool {
match self {
// TODO just adding one operation example, as currently none of the
// implemented operations scan the entire table.
Self::Write { predicate, .. } if predicate.is_none() => false,
_ => false,
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does overwrite read the entire table?

rust/src/operations/transaction/state.rs Outdated Show resolved Hide resolved
rust/src/operations/transaction/state.rs Outdated Show resolved Hide resolved
Comment on lines +734 to +736
// add / read + no write
// transaction would have read file added by concurrent txn, but does not write data,
// so no real conflicting change even though data was added
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does read + no write mean no data change? like an optimize?

rust/src/operations/transaction/conflict_checker.rs Outdated Show resolved Hide resolved
version_to_read += 1;
}

// TODO how to handle commit info for multiple read commits?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, if there are multiple versions, should we just error for now? If there are multiple commits that beat our candidate, it seems like we should generate a WinningCommitSummary for each transaction after read_version, right? And then checking our candidate commit against each of them?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Changed the logic, that out commit loop will always only check against the currently conflicting commit and as such the WinningCommitSummary will now summarize exactly one commit.

rust/src/operations/transaction/conflict_checker.rs Outdated Show resolved Hide resolved
rust/src/operations/transaction/conflict_checker.rs Outdated Show resolved Hide resolved
Comment on lines +659 to +666
pub fn read_whole_table(&self) -> bool {
match self {
// TODO just adding one operation example, as currently none of the
// implemented operations scan the entire table.
Self::Write { predicate, .. } if predicate.is_none() => false,
_ => false,
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or could you provide examples of which operations will?

@roeap
Copy link
Collaborator Author

roeap commented Mar 28, 2023

@wjones127 - sorry for taking a long time to incorporate your feedback. I think it's mostly done now.

Mainly we now always check against the next version of commits. The questions around read_whole_table etc. I hope to address right after this.

As I plus I re-enabled an existing test for optimise, which we can now support again.

@wjones127
Copy link
Collaborator

I am realizing we have another transaction implementation called DeltaTransaction, which is what we are currently using in Python. This improved implementation doesn't seem to have a low-level API where the user can provide the added and removed files, which is what we need for the PyArrow-based writer.

So I think we need to consolidate those into operations/transaction/mod.rs, make them public, and switch the Python implementation. That sounds right to you?

Copy link
Collaborator

@wjones127 wjones127 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking good now.

The PySpark integration tests will be fixed soon. I should take a look at the sporadic failures in the Azure tests.

@roeap
Copy link
Collaborator Author

roeap commented Apr 7, 2023

So I think we need to consolidate those into operations/transaction/mod.rs, make them public, and switch the Python implementation. That sounds right to you?

Yes it does. So far I have deliberately kept the implementation of commits used in the operations module separate from the existing one on the table. Where I am not sure is if we need the DeltaTransaction to be a struct at all, I think this is somehow an artifact from the reference implementation, where the Traction is conceptually used kind of like a context in python (it's been a long time since I wrote scala :D). In the new trasaction module we just have the commit function, which we should make public and which I think will give us what we need in python.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
binding/rust Issues for the Rust crate rust
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants