Skip to content

Commit

Permalink
Merge branch 'main' into partition-cols-2494
Browse files Browse the repository at this point in the history
  • Loading branch information
rtyler authored Jul 11, 2024
2 parents c58733f + 197a474 commit 4e88459
Show file tree
Hide file tree
Showing 14 changed files with 314 additions and 76 deletions.
2 changes: 1 addition & 1 deletion .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
crates/ @wjones127 @roeap @rtyler
crates/ @wjones127 @roeap @rtyler @hntd187
delta-inspect/ @wjones127 @rtyler
proofs/ @houqp
python/ @wjones127 @fvaleye @roeap @ion-elgreco
Expand Down
1 change: 0 additions & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ jobs:
matrix:
os:
- ubuntu-latest
- macos-11
- windows-latest
runs-on: ${{ matrix.os }}
env:
Expand Down
2 changes: 1 addition & 1 deletion crates/azure/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-azure"
version = "0.1.2"
version = "0.1.3"
authors.workspace = true
keywords.workspace = true
readme.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ tokio = { workspace = true, features = [

# other deps (these should be organized and pulled into workspace.dependencies as necessary)
cfg-if = "1"
dashmap = "5"
dashmap = "6"
errno = "0.3"
either = "1.8"
fix-hidden-lifetime-bug = "0.2"
Expand Down
115 changes: 101 additions & 14 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,14 +392,13 @@ impl DeltaScanConfigBuilder {

/// Build a DeltaScanConfig and ensure no column name conflicts occur during downstream processing
pub fn build(&self, snapshot: &DeltaTableState) -> DeltaResult<DeltaScanConfig> {
let input_schema = snapshot.input_schema()?;
let mut file_column_name = None;
let mut column_names: HashSet<&String> = HashSet::new();
for field in input_schema.fields.iter() {
column_names.insert(field.name());
}
let file_column_name = if self.include_file_column {
let input_schema = snapshot.input_schema()?;
let mut column_names: HashSet<&String> = HashSet::new();
for field in input_schema.fields.iter() {
column_names.insert(field.name());
}

if self.include_file_column {
match &self.file_column_name {
Some(name) => {
if column_names.contains(name) {
Expand All @@ -409,7 +408,7 @@ impl DeltaScanConfigBuilder {
)));
}

file_column_name = Some(name.to_owned())
Some(name.to_owned())
}
None => {
let prefix = PATH_COLUMN;
Expand All @@ -421,10 +420,12 @@ impl DeltaScanConfigBuilder {
name = format!("{}_{}", prefix, idx);
}

file_column_name = Some(name);
Some(name)
}
}
}
} else {
None
};

Ok(DeltaScanConfig {
file_column_name,
Expand Down Expand Up @@ -454,7 +455,7 @@ pub(crate) struct DeltaScanBuilder<'a> {
projection: Option<&'a Vec<usize>>,
limit: Option<usize>,
files: Option<&'a [Add]>,
config: DeltaScanConfig,
config: Option<DeltaScanConfig>,
schema: Option<SchemaRef>,
}

Expand All @@ -472,7 +473,7 @@ impl<'a> DeltaScanBuilder<'a> {
files: None,
projection: None,
limit: None,
config: DeltaScanConfig::default(),
config: None,
schema: None,
}
}
Expand All @@ -498,7 +499,7 @@ impl<'a> DeltaScanBuilder<'a> {
}

pub fn with_scan_config(mut self, config: DeltaScanConfig) -> Self {
self.config = config;
self.config = Some(config);
self
}

Expand All @@ -509,7 +510,11 @@ impl<'a> DeltaScanBuilder<'a> {
}

pub async fn build(self) -> DeltaResult<DeltaScan> {
let config = self.config;
let config = match self.config {
Some(config) => config,
None => DeltaScanConfigBuilder::new().build(self.snapshot)?,
};

let schema = match self.schema {
Some(schema) => schema,
None => {
Expand Down Expand Up @@ -1749,7 +1754,10 @@ mod tests {
use arrow::datatypes::{DataType, Field, Schema};
use chrono::{TimeZone, Utc};
use datafusion::assert_batches_sorted_eq;
use datafusion::datasource::physical_plan::ParquetExec;
use datafusion::physical_plan::empty::EmptyExec;
use datafusion::physical_plan::{visit_execution_plan, ExecutionPlanVisitor};
use datafusion_expr::lit;
use datafusion_proto::physical_plan::AsExecutionPlan;
use datafusion_proto::protobuf;
use object_store::path::Path;
Expand Down Expand Up @@ -2187,4 +2195,83 @@ mod tests {

df.collect().await.unwrap();
}

#[tokio::test]
async fn test_delta_scan_builder_no_scan_config() {
use crate::datafusion::prelude::SessionContext;
let arr: Arc<dyn Array> = Arc::new(arrow::array::StringArray::from(vec!["s"]));
let batch = RecordBatch::try_from_iter_with_nullable(vec![("a", arr, false)]).unwrap();
let table = crate::DeltaOps::new_in_memory()
.write(vec![batch])
.with_save_mode(crate::protocol::SaveMode::Append)
.await
.unwrap();

let ctx = SessionContext::new();
let scan =
DeltaScanBuilder::new(table.snapshot().unwrap(), table.log_store(), &ctx.state())
.with_filter(Some(col("a").eq(lit("s"))))
.build()
.await
.unwrap();

let mut visitor = ParquetPredicateVisitor::default();
visit_execution_plan(&scan, &mut visitor).unwrap();

assert_eq!(visitor.predicate.unwrap().to_string(), "a@0 = s");
assert_eq!(
visitor.pruning_predicate.unwrap().orig_expr().to_string(),
"a@0 = s"
);
}

#[tokio::test]
async fn test_delta_scan_builder_scan_config_disable_pushdown() {
use crate::datafusion::prelude::SessionContext;
let arr: Arc<dyn Array> = Arc::new(arrow::array::StringArray::from(vec!["s"]));
let batch = RecordBatch::try_from_iter_with_nullable(vec![("a", arr, false)]).unwrap();
let table = crate::DeltaOps::new_in_memory()
.write(vec![batch])
.with_save_mode(crate::protocol::SaveMode::Append)
.await
.unwrap();

let ctx = SessionContext::new();
let snapshot = table.snapshot().unwrap();
let scan = DeltaScanBuilder::new(snapshot, table.log_store(), &ctx.state())
.with_filter(Some(col("a").eq(lit("s"))))
.with_scan_config(
DeltaScanConfigBuilder::new()
.with_parquet_pushdown(false)
.build(snapshot)
.unwrap(),
)
.build()
.await
.unwrap();

let mut visitor = ParquetPredicateVisitor::default();
visit_execution_plan(&scan, &mut visitor).unwrap();

assert!(visitor.predicate.is_none());
assert!(visitor.pruning_predicate.is_none());
}

#[derive(Default)]
struct ParquetPredicateVisitor {
predicate: Option<Arc<dyn PhysicalExpr>>,
pruning_predicate: Option<Arc<PruningPredicate>>,
}

impl ExecutionPlanVisitor for ParquetPredicateVisitor {
type Error = DataFusionError;

fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
if let Some(parquet_exec) = plan.as_any().downcast_ref::<ParquetExec>() {
self.predicate = parquet_exec.predicate().cloned();
self.pruning_predicate = parquet_exec.pruning_predicate().cloned();
}
Ok(true)
}
}
}
6 changes: 3 additions & 3 deletions crates/core/src/kernel/snapshot/log_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,13 +373,13 @@ struct CheckpointMetadata {
#[allow(unreachable_pub)] // used by acceptance tests (TODO make an fn accessor?)
pub version: i64,
/// The number of actions that are stored in the checkpoint.
pub(crate) size: i32,
pub(crate) size: i64,
/// The number of fragments if the last checkpoint was written in multiple parts.
pub(crate) parts: Option<i32>,
/// The number of bytes of the checkpoint.
pub(crate) size_in_bytes: Option<i32>,
pub(crate) size_in_bytes: Option<i64>,
/// The number of AddFile actions in the checkpoint.
pub(crate) num_of_add_files: Option<i32>,
pub(crate) num_of_add_files: Option<i64>,
/// The schema of the checkpoint file.
pub(crate) checkpoint_schema: Option<Schema>,
/// The checksum of the last checkpoint JSON.
Expand Down
2 changes: 1 addition & 1 deletion crates/gcp/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-gcp"
version = "0.2.1"
version = "0.2.2"
authors.workspace = true
keywords.workspace = true
readme.workspace = true
Expand Down
16 changes: 8 additions & 8 deletions docs/how-delta-lake-works/architecture-of-delta-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ tmp/some-table
└── 00000000000000000000.json
```

The Parquet file stores the data that was written. The `_delta_log` directory stores metadata about the transactions. Let's inspect the `_delta_log/00000000000000000000.json` file.
The Parquet file stores the data that was written. The `_delta_log` directory stores metadata about the transactions. Let's inspect the `_delta_log/00000000000000000000.json` file.

```json
{
Expand Down Expand Up @@ -78,9 +78,9 @@ The Parquet file stores the data that was written. The `_delta_log` directory s

The transaction log file contains the following information:

* the files added to the Delta table
* schema of the files
* column level metadata including the min/max value for each file
- the files added to the Delta table
- schema of the files
- column level metadata including the min/max value for each file

Create another pandas DataFrame and append it to the Delta table to see how this transaction is recorded.

Expand Down Expand Up @@ -194,18 +194,18 @@ Here are the contents of the `_delta_log/0002.json` file:
}
```

This transaction adds a data file and marks the two exising data files for removal. Marking a file for removal in the transaction log is known as "tombstoning the file" or a "logical delete". This is different from a "physical delete" which actually removes the data file from storage.
This transaction adds a data file and marks the two exising data files for removal. Marking a file for removal in the transaction log is known as "tombstoning the file" or a "logical delete". This is different from a "physical delete" which actually removes the data file from storage.

## How Delta table operations differ from data lakes

Data lakes consist of data files persisted in storage. They don't have a transaction log that retain metadata about the transactions.
Data lakes consist of data files persisted in storage. They don't have a transaction log that retain metadata about the transactions.

Data lakes perform transactions differently than Delta tables.

When you perform an overwrite tranasction with a Delta table, you logically delete the exiting data without physically removing it.

Data lakes don't support logical deletes, so you have to physically delete the data from storage.

Logical data operations are safer because they can be rolled back if they don't complete successfully. Physically removing data from storage can be dangerous, especially if it's before a transaction is complete.
Logical data operations are safer because they can be rolled back if they don't complete successfully. Physically removing data from storage can be dangerous, especially if it's before a transaction is complete.

We're now ready to look into Delta Lake ACID transactions in more detail.
We're now ready to look into [Delta Lake ACID transactions](../how-delta-lake-works/delta-lake-acid-transactions.md) in more detail.
Loading

0 comments on commit 4e88459

Please sign in to comment.