-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Avro Table Provider #910
Avro Table Provider #910
Conversation
I've added nested field access for pg/generic dialect for things like |
@alamb Thanks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for this contribution @Igosuki. Sorry that took so long. It was quite a bit of code 😅 !
The code is really nicely done, seems very well tested, and was a pleasure to review.
High level things I think we need prior to merging this in:
- Make the new
avro-rs
dependency optional - Commit changes to the arrow-testing repo (or maybe we need a new repo if they decide that having avro files in an arrow repo is not ideal)
- Remove the changes for
GetFieldExpr
(I think we should make that its own PR -- perhaps based off of Added composite identifiers to get field of struct. #628, it just needed someone to push it over the line) - Adding 1 basic end-to-end test to
sql.rs
that reads one of the newly added avro files and does some simple query on it
I didn't look at the ballista changes carefully -- but I skimmed them and they looked reasonable to me
Also, FWIW I think this PR may be related to #811 (but I would say that PR is still in a bit of a WIP so we shouldn't hold this PR up).
.gitmodules
Outdated
@@ -3,4 +3,5 @@ | |||
url = https://github.com/apache/parquet-testing.git | |||
[submodule "testing"] | |||
path = testing | |||
url = https://github.com/apache/arrow-testing | |||
url = https://github.com/Igosuki/arrow-testing.git |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prior to merging this PR we should request/merge the changes into apache/arrow-testing I think
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be good to change back now that the PR is merged.
@@ -1403,6 +1424,9 @@ impl TryInto<protobuf::ScalarFunction> for &BuiltinScalarFunction { | |||
BuiltinScalarFunction::SHA256 => Ok(protobuf::ScalarFunction::Sha256), | |||
BuiltinScalarFunction::SHA384 => Ok(protobuf::ScalarFunction::Sha384), | |||
BuiltinScalarFunction::SHA512 => Ok(protobuf::ScalarFunction::Sha512), | |||
BuiltinScalarFunction::ToTimestampMillis => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the addition of ToTimestampMillis
seems unrelated to this PR, but is a good addition
datafusion/Cargo.toml
Outdated
@@ -69,6 +69,8 @@ regex = { version = "^1.4.3", optional = true } | |||
lazy_static = { version = "^1.4.0", optional = true } | |||
smallvec = { version = "1.6", features = ["union"] } | |||
rand = "0.8" | |||
avro-rs = { version = "0.13", features = ["snappy"] } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As the datafusion codebase already has a non trivial number of depenencies I would personally prefer we did not add additional required ones.
What would you think about making
avro-rs = { version = "0.13", features = ["snappy"] } | |
avro-rs = { version = "0.13", features = ["snappy"], optional=true } |
And then document it as a crate feature
-- e.g. like regex_expressions
(but not a default feature)?
I think most of the rest of the code in this PR could be left the same and only the part that interacts with avro-rs
could be #[cfg(...)]
out
@@ -140,6 +141,7 @@ impl DFSchema { | |||
return Ok(i); | |||
} | |||
} | |||
println!("{}", name); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
perhaps a stray debugging leftover
datafusion/src/logical_plan/expr.rs
Outdated
@@ -244,6 +245,13 @@ pub enum Expr { | |||
IsNull(Box<Expr>), | |||
/// arithmetic negation of an expression, the operand must be of a signed numeric data type | |||
Negative(Box<Expr>), | |||
/// Returns the field of a [`StructArray`] by name | |||
GetField { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This appears to be some part of #628 -- I think it would be cleaner if we revived that PR to get the GetField functionality separately rather than including it in a single large PR
n.into_iter() | ||
.map(|v| { | ||
resolve_string(&v) | ||
// else if matches!( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
perhaps this should return an error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
datafusion/src/physical_plan/avro.rs
Outdated
Ok(Box::pin(AvroStream::new(builder.build(rdr)?, self.limit))) | ||
} else { | ||
Err(DataFusionError::Execution( | ||
"Error reading CSV: Data can only be read a single time when the source is a reader" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Error reading CSV: Data can only be read a single time when the source is a reader" | |
"Error reading AVRO: Data can only be read a single time when the source is a reader" |
datafusion/src/physical_plan/avro.rs
Outdated
Source::Reader(rdr) => { | ||
if partition != 0 { | ||
Err(DataFusionError::Internal( | ||
"Only partition 0 is valid when CSV comes from a reader" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Only partition 0 is valid when CSV comes from a reader" | |
"Only partition 0 is valid when AVRO comes from a reader" |
Vec::new() | ||
} | ||
|
||
fn with_new_children( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code seems to support creating a table backed by multiple avro files (as is supported by the csv and parquet readers), but I don't see a test for that functionality anywhere.
Maybe you could have a test in sql.rs that refered to the same test files twice and ensured we got back two copies of the data
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a test for that in sql.rs
@alamb will address everything soon |
PR in arrow-testing apache/arrow-testing#62 |
I think I handled all your comments, let me know if there's more I can do |
datafusion/Cargo.toml
Outdated
@@ -69,6 +70,8 @@ regex = { version = "^1.4.3", optional = true } | |||
lazy_static = { version = "^1.4.0", optional = true } | |||
smallvec = { version = "1.6", features = ["union"] } | |||
rand = "0.8" | |||
avro-rs = { version = "0.13", features = ["snappy"], optional = true } | |||
num-traits = "0.2" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not seeing num_traits
being used in the code?
NVM, I see it's used in the avro reader code :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like num-traits should be marked as optional and be included as part of avro
feature flag?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
num-traits = "0.2" | |
num-traits = { version = "0.2", optional = true } |
@@ -17,6 +17,7 @@ | |||
|
|||
//! DataFusion data sources | |||
|
|||
pub mod avro; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be gated by feature flag, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, then the feature flags in the avro
module can be removed
@@ -78,7 +78,7 @@ impl NthValue { | |||
} | |||
|
|||
/// Create a new NTH_VALUE window aggregate function | |||
pub fn nth_value( | |||
pub fn value( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe first
, last
and nth
would be better names here, cc @jimexist
datafusion/Cargo.toml
Outdated
@@ -69,6 +70,8 @@ regex = { version = "^1.4.3", optional = true } | |||
lazy_static = { version = "^1.4.0", optional = true } | |||
smallvec = { version = "1.6", features = ["union"] } | |||
rand = "0.8" | |||
avro-rs = { version = "0.13", features = ["snappy"], optional = true } | |||
num-traits = "0.2" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like num-traits should be marked as optional and be included as part of avro
feature flag?
@nevi-me I think having a reader in arrow-rs is definitely something to consider (I can definitely see other projects wanting to read from avro files to Arrow arrays). Given this PR is already fairly large I suggest we get it in as is and if there is interest / appetite to moving the avro reader to arrow-rs we do it as a follow on PR -- I filed apache/arrow-rs#727 to track this idea |
(And welcome back @nevi-me ! ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is looking great @Igosuki - thank you so much!
I think the needed items prior to merging this are:
- Merge Add basic AVRO files (translated copies of the parquet testing files to avro) arrow-testing#62 (review) and update references in this PR
- Get a clean CI run (looks like maybe a few more places need
#[cfg(feature = "avro")]
sprinkled) as well as adding some apache license files
Again, thank you so much
datafusion/tests/sql.rs
Outdated
\n CoalescePartitionsExec\ | ||
\n HashAggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))]\ | ||
\n RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES)\ | ||
\n ExecutionPlan(PlaceHolder)\ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note the fact that this says Placeholder
can be fixed by implementing fmt_as
in the Avro ExecutionPlan
datafusion/tests/sql.rs
Outdated
|
||
#[cfg(feature = "avro")] | ||
#[tokio::test] | ||
async fn avro_explain_analyze() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this particular test adds a lot of test coverage -- the non avro version is to ensure execution metrics are propagated correctly.
|
||
#[cfg(feature = "avro")] | ||
#[tokio::test] | ||
async fn avro_query_multiple_files() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❤️
@@ -105,13 +105,14 @@ jobs: | |||
run: | | |||
export ARROW_TEST_DATA=$(pwd)/testing/data | |||
export PARQUET_TEST_DATA=$(pwd)/parquet-testing/data | |||
# run tests on all workspace members with default feature list | |||
cargo test | |||
# run tests on all workspace members with default feature list + avro |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Looks like I should have run the entire CI pipeline before committing, will fix |
}; | ||
Ok(lit(utf8_val)) | ||
Expr::Literal(ScalarValue::Utf8(Some(utf8_val))) => { | ||
let utf8_val = if utf8_val == "foo" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe this isn't intended to be merged?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jimexist I get a clippy failure on nightly :
error: unnecessary nested `if let` or `match`
--> datafusion/src/logical_plan/expr.rs:1915:21
|
1915 | / if let ScalarValue::Utf8(Some(utf8_val)) = scalar {
1916 | | let utf8_val = if utf8_val == "foo" {
1917 | | "bar".to_string()
1918 | | } else {
... |
1923 | | Ok(Expr::Literal(scalar))
1924 | | }
| |_____________________^
|
= note: `-D clippy::collapsible-match` implied by `-D warnings`
help: the outer pattern can be modified to include the inner pattern
--> datafusion/src/logical_plan/expr.rs:1914:31
|
1914 | Expr::Literal(scalar) => {
| ^^^^^^ replace this binding
1915 | if let ScalarValue::Utf8(Some(utf8_val)) = scalar {
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ with this pattern
= help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#collapsible_match
rebased |
Rebased again after new conflict |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks great. I only reviewed the avro changes in datafusion in detail, so skimmed through ballista.
The feature flag can be simplified before this is merged.
Avro -> Arrow related changes can be addressed when/if moving the code to arrow-rs (which I'd be happy to do)
|
||
//! This module contains utilities to manipulate avro metadata. | ||
|
||
#[cfg(feature = "avro")] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be possible to move this feature flag to datafusion/src/lib.rs
so you don't include it for each module here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this would also be ok to do as a follow on PR
datafusion/src/avro_to_arrow/mod.rs
Outdated
|
||
#[cfg(feature = "avro")] | ||
/// Infer Avro schema given a reader | ||
pub fn infer_avro_schema_from_reader<R: Read + Seek>(reader: &mut R) -> Result<Schema> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: is it inference, or reading the schema from the file/input?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is actually reading the schema from the input and not infering. I went that way initially, because in the Java world, there is such a use case as infering avro schemas from a stream of avro datums.
The use case would be streaming data into arrow from avro, but it's not the case here so we can simply remove Seek and rename this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
name: Option<&str>, | ||
nullable: bool, | ||
) -> Result<Field> { | ||
schema_to_field_with_props(schema, name, nullable, Some(&Default::default())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could pass None
as the props, and then only set the metadata if there are actual properties.
We can adjust this when moving this module to arrow-rs though
AvroSchema::Decimal { | ||
precision, scale, .. | ||
} => DataType::Decimal(*precision, *scale), | ||
AvroSchema::Uuid => DataType::Utf8, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be a binary field, DataType::FixedSizeLen
ideally because you otherwise lose the type information (that you're dealing with an UUID).
Extension arrays will also be helpful in future for this. UUID is common enough across different data sources (SQL, Parquet) that we might want to preserve its properties.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -17,6 +17,7 @@ | |||
|
|||
//! DataFusion data sources | |||
|
|||
pub mod avro; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, then the feature flags in the avro
module can be removed
@@ -236,14 +236,8 @@ pub fn date_trunc(args: &[ColumnarValue]) -> Result<ColumnarValue> { | |||
let f = |x: Option<i64>| x.map(|x| date_trunc_single(granularity, x)).transpose(); | |||
|
|||
Ok(match array { | |||
ColumnarValue::Scalar(scalar) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this change related to the avro addition?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nevi-me This is another clippy issue in nightly
@@ -78,7 +78,7 @@ impl NthValue { | |||
} | |||
|
|||
/// Create a new NTH_VALUE window aggregate function | |||
pub fn nth_value( | |||
pub fn value( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If no decision is made here, we could defer the clippy changes until they land on stable
@@ -570,6 +570,7 @@ pub trait Accumulator: Send + Sync + Debug { | |||
pub mod aggregates; | |||
pub mod analyze; | |||
pub mod array_expressions; | |||
pub mod avro; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should also be feature-flagged so that the flags inside its file can be removed. There's no use compiling the avro module if the flag is disabled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That implies adding the avro feature flag in a number of other places such as execution/context.rs, is that what you are asking for implicitly ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think @nevi-me is suggesting something like
pub mod avro; | |
#[cfg(feature = "avro")] | |
pub mod avro; |
Which would then let you avoid so much #[cfg...]
in #910 (comment)
I am not sure what other changes that entails
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alamb The physical plan is used everywhere in the codebase, so feature gating in the avro submod of physical plan seemed to be the best thing to do
It seems adding (compile time obviously increased by quite a bit)
However, those tests now fail somewhere else (extract), because the arrow test data is not available here:
|
Tests pass on my local environment setting RUST_MIN_STACK_SIZE without changing cargo, I put it in the github workflow to see if it passes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Epic work @Igosuki 👍 Thanks again for sticking with this
Thanks for merging, hope to contribute some more. |
Thanks @Igosuki great contribution! Looking forward to future contributions 😁 |
AvroSchema::Enum { symbols, name, .. } => { | ||
return Ok(Field::new_dict( | ||
&name.fullname(None), | ||
index_type(symbols.len()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for being late to the party. While porting this to arrow2 I noticed it: I think we want here
DataType::Dictionary(Box::new(index_type(symbols.len())), Box::new(DataType::Utf8))
Field::new_dict
does not create a new dict datatype
, it is only used to set the dict_id
, order, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jorgecarleitao thanks for pointing that out, I should fix it
Which issue does this PR close?
Closes #903.
Rationale for this change
Enables loading avro data files through datafusion.
What changes are included in this PR?
Avro is added as a table provider and a supported file format.
Avro schemas can be translated into arrow schemas.
Are there any user-facing changes?
Yes, as one can now call register_avro on df and 'STORED AS AVRO' in sqsl.
N.B.:
Missing :
I find there is duplication between modules with these additions, I should probably do some refactoring.