Skip to content

Commit

Permalink
test: loading version 0 Delta table
Browse files Browse the repository at this point in the history
# Description
According to the issue test should fail to load table without snapshot (version 0) but test is written to test that it is possible to read and load Delta Table with version 0 into the Rust (functions `open_table` and `open_table_with_version` work)

# Related Issue(s)
- closes #1831
  • Loading branch information
Dmytro Suvorov committed Nov 20, 2023
2 parents 256970f + e48b8a7 commit 4064550
Show file tree
Hide file tree
Showing 8 changed files with 268 additions and 26 deletions.
27 changes: 14 additions & 13 deletions crates/deltalake-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -676,21 +676,22 @@ mod tests {
);
}

#[tokio::test()]
async fn test_version_zero_table_load() {
let path = "./tests/data/COVID-19_NYT";
let mut latest_table: DeltaTable = crate::open_table(path).await.unwrap();
#[tokio::test()]
async fn test_version_zero_table_load() {
let path = "./tests/data/COVID-19_NYT";
let mut latest_table: DeltaTable = crate::open_table(path).await.unwrap();

let mut version_0_table = crate::open_table_with_version(path, 0).await.unwrap();
let mut version_0_table = crate::open_table_with_version(path, 0).await.unwrap();

let version_0_history = version_0_table.history(None).await.expect("Cannot get table history");
let latest_table_history = latest_table
.history(None)
.await
.expect("Cannot get table history");
let version_0_history = version_0_table
.history(None)
.await
.expect("Cannot get table history");
let latest_table_history = latest_table
.history(None)
.await
.expect("Cannot get table history");

assert_eq!(latest_table_history, version_0_history);
}
assert_eq!(latest_table_history, version_0_history);
}

}
16 changes: 15 additions & 1 deletion crates/deltalake-core/src/operations/convert_to_delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use serde_json::{Map, Value};
use std::{
collections::{HashMap, HashSet},
num::TryFromIntError,
str::Utf8Error,
str::{FromStr, Utf8Error},
sync::Arc,
};

Expand Down Expand Up @@ -82,6 +82,20 @@ pub enum PartitionStrategy {
Hive,
}

impl FromStr for PartitionStrategy {
type Err = DeltaTableError;

fn from_str(s: &str) -> DeltaResult<Self> {
match s.to_ascii_lowercase().as_str() {
"hive" => Ok(PartitionStrategy::Hive),
_ => Err(DeltaTableError::Generic(format!(
"Invalid partition strategy provided {}",
s
))),
}
}
}

/// Build an operation to convert a Parquet table to a [`DeltaTable`] in place
pub struct ConvertToDeltaBuilder {
log_store: Option<LogStoreRef>,
Expand Down
20 changes: 19 additions & 1 deletion crates/deltalake-core/src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ use std::borrow::Borrow;
use std::collections::HashMap;
use std::hash::{Hash, Hasher};
use std::mem::take;
use std::str::FromStr;

use crate::errors::DeltaResult;
use crate::errors::{DeltaResult, DeltaTableError};
use crate::kernel::{Add, CommitInfo, Metadata, Protocol, Remove};
use crate::logstore::LogStore;
use crate::table::CheckPoint;
Expand Down Expand Up @@ -589,6 +590,23 @@ pub enum SaveMode {
Ignore,
}

impl FromStr for SaveMode {
type Err = DeltaTableError;

fn from_str(s: &str) -> DeltaResult<Self> {
match s.to_ascii_lowercase().as_str() {
"append" => Ok(SaveMode::Append),
"overwrite" => Ok(SaveMode::Overwrite),
"error" => Ok(SaveMode::ErrorIfExists),
"ignore" => Ok(SaveMode::Ignore),
_ => Err(DeltaTableError::Generic(format!(
"Invalid save mode provided: {}, only these are supported: ['append', 'overwrite', 'error', 'ignore']",
s
))),
}
}
}

/// The OutputMode used in streaming operations.
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum OutputMode {
Expand Down
1 change: 1 addition & 0 deletions python/deltalake/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@
from .schema import Schema as Schema
from .table import DeltaTable as DeltaTable
from .table import Metadata as Metadata
from .writer import convert_to_deltalake as convert_to_deltalake
from .writer import write_deltalake as write_deltalake
10 changes: 10 additions & 0 deletions python/deltalake/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,16 @@ def write_new_deltalake(
configuration: Optional[Mapping[str, Optional[str]]],
storage_options: Optional[Dict[str, str]],
) -> None: ...
def convert_to_deltalake(
uri: str,
partition_by: Optional[pyarrow.Schema],
partition_strategy: Optional[Literal["hive"]],
name: Optional[str],
description: Optional[str],
configuration: Optional[Mapping[str, Optional[str]]],
storage_options: Optional[Dict[str, str]],
custom_metadata: Optional[Dict[str, str]],
) -> None: ...
def batch_distinct(batch: pyarrow.RecordBatch) -> pyarrow.RecordBatch: ...

# Can't implement inheritance (see note in src/schema.rs), so this is next
Expand Down
55 changes: 55 additions & 0 deletions python/deltalake/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

from ._internal import DeltaDataChecker as _DeltaDataChecker
from ._internal import batch_distinct
from ._internal import convert_to_deltalake as _convert_to_deltalake
from ._internal import write_new_deltalake as _write_new_deltalake
from .exceptions import DeltaProtocolError, TableNotFoundError
from .table import MAX_SUPPORTED_WRITER_VERSION, DeltaTable
Expand Down Expand Up @@ -391,6 +392,60 @@ def validate_batch(batch: pa.RecordBatch) -> pa.RecordBatch:
table.update_incremental()


def convert_to_deltalake(
uri: Union[str, Path],
mode: Literal["error", "ignore"] = "error",
partition_by: Optional[pa.Schema] = None,
partition_strategy: Optional[Literal["hive"]] = None,
name: Optional[str] = None,
description: Optional[str] = None,
configuration: Optional[Mapping[str, Optional[str]]] = None,
storage_options: Optional[Dict[str, str]] = None,
custom_metadata: Optional[Dict[str, str]] = None,
) -> None:
"""
`Convert` parquet tables `to delta` tables.
Currently only HIVE partitioned tables are supported. `Convert to delta` creates
a transaction log commit with add actions, and additional properties provided such
as configuration, name, and description.
Args:
uri: URI of a table.
partition_by: Optional partitioning schema if table is partitioned.
partition_strategy: Optional partition strategy to read and convert
mode: How to handle existing data. Default is to error if table already exists.
If 'ignore', will not convert anything if table already exists.
name: User-provided identifier for this table.
description: User-provided description for this table.
configuration: A map containing configuration options for the metadata action.
storage_options: options passed to the native delta filesystem. Unused if 'filesystem' is defined.
custom_metadata: custom metadata that will be added to the transaction commit
"""
if partition_by is not None and partition_strategy is None:
raise ValueError("Partition strategy has to be provided with partition_by.")

if partition_strategy is not None and partition_strategy != "hive":
raise ValueError(
"Currently only `hive` partition strategy is supported to be converted."
)

if mode == "ignore" and try_get_deltatable(uri, storage_options) is not None:
return

_convert_to_deltalake(
str(uri),
partition_by,
partition_strategy,
name,
description,
configuration,
storage_options,
custom_metadata,
)
return


def __enforce_append_only(
table: Optional[DeltaTable],
configuration: Optional[Mapping[str, Optional[str]]],
Expand Down
68 changes: 57 additions & 11 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use deltalake::datafusion::prelude::SessionContext;
use deltalake::delta_datafusion::DeltaDataChecker;
use deltalake::errors::DeltaTableError;
use deltalake::kernel::{Action, Add, Invariant, Metadata, Remove, StructType};
use deltalake::operations::convert_to_delta::{ConvertToDeltaBuilder, PartitionStrategy};
use deltalake::operations::delete::DeleteBuilder;
use deltalake::operations::filesystem_check::FileSystemCheckBuilder;
use deltalake::operations::merge::MergeBuilder;
Expand All @@ -43,6 +44,7 @@ use deltalake::DeltaTableBuilder;
use pyo3::exceptions::{PyIOError, PyRuntimeError, PyValueError};
use pyo3::prelude::*;
use pyo3::types::{PyFrozenSet, PyType};
use serde_json::{Map, Value};

use crate::error::DeltaProtocolError;
use crate::error::PythonError;
Expand Down Expand Up @@ -758,7 +760,8 @@ impl RawDeltaTable {
schema: PyArrowType<ArrowSchema>,
partitions_filters: Option<Vec<(&str, &str, PartitionFilterValue)>>,
) -> PyResult<()> {
let mode = save_mode_from_str(mode)?;
let mode = mode.parse().map_err(PythonError::from)?;

let schema: StructType = (&schema.0).try_into().map_err(PythonError::from)?;

let existing_schema = self._table.get_schema().map_err(PythonError::from)?;
Expand Down Expand Up @@ -1088,16 +1091,6 @@ fn batch_distinct(batch: PyArrowType<RecordBatch>) -> PyResult<PyArrowType<Recor
))
}

fn save_mode_from_str(value: &str) -> PyResult<SaveMode> {
match value {
"append" => Ok(SaveMode::Append),
"overwrite" => Ok(SaveMode::Overwrite),
"error" => Ok(SaveMode::ErrorIfExists),
"ignore" => Ok(SaveMode::Ignore),
_ => Err(PyValueError::new_err("Invalid save mode")),
}
}

fn current_timestamp() -> i64 {
let start = SystemTime::now();
let since_the_epoch = start
Expand Down Expand Up @@ -1180,6 +1173,58 @@ fn write_new_deltalake(
Ok(())
}

#[pyfunction]
#[allow(clippy::too_many_arguments)]
fn convert_to_deltalake(
uri: String,
partition_schema: Option<PyArrowType<ArrowSchema>>,
partition_strategy: Option<String>,
name: Option<String>,
description: Option<String>,
configuration: Option<HashMap<String, Option<String>>>,
storage_options: Option<HashMap<String, String>>,
custom_metadata: Option<HashMap<String, String>>,
) -> PyResult<()> {
let mut builder = ConvertToDeltaBuilder::new().with_location(uri);

if let Some(part_schema) = partition_schema {
let schema: StructType = (&part_schema.0).try_into().map_err(PythonError::from)?;
builder = builder.with_partition_schema(schema.fields().clone());
}

if let Some(partition_strategy) = &partition_strategy {
let strategy: PartitionStrategy = partition_strategy.parse().map_err(PythonError::from)?;
builder = builder.with_partition_strategy(strategy);
}

if let Some(name) = &name {
builder = builder.with_table_name(name);
}

if let Some(description) = &description {
builder = builder.with_comment(description);
}

if let Some(config) = configuration {
builder = builder.with_configuration(config);
};

if let Some(strg_options) = storage_options {
builder = builder.with_storage_options(strg_options);
};

if let Some(metadata) = custom_metadata {
let json_metadata: Map<String, Value> =
metadata.into_iter().map(|(k, v)| (k, v.into())).collect();
builder = builder.with_metadata(json_metadata);
};

rt()?
.block_on(builder.into_future())
.map_err(PythonError::from)?;
Ok(())
}

#[pyclass(name = "DeltaDataChecker", module = "deltalake._internal")]
struct PyDeltaDataChecker {
inner: DeltaDataChecker,
Expand Down Expand Up @@ -1225,6 +1270,7 @@ fn _internal(py: Python, m: &PyModule) -> PyResult<()> {
m.add("__version__", env!("CARGO_PKG_VERSION"))?;
m.add_function(pyo3::wrap_pyfunction!(rust_core_version, m)?)?;
m.add_function(pyo3::wrap_pyfunction!(write_new_deltalake, m)?)?;
m.add_function(pyo3::wrap_pyfunction!(convert_to_deltalake, m)?)?;
m.add_function(pyo3::wrap_pyfunction!(batch_distinct, m)?)?;
m.add_class::<RawDeltaTable>()?;
m.add_class::<RawDeltaTableMetaData>()?;
Expand Down
Loading

0 comments on commit 4064550

Please sign in to comment.