Skip to content

Commit

Permalink
Fix checkpoint bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
xianwill committed Aug 11, 2021
1 parent 7f31207 commit 44777d0
Show file tree
Hide file tree
Showing 9 changed files with 588 additions and 313 deletions.
654 changes: 375 additions & 279 deletions Cargo.lock

Large diffs are not rendered by default.

13 changes: 6 additions & 7 deletions python/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
[package]
name = "deltalake-python"
version = "0.5.2"
version = "0.5.0"
authors = ["Qingping Hou <dave2008713@gmail.com>"]
homepage = "https://github.com/delta-io/delta-rs"
license = "Apache-2.0"
description = "Native Delta Lake Python binding based on delta-rs with Pandas integration"
description = "Python binding for delta-rs"
readme = "README.md"
edition = "2018"
keywords = ["deltalake", "delta", "datalake", "pandas", "arrow"]

[lib]
name = "deltalake"
Expand All @@ -20,10 +19,10 @@ env_logger = "0"
# for binary wheel best practice, statically link openssl
reqwest = { version = "*", features = ["native-tls-vendored"] }
serde_json = "1"
arrow = { git = "https://github.com/apache/arrow-rs.git", rev= "fa5acd971c973161f17e69d5c6b50d6e77c7da03" }
arrow = { git = "https://github.com/apache/arrow-rs", rev = "fa5acd971c973161f17e69d5c6b50d6e77c7da03" }

[dependencies.pyo3]
version = "0.14"
version = "0.13"
features = ["extension-module", "abi3", "abi3-py36"]

[dependencies.deltalake]
Expand All @@ -39,13 +38,13 @@ classifier = [
"License :: OSI Approved :: Apache Software License",
"Programming Language :: Python :: 3 :: Only"
]
project-url = { Repo = "https://github.com/delta-io/delta-rs", Documentation = "https://delta-io.github.io/delta-rs/python/", "Bug Tracker" = "https://github.com/delta-io/delta-rs/issues" }
project-url = { Repo = "https://github.com/delta-io/delta-rs" }
requires-dist = [
"pyarrow>=4",
'numpy<1.20.0;python_version<="3.6"',
'dataclasses;python_version<="3.6"',
'types-dataclasses;python_version<="3.6" and extra == "pandas"',
"pandas; extra == 'pandas' or extra == 'devel'",
"pandas; extra == 'pandas'",
"mypy; extra == 'devel'",
"isort; extra == 'devel'",
"pytest; extra == 'devel'",
Expand Down
12 changes: 8 additions & 4 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake"
version = "0.4.1"
version = "0.4.0"
authors = ["Qingping Hou <dave2008713@gmail.com>"]
homepage = "https://github.com/delta-io/delta.rs"
license = "Apache-2.0"
Expand Down Expand Up @@ -43,11 +43,15 @@ maplit = { version = "1", optional = true }
# High-level writer
parquet-format = "~2.6.1"

arrow = { git = "https://github.com/apache/arrow-rs.git", rev= "fa5acd971c973161f17e69d5c6b50d6e77c7da03" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "a4f628221197030cd6d6ebbcb666478ece078e71", optional = true }
parquet = { git = "https://github.com/apache/arrow-rs.git", rev= "fa5acd971c973161f17e69d5c6b50d6e77c7da03" }
arrow = { git = "https://github.com/apache/arrow-rs", rev = "fa5acd971c973161f17e69d5c6b50d6e77c7da03" }
parquet = { git = "https://github.com/apache/arrow-rs", rev = "fa5acd971c973161f17e69d5c6b50d6e77c7da03" }
datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "4ddd2f5e7582ffe662aea27bbb74c58cd0715152", optional = true }

crossbeam = { version = "0", optional = true }

cfg-if = "1"
async-trait = "0.1"

# NOTE: disable rust-dataframe integration since it currently doesn't have a
# version published in crates.io
# rust-dataframe = {version = "0.*", optional = true }
Expand Down
10 changes: 6 additions & 4 deletions rust/src/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use super::open_table_with_version;
use super::schema::*;
use super::storage;
use super::storage::{StorageBackend, StorageError};
use super::writer::time_utils;
use super::{CheckPoint, DeltaTableError, DeltaTableState};

/// Error returned when the CheckPointWriter is unable to write a checkpoint.
Expand Down Expand Up @@ -194,7 +195,7 @@ impl CheckPointWriter {
min_reader_version: state.min_reader_version(),
min_writer_version: state.min_writer_version(),
}))
// metadata
// metaData
.chain(std::iter::once(action::Action::metaData(
action::MetaData::try_from(current_metadata.clone())?,
)))
Expand Down Expand Up @@ -257,6 +258,8 @@ fn checkpoint_add_from_state(
) -> Result<Value, ArrowError> {
let mut v = serde_json::to_value(action::Action::add(add.clone()))?;

v["add"]["dataChange"] = Value::Bool(false);

if !add.partition_values.is_empty() {
let mut partition_values_parsed: HashMap<String, Value> = HashMap::new();

Expand Down Expand Up @@ -405,9 +408,8 @@ fn apply_stats_conversion(
if let Some(v) = v {
let ts = v
.as_str()
.and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
.map(|dt| serde_json::Number::from(dt.timestamp_nanos()))
.map(Value::Number);
.and_then(|s| time_utils::timestamp_micros_from_stats_string(s).ok())
.map(|n| Value::Number(serde_json::Number::from(n)));

if let Some(ts) = ts {
*v = ts;
Expand Down
85 changes: 80 additions & 5 deletions rust/src/delta_arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,12 @@ impl TryFrom<&schema::SchemaDataType> for ArrowDataType {
}
"date" => {
// A calendar date, represented as a year-month-day triple without a
// timezone. Stored as 4 bytes integer representing days sinece 1970-01-01
// timezone.
Ok(ArrowDataType::Date32)
}
"timestamp" => {
// Issue: https://github.com/delta-io/delta/issues/643
Ok(ArrowDataType::Timestamp(TimeUnit::Nanosecond, None))
Ok(ArrowDataType::Timestamp(TimeUnit::Microsecond, None))
}
s => Err(ArrowError::SchemaError(format!(
"Invalid data type for Arrow: {}",
Expand Down Expand Up @@ -185,6 +185,21 @@ pub(crate) fn delta_log_schema_for_table(
))),
true
),
ArrowField::new(
"configuration",
ArrowDataType::Map(
Box::new(ArrowField::new(
"key_value",
ArrowDataType::Struct(vec![
ArrowField::new("key", ArrowDataType::Utf8, false),
ArrowField::new("value", ArrowDataType::Utf8, true),
]),
false
)),
false
),
true
),
ArrowField::new(
"format",
ArrowDataType::Struct(vec![
Expand Down Expand Up @@ -311,19 +326,33 @@ pub(crate) fn delta_log_schema_for_table(
.fields()
.iter()
.map(|f| f.to_owned())
.partition(|field| partition_columns.contains(field.name()));
.partition(|field| partition_columns.contains(&field.name()));

let mut stats_parsed_fields: Vec<ArrowField> =
vec![ArrowField::new("numRecords", ArrowDataType::Int64, true)];

if !non_partition_fields.is_empty() {
stats_parsed_fields.extend(["minValues", "maxValues", "nullCount"].iter().map(|name| {
let mut max_min_vec = Vec::new();
non_partition_fields
.iter()
.for_each(|f| max_min_schema_for_fields(&mut max_min_vec, f));

stats_parsed_fields.extend(["minValues", "maxValues"].iter().map(|name| {
ArrowField::new(
name,
ArrowDataType::Struct(non_partition_fields.clone()),
ArrowDataType::Struct(max_min_vec.clone()),
true,
)
}));

let mut null_count_vec = Vec::new();
non_partition_fields
.iter()
.for_each(|f| null_count_schema_for_fields(&mut null_count_vec, f));
let null_count_struct =
ArrowField::new("nullCount", ArrowDataType::Struct(null_count_vec), true);

stats_parsed_fields.push(null_count_struct);
}

let mut add_fields = ADD_FIELDS.clone();
Expand Down Expand Up @@ -354,6 +383,52 @@ pub(crate) fn delta_log_schema_for_table(
std::sync::Arc::new(arrow_schema)
}

fn max_min_schema_for_fields(dest: &mut Vec<ArrowField>, f: &ArrowField) {
match f.data_type() {
ArrowDataType::Struct(struct_fields) => {
let mut child_dest = Vec::new();

for f in struct_fields {
max_min_schema_for_fields(&mut child_dest, f);
}

dest.push(ArrowField::new(
f.name(),
ArrowDataType::Struct(child_dest),
true,
));
}
// don't compute min or max for list or map types
ArrowDataType::List(_) | ArrowDataType::Map(_, _) => { /* noop */ }
_ => {
let f = f.clone();
dest.push(f);
}
}
}

fn null_count_schema_for_fields(dest: &mut Vec<ArrowField>, f: &ArrowField) {
match f.data_type() {
ArrowDataType::Struct(struct_fields) => {
let mut child_dest = Vec::new();

for f in struct_fields {
null_count_schema_for_fields(&mut child_dest, f);
}

dest.push(ArrowField::new(
f.name(),
ArrowDataType::Struct(child_dest),
true,
));
}
_ => {
let f = ArrowField::new(f.name(), ArrowDataType::Int64, true);
dest.push(f);
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
9 changes: 6 additions & 3 deletions rust/src/storage/file/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,18 +114,21 @@ impl StorageBackend for FileStorageBackend {
f.sync_all().await?;
drop(f);

match self.rename_obj(tmp_path, path).await {
// atomic rename with swap=true only possible if both paths exists.
let swap = Path::new(path).exists();

match rename::atomic_rename(tmp_path, path, swap) {
Ok(_) => Ok(()),
Err(e) => {
// If rename failed, clean up the temp file.
self.delete_obj(tmp_path).await?;
//self.delete_obj(tmp_path).await?;
Err(e)
}
}
}

async fn rename_obj(&self, src: &str, dst: &str) -> Result<(), StorageError> {
rename::atomic_rename(src, dst)
rename::atomic_rename(src, dst, false)
}

async fn delete_obj(&self, path: &str) -> Result<(), StorageError> {
Expand Down
60 changes: 49 additions & 11 deletions rust/src/storage/file/rename.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::StorageError;
mod imp {
use super::*;

pub fn atomic_rename(from: &str, to: &str) -> Result<(), StorageError> {
pub fn atomic_rename(from: &str, to: &str, _swap: bool) -> Result<(), StorageError> {
// doing best effort in windows since there is no native atomic rename support
if std::fs::metadata(to).is_ok() {
return Err(StorageError::AlreadyExists(to.to_string()));
Expand All @@ -24,10 +24,10 @@ mod imp {
CString::new(p).map_err(|e| StorageError::Generic(format!("{}", e)))
}

pub fn atomic_rename(from: &str, to: &str) -> Result<(), StorageError> {
pub fn atomic_rename(from: &str, to: &str, swap: bool) -> Result<(), StorageError> {
let cs_from = to_c_string(from)?;
let cs_to = to_c_string(to)?;
let ret = unsafe { platform_specific_rename(cs_from.as_ptr(), cs_to.as_ptr()) };
let ret = unsafe { platform_specific_rename(cs_from.as_ptr(), cs_to.as_ptr(), swap) };

if ret != 0 {
let e = errno::errno();
Expand All @@ -45,29 +45,39 @@ mod imp {
}

#[allow(unused_variables)]
unsafe fn platform_specific_rename(from: *const libc::c_char, to: *const libc::c_char) -> i32 {
unsafe fn platform_specific_rename(
from: *const libc::c_char,
to: *const libc::c_char,
swap: bool,
) -> i32 {
cfg_if::cfg_if! {
if #[cfg(all(target_os = "linux", target_env = "gnu"))] {
cfg_if::cfg_if! {
if #[cfg(glibc_renameat2)] {
libc::renameat2(libc::AT_FDCWD, from, libc::AT_FDCWD, to, libc::RENAME_NOREPLACE)
let flag = if swap { libc::RENAME_EXCHANGE } else { libc::RENAME_NOREPLACE };
libc::renameat2(libc::AT_FDCWD, from, libc::AT_FDCWD, to, flag)
} else {
// target has old glibc (< 2.28), we would need to invoke syscall manually
unimplemented!()
}
}
} else if #[cfg(target_os = "macos")] {
libc::renamex_np(from, to, libc::RENAME_EXCL)
let flag = if swap { libc::RENAME_SWAP } else { libc::RENAME_EXCL };
libc::renamex_np(from, to, flag)
} else {
unimplemented!()
}
}
}
}

/// Atomically renames `from` to `to`.
/// - if `swap` is `true` then both `from` and `to` have to exist;
/// - if `swap` is `false` then `from` has to exist, but `to` is not;
/// otherwise the operation will fail.
#[inline]
pub fn atomic_rename(from: &str, to: &str) -> Result<(), StorageError> {
imp::atomic_rename(from, to)
pub fn atomic_rename(from: &str, to: &str, swap: bool) -> Result<(), StorageError> {
imp::atomic_rename(from, to, swap)
}

#[cfg(test)]
Expand All @@ -85,7 +95,7 @@ mod tests {
let c = &tmp_dir.path().join("c");

// unsuccessful move not_exists to C, not_exists is missing
match atomic_rename("not_exists", c.to_str().unwrap()) {
match atomic_rename("not_exists", c.to_str().unwrap(), false) {
Err(StorageError::Io { source: e }) => {
cfg_if::cfg_if! {
if #[cfg(target_os = "windows")] {
Expand Down Expand Up @@ -114,18 +124,46 @@ mod tests {
// successful move A to C
assert!(a.exists());
assert!(!c.exists());
atomic_rename(a.to_str().unwrap(), c.to_str().unwrap()).unwrap();
atomic_rename(a.to_str().unwrap(), c.to_str().unwrap(), false).unwrap();
assert!(!a.exists());
assert!(c.exists());

// unsuccessful move B to C, C already exists, B is not deleted
assert!(b.exists());
match atomic_rename(b.to_str().unwrap(), c.to_str().unwrap()) {
match atomic_rename(b.to_str().unwrap(), c.to_str().unwrap(), false) {
Err(StorageError::AlreadyExists(p)) => assert_eq!(p, c.to_str().unwrap()),
_ => panic!("unexpected"),
}
assert!(b.exists());
assert_eq!(std::fs::read_to_string(c).unwrap(), "a");

// until https://github.com/delta-io/delta-rs/issues/377 is resolved
cfg_if::cfg_if! {
if #[cfg(target_os = "windows")] {
if true {
return;
}
}
}

// successful swaps B to C
atomic_rename(b.to_str().unwrap(), c.to_str().unwrap(), true).unwrap();
assert!(b.exists());
assert!(c.exists());
assert_eq!(std::fs::read_to_string(b).unwrap(), "a");
assert_eq!(std::fs::read_to_string(c).unwrap(), "b");

// unsuccessful swap C to D, D does not exist
let d = &tmp_dir.path().join("d");
assert!(!d.exists());
match atomic_rename(c.to_str().unwrap(), d.to_str().unwrap(), true) {
Err(StorageError::Io { source }) => {
assert!(source.to_string().starts_with("failed to rename"));
}
_ => panic!("unexpected"),
}
assert!(c.exists());
assert!(!d.exists());
}

fn create_file(dir: &Path, name: &str) -> PathBuf {
Expand Down
Loading

0 comments on commit 44777d0

Please sign in to comment.