Skip to content

Commit

Permalink
refactor: a little more API harmonization
Browse files Browse the repository at this point in the history
  • Loading branch information
roeap authored and rtyler committed May 26, 2024
1 parent 5fcc4c6 commit e52bd29
Show file tree
Hide file tree
Showing 8 changed files with 31 additions and 23 deletions.
6 changes: 2 additions & 4 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -544,8 +544,7 @@ impl<'a> DeltaScanBuilder<'a> {
PruningPredicate::try_new(predicate.clone(), logical_schema.clone())?;
let files_to_prune = pruning_predicate.prune(self.snapshot)?;
self.snapshot
.file_actions()?
.iter()
.file_actions_iter()?
.zip(files_to_prune.into_iter())
.filter_map(
|(action, keep)| {
Expand Down Expand Up @@ -1517,8 +1516,7 @@ pub(crate) async fn find_files_scan<'a>(
expression: Expr,
) -> DeltaResult<Vec<Add>> {
let candidate_map: HashMap<String, Add> = snapshot
.file_actions()?
.iter()
.file_actions_iter()?
.map(|add| (add.path.clone(), add.to_owned()))
.collect();

Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/kernel/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use object_store::ObjectStore;
use self::log_segment::{LogSegment, PathExt};
use self::parse::{read_adds, read_removes};
use self::replay::{LogMapper, LogReplayScanner, ReplayStream};
use self::visitors::*;
use super::{
Action, Add, AddCDCFile, CommitInfo, DataType, Metadata, Protocol, Remove, StructField,
Transaction,
Expand All @@ -41,7 +42,6 @@ use crate::table::config::TableConfig;
use crate::{DeltaResult, DeltaTableConfig, DeltaTableError};

pub use self::log_data::*;
pub(crate) use self::visitors::*;

mod log_data;
mod log_segment;
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/operations/filesystem_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ impl FileSystemCheckBuilder {
HashMap::with_capacity(self.snapshot.file_actions()?.len());
let log_store = self.log_store.clone();

for active in self.snapshot.file_actions()? {
for active in self.snapshot.file_actions_iter()? {
if is_absolute_path(&active.path)? {
return Err(DeltaTableError::Generic(
"Filesystem check does not support absolute paths".to_string(),
Expand Down
19 changes: 14 additions & 5 deletions crates/core/src/operations/transaction/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,20 @@ impl DeltaTableState {
&self,
object_store: Arc<dyn ObjectStore>,
) -> DeltaResult<ArrowSchemaRef> {
if let Some(add) = self
.file_actions()?
.iter()
.max_by_key(|obj| obj.modification_time)
{
self.snapshot.physical_arrow_schema(object_store).await
}
}

impl EagerSnapshot {
/// Get the physical table schema.
///
/// This will construct a schema derived from the parquet schema of the latest data file,
/// and fields for partition columns from the schema defined in table meta data.
pub async fn physical_arrow_schema(
&self,
object_store: Arc<dyn ObjectStore>,
) -> DeltaResult<ArrowSchemaRef> {
if let Some(add) = self.file_actions()?.max_by_key(|obj| obj.modification_time) {
let file_meta = add.try_into()?;
let file_reader = ParquetObjectReader::new(object_store, file_meta);
let file_schema = ParquetRecordBatchStreamBuilder::new_with_options(
Expand Down
3 changes: 1 addition & 2 deletions crates/core/src/protocol/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,7 @@ fn parquet_bytes_from_state(
state
.app_transaction_version()
.map_err(|_| CheckpointError::MissingActionType("txn".to_string()))?
.into_iter()
.map(|(_, txn)| Action::Txn(txn.clone())),
.map(|txn| Action::Txn(txn)),
)
// removes
.chain(tombstones.iter().map(|r| {
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,7 @@ impl DeltaTable {
self.state
.as_ref()
.and_then(|s| s.app_transaction_version().ok())
.map(|it| it.map(|t| (t.app_id.clone(), t)).collect())
.unwrap_or_default()
}

Expand Down
18 changes: 10 additions & 8 deletions crates/core/src/table/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,14 +137,20 @@ impl DeltaTableState {
Ok(self.snapshot.file_actions()?.collect())
}

/// Full list of add actions representing all parquet files that are part of the current
/// delta table state.
pub fn file_actions_iter(&self) -> DeltaResult<impl Iterator<Item = Add> + '_> {
self.snapshot.file_actions()
}

/// Get the number of files in the current table state
pub fn files_count(&self) -> usize {
self.snapshot.files_count()
}

/// Full list of all of the CDC files added as part of the changeDataFeed feature
pub fn cdc_files(&self) -> DeltaResult<Vec<AddCDCFile>> {
Ok(self.snapshot.cdc_files()?.collect())
pub fn cdc_files(&self) -> DeltaResult<impl Iterator<Item = AddCDCFile> + '_> {
self.snapshot.cdc_files()
}

/// Returns an iterator of file names present in the loaded state
Expand All @@ -156,12 +162,8 @@ impl DeltaTableState {
}

/// HashMap containing the last transaction stored for every application.
pub fn app_transaction_version(&self) -> DeltaResult<HashMap<String, Transaction>> {
Ok(self
.snapshot
.transactions()?
.map(|t| (t.app_id.clone(), t))
.collect())
pub fn app_transaction_version(&self) -> DeltaResult<impl Iterator<Item = Transaction> + '_> {
self.snapshot.transactions()
}

/// The most recent protocol of the table.
Expand Down
3 changes: 1 addition & 2 deletions crates/core/src/table/state_arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,8 +397,7 @@ impl DeltaTableState {
flatten: bool,
) -> Result<arrow::record_batch::RecordBatch, DeltaTableError> {
let stats: Vec<Option<Stats>> = self
.file_actions()?
.iter()
.file_actions_iter()?
.map(|f| {
f.get_stats()
.map_err(|err| DeltaTableError::InvalidStatsJson { json_err: err })
Expand Down

0 comments on commit e52bd29

Please sign in to comment.