Skip to content

Commit

Permalink
feat: raise if files not loaded
Browse files Browse the repository at this point in the history
  • Loading branch information
ion-elgreco committed Aug 24, 2024
1 parent e657b65 commit ff8617f
Show file tree
Hide file tree
Showing 14 changed files with 68 additions and 5 deletions.
3 changes: 3 additions & 0 deletions crates/core/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,9 @@ pub enum DeltaTableError {
#[error("Table has not yet been initialized")]
NotInitialized,

#[error("Table has not yet been initialized with files, therefore {0} is not supported")]
NotInitializedWithFiles(String),

#[error("Change Data not enabled for version: {version}, Start: {start}, End: {end}")]
ChangeDataNotRecorded { version: i64, start: i64, end: i64 },

Expand Down
10 changes: 10 additions & 0 deletions crates/core/src/kernel/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,11 @@ impl Snapshot {
&self.protocol
}

/// Get the table config which is loaded with of the snapshot
pub fn load_config(&self) -> &DeltaTableConfig {
&self.config
}

/// Get the table root of the snapshot
pub fn table_root(&self) -> Path {
Path::from(self.table_url.clone())
Expand Down Expand Up @@ -535,6 +540,11 @@ impl EagerSnapshot {
self.snapshot.table_root()
}

/// Get the table config which is loaded with of the snapshot
pub fn load_config(&self) -> &DeltaTableConfig {
&self.snapshot.load_config()
}

/// Well known table configuration
pub fn table_config(&self) -> TableConfig<'_> {
self.snapshot.table_config()
Expand Down
6 changes: 6 additions & 0 deletions crates/core/src/operations/constraints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ impl std::future::IntoFuture for ConstraintBuilder {
let this = self;

Box::pin(async move {
if !this.snapshot.load_config().require_files {
return Err(DeltaTableError::NotInitializedWithFiles(
"ADD CONSTRAINTS".into(),
));
}

let name = match this.name {
Some(v) => v,
None => return Err(DeltaTableError::Generic("No name provided".to_string())),
Expand Down
4 changes: 4 additions & 0 deletions crates/core/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,10 @@ async fn execute(
writer_properties: Option<WriterProperties>,
mut commit_properties: CommitProperties,
) -> DeltaResult<(DeltaTableState, DeleteMetrics)> {
if !&snapshot.load_config().require_files {
return Err(DeltaTableError::NotInitializedWithFiles("DELETE".into()));
}

let exec_start = Instant::now();
let mut metrics = DeleteMetrics::default();

Expand Down
3 changes: 3 additions & 0 deletions crates/core/src/operations/load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ impl std::future::IntoFuture for LoadBuilder {

Box::pin(async move {
PROTOCOL.can_read_from(&this.snapshot.snapshot)?;
if !this.snapshot.load_config().require_files {
return Err(DeltaTableError::NotInitializedWithFiles("reading".into()));
}

let table = DeltaTable::new_with_state(this.log_store, this.snapshot);
let schema = table.snapshot()?.arrow_schema()?;
Expand Down
4 changes: 4 additions & 0 deletions crates/core/src/operations/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1016,6 +1016,10 @@ async fn execute(
not_match_target_operations: Vec<MergeOperationConfig>,
not_match_source_operations: Vec<MergeOperationConfig>,
) -> DeltaResult<(DeltaTableState, MergeMetrics)> {
if !snapshot.load_config().require_files {
return Err(DeltaTableError::NotInitializedWithFiles("MERGE".into()));
}

let mut metrics = MergeMetrics::default();
let exec_start = Instant::now();
// Determining whether we should write change data once so that computation of change data can
Expand Down
3 changes: 3 additions & 0 deletions crates/core/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,9 @@ impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> {

Box::pin(async move {
PROTOCOL.can_write_to(&this.snapshot.snapshot)?;
if !&this.snapshot.load_config().require_files {
return Err(DeltaTableError::NotInitializedWithFiles("OPTIMIZE".into()));
}

let writer_properties = this.writer_properties.unwrap_or_else(|| {
WriterProperties::builder()
Expand Down
3 changes: 3 additions & 0 deletions crates/core/src/operations/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,9 @@ async fn execute(
// For files that were identified, scan for records that match the predicate,
// perform update operations, and then commit add and remove actions to
// the log.
if !&snapshot.load_config().require_files {
return Err(DeltaTableError::NotInitializedWithFiles("UPDATE".into()));
}

let update_planner = DeltaPlanner::<UpdateMetricExtensionPlanner> {
extension_planner: UpdateMetricExtensionPlanner {},
Expand Down
5 changes: 4 additions & 1 deletion crates/core/src/operations/vacuum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,11 @@ impl std::future::IntoFuture for VacuumBuilder {

fn into_future(self) -> Self::IntoFuture {
let this = self;

Box::pin(async move {
if !&this.snapshot.load_config().require_files {
return Err(DeltaTableError::NotInitializedWithFiles("VACUUM".into()));
}

let plan = this.create_vacuum_plan().await?;
if this.dry_run {
return Ok((
Expand Down
3 changes: 3 additions & 0 deletions crates/core/src/operations/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -769,6 +769,9 @@ impl std::future::IntoFuture for WriteBuilder {
if this.mode == SaveMode::Overwrite {
if let Some(snapshot) = &this.snapshot {
PROTOCOL.check_append_only(&snapshot.snapshot)?;
if !snapshot.load_config().require_files {
return Err(DeltaTableError::NotInitializedWithFiles("WRITE".into()));
}
}
}
if this.schema_mode == Some(SchemaMode::Overwrite) && this.mode != SaveMode::Overwrite {
Expand Down
5 changes: 5 additions & 0 deletions crates/core/src/table/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ impl DeltaTableState {
self.snapshot.schema()
}

/// Get the table config which is loaded with of the snapshot
pub fn load_config(&self) -> &DeltaTableConfig {
&self.snapshot.load_config()
}

/// Well known table configuration
pub fn table_config(&self) -> TableConfig<'_> {
self.snapshot.table_config()
Expand Down
1 change: 1 addition & 0 deletions python/deltalake/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class RawDeltaTable:
) -> bool: ...
def table_uri(self) -> str: ...
def version(self) -> int: ...
def has_files(self) -> bool: ...
def get_add_file_sizes(self) -> Dict[str, int]: ...
def get_latest_version(self) -> int: ...
def get_num_index_cols(self) -> int: ...
Expand Down
8 changes: 4 additions & 4 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@
if TYPE_CHECKING:
import os

from deltalake._internal import (
RawDeltaTable,
)
from deltalake._internal import DeltaError, RawDeltaTable
from deltalake._internal import create_deltalake as _create_deltalake
from deltalake._util import encode_partition_value
from deltalake.data_catalog import DataCatalog
Expand Down Expand Up @@ -1043,6 +1041,9 @@ def to_pyarrow_dataset(
Returns:
the PyArrow dataset in PyArrow
"""
if not self._table.has_files():
raise DeltaError("Table is instantiated without files.")

table_protocol = self.protocol()
if (
table_protocol.min_reader_version > MAX_SUPPORTED_READER_VERSION
Expand All @@ -1063,7 +1064,6 @@ def to_pyarrow_dataset(
raise DeltaProtocolError(
f"The table has set these reader features: {missing_features} but these are not yet supported by the deltalake reader."
)

if not filesystem:
filesystem = pa_fs.PyFileSystem(
DeltaStorageHandler.from_table(
Expand Down
15 changes: 15 additions & 0 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ use deltalake::protocol::{DeltaOperation, SaveMode};
use deltalake::storage::IORuntime;
use deltalake::DeltaTableBuilder;
use deltalake::{DeltaOps, DeltaResult};
use error::DeltaError;
use futures::future::join_all;
use pyo3::exceptions::{PyRuntimeError, PyValueError};
use pyo3::prelude::*;
Expand Down Expand Up @@ -168,6 +169,10 @@ impl RawDeltaTable {
Ok(self._table.version())
}

pub fn has_files(&self) -> PyResult<bool> {
Ok(self._table.config.require_files)
}

pub fn metadata(&self) -> PyResult<RawDeltaTableMetaData> {
let metadata = self._table.metadata().map_err(PythonError::from)?;
Ok(RawDeltaTableMetaData {
Expand Down Expand Up @@ -275,6 +280,9 @@ impl RawDeltaTable {
py: Python,
partition_filters: Option<Vec<(PyBackedStr, PyBackedStr, PartitionFilterValue)>>,
) -> PyResult<Vec<String>> {
if !self.has_files()? {
return Err(DeltaError::new_err("Table is instantiated without files."));
}
py.allow_threads(|| {
if let Some(filters) = partition_filters {
let filters = convert_partition_filters(filters).map_err(PythonError::from)?;
Expand All @@ -300,6 +308,10 @@ impl RawDeltaTable {
&self,
partition_filters: Option<Vec<(PyBackedStr, PyBackedStr, PartitionFilterValue)>>,
) -> PyResult<Vec<String>> {
if !self._table.config.require_files {
return Err(DeltaError::new_err("Table is initiated without files."));
}

if let Some(filters) = partition_filters {
let filters = convert_partition_filters(filters).map_err(PythonError::from)?;
Ok(self
Expand Down Expand Up @@ -1209,6 +1221,9 @@ impl RawDeltaTable {
}

pub fn get_add_actions(&self, flatten: bool) -> PyResult<PyArrowType<RecordBatch>> {
if !self.has_files()? {
return Err(DeltaError::new_err("Table is instantiated without files."));
}
Ok(PyArrowType(
self._table
.snapshot()
Expand Down

0 comments on commit ff8617f

Please sign in to comment.