Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: optimistic transaction protocol #632

Merged
merged 87 commits into from
Apr 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
87 commits
Select commit Hold shift + click to select a range
aedecde
feat: initial conflict checker methods
roeap Jun 27, 2022
224b826
Merge branch 'main' into conflict-checker
roeap Jun 28, 2022
3243ee5
initialize conflict checker
roeap Jun 28, 2022
bf55226
feat: collect more transaction infos
roeap Jun 29, 2022
595e91b
docs: add some comments
roeap Jul 4, 2022
9c4bb02
Merge branch 'main' into conflict-checker
roeap Jul 4, 2022
139c1a4
Merge branch 'main' into conflict-checker
roeap Jul 8, 2022
fcc7c90
chore: small fixes
roeap Jul 10, 2022
bb4cfeb
Merge branch 'main' into conflict-checker
roeap Jul 11, 2022
1460d91
Merge branch 'main' into conflict-checker
roeap Dec 30, 2022
eabb5bd
Merge branch 'main' into conflict-checker
roeap Jan 26, 2023
3d91509
Merge branch 'main' into conflict-checker
roeap Jan 27, 2023
d3f10e6
chore: clippy fix
roeap Feb 17, 2023
9fea909
chore: bump datqafusion and arrow
roeap Feb 17, 2023
40611aa
chore!: update pyo3 function signatures
roeap Feb 17, 2023
03ec7bc
feat: move and update optimize command
roeap Feb 17, 2023
dbb8be9
fix: add missing feature cfg
roeap Feb 17, 2023
9622ef5
fix: remove unwrap
roeap Feb 17, 2023
3bdcd13
docs: fix optimize documentation
roeap Feb 17, 2023
19c7064
Merge branch 'optimize-operation' into conflict-checker
roeap Feb 18, 2023
2cfc391
chore: move commit_uri_from_version
roeap Feb 20, 2023
e1e0b0b
chore: build with ConflictChecker
roeap Feb 20, 2023
dc96f19
feat: typed CommitInfo
roeap Feb 20, 2023
8b65628
feat: implement PruningStatistics for DeltaTableState
roeap Feb 21, 2023
59df7b0
feat: basic expression parsing
roeap Feb 21, 2023
466999e
test: add basic parser tests
roeap Feb 21, 2023
c8034b0
test: test files with predicate
roeap Feb 21, 2023
a048d67
chore: some error handling
roeap Feb 21, 2023
1f923ac
chore: going green
roeap Feb 21, 2023
58f23ea
fix: feature-gate df import
roeap Feb 21, 2023
a831d1a
fix: parquet2 tests
roeap Feb 21, 2023
fbb0083
feat: initial re-try loop
roeap Feb 22, 2023
5ac369a
fix: clippy
roeap Feb 22, 2023
bb5c4f0
Merge branch 'main' into conflict-checker
roeap Feb 23, 2023
a792dbe
chore: simplify lifetimes
roeap Feb 23, 2023
c1bd4ea
chore: flatten
roeap Feb 23, 2023
adbfc09
chore: cleanup
roeap Feb 23, 2023
a638b50
test: separate generating commit bytes
roeap Mar 2, 2023
99ed5a6
Merge branch 'main' into conflict-checker
roeap Mar 2, 2023
0562e17
test: try_commit_transaction
roeap Mar 2, 2023
2a2c34f
test: add action serilaization test
roeap Mar 2, 2023
0c1e5af
feat: improve table config access
roeap Mar 2, 2023
b8d40a7
chore: use table_config
roeap Mar 2, 2023
1f0f27f
feat: assure table config
roeap Mar 2, 2023
58b25aa
test: add sanity check for isolation level downgrade
roeap Mar 2, 2023
10486c5
refactor: move io out of conflict checker
roeap Mar 2, 2023
24cc38a
test: start scenario tests
roeap Mar 2, 2023
1825a23
fix: load command for local tables
roeap Mar 3, 2023
bd276b6
Merge branch 'fix-load' into conflict-checker
roeap Mar 3, 2023
7821ea2
chore: clippy
roeap Mar 3, 2023
a149c81
Merge branch 'fix-load' into conflict-checker
roeap Mar 3, 2023
cad5865
feat: extend configuration handling
roeap Mar 3, 2023
3c93e59
Merge branch 'main' into conflict-checker
roeap Mar 4, 2023
ad8141c
chore: clippy fix
roeap Mar 4, 2023
a844e79
feat: typed commit info
roeap Mar 4, 2023
b5c2eb0
Merge branch 'commit-info' into conflict-checker
roeap Mar 4, 2023
f69bfc7
chore: cleanup after merge
roeap Mar 4, 2023
93dcfaf
feat: propagate errors
roeap Mar 4, 2023
cee283f
test: start conflict checker tests
roeap Mar 4, 2023
711c59d
chore: cleanup
roeap Mar 5, 2023
c209fc9
feat: typed commit info
roeap Mar 4, 2023
b21c237
Merge branch 'commit-info' into conflict-checker
roeap Mar 6, 2023
af22900
Merge branch 'main' into conflict-checker
roeap Mar 7, 2023
3bf44f7
refactor: pass objectore as a ref to commit
roeap Mar 7, 2023
7cfdde3
test: check concurrent delete in fsck
roeap Mar 9, 2023
b4dc95e
chore: clippy
roeap Mar 9, 2023
906fa7c
chore: fmt
roeap Mar 9, 2023
d5eaa8c
Merge branch 'main' into conflict-checker
roeap Mar 10, 2023
2378123
style: import order
roeap Mar 10, 2023
f7ab749
chore: adopt fixed pruning approach
roeap Mar 10, 2023
4c61b1e
test: add delete-delete test
roeap Mar 10, 2023
88072e9
refactor: make ConflictChecker independent of operation
roeap Mar 11, 2023
576f5db
refactor: move transaction info creation out of conflict checker
roeap Mar 11, 2023
1ad09ad
test: disjoint delete-read
roeap Mar 11, 2023
fd9da63
test: add disallowed transaction tests
roeap Mar 11, 2023
0dda8cb
chore: simplify tests
roeap Mar 11, 2023
207d65f
Merge branch 'main' into conflict-checker
roeap Mar 11, 2023
97d3405
chore: cleanup
roeap Mar 11, 2023
54b1a85
chore: cleanup
roeap Mar 17, 2023
3d801e0
Merge branch 'main' into conflict-checker
roeap Mar 17, 2023
3eb23b3
chore: clippy
roeap Mar 17, 2023
d390884
chore: parquet2 clippy
roeap Mar 17, 2023
68f1b6e
Apply suggestions from code review
roeap Mar 19, 2023
740eeff
fix: chack conflicts against next commit only
roeap Mar 28, 2023
bfcb393
test: re-enable ignored optimize test
roeap Mar 28, 2023
1b1e4ad
Merge branch 'main' into conflict-checker
roeap Apr 4, 2023
4cbd158
chore: rename PrefixObjectStore imports
roeap Apr 4, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ readme = "README.md"
edition = "2021"

[dependencies]
arrow = { version = "33.0.0", optional = true }
arrow = { version = "33", optional = true }
async-trait = "0.1"
bytes = "1"
chrono = { version = "0.4.22", default-features = false, features = ["clock"] }
Expand All @@ -26,7 +26,7 @@ log = "0"
libc = ">=0.2.90, <1"
num-bigint = "0.4"
num-traits = "0.2.15"
object_store = "0.5.3"
object_store = "0.5.6"
once_cell = "1.16.0"
parking_lot = "0.12"
parquet = { version = "33", features = [
Expand Down Expand Up @@ -57,6 +57,8 @@ datafusion = { version = "19", optional = true }
datafusion-expr = { version = "19", optional = true }
datafusion-common = { version = "19", optional = true }
datafusion-proto = { version = "19", optional = true }
datafusion-sql = { version = "19", optional = true }
sqlparser = { version = "0.30", optional = true }

# NOTE dependencies only for integration tests
fs_extra = { version = "1.2.0", optional = true }
Expand Down Expand Up @@ -91,6 +93,8 @@ datafusion = [
"datafusion-expr",
"datafusion-common",
"datafusion-proto",
"datafusion-sql",
"sqlparser",
"arrow",
"parquet",
]
Expand Down
22 changes: 21 additions & 1 deletion rust/src/action/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,13 @@ pub enum DeltaOperation {
/// The predicate used during the write.
predicate: Option<String>,
},

/// Delete data matching predicate from delta table
Delete {
/// The condition the to be deleted data must match
predicate: Option<String>,
},

/// Represents a Delta `StreamingUpdate` operation.
#[serde(rename_all = "camelCase")]
StreamingUpdate {
Expand Down Expand Up @@ -580,6 +587,7 @@ impl DeltaOperation {
}
DeltaOperation::Create { .. } => "CREATE TABLE",
DeltaOperation::Write { .. } => "WRITE",
DeltaOperation::Delete { .. } => "DELETE",
DeltaOperation::StreamingUpdate { .. } => "STREAMING UPDATE",
DeltaOperation::Optimize { .. } => "OPTIMIZE",
DeltaOperation::FileSystemCheck { .. } => "FSCK",
Expand Down Expand Up @@ -622,7 +630,8 @@ impl DeltaOperation {
Self::Create { .. }
| Self::FileSystemCheck {}
| Self::StreamingUpdate { .. }
| Self::Write { .. } => true,
| Self::Write { .. }
| Self::Delete { .. } => true,
}
}

Expand All @@ -641,9 +650,20 @@ impl DeltaOperation {
match self {
// TODO add more operations
Self::Write { predicate, .. } => predicate.clone(),
Self::Delete { predicate, .. } => predicate.clone(),
_ => None,
}
}

/// Denotes if the operation reads the entire table
pub fn read_whole_table(&self) -> bool {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method and changes_data makes me wonder whether DeltaOperation should instead be a trait so that we require each instance to implement.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed! If OK with you, I would defer exploring that to a follow up PR. To really get a feel for how that API should look like, I would like to work a bit more with operations that actually need the conflict resolution :).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds good to me!

match self {
// TODO just adding one operation example, as currently none of the
// implemented operations scan the entire table.
Self::Write { predicate, .. } if predicate.is_none() => false,
_ => false,
}
}
Comment on lines +659 to +666
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reference implementaition allows for txn to "taint" the entire table, in which case we disregrad analysing the specific actions. The conflict checker covers this behavior in tests, but I haven't investigated yet, when this is actually set. Mainly leaving this fn, as a "reminder" for subsequent PRs soon to come.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does overwrite read the entire table?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or could you provide examples of which operations will?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our operations should actually never use this directly right now. I went through the spark code base, and it seems this is invoked mostly in cases when a plan querying multiple tables with spark has a delta table set as a sink.. This usually uses generic plans and does not invoke the delta operations directly, or rather when we are not too sure what actually happened.

Did not get too deep though. Personally I am a bit conflicted on how to proceed with this. on one hand it does nothing useful right now. On the other hand, we can keep it as a reminder to mindful of this. While right now I think datafusion does not yet have the concept of a sink in their plans, I believe they might be added fairly soon. At this point we may encounter situations, where a plan wants to write to delta, but we have no way of knowing what was scanned.

That said, I do hope that we will be able to inspect the plan metrics and figure out what was read anyhow. Especially since we report the scanned files in out scan operator.

}

/// The SaveMode used when performing a DeltaOperation
Expand Down
7 changes: 7 additions & 0 deletions rust/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use super::schema::*;
use super::table_state::DeltaTableState;
use crate::action::{Add, Stats};
use crate::delta_config::DeltaConfigError;
use crate::operations::transaction::TransactionError;
use crate::operations::vacuum::VacuumBuilder;
use crate::storage::{commit_uri_from_version, ObjectStoreRef};

Expand Down Expand Up @@ -216,6 +217,12 @@ pub enum DeltaTableError {
#[from]
source: std::io::Error,
},
/// Error raised while commititng transaction
#[error("Transaction failed: {source}")]
Transaction {
/// The source error
source: TransactionError,
},
/// Error returned when transaction is failed to be committed because given version already exists.
#[error("Delta transaction failed, version {0} already exists.")]
VersionAlreadyExists(DeltaDataTypeVersion),
Expand Down
4 changes: 2 additions & 2 deletions rust/src/delta_datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ impl TableProvider for DeltaTable {
(!filters.is_empty()).then_some(conjunction(filters.iter().cloned()))
{
let pruning_predicate = PruningPredicate::try_new(predicate, schema.clone())?;
let files_to_prune = pruning_predicate.prune(self)?;
let files_to_prune = pruning_predicate.prune(&self.state)?;
self.get_state()
.files()
.iter()
Expand Down Expand Up @@ -630,7 +630,7 @@ fn to_scalar_value(stat_val: &serde_json::Value) -> Option<ScalarValue> {
}
}

fn to_correct_scalar_value(
pub(crate) fn to_correct_scalar_value(
stat_val: &serde_json::Value,
field_dt: &ArrowDataType,
) -> Option<ScalarValue> {
Expand Down
5 changes: 2 additions & 3 deletions rust/src/operations/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,6 @@ impl std::future::IntoFuture for CreateBuilder {

Box::pin(async move {
let mode = this.mode.clone();
let metadata = this.metadata.clone();
let (mut table, actions, operation) = this.into_table_and_actions()?;
if table.object_store().is_delta_table_location().await? {
match mode {
Expand All @@ -302,10 +301,10 @@ impl std::future::IntoFuture for CreateBuilder {
}
let version = commit(
table.object_store().as_ref(),
0,
&actions,
operation,
metadata,
&table.state,
None,
)
.await?;
table.load_version(version).await?;
Expand Down
16 changes: 5 additions & 11 deletions rust/src/operations/filesystem_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use crate::action::{Action, Add, DeltaOperation, Remove};
use crate::operations::transaction::commit;
use crate::storage::DeltaObjectStore;
use crate::table_state::DeltaTableState;
use crate::DeltaDataTypeVersion;
use crate::{DeltaDataTypeLong, DeltaResult, DeltaTable, DeltaTableError};
use futures::future::BoxFuture;
use futures::StreamExt;
Expand Down Expand Up @@ -50,8 +49,6 @@ pub struct FileSystemCheckMetrics {
}

struct FileSystemCheckPlan {
/// Version of the snapshot provided
version: DeltaDataTypeVersion,
/// Delta object store for handling data files
store: Arc<DeltaObjectStore>,
/// Files that no longer exists in undlying ObjectStore but have active add actions
Expand Down Expand Up @@ -88,7 +85,6 @@ impl FileSystemCheckBuilder {
async fn create_fsck_plan(&self) -> DeltaResult<FileSystemCheckPlan> {
let mut files_relative: HashMap<&str, &Add> =
HashMap::with_capacity(self.state.files().len());
let version = self.state.version();
let store = self.store.clone();

for active in self.state.files() {
Expand Down Expand Up @@ -118,14 +114,13 @@ impl FileSystemCheckBuilder {

Ok(FileSystemCheckPlan {
files_to_remove,
version,
store,
})
}
}

impl FileSystemCheckPlan {
pub async fn execute(self) -> DeltaResult<FileSystemCheckMetrics> {
pub async fn execute(self, snapshot: &DeltaTableState) -> DeltaResult<FileSystemCheckMetrics> {
if self.files_to_remove.is_empty() {
return Ok(FileSystemCheckMetrics {
dry_run: false,
Expand All @@ -135,8 +130,6 @@ impl FileSystemCheckPlan {

let mut actions = Vec::with_capacity(self.files_to_remove.len());
let mut removed_file_paths = Vec::with_capacity(self.files_to_remove.len());
let version = self.version;
let store = &self.store;

for file in self.files_to_remove {
let deletion_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
Expand All @@ -154,10 +147,11 @@ impl FileSystemCheckPlan {
}

commit(
store,
version + 1,
self.store.as_ref(),
&actions,
DeltaOperation::FileSystemCheck {},
snapshot,
// TODO pass through metadata
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I plan to do a pass through all our operations in the next Pr, where also I'll be focussing on testing the commit function, and addressing these kinds of TODOs.

None,
)
.await?;
Expand Down Expand Up @@ -188,7 +182,7 @@ impl std::future::IntoFuture for FileSystemCheckBuilder {
));
}

let metrics = plan.execute().await?;
let metrics = plan.execute(&this.state).await?;
let mut table = DeltaTable::new_with_state(this.store, this.state);
table.update().await?;
Ok((table, metrics))
Expand Down
10 changes: 7 additions & 3 deletions rust/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> {
this.target_size.to_owned(),
writer_properties,
)?;
let metrics = plan.execute(this.store.clone()).await?;
let metrics = plan.execute(this.store.clone(), &this.snapshot).await?;
let mut table = DeltaTable::new_with_state(this.store, this.snapshot);
table.update().await?;
Ok((table, metrics))
Expand Down Expand Up @@ -270,7 +270,11 @@ pub struct MergePlan {

impl MergePlan {
/// Peform the operations outlined in the plan.
pub async fn execute(self, object_store: ObjectStoreRef) -> Result<Metrics, DeltaTableError> {
pub async fn execute(
self,
object_store: ObjectStoreRef,
snapshot: &DeltaTableState,
) -> Result<Metrics, DeltaTableError> {
let mut actions = vec![];
let mut metrics = self.metrics;

Expand Down Expand Up @@ -368,9 +372,9 @@ impl MergePlan {

commit(
object_store.as_ref(),
self.read_table_version + 1,
&actions,
self.input_parameters.into(),
snapshot,
Some(metadata),
)
.await?;
Expand Down
Loading