Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify log replay visitor and avoid materializing Add/Remove actions #494

Merged
merged 21 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions derive-macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub fn parse_column_name(input: proc_macro::TokenStream) -> proc_macro::TokenStr
err.into_compile_error().into()
}

/// Derive a `delta_kernel::schemas::ToDataType` implementation for the annotated struct. The actual
/// Derive a `delta_kernel::schemas::ToSchema` implementation for the annotated struct. The actual
/// field names in the schema (and therefore of the struct members) are all mandated by the Delta
/// spec, and so the user of this macro is responsible for ensuring that
/// e.g. `Metadata::schema_string` is the snake_case-ified version of `schemaString` from [Delta's
Expand All @@ -45,10 +45,10 @@ pub fn derive_schema(input: proc_macro::TokenStream) -> proc_macro::TokenStream
let schema_fields = gen_schema_fields(&input.data);
let output = quote! {
#[automatically_derived]
impl crate::actions::schemas::ToDataType for #struct_ident {
fn to_data_type() -> crate::schema::DataType {
impl crate::actions::schemas::ToSchema for #struct_ident {
fn to_schema() -> crate::schema::StructType {
use crate::actions::schemas::{ToDataType, GetStructField, GetNullableContainerStructField};
crate::schema::DataType::struct_type([
crate::schema::StructType::new([
#schema_fields
])
}
Expand Down
108 changes: 61 additions & 47 deletions kernel/examples/inspect-table/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
use delta_kernel::actions::get_log_schema;
use delta_kernel::actions::visitors::{
AddVisitor, MetadataVisitor, ProtocolVisitor, RemoveVisitor, SetTransactionVisitor,
};
use delta_kernel::actions::{
get_log_schema, COMMIT_INFO_NAME, METADATA_NAME, PROTOCOL_NAME, REMOVE_NAME,
SET_TRANSACTION_NAME,
};
use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
use delta_kernel::engine::default::DefaultEngine;
use delta_kernel::engine_data::{GetData, TypedGetData};
use delta_kernel::engine_data::{GetData, RowVisitorBase, TypedGetData};
use delta_kernel::expressions::ColumnName;
use delta_kernel::scan::state::{DvInfo, Stats};
use delta_kernel::scan::ScanBuilder;
use delta_kernel::schema::{DataType, SchemaRef, StructField};
use delta_kernel::{DataVisitor, DeltaResult, Table};
use delta_kernel::schema::{ColumnNamesAndTypes, DataType};
use delta_kernel::{DeltaResult, Error, RowVisitor as _, Table};

use std::collections::HashMap;
use std::process::ExitCode;
use std::sync::Arc;
use std::sync::{Arc, LazyLock};

use clap::{Parser, Subcommand};

Expand Down Expand Up @@ -77,97 +81,107 @@ impl Action {
}
}

fn fields_in(field: &StructField) -> usize {
if let DataType::Struct(ref inner) = field.data_type {
inner.fields().map(fields_in).sum()
} else {
1
}
}

struct LogVisitor {
actions: Vec<Action>,
add_offset: usize,
remove_offset: usize,
protocol_offset: usize,
metadata_offset: usize,
set_transaction_offset: usize,
commit_info_offset: usize,
previous_rows_seen: usize,
}

impl LogVisitor {
fn new(log_schema: &SchemaRef) -> LogVisitor {
let mut offset = 0;
let mut add_offset = 0;
let mut remove_offset = 0;
let mut protocol_offset = 0;
let mut metadata_offset = 0;
let mut set_transaction_offset = 0;
for field in log_schema.fields() {
match field.name().as_str() {
"add" => add_offset = offset,
"remove" => remove_offset = offset,
"protocol" => protocol_offset = offset,
"metaData" => metadata_offset = offset,
"txn" => set_transaction_offset = offset,
_ => {}
}
offset += fields_in(field);
}
fn new() -> LogVisitor {
// NOTE: Each `position` call consumes the first item of the searched-for batch. So we have
// to add one to each `prev_offset`. But the first call starts from actual first, so we
// manually skip the first entry to make the first call behavior match the other calls.
let mut names = NAMES_AND_TYPES.as_ref().0.iter();
names.next();

let mut next_offset =
|prev_offset, name| prev_offset + 1 + names.position(|n| n[0] == name).unwrap();
let add_offset = 0;
let remove_offset = next_offset(add_offset, REMOVE_NAME);
let metadata_offset = next_offset(remove_offset, METADATA_NAME);
let protocol_offset = next_offset(metadata_offset, PROTOCOL_NAME);
let set_transaction_offset = next_offset(protocol_offset, SET_TRANSACTION_NAME);
let commit_info_offset = next_offset(set_transaction_offset, COMMIT_INFO_NAME);
LogVisitor {
actions: vec![],
add_offset,
remove_offset,
protocol_offset,
metadata_offset,
set_transaction_offset,
commit_info_offset,
previous_rows_seen: 0,
}
}
}
static NAMES_AND_TYPES: LazyLock<ColumnNamesAndTypes> =
LazyLock::new(|| get_log_schema().leaves(None));

impl DataVisitor for LogVisitor {
impl RowVisitorBase for LogVisitor {
fn selected_column_names_and_types(&self) -> (&'static [ColumnName], &'static [DataType]) {
NAMES_AND_TYPES.as_ref()
}
fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> {
if getters.len() != 50 {
return Err(Error::InternalError(format!(
"Wrong number of LogVisitor getters: {}",
getters.len()
)));
}
for i in 0..row_count {
if let Some(path) = getters[self.add_offset].get_opt(i, "add.path")? {
self.actions.push(Action::Add(
AddVisitor::visit_add(i, path, &getters[self.add_offset..])?,
AddVisitor::visit_add(i, path, &getters[self.add_offset..self.remove_offset])?,
self.previous_rows_seen + i,
));
}
if let Some(path) = getters[self.remove_offset].get_opt(i, "remove.path")? {
} else if let Some(path) = getters[self.remove_offset].get_opt(i, "remove.path")? {
self.actions.push(Action::Remove(
RemoveVisitor::visit_remove(i, path, &getters[self.remove_offset..])?,
RemoveVisitor::visit_remove(
i,
path,
&getters[self.remove_offset..self.metadata_offset],
)?,
self.previous_rows_seen + i,
));
}
if let Some(id) = getters[self.metadata_offset].get_opt(i, "metadata.id")? {
} else if let Some(id) = getters[self.metadata_offset].get_opt(i, "metadata.id")? {
self.actions.push(Action::Metadata(
MetadataVisitor::visit_metadata(i, id, &getters[self.metadata_offset..])?,
MetadataVisitor::visit_metadata(
i,
id,
&getters[self.metadata_offset..self.protocol_offset],
)?,
self.previous_rows_seen + i,
));
}
if let Some(min_reader_version) =
} else if let Some(min_reader_version) =
getters[self.protocol_offset].get_opt(i, "protocol.min_reader_version")?
{
self.actions.push(Action::Protocol(
ProtocolVisitor::visit_protocol(
i,
min_reader_version,
&getters[self.protocol_offset..],
&getters[self.protocol_offset..self.set_transaction_offset],
)?,
self.previous_rows_seen + i,
));
}
if let Some(app_id) = getters[self.set_transaction_offset].get_opt(i, "txn.appId")? {
} else if let Some(app_id) =
getters[self.set_transaction_offset].get_opt(i, "txn.appId")?
{
self.actions.push(Action::SetTransaction(
SetTransactionVisitor::visit_txn(
i,
app_id,
&getters[self.set_transaction_offset..],
&getters[self.set_transaction_offset..self.commit_info_offset],
)?,
self.previous_rows_seen + i,
));
} else {
// TODO: Add CommitInfo support (tricky because all fields are optional)
}
}
self.previous_rows_seen += row_count;
Expand Down Expand Up @@ -246,9 +260,9 @@ fn try_main() -> DeltaResult<()> {
None,
)?;

let mut visitor = LogVisitor::new(log_schema);
let mut visitor = LogVisitor::new();
for action in actions {
action?.0.extract(log_schema.clone(), &mut visitor)?;
visitor.visit_rows_of(action?.0.as_ref())?;
}

if *forward {
Expand Down
14 changes: 10 additions & 4 deletions kernel/src/actions/deletion_vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,16 @@ pub struct DeletionVectorDescriptor {

impl DeletionVectorDescriptor {
pub fn unique_id(&self) -> String {
if let Some(offset) = self.offset {
format!("{}{}@{offset}", self.storage_type, self.path_or_inline_dv)
} else {
format!("{}{}", self.storage_type, self.path_or_inline_dv)
Self::unique_id_from_parts(&self.storage_type, &self.path_or_inline_dv, self.offset)
}
pub(crate) fn unique_id_from_parts(
storage_type: &str,
path_or_inline_dv: &str,
offset: Option<i32>,
) -> String {
match offset {
Some(offset) => format!("{storage_type}{path_or_inline_dv}@{offset}"),
None => format!("{storage_type}{path_or_inline_dv}"),
}
}

Expand Down
42 changes: 21 additions & 21 deletions kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ use crate::table_features::{
ReaderFeatures, WriterFeatures, SUPPORTED_READER_FEATURES, SUPPORTED_WRITER_FEATURES,
};
use crate::utils::require;
use crate::{DeltaResult, EngineData, Error};
use visitors::{AddVisitor, MetadataVisitor, ProtocolVisitor};
use crate::{DeltaResult, EngineData, Error, RowVisitor as _};
use visitors::{MetadataVisitor, ProtocolVisitor};

use delta_kernel_derive::Schema;
use serde::{Deserialize, Serialize};
Expand All @@ -30,15 +30,22 @@ pub mod visitors;
#[cfg(not(feature = "developer-visibility"))]
pub(crate) mod visitors;

#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) const ADD_NAME: &str = "add";
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) const REMOVE_NAME: &str = "remove";
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) const METADATA_NAME: &str = "metaData";
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) const PROTOCOL_NAME: &str = "protocol";
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) const SET_TRANSACTION_NAME: &str = "txn";
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) const COMMIT_INFO_NAME: &str = "commitInfo";

static LOG_ADD_SCHEMA: LazyLock<SchemaRef> =
LazyLock::new(|| StructType::new([Option::<Add>::get_struct_field(ADD_NAME)]).into());

static LOG_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
StructType::new([
Option::<Add>::get_struct_field(ADD_NAME),
Expand Down Expand Up @@ -114,7 +121,7 @@ pub struct Metadata {
impl Metadata {
pub fn try_new_from_data(data: &dyn EngineData) -> DeltaResult<Option<Metadata>> {
let mut visitor = MetadataVisitor::default();
data.extract(get_log_schema().project(&[METADATA_NAME])?, &mut visitor)?;
visitor.visit_rows_of(data)?;
Ok(visitor.metadata)
}

Expand Down Expand Up @@ -181,7 +188,7 @@ impl Protocol {

pub fn try_new_from_data(data: &dyn EngineData) -> DeltaResult<Option<Protocol>> {
let mut visitor = ProtocolVisitor::default();
data.extract(get_log_schema().project(&[PROTOCOL_NAME])?, &mut visitor)?;
visitor.visit_rows_of(data)?;
Ok(visitor.protocol)
}

Expand All @@ -193,15 +200,21 @@ impl Protocol {
self.min_writer_version
}

pub fn reader_features(&self) -> Option<&[String]> {
self.reader_features.as_deref()
}

pub fn writer_features(&self) -> Option<&[String]> {
self.writer_features.as_deref()
}

pub fn has_reader_feature(&self, feature: &ReaderFeatures) -> bool {
self.reader_features
.as_ref()
self.reader_features()
.is_some_and(|features| features.iter().any(|f| f == feature.as_ref()))
}

pub fn has_writer_feature(&self, feature: &WriterFeatures) -> bool {
self.writer_features
.as_ref()
self.writer_features()
.is_some_and(|features| features.iter().any(|f| f == feature.as_ref()))
}

Expand Down Expand Up @@ -364,13 +377,6 @@ pub struct Add {
}

impl Add {
/// Since we always want to parse multiple adds from data, we return a `Vec<Add>`
pub fn parse_from_data(data: &dyn EngineData) -> DeltaResult<Vec<Add>> {
let mut visitor = AddVisitor::default();
data.extract(get_log_add_schema().clone(), &mut visitor)?;
Ok(visitor.adds)
}

pub fn dv_unique_id(&self) -> Option<String> {
self.deletion_vector.as_ref().map(|dv| dv.unique_id())
}
Expand Down Expand Up @@ -418,12 +424,6 @@ struct Remove {
pub(crate) default_row_commit_version: Option<i64>,
}

impl Remove {
pub(crate) fn dv_unique_id(&self) -> Option<String> {
self.deletion_vector.as_ref().map(|dv| dv.unique_id())
}
}

#[derive(Debug, Clone, PartialEq, Eq, Schema)]
pub struct SetTransaction {
/// A unique identifier for the application performing the transaction.
Expand Down
12 changes: 11 additions & 1 deletion kernel/src/actions/schemas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,22 @@

use std::collections::{HashMap, HashSet};

use crate::schema::{ArrayType, DataType, MapType, StructField};
use crate::schema::{ArrayType, DataType, MapType, StructField, StructType};

pub(crate) trait ToSchema {
fn to_schema() -> StructType;
}

pub(crate) trait ToDataType {
fn to_data_type() -> DataType;
}

impl<T: ToSchema> ToDataType for T {
fn to_data_type() -> DataType {
T::to_schema().into()
}
}

pub(crate) trait ToNullableContainerType {
fn to_nullable_container_type() -> DataType;
}
Expand Down
6 changes: 4 additions & 2 deletions kernel/src/actions/set_transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use std::sync::{Arc, LazyLock};
use crate::actions::visitors::SetTransactionVisitor;
use crate::actions::{get_log_schema, SetTransaction, SET_TRANSACTION_NAME};
use crate::snapshot::Snapshot;
use crate::{DeltaResult, Engine, EngineData, Expression as Expr, ExpressionRef, SchemaRef};
use crate::{
DeltaResult, Engine, EngineData, Expression as Expr, ExpressionRef, RowVisitor as _, SchemaRef,
};

pub use crate::actions::visitors::SetTransactionMap;
pub struct SetTransactionScanner {
Expand All @@ -27,7 +29,7 @@ impl SetTransactionScanner {
// found. If all ids are requested then we are forced to replay the entire log.
for maybe_data in self.replay_for_app_ids(engine, schema.clone())? {
let (txns, _) = maybe_data?;
txns.extract(schema.clone(), &mut visitor)?;
visitor.visit_rows_of(txns.as_ref())?;
// if a specific id is requested and a transaction was found, then return
if application_id.is_some() && !visitor.set_transactions.is_empty() {
break;
Expand Down
Loading
Loading