Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch to new data passing API #109

Merged
merged 113 commits into from
Mar 4, 2024
Merged

Conversation

nicklan
Copy link
Collaborator

@nicklan nicklan commented Jan 27, 2024

WIP of the "big bang" switch to the new data passing API. Note this is still based on when I named things "EngineClient" and not "EngineInterface" so that term is used throughout. I will rebase to main and rename everything once we're agreed on the major pieces.

This implements:

  • The traits for the API as discussed
    • including list/map
  • Switching the defs in lib to use it
  • Parsing of Metadata, Protocol, and Add
  • A SimpleClient which is a simple EngineClient that uses arrow but isn't async
  • Has the default client return EngineData
  • working scan
  • Remove any mention of arrow in lib.rs

Potential things still to do:

  • Probably move SimpleData to ArrowData and not in the simple_client mod since it can be used by both Default and Simple client

Copy link
Contributor

@ryan-johnson-databricks ryan-johnson-databricks left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! We're sooo close to banishing the last bits of arrow from core kernel code. Just that one spot in data skipping that messes with selection vectors.

(bunch of unresolved comments from before, plus several new ones)

data-skipping still uses arrow for now, we can change that after #83 merges

It merged.

if metadata.is_none() {
return Ok(Box::new(std::iter::empty()));
pub fn schema(&self) -> DeltaResult<StructType> {
Ok(serde_json::from_str(&self.schema_string)?)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we eventually need to use the engine's json parser for this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, it's internal so we don't have to I don't think

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Internal and a single value that won't hog MB/GB memory. Fair.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do remember we discussed some related topic. In general the main issue here I think is that we cannot tell the engine what schema we are expecting which right now is required. IIRC the question was mainly trying to avoid serde_json as a dependency, personally I would vote for accepting as a dependency and doing it as implemented here.

Another place whas trying to parse the _last_cehckpoint file, which may also contain the schema of the checkpoint files - I think. At that time we would also have no way of telling eninge the schema of the data we want. In this case though I belive that using that schema was not eneven doen internally at databricks?

Comment on lines +278 to +279
let schema = ArrowSchema::new(vec![ArrowField::new("output", arrow_type, true)]);
RecordBatch::try_new(Arc::new(schema), vec![array_ref])?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity -- do you know what code currently triggers this non-struct path?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can tell from DataSkippingFilter::new. For instance the STATS_EXPR is a column expression, so it'll eval to just an Array, so the select_stats_evaluator will trigger this path (I think, haven't tested exactly what does but I do know the unit tests hit both paths)

kernel/src/lib.rs Outdated Show resolved Hide resolved
kernel/src/lib.rs Show resolved Hide resolved
kernel/src/scan/data_skipping.rs Outdated Show resolved Hide resolved

impl EngineInterface for SimpleClient {
fn get_expression_handler(&self) -> Arc<dyn ExpressionHandler> {
unimplemented!();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This just needs us to factor out the expression handler that the default client uses?
Or is there some bigger blocker?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, that's all it would take yeah. Just would prefer to do that as a follow-up

kernel/src/actions/deletion_vector.rs Show resolved Hide resolved

pub(crate) fn treemap_to_bools(treemap: RoaringTreemap) -> Vec<bool> {
fn combine(high_bits: u32, low_bits: u32) -> usize {
((u64::from(high_bits) << 32) | u64::from(low_bits)) as usize
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rescuing #109 (comment) from github oblivion...

this would silently lose information if sizeof(usize) < sizeof(u64); we should add a static assertion to make it fail at compile time instead.

(even if we move it from kernel to engine, the same issue would remain in our default clients)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I've filed #124 since we'd need to add a dependency

kernel/src/scan/mod.rs Show resolved Hide resolved
kernel/src/scan/data_skipping.rs Outdated Show resolved Hide resolved
Copy link
Contributor

@ryan-johnson-databricks ryan-johnson-databricks left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

/// Any type that an engine wants to return as "data" needs to implement this trait. The bulk of the
/// work is in the [`EngineData::extract`] method. See the docs for that method for more details.
/// ```rust
/// # use std::any::Any;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does the # mean?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because cargo doc actually builds the code in the comments to make sure the examples are correct, you have to import stuff. But if you don't want it to show up in the docs because it's not relevant, you can just # it. So it means the example will just show like:
image

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL, thanks!

if metadata.is_none() {
return Ok(Box::new(std::iter::empty()));
pub fn schema(&self) -> DeltaResult<StructType> {
Ok(serde_json::from_str(&self.schema_string)?)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Internal and a single value that won't hog MB/GB memory. Fair.

Comment on lines +104 to +106
return Ok(Box::new(SimpleData::new(RecordBatch::new_empty(
output_schema,
))));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree that's not better.

(I'm still building intuition for when type inference works vs. doesn't, thanks for your patience)

@ryan-johnson-databricks
Copy link
Contributor

I believe this is "mergable" (once conflicts are resolved), and we can work to fully migrate in follow-up PRs.

Conflicts are resolved now, correct?

@nicklan
Copy link
Collaborator Author

nicklan commented Feb 27, 2024

I believe this is "mergable" (once conflicts are resolved), and we can work to fully migrate in follow-up PRs.

Conflicts are resolved now, correct?

Yep, removed from the description. Just waiting for one more review now, I've pinged people

Copy link
Collaborator

@zachschuermann zachschuermann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

awesome stuff nick LGTM I just left a few random nits/questions/comments

use crate::{DeltaResult, Error, FileSystemClient};

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DeletionVectorDescriptor {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want this to be pub? Could it just be pub(crate)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we do want actions to be pub, since we expect external systems might use them. at the very least, all of the action stuff is pub for now, so I'll leave it for a follow-up discussion :)


use super::DeletionVectorDescriptor;

fn dv_relateive() -> DeletionVectorDescriptor {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit

Suggested change
fn dv_relateive() -> DeletionVectorDescriptor {
fn dv_relative() -> DeletionVectorDescriptor {

Ok(Box::new(zipped.flatten().map(Action::Add)))
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Add {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at risk of being repetitive: do we want actions to be pub? I was hoping we could hide more of the implementation details instead of exposing a large API with internal details like standalone did..

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh yea just noticed below - Remove is pub(crate)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's a good question which I'll leave for a later discussion :)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we could hide it behind the visibility crate as well?

I think right now there is a files list method exposed externally, but my hope was anyhow to make that in terms of data...

handler.parse_json(json_strings, output_schema).unwrap()
impl Add {
/// Since we always want to parse multiple adds from data, we return a `Vec<Add>`
pub fn parse_from_data(data: &dyn EngineData) -> DeltaResult<Vec<Add>> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

awesome how these turned out :)

.collect::<Vec<_>>();
println!("{:?}", actions)
impl Remove {
// _try_new_from_data for now, to avoid warning, probably will need at some point
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think we'll want this at some point, so I'm just going to leave it so it's less work then. If it hangs out for a while and we end up not needing it, I will remove it.

@@ -19,6 +20,35 @@ struct LogReplayScanner {
seen: HashSet<(String, Option<String>)>,
}

#[derive(Default)]
struct AddRemoveVisitor {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this just for visiting a mix of add/removes for replay?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, it just handles visiting one or the other so we only have to traverse the data once.

kernel/src/snapshot.rs Outdated Show resolved Hide resolved
@@ -462,14 +438,14 @@ mod tests {
assert!(cp.is_none())
}

#[test]
#[test_log::test]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we have many other tests using test_log?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we probably should, this was just a really useful one for debugging so I left it in. i think we should actually just always have it, which is maybe just a matter of always importing it or something, but again, future work :)

pub mod arrow_conversion;

#[cfg(feature = "simple-client")]
pub mod simple_client;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wow awesome example/implementation of a client

nicklan and others added 2 commits March 4, 2024 12:42
Co-authored-by: Zach Schuermann <zachary.zvs@gmail.com>
@nicklan nicklan merged commit 4adb294 into delta-io:main Mar 4, 2024
3 checks passed
Copy link
Collaborator

@roeap roeap left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very sorry I did not get to it last friday - dropping my pending comments in case someone is interested :).

Ok(adds) => Either::Left(adds.into_iter().map(Ok)),
Err(err) => Either::Right(std::iter::once(Err(err))),
}
}
Err(err) => Either::Right(std::iter::once(Err(err))),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the main reason for this is that rust wants us to return exactly one specific kind of iterator for the impl Iterator<Item = DeltaResult<Add>>, but Vec::Iter and the Once iterator are different things. Eiher helps us to conveniently normalise this. Without collecting the all actions internally, I see no clear path to avoid this. This does not mean there is none though :)

/// The time when this metadata action is created, in milliseconds since the Unix epoch
pub created_time: Option<i64>,
/// Configuration options for the metadata action
pub configuration: HashMap<String, Option<String>>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has been the situation as I found it when first joining with delta-rs, and I remember this bothering me a bunch of times, but I don't recall exactly what the reason was we could not get rid of it.

I'll try to remember, but if we can move to just String, this will allow he consuming code to avoid a bunch of None checks...

if metadata.is_none() {
return Ok(Box::new(std::iter::empty()));
pub fn schema(&self) -> DeltaResult<StructType> {
Ok(serde_json::from_str(&self.schema_string)?)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do remember we discussed some related topic. In general the main issue here I think is that we cannot tell the engine what schema we are expecting which right now is required. IIRC the question was mainly trying to avoid serde_json as a dependency, personally I would vote for accepting as a dependency and doing it as implemented here.

Another place whas trying to parse the _last_cehckpoint file, which may also contain the schema of the checkpoint files - I think. At that time we would also have no way of telling eninge the schema of the data we want. In this case though I belive that using that schema was not eneven doen internally at databricks?

Ok(Box::new(zipped.flatten().map(Action::Add)))
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Add {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we could hide it behind the visibility crate as well?

I think right now there is a files list method exposed externally, but my hope was anyhow to make that in terms of data...

let output_schema = Arc::new(log_schema().clone());
handler.parse_json(json_strings, output_schema).unwrap()
impl Add {
/// Since we always want to parse multiple adds from data, we return a `Vec<Add>`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit this feels more like a comment (//) then a docstring (///)?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Development

Successfully merging this pull request may close these issues.

4 participants