Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(python, rust): use require files #2809

Merged
merged 2 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions crates/core/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,9 @@ pub enum DeltaTableError {
#[error("Table has not yet been initialized")]
NotInitialized,

#[error("Table has not yet been initialized with files, therefore {0} is not supported")]
NotInitializedWithFiles(String),

#[error("Change Data not enabled for version: {version}, Start: {start}, End: {end}")]
ChangeDataNotRecorded { version: i64, start: i64, end: i64 },

Expand Down
19 changes: 17 additions & 2 deletions crates/core/src/kernel/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,11 @@ impl Snapshot {
&self.protocol
}

/// Get the table config which is loaded with of the snapshot
pub fn load_config(&self) -> &DeltaTableConfig {
&self.config
}

/// Get the table root of the snapshot
pub fn table_root(&self) -> Path {
Path::from(self.table_url.clone())
Expand Down Expand Up @@ -364,8 +369,13 @@ impl EagerSnapshot {
.iter()
.flat_map(get_visitor)
.collect::<Vec<_>>();
let snapshot = Snapshot::try_new(table_root, store.clone(), config, version).await?;
let files = snapshot.files(store, &mut visitors)?.try_collect().await?;
let snapshot =
Snapshot::try_new(table_root, store.clone(), config.clone(), version).await?;

let files = match config.require_files {
true => snapshot.files(store, &mut visitors)?.try_collect().await?,
false => vec![],
};

let mut sn = Self {
snapshot,
Expand Down Expand Up @@ -530,6 +540,11 @@ impl EagerSnapshot {
self.snapshot.table_root()
}

/// Get the table config which is loaded with of the snapshot
pub fn load_config(&self) -> &DeltaTableConfig {
&self.snapshot.load_config()
}

/// Well known table configuration
pub fn table_config(&self) -> TableConfig<'_> {
self.snapshot.table_config()
Expand Down
6 changes: 6 additions & 0 deletions crates/core/src/operations/constraints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ impl std::future::IntoFuture for ConstraintBuilder {
let this = self;

Box::pin(async move {
if !this.snapshot.load_config().require_files {
return Err(DeltaTableError::NotInitializedWithFiles(
"ADD CONSTRAINTS".into(),
));
}

let name = match this.name {
Some(v) => v,
None => return Err(DeltaTableError::Generic("No name provided".to_string())),
Expand Down
4 changes: 4 additions & 0 deletions crates/core/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,10 @@ async fn execute(
writer_properties: Option<WriterProperties>,
mut commit_properties: CommitProperties,
) -> DeltaResult<(DeltaTableState, DeleteMetrics)> {
if !&snapshot.load_config().require_files {
return Err(DeltaTableError::NotInitializedWithFiles("DELETE".into()));
}

let exec_start = Instant::now();
let mut metrics = DeleteMetrics::default();

Expand Down
3 changes: 3 additions & 0 deletions crates/core/src/operations/load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ impl std::future::IntoFuture for LoadBuilder {

Box::pin(async move {
PROTOCOL.can_read_from(&this.snapshot.snapshot)?;
if !this.snapshot.load_config().require_files {
return Err(DeltaTableError::NotInitializedWithFiles("reading".into()));
}

let table = DeltaTable::new_with_state(this.log_store, this.snapshot);
let schema = table.snapshot()?.arrow_schema()?;
Expand Down
4 changes: 4 additions & 0 deletions crates/core/src/operations/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,10 @@ async fn execute(
not_match_target_operations: Vec<MergeOperationConfig>,
not_match_source_operations: Vec<MergeOperationConfig>,
) -> DeltaResult<(DeltaTableState, MergeMetrics)> {
if !snapshot.load_config().require_files {
return Err(DeltaTableError::NotInitializedWithFiles("MERGE".into()));
}

let mut metrics = MergeMetrics::default();
let exec_start = Instant::now();
// Determining whether we should write change data once so that computation of change data can
Expand Down
3 changes: 3 additions & 0 deletions crates/core/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,9 @@ impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> {

Box::pin(async move {
PROTOCOL.can_write_to(&this.snapshot.snapshot)?;
if !&this.snapshot.load_config().require_files {
return Err(DeltaTableError::NotInitializedWithFiles("OPTIMIZE".into()));
}

let writer_properties = this.writer_properties.unwrap_or_else(|| {
WriterProperties::builder()
Expand Down
3 changes: 3 additions & 0 deletions crates/core/src/operations/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,9 @@ async fn execute(
// For files that were identified, scan for records that match the predicate,
// perform update operations, and then commit add and remove actions to
// the log.
if !&snapshot.load_config().require_files {
return Err(DeltaTableError::NotInitializedWithFiles("UPDATE".into()));
}

let update_planner = DeltaPlanner::<UpdateMetricExtensionPlanner> {
extension_planner: UpdateMetricExtensionPlanner {},
Expand Down
5 changes: 4 additions & 1 deletion crates/core/src/operations/vacuum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,11 @@ impl std::future::IntoFuture for VacuumBuilder {

fn into_future(self) -> Self::IntoFuture {
let this = self;

Box::pin(async move {
if !&this.snapshot.load_config().require_files {
return Err(DeltaTableError::NotInitializedWithFiles("VACUUM".into()));
}

let plan = this.create_vacuum_plan().await?;
if this.dry_run {
return Ok((
Expand Down
3 changes: 3 additions & 0 deletions crates/core/src/operations/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -789,6 +789,9 @@ impl std::future::IntoFuture for WriteBuilder {
if this.mode == SaveMode::Overwrite {
if let Some(snapshot) = &this.snapshot {
PROTOCOL.check_append_only(&snapshot.snapshot)?;
if !snapshot.load_config().require_files {
return Err(DeltaTableError::NotInitializedWithFiles("WRITE".into()));
}
}
}
if this.schema_mode == Some(SchemaMode::Overwrite) && this.mode != SaveMode::Overwrite {
Expand Down
5 changes: 5 additions & 0 deletions crates/core/src/table/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ impl DeltaTableState {
self.snapshot.schema()
}

/// Get the table config which is loaded with of the snapshot
pub fn load_config(&self) -> &DeltaTableConfig {
&self.snapshot.load_config()
}

/// Well known table configuration
pub fn table_config(&self) -> TableConfig<'_> {
self.snapshot.table_config()
Expand Down
1 change: 1 addition & 0 deletions python/deltalake/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class RawDeltaTable:
) -> bool: ...
def table_uri(self) -> str: ...
def version(self) -> int: ...
def has_files(self) -> bool: ...
def get_add_file_sizes(self) -> Dict[str, int]: ...
def get_latest_version(self) -> int: ...
def get_num_index_cols(self) -> int: ...
Expand Down
5 changes: 4 additions & 1 deletion python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import os

from deltalake._internal import (
DeltaError,
PyMergeBuilder,
RawDeltaTable,
)
Expand Down Expand Up @@ -1138,6 +1139,9 @@ def to_pyarrow_dataset(
Returns:
the PyArrow dataset in PyArrow
"""
if not self._table.has_files():
raise DeltaError("Table is instantiated without files.")

table_protocol = self.protocol()
if (
table_protocol.min_reader_version > MAX_SUPPORTED_READER_VERSION
Expand All @@ -1158,7 +1162,6 @@ def to_pyarrow_dataset(
raise DeltaProtocolError(
f"The table has set these reader features: {missing_features} but these are not yet supported by the deltalake reader."
)

if not filesystem:
filesystem = pa_fs.PyFileSystem(
DeltaStorageHandler.from_table(
Expand Down
15 changes: 15 additions & 0 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use deltalake::protocol::{DeltaOperation, SaveMode};
use deltalake::storage::IORuntime;
use deltalake::DeltaTableBuilder;
use deltalake::{DeltaOps, DeltaResult};
use error::DeltaError;
use futures::future::join_all;

use pyo3::exceptions::{PyRuntimeError, PyValueError};
Expand Down Expand Up @@ -166,6 +167,10 @@ impl RawDeltaTable {
Ok(self._table.version())
}

pub fn has_files(&self) -> PyResult<bool> {
Ok(self._table.config.require_files)
}

pub fn metadata(&self) -> PyResult<RawDeltaTableMetaData> {
let metadata = self._table.metadata().map_err(PythonError::from)?;
Ok(RawDeltaTableMetaData {
Expand Down Expand Up @@ -273,6 +278,9 @@ impl RawDeltaTable {
py: Python,
partition_filters: Option<Vec<(PyBackedStr, PyBackedStr, PartitionFilterValue)>>,
) -> PyResult<Vec<String>> {
if !self.has_files()? {
return Err(DeltaError::new_err("Table is instantiated without files."));
}
py.allow_threads(|| {
if let Some(filters) = partition_filters {
let filters = convert_partition_filters(filters).map_err(PythonError::from)?;
Expand All @@ -298,6 +306,10 @@ impl RawDeltaTable {
&self,
partition_filters: Option<Vec<(PyBackedStr, PyBackedStr, PartitionFilterValue)>>,
) -> PyResult<Vec<String>> {
if !self._table.config.require_files {
return Err(DeltaError::new_err("Table is initiated without files."));
}

if let Some(filters) = partition_filters {
let filters = convert_partition_filters(filters).map_err(PythonError::from)?;
Ok(self
Expand Down Expand Up @@ -1073,6 +1085,9 @@ impl RawDeltaTable {
}

pub fn get_add_actions(&self, flatten: bool) -> PyResult<PyArrowType<RecordBatch>> {
if !self.has_files()? {
return Err(DeltaError::new_err("Table is instantiated without files."));
}
Ok(PyArrowType(
self._table
.snapshot()
Expand Down
Loading