Skip to content

Commit

Permalink
Merge remote-tracking branch 'oss/main' into nested-data-skipping
Browse files Browse the repository at this point in the history
  • Loading branch information
scovich committed Nov 22, 2024
2 parents 9eab3f3 + 4a0fad2 commit 4e5c6b4
Show file tree
Hide file tree
Showing 15 changed files with 567 additions and 638 deletions.
8 changes: 4 additions & 4 deletions derive-macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub fn parse_column_name(input: proc_macro::TokenStream) -> proc_macro::TokenStr
err.into_compile_error().into()
}

/// Derive a `delta_kernel::schemas::ToDataType` implementation for the annotated struct. The actual
/// Derive a `delta_kernel::schemas::ToSchema` implementation for the annotated struct. The actual
/// field names in the schema (and therefore of the struct members) are all mandated by the Delta
/// spec, and so the user of this macro is responsible for ensuring that
/// e.g. `Metadata::schema_string` is the snake_case-ified version of `schemaString` from [Delta's
Expand All @@ -45,10 +45,10 @@ pub fn derive_schema(input: proc_macro::TokenStream) -> proc_macro::TokenStream
let schema_fields = gen_schema_fields(&input.data);
let output = quote! {
#[automatically_derived]
impl crate::actions::schemas::ToDataType for #struct_ident {
fn to_data_type() -> crate::schema::DataType {
impl crate::actions::schemas::ToSchema for #struct_ident {
fn to_schema() -> crate::schema::StructType {
use crate::actions::schemas::{ToDataType, GetStructField, GetNullableContainerStructField};
crate::schema::DataType::struct_type([
crate::schema::StructType::new([
#schema_fields
])
}
Expand Down
210 changes: 93 additions & 117 deletions kernel/examples/inspect-table/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
use delta_kernel::actions::get_log_schema;
use delta_kernel::actions::visitors::{
AddVisitor, MetadataVisitor, ProtocolVisitor, RemoveVisitor, SetTransactionVisitor,
AddVisitor, CdcVisitor, MetadataVisitor, ProtocolVisitor, RemoveVisitor, SetTransactionVisitor,
};
use delta_kernel::actions::{
get_log_schema, ADD_NAME, CDC_NAME, METADATA_NAME, PROTOCOL_NAME, REMOVE_NAME,
SET_TRANSACTION_NAME,
};
use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
use delta_kernel::engine::default::DefaultEngine;
use delta_kernel::engine_data::{GetData, TypedGetData};
use delta_kernel::engine_data::{GetData, RowVisitor, TypedGetData as _};
use delta_kernel::expressions::ColumnName;
use delta_kernel::scan::state::{DvInfo, Stats};
use delta_kernel::scan::ScanBuilder;
use delta_kernel::schema::{DataType, SchemaRef, StructField};
use delta_kernel::{DataVisitor, DeltaResult, Table};
use delta_kernel::schema::{ColumnNamesAndTypes, DataType};
use delta_kernel::{DeltaResult, Error, Table};

use std::collections::HashMap;
use std::process::ExitCode;
use std::sync::Arc;
use std::sync::{Arc, LazyLock};

use clap::{Parser, Subcommand};

Expand Down Expand Up @@ -40,9 +44,9 @@ enum Commands {
ScanData,
/// Show each action from the log-segments
Actions {
/// Show the log in forward order (default is to show it going backwards in time)
/// Show the log in reverse order (default is log replay order -- newest first)
#[arg(short, long)]
forward: bool,
oldest_first: bool,
},
}

Expand All @@ -58,117 +62,94 @@ fn main() -> ExitCode {
}

enum Action {
Metadata(delta_kernel::actions::Metadata, usize),
Protocol(delta_kernel::actions::Protocol, usize),
Remove(delta_kernel::actions::Remove, usize),
Add(delta_kernel::actions::Add, usize),
SetTransaction(delta_kernel::actions::SetTransaction, usize),
Metadata(delta_kernel::actions::Metadata),
Protocol(delta_kernel::actions::Protocol),
Remove(delta_kernel::actions::Remove),
Add(delta_kernel::actions::Add),
SetTransaction(delta_kernel::actions::SetTransaction),
Cdc(delta_kernel::actions::Cdc),
}

impl Action {
fn row(&self) -> usize {
match self {
Action::Metadata(_, row) => *row,
Action::Protocol(_, row) => *row,
Action::Remove(_, row) => *row,
Action::Add(_, row) => *row,
Action::SetTransaction(_, row) => *row,
}
}
}

fn fields_in(field: &StructField) -> usize {
if let DataType::Struct(ref inner) = field.data_type {
inner.fields().map(fields_in).sum()
} else {
1
}
}
static NAMES_AND_TYPES: LazyLock<ColumnNamesAndTypes> =
LazyLock::new(|| get_log_schema().leaves(None));

struct LogVisitor {
actions: Vec<Action>,
add_offset: usize,
remove_offset: usize,
protocol_offset: usize,
metadata_offset: usize,
set_transaction_offset: usize,
actions: Vec<(Action, usize)>,
offsets: HashMap<String, (usize, usize)>,
previous_rows_seen: usize,
}

impl LogVisitor {
fn new(log_schema: &SchemaRef) -> LogVisitor {
let mut offset = 0;
let mut add_offset = 0;
let mut remove_offset = 0;
let mut protocol_offset = 0;
let mut metadata_offset = 0;
let mut set_transaction_offset = 0;
for field in log_schema.fields() {
match field.name().as_str() {
"add" => add_offset = offset,
"remove" => remove_offset = offset,
"protocol" => protocol_offset = offset,
"metaData" => metadata_offset = offset,
"txn" => set_transaction_offset = offset,
_ => {}
fn new() -> LogVisitor {
// Grab the start offset for each top-level column name, then compute the end offset by
// skipping the rest of the leaves for that column.
let mut offsets = HashMap::new();
let mut it = NAMES_AND_TYPES.as_ref().0.iter().enumerate().peekable();
while let Some((start, col)) = it.next() {
let mut end = start + 1;
while it.next_if(|(_, other)| col[0] == other[0]).is_some() {
end += 1;
}
offset += fields_in(field);
offsets.insert(col[0].clone(), (start, end));
}
LogVisitor {
actions: vec![],
add_offset,
remove_offset,
protocol_offset,
metadata_offset,
set_transaction_offset,
offsets,
previous_rows_seen: 0,
}
}
}

impl DataVisitor for LogVisitor {
impl RowVisitor for LogVisitor {
fn selected_column_names_and_types(&self) -> (&'static [ColumnName], &'static [DataType]) {
NAMES_AND_TYPES.as_ref()
}
fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> {
if getters.len() != 55 {
return Err(Error::InternalError(format!(
"Wrong number of LogVisitor getters: {}",
getters.len()
)));
}
let (add_start, add_end) = self.offsets[ADD_NAME];
let (remove_start, remove_end) = self.offsets[REMOVE_NAME];
let (metadata_start, metadata_end) = self.offsets[METADATA_NAME];
let (protocol_start, protocol_end) = self.offsets[PROTOCOL_NAME];
let (txn_start, txn_end) = self.offsets[SET_TRANSACTION_NAME];
let (cdc_start, cdc_end) = self.offsets[CDC_NAME];
for i in 0..row_count {
if let Some(path) = getters[self.add_offset].get_opt(i, "add.path")? {
self.actions.push(Action::Add(
AddVisitor::visit_add(i, path, &getters[self.add_offset..])?,
self.previous_rows_seen + i,
));
}
if let Some(path) = getters[self.remove_offset].get_opt(i, "remove.path")? {
self.actions.push(Action::Remove(
RemoveVisitor::visit_remove(i, path, &getters[self.remove_offset..])?,
self.previous_rows_seen + i,
));
}
if let Some(id) = getters[self.metadata_offset].get_opt(i, "metadata.id")? {
self.actions.push(Action::Metadata(
MetadataVisitor::visit_metadata(i, id, &getters[self.metadata_offset..])?,
self.previous_rows_seen + i,
));
}
if let Some(min_reader_version) =
getters[self.protocol_offset].get_opt(i, "protocol.min_reader_version")?
let action = if let Some(path) = getters[add_start].get_opt(i, "add.path")? {
let add = AddVisitor::visit_add(i, path, &getters[add_start..add_end])?;
Action::Add(add)
} else if let Some(path) = getters[remove_start].get_opt(i, "remove.path")? {
let remove =
RemoveVisitor::visit_remove(i, path, &getters[remove_start..remove_end])?;
Action::Remove(remove)
} else if let Some(id) = getters[metadata_start].get_opt(i, "metadata.id")? {
let metadata =
MetadataVisitor::visit_metadata(i, id, &getters[metadata_start..metadata_end])?;
Action::Metadata(metadata)
} else if let Some(min_reader_version) =
getters[protocol_start].get_opt(i, "protocol.min_reader_version")?
{
self.actions.push(Action::Protocol(
ProtocolVisitor::visit_protocol(
i,
min_reader_version,
&getters[self.protocol_offset..],
)?,
self.previous_rows_seen + i,
));
}
if let Some(app_id) = getters[self.set_transaction_offset].get_opt(i, "txn.appId")? {
self.actions.push(Action::SetTransaction(
SetTransactionVisitor::visit_txn(
i,
app_id,
&getters[self.set_transaction_offset..],
)?,
self.previous_rows_seen + i,
));
}
let protocol = ProtocolVisitor::visit_protocol(
i,
min_reader_version,
&getters[protocol_start..protocol_end],
)?;
Action::Protocol(protocol)
} else if let Some(app_id) = getters[txn_start].get_opt(i, "txn.appId")? {
let txn =
SetTransactionVisitor::visit_txn(i, app_id, &getters[txn_start..txn_end])?;
Action::SetTransaction(txn)
} else if let Some(path) = getters[cdc_start].get_opt(i, "cdc.path")? {
let cdc = CdcVisitor::visit_cdc(i, path, &getters[cdc_start..cdc_end])?;
Action::Cdc(cdc)
} else {
// TODO: Add CommitInfo support (tricky because all fields are optional)
continue;
};
self.actions.push((action, self.previous_rows_seen + i));
}
self.previous_rows_seen += row_count;
Ok(())
Expand Down Expand Up @@ -214,7 +195,7 @@ fn try_main() -> DeltaResult<()> {

let snapshot = table.snapshot(&engine, None)?;

match &cli.command {
match cli.command {
Commands::TableVersion => {
println!("Latest table version: {}", snapshot.version());
}
Expand All @@ -237,7 +218,7 @@ fn try_main() -> DeltaResult<()> {
)?;
}
}
Commands::Actions { forward } => {
Commands::Actions { oldest_first } => {
let log_schema = get_log_schema();
let actions = snapshot._log_segment().replay(
&engine,
Expand All @@ -246,27 +227,22 @@ fn try_main() -> DeltaResult<()> {
None,
)?;

let mut visitor = LogVisitor::new(log_schema);
let mut visitor = LogVisitor::new();
for action in actions {
action?.0.extract(log_schema.clone(), &mut visitor)?;
visitor.visit_rows_of(action?.0.as_ref())?;
}

if *forward {
visitor
.actions
.sort_by(|a, b| a.row().partial_cmp(&b.row()).unwrap());
} else {
visitor
.actions
.sort_by(|a, b| b.row().partial_cmp(&a.row()).unwrap());
if oldest_first {
visitor.actions.reverse();
}
for action in visitor.actions.iter() {
for (action, row) in visitor.actions.iter() {
match action {
Action::Metadata(md, row) => println!("\nAction {row}:\n{:#?}", md),
Action::Protocol(p, row) => println!("\nAction {row}:\n{:#?}", p),
Action::Remove(r, row) => println!("\nAction {row}:\n{:#?}", r),
Action::Add(a, row) => println!("\nAction {row}:\n{:#?}", a),
Action::SetTransaction(t, row) => println!("\nAction {row}:\n{:#?}", t),
Action::Metadata(md) => println!("\nAction {row}:\n{:#?}", md),
Action::Protocol(p) => println!("\nAction {row}:\n{:#?}", p),
Action::Remove(r) => println!("\nAction {row}:\n{:#?}", r),
Action::Add(a) => println!("\nAction {row}:\n{:#?}", a),
Action::SetTransaction(t) => println!("\nAction {row}:\n{:#?}", t),
Action::Cdc(c) => println!("\nAction {row}:\n{:#?}", c),
}
}
}
Expand Down
23 changes: 12 additions & 11 deletions kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ use crate::table_features::{
ReaderFeatures, WriterFeatures, SUPPORTED_READER_FEATURES, SUPPORTED_WRITER_FEATURES,
};
use crate::utils::require;
use crate::{DeltaResult, EngineData, Error};
use visitors::{AddVisitor, MetadataVisitor, ProtocolVisitor};
use crate::{DeltaResult, EngineData, Error, RowVisitor as _};
use visitors::{MetadataVisitor, ProtocolVisitor};

use delta_kernel_derive::Schema;
use serde::{Deserialize, Serialize};
Expand All @@ -30,16 +30,24 @@ pub mod visitors;
#[cfg(not(feature = "developer-visibility"))]
pub(crate) mod visitors;

#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) const ADD_NAME: &str = "add";
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) const REMOVE_NAME: &str = "remove";
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) const METADATA_NAME: &str = "metaData";
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) const PROTOCOL_NAME: &str = "protocol";
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) const SET_TRANSACTION_NAME: &str = "txn";
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) const COMMIT_INFO_NAME: &str = "commitInfo";
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) const CDC_NAME: &str = "cdc";

static LOG_ADD_SCHEMA: LazyLock<SchemaRef> =
LazyLock::new(|| StructType::new([Option::<Add>::get_struct_field(ADD_NAME)]).into());

static LOG_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
StructType::new([
Option::<Add>::get_struct_field(ADD_NAME),
Expand Down Expand Up @@ -115,7 +123,7 @@ pub struct Metadata {
impl Metadata {
pub fn try_new_from_data(data: &dyn EngineData) -> DeltaResult<Option<Metadata>> {
let mut visitor = MetadataVisitor::default();
data.extract(get_log_schema().project(&[METADATA_NAME])?, &mut visitor)?;
visitor.visit_rows_of(data)?;
Ok(visitor.metadata)
}

Expand Down Expand Up @@ -184,7 +192,7 @@ impl Protocol {
/// a Protocol instance. If no protocol row is found, returns Ok(None).
pub fn try_new_from_data(data: &dyn EngineData) -> DeltaResult<Option<Protocol>> {
let mut visitor = ProtocolVisitor::default();
data.extract(get_log_schema().project(&[PROTOCOL_NAME])?, &mut visitor)?;
visitor.visit_rows_of(data)?;
Ok(visitor.protocol)
}

Expand Down Expand Up @@ -379,13 +387,6 @@ pub struct Add {
}

impl Add {
/// Since we always want to parse multiple adds from data, we return a `Vec<Add>`
pub fn parse_from_data(data: &dyn EngineData) -> DeltaResult<Vec<Add>> {
let mut visitor = AddVisitor::default();
data.extract(get_log_add_schema().clone(), &mut visitor)?;
Ok(visitor.adds)
}

pub fn dv_unique_id(&self) -> Option<String> {
self.deletion_vector.as_ref().map(|dv| dv.unique_id())
}
Expand Down
12 changes: 11 additions & 1 deletion kernel/src/actions/schemas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,22 @@
use std::collections::{HashMap, HashSet};

use crate::schema::{ArrayType, DataType, MapType, StructField};
use crate::schema::{ArrayType, DataType, MapType, StructField, StructType};

pub(crate) trait ToSchema {
fn to_schema() -> StructType;
}

pub(crate) trait ToDataType {
fn to_data_type() -> DataType;
}

impl<T: ToSchema> ToDataType for T {
fn to_data_type() -> DataType {
T::to_schema().into()
}
}

pub(crate) trait ToNullableContainerType {
fn to_nullable_container_type() -> DataType;
}
Expand Down
Loading

0 comments on commit 4e5c6b4

Please sign in to comment.