Skip to content

Commit

Permalink
feat: arrow backed log replay and table state (#2037)
Browse files Browse the repository at this point in the history
# Description

This is still very much a work in progress, opening it up for visibility
and discussion.

Finally I do hope that we can make the switch to arrow based log
handling. Aside from hopefully advantages in the memory footprint, I
also believe it opens us up to many future optimizations as well.

To make the transition we introduce two new structs 

- `Snapshot` - a half lazy version of the Snapshot, which only tries to
get `Protocol` & `Metadata` actions ASAP. Of course these drive all our
planning activities and without them there is not much we can do.
- `EagerSnapshot` - An intermediary structure, which eagerly loads file
actions and does log replay to serve as a compatibility laver for the
current `DeltaTable` APIs.

One conceptually larger change is related to how we view the
availability of information. Up until now `DeltaTableState` could be
initialized empty, containing no useful information for any code to work
with. State (snapshots) now always needs to be created valid. The thing
that may not yet be initialized is the `DeltaTable`, which now only
carries the table configuration and the `LogStore`. the state / snapshot
is now optional. Consequently all code that works against a snapshot no
longer needs to handle that matadata / schema etc may not be available.

This also has implications for the datafusion integration. We already
are working against snapshots mostly, but should abolish most traits
implemented for `DeltaTable` as this does not provide the information
(and never has) that is al least required to execute a query.

Some larger notable changes include:

* remove `DeltaTableMetadata` and always use `Metadata` action.
* arrow and parquet are now required, as such the features got removed.
Personalyl I would also argue, that if you cannot read checkpoints, you
cannot read delta tables :). - so hopefully users weren't using
arrow-free versions.

### Major follow-ups:

* (pre-0.17) review integration with `log_store` and `object_store`.
Currently we make use mostly of `ObjectStore` inside the state handling.
What we really use is `head` / `list_from` / `get` - my hope would be
that we end up with a single abstraction...
* test cleanup - we are currently dealing with test flakiness and have
several approaches to scaffolding tests. SInce we have the
`deltalake-test` crate now, this can be reconciled.
* ...
* do more processing on borrowed data ...
* perform file-heavy operations on arrow data
* update checkpoint writing to leverage new state handling and arrow ...
* switch to exposing URL in public APIs

## Questions

* should paths be percent-encoded when written to checkpoint?

# Related Issue(s)

supersedes: #454
supersedes: #1837
closes: #1776
closes: #425 (should also be addressed in the current implementation)
closes: #288 (multi-part checkpoints are deprecated)
related: #435

# Documentation

<!---
Share links to useful documentation
--->

---------

Co-authored-by: R. Tyler Croy <rtyler@brokenco.de>
  • Loading branch information
roeap and rtyler authored Jan 23, 2024
1 parent 61ca275 commit 1a984ce
Show file tree
Hide file tree
Showing 164 changed files with 6,658 additions and 4,604 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ arrow-arith = { version = "49" }
arrow-array = { version = "49" }
arrow-buffer = { version = "49" }
arrow-cast = { version = "49" }
arrow-ipc = { version = "49" }
arrow-json = { version = "49" }
arrow-ord = { version = "49" }
arrow-row = { version = "49" }
arrow-schema = { version = "49" }
Expand Down
4 changes: 2 additions & 2 deletions crates/benchmarks/src/bin/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,10 @@ async fn benchmark_merge_tpcds(
merge: fn(DataFrame, DeltaTable) -> Result<MergeBuilder, DeltaTableError>,
) -> Result<(core::time::Duration, MergeMetrics), DataFusionError> {
let table = DeltaTableBuilder::from_uri(path).load().await?;
let file_count = table.state.files().len();
let file_count = table.snapshot()?.files_count();

let provider = DeltaTableProvider::try_new(
table.state.clone(),
table.snapshot()?.clone(),
table.log_store(),
DeltaScanConfig {
file_column_name: Some("file_path".to_string()),
Expand Down
1 change: 1 addition & 0 deletions crates/deltalake-aws/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ url = { workspace = true }
backoff = { version = "0.4", features = [ "tokio" ] }

[dev-dependencies]
deltalake-core = { path = "../deltalake-core", features = ["datafusion"] }
chrono = { workspace = true }
serial_test = "3"
deltalake-test = { path = "../deltalake-test" }
Expand Down
10 changes: 5 additions & 5 deletions crates/deltalake-aws/tests/integration_s3_dynamodb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,12 +267,11 @@ fn add_action(name: &str) -> Action {
let ts = (SystemTime::now() - Duration::from_secs(1800))
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
Action::Add(Add {
.as_millis();
Add {
path: format!("{}.parquet", name),
size: 396,
partition_values: HashMap::new(),
partition_values_parsed: None,
modification_time: ts as i64,
data_change: true,
stats: None,
Expand All @@ -282,7 +281,8 @@ fn add_action(name: &str) -> Action {
base_row_id: None,
default_row_commit_version: None,
clustering_provider: None,
})
}
.into()
}

async fn prepare_table(context: &IntegrationContext, table_name: &str) -> TestResult<DeltaTable> {
Expand Down Expand Up @@ -322,7 +322,7 @@ async fn append_to_table(
table.log_store().as_ref(),
&actions,
operation,
&table.state,
Some(table.snapshot()?),
metadata,
)
.await
Expand Down
43 changes: 15 additions & 28 deletions crates/deltalake-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,22 @@ features = ["azure", "datafusion", "gcs", "hdfs", "json", "python", "s3", "unity

[dependencies]
# arrow
arrow = { workspace = true, optional = true }
arrow-arith = { workspace = true, optional = true }
arrow-array = { workspace = true, optional = true }
arrow-buffer = { workspace = true, optional = true }
arrow-cast = { workspace = true, optional = true }
arrow-ord = { workspace = true, optional = true }
arrow-row = { workspace = true, optional = true }
arrow-schema = { workspace = true, optional = true, features = ["serde"] }
arrow-select = { workspace = true, optional = true }
arrow = { workspace = true }
arrow-arith = { workspace = true }
arrow-array = { workspace = true }
arrow-buffer = { workspace = true }
arrow-cast = { workspace = true }
arrow-ipc = { workspace = true }
arrow-json = { workspace = true }
arrow-ord = { workspace = true }
arrow-row = { workspace = true }
arrow-schema = { workspace = true, features = ["serde"] }
arrow-select = { workspace = true }
parquet = { workspace = true, features = [
"async",
"object_store",
], optional = true }
] }
pin-project-lite = "^0.2.7"

# datafusion
datafusion = { workspace = true, optional = true }
Expand All @@ -48,6 +51,7 @@ serde_json = { workspace = true }
# "stdlib"
bytes = { workspace = true }
chrono = { workspace = true, default-features = false, features = ["clock"] }
hashbrown = "*"
regex = { workspace = true }
thiserror = { workspace = true }
uuid = { workspace = true, features = ["serde", "v4"] }
Expand Down Expand Up @@ -111,18 +115,7 @@ tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
utime = "0.3"

[features]
arrow = [
"dep:arrow",
"arrow-arith",
"arrow-array",
"arrow-cast",
"arrow-ord",
"arrow-row",
"arrow-schema",
"arrow-select",
"arrow-buffer",
]
default = ["arrow", "parquet"]
default = []
datafusion = [
"dep:datafusion",
"datafusion-expr",
Expand All @@ -131,14 +124,8 @@ datafusion = [
"datafusion-physical-expr",
"datafusion-sql",
"sqlparser",
"arrow",
"parquet",
]
datafusion-ext = ["datafusion"]
json = ["parquet/json"]
python = ["arrow/pyarrow"]
unity-experimental = ["reqwest", "hyper"]

[[bench]]
name = "read_checkpoint"
harness = false
29 changes: 0 additions & 29 deletions crates/deltalake-core/benches/read_checkpoint.rs

This file was deleted.

6 changes: 4 additions & 2 deletions crates/deltalake-core/src/delta_datafusion/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,8 @@ mod test {
.cast_to::<DFSchema>(
&arrow_schema::DataType::Utf8,
&table
.state
.snapshot()
.unwrap()
.input_schema()
.unwrap()
.as_ref()
Expand Down Expand Up @@ -612,7 +613,8 @@ mod test {
assert_eq!(test.expected, actual);

let actual_expr = table
.state
.snapshot()
.unwrap()
.parse_predicate_expression(actual, &session.state())
.unwrap();

Expand Down
Loading

0 comments on commit 1a984ce

Please sign in to comment.