From 776a4e17afe4634968b2961ee028a229081e1821 Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Mon, 19 Aug 2024 21:08:55 +0200 Subject: [PATCH] feat: get earliest version --- crates/aws/src/logstore/dynamodb_logstore.rs | 4 ++ crates/core/src/logstore/default_logstore.rs | 4 ++ crates/core/src/logstore/mod.rs | 52 +++++++++++++++++++- crates/core/src/table/mod.rs | 5 ++ python/deltalake/_internal.pyi | 1 + python/src/lib.rs | 8 +++ 6 files changed, 73 insertions(+), 1 deletion(-) diff --git a/crates/aws/src/logstore/dynamodb_logstore.rs b/crates/aws/src/logstore/dynamodb_logstore.rs index 5307040538..377199ffd5 100644 --- a/crates/aws/src/logstore/dynamodb_logstore.rs +++ b/crates/aws/src/logstore/dynamodb_logstore.rs @@ -296,6 +296,10 @@ impl LogStore for S3DynamoDbLogStore { } } + async fn get_earliest_version(&self, current_version: i64) -> DeltaResult { + get_earliest_version(self, current_version).await + } + fn object_store(&self) -> ObjectStoreRef { self.storage.clone() } diff --git a/crates/core/src/logstore/default_logstore.rs b/crates/core/src/logstore/default_logstore.rs index 79a1c76653..c31280ef5d 100644 --- a/crates/core/src/logstore/default_logstore.rs +++ b/crates/core/src/logstore/default_logstore.rs @@ -97,6 +97,10 @@ impl LogStore for DefaultLogStore { super::get_latest_version(self, current_version).await } + async fn get_earliest_version(&self, current_version: i64) -> DeltaResult { + super::get_earliest_version(self, current_version).await + } + fn object_store(&self) -> Arc { self.storage.clone() } diff --git a/crates/core/src/logstore/mod.rs b/crates/core/src/logstore/mod.rs index 51191d77d0..5b488392d9 100644 --- a/crates/core/src/logstore/mod.rs +++ b/crates/core/src/logstore/mod.rs @@ -1,11 +1,12 @@ //! Delta log store. +use std::cmp::min; use std::io::{BufRead, BufReader, Cursor}; use std::sync::OnceLock; use std::{cmp::max, collections::HashMap, sync::Arc}; use bytes::Bytes; use dashmap::DashMap; -use futures::StreamExt; +use futures::{StreamExt, TryStreamExt}; use lazy_static::lazy_static; use object_store::{path::Path, Error as ObjectStoreError, ObjectStore}; use regex::Regex; @@ -212,6 +213,9 @@ pub trait LogStore: Sync + Send { /// Find latest version currently stored in the delta log. async fn get_latest_version(&self, start_version: i64) -> DeltaResult; + /// Find earliest version currently stored in the delta log. + async fn get_earliest_version(&self, start_version: i64) -> DeltaResult; + /// Get underlying object store. fn object_store(&self) -> Arc; @@ -440,6 +444,52 @@ pub async fn get_latest_version( Ok(version) } +/// Default implementation for retrieving the earliest version +pub async fn get_earliest_version( + log_store: &dyn LogStore, + current_version: i64, +) -> DeltaResult { + let version_start = match get_last_checkpoint(log_store).await { + Ok(last_check_point) => last_check_point.version, + Err(ProtocolError::CheckpointNotFound) => { + // no checkpoint so start from current_version + current_version + } + Err(e) => { + return Err(DeltaTableError::from(e)); + } + }; + + // list files to find min version + let version = async { + let mut min_version: i64 = version_start; + let prefix = Some(log_store.log_path()); + let offset_path = commit_uri_from_version(version_start); + let object_store = log_store.object_store(); + + // Manually filter until we can provide direction in https://github.com/apache/arrow-rs/issues/6274 + let mut files = object_store + .list(prefix) + .try_filter(move |f| futures::future::ready(f.location < offset_path)) + .boxed(); + + while let Some(obj_meta) = files.next().await { + let obj_meta = obj_meta?; + if let Some(log_version) = extract_version_from_filename(obj_meta.location.as_ref()) { + min_version = min(min_version, log_version); + } + } + + if min_version < 0 { + return Err(DeltaTableError::not_a_table(log_store.root_uri())); + } + + Ok::(min_version) + } + .await?; + Ok(version) +} + /// Read delta log for a specific version pub async fn read_commit_entry( storage: &dyn ObjectStore, diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs index c7ee976eeb..53efcf2fb6 100644 --- a/crates/core/src/table/mod.rs +++ b/crates/core/src/table/mod.rs @@ -310,6 +310,11 @@ impl DeltaTable { self.log_store.get_latest_version(self.version()).await } + /// returns the earliest available version of the table + pub async fn get_earliest_version(&self) -> Result { + self.log_store.get_earliest_version(self.version()).await + } + /// Currently loaded version of the table pub fn version(&self) -> i64 { self.state.as_ref().map(|s| s.version()).unwrap_or(-1) diff --git a/python/deltalake/_internal.pyi b/python/deltalake/_internal.pyi index ceac16e7f8..e60a323d01 100644 --- a/python/deltalake/_internal.pyi +++ b/python/deltalake/_internal.pyi @@ -42,6 +42,7 @@ class RawDeltaTable: def version(self) -> int: ... def get_add_file_sizes(self) -> Dict[str, int]: ... def get_latest_version(self) -> int: ... + def get_earliest_version(self) -> int: ... def get_num_index_cols(self) -> int: ... def get_stats_columns(self) -> Optional[List[str]]: ... def metadata(self) -> RawDeltaTableMetaData: ... diff --git a/python/src/lib.rs b/python/src/lib.rs index aeb1b3c429..0feb55f503 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -237,6 +237,14 @@ impl RawDeltaTable { }) } + pub fn get_earliest_version(&mut self, py: Python) -> PyResult { + py.allow_threads(|| { + Ok(rt() + .block_on(self._table.get_earliest_version()) + .map_err(PythonError::from)?) + }) + } + pub fn get_num_index_cols(&mut self) -> PyResult { Ok(self ._table