-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Replace file format providers #2
Conversation
dd7f599
to
5893b10
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is an epic PR @rdettai -- I like it a lot 👍 Really nice work; Thank you so much
I made it about half way through reviewing, but ran out of time for today -- I will try and finish this first thing tomorrow. So far it is looking great.
Given what I have seen so far, I think we should get merged into apache#1010 and then get that merged on to master.
@@ -76,10 +76,10 @@ use datafusion::arrow::record_batch::RecordBatch; | |||
async fn main() -> datafusion::error::Result<()> { | |||
// register the table | |||
let mut ctx = ExecutionContext::new(); | |||
ctx.register_csv("example", "tests/example.csv", CsvReadOptions::new())?; | |||
ctx.register_csv("example", "tests/example.csv", CsvReadOptions::new()).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I think this kind of change is inevitable to support remote object stores.
Some(projection), | ||
scan.batch_size as usize, | ||
None, | ||
)?)) | ||
scan.limit.as_ref().map(|sl| sl.limit as usize), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this a drive by bug fix?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My goal here was to serialize the whole object, I wasn't aiming any specific bugfix. I don't think this was a bug earlier in the sense that the final result would be correct because the limit is also ensured by the LimitExec
. But it was surprising indeed that the CsvExec
was different before and after serialization 😄
Ok(Arc::new(ParquetExec::new( | ||
partitions, | ||
Arc::new(LocalFileSystem {}), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is the use of the hard coded LocalFileSystem
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same question here, are we planning to incorporate object_store both in DataFusion and Ballista? or just in Datafusion core for scope limit? I hit on your design doc on how to incorporate object_store into Ballista several days ago, it that the scope of another PR due to a separate registry passing mechanism of object stores?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am also guessing generalizing to support all object stores would be handled as a follow up PR after 1010 gets merged. I think hard coding localfilesystem here is perfectly fine for this refactor because it doesn't break any existing feature.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now, the simplest solution I have to handle ObjectStore
in Ballista is to do exactly the same way as for TableProvider
: define the implementations we support in a predefined enum of the protobuf and serialize/deserialize to/from that. I haven't created that enum for now as we have only 1 variant.
If we want to move away from that, we will need:
- to reference the ExecutionContext in the serde (requires some piping on the Ballista side)
- reorganize the ExecutionContext into tiers as discussed in Expose a static object store registry apache/datafusion#1072 (comment) to avoid confusion
- find a way to dynamically load the same static configs accross all components of Ballista (with the challenge of adding new object stores / table providers at build time)
@@ -272,6 +278,10 @@ impl SchedulerGrpc for SchedulerServer { | |||
&self, | |||
request: Request<GetFileMetadataParams>, | |||
) -> std::result::Result<Response<GetFileMetadataResult>, tonic::Status> { | |||
// TODO support multiple object stores |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this still a TODO?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like it because it's hard coding to localfilesystem below :)
}; | ||
let schema = Arc::new(get_schema(table)); | ||
|
||
let options = ListingOptions { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a cool interface 👍
@@ -31,7 +31,7 @@ async fn query(ctx: &mut ExecutionContext, sql: &str) { | |||
let rt = Runtime::new().unwrap(); | |||
|
|||
// execute the query | |||
let df = ctx.sql(sql).unwrap(); | |||
let df = rt.block_on(ctx.sql(sql)).unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this change mean the benchmark time will now include the planning time (whereas before it did not)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that criterion::black_box
still only wraps the collect call, I think the sql planning shouldn't be inlcluded?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This shouldn't change anything, to my knowledge rt.block_on
is independent of Criterion. Also, sql()
is only blocking for CreateExternalTable
queries, so for normal queries it will actually never yield back to the runtime.
I introduced PartitionedFile -> Single file (for the moment) or part of a file (later, part of the row groups or rows), and we may even extend this to include partition value and partition schema (see below) to support partitioned tables: FilePartition -> The basic unit for parallel processing, each task is responsible for processing one FilePartition which is composed of several PartitionFiles. I think |
Let's merge it into apache#1010 and get this epic happen. Great work @rdettai ! 👍👍 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
truly amazing work @rdettai , ready to go from me 👍 I can't wait to see this land in master. The new list table provider abstraction looks solid in practice :)
Schema schema = 3; | ||
message CsvFormat { | ||
bool has_header = 1; | ||
string delimiter = 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not a blocker and something we can improve after everything gets merged into datafusion master, but just wanted to point out that I think it would be better to use bytes type here since string in rust is more expensive than bytes due to utf8 validation overhead. The overhead is not going to matter for this particular use-case, but just a good practice to keep. It will also simplify the byte_to_string
and string_to_byte
logic you have in the plan ser/de code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point! as it was a string already I didn't even look into it 😄. Personally, what I find rather surprising is that the delimiter is bound to be a single char and can't be an arbitrary string 😅
Ok(Arc::new(ParquetExec::new( | ||
partitions, | ||
Arc::new(LocalFileSystem {}), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am also guessing generalizing to support all object stores would be handled as a follow up PR after 1010 gets merged. I think hard coding localfilesystem here is perfectly fine for this refactor because it doesn't break any existing feature.
@@ -272,6 +278,10 @@ impl SchedulerGrpc for SchedulerServer { | |||
&self, | |||
request: Request<GetFileMetadataParams>, | |||
) -> std::result::Result<Response<GetFileMetadataResult>, tonic::Status> { | |||
// TODO support multiple object stores |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like it because it's hard coding to localfilesystem below :)
@@ -31,7 +31,7 @@ async fn query(ctx: &mut ExecutionContext, sql: &str) { | |||
let rt = Runtime::new().unwrap(); | |||
|
|||
// execute the query | |||
let df = ctx.sql(sql).unwrap(); | |||
let df = rt.block_on(ctx.sql(sql)).unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that criterion::black_box
still only wraps the collect call, I think the sql planning shouldn't be inlcluded?
path: impl Into<String>, | ||
projection: Option<Vec<usize>>, | ||
target_partitions: usize, | ||
table_name: impl Into<String>, | ||
) -> Result<Self> { | ||
let provider = Arc::new(ParquetTable::try_new(path, target_partitions)?); | ||
Self::scan(table_name, provider, projection) | ||
// TODO remove hard coded enable_pruning |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 this is not a breaking change and preserves the current behavior.
Thank you all for reviewing this so quickly! 🚀 |
* [fix] replace file format providers in datafusion * [lint] clippy * [fix] replace file format providers in ballista * [fix] await in python wrapper
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks epic -- really nice work @rdettai 👍
pub fn get_statistics_with_limit( | ||
table_desc: &TableDescriptor, | ||
/// TODO fix case where `num_rows` and `total_byte_size` are not defined (stat should be None instead of Some(0)) | ||
/// TODO move back to crate::datasource::mod.rs once legacy cleaned up |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW this appears to have been done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cleanded up in apache#1010, sorry that!
#[derive(Debug, Clone)] | ||
/// A single file that should be read, along with its schema, statistics | ||
/// and partition column values that need to be appended to each row. | ||
/// TODO move back to crate::datasource::mod.rs once legacy cleaned up |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
again, this is done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cleanded up in apache#1010, sorry that!
|
||
#[derive(Debug, Clone)] | ||
/// A collection of files that should be read in a single task | ||
/// TODO move back to crate::datasource::mod.rs once legacy cleaned up |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cleanded up in apache#1010, sorry that!
))), | ||
}?; | ||
|
||
let options = ListingOptions { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is so cool
|
||
/// CSV file read option | ||
#[derive(Copy, Clone)] | ||
pub struct CsvReadOptions<'a> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if it would be better to put these files into the same modules as the file format that uses them: file_format::avro::Avro
etc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I was kind of hesitant about this! These option struct are really meant to be specific to context.read_xxx
and context.register_xxx
APIs, to serve as documentation and default holders. I will probably do a separate PR to propose something a bit more coherent.
* [fix] replace file format providers in datafusion * [lint] clippy * [fix] replace file format providers in ballista * [fix] await in python wrapper
* # This is a combination of 3 commits. # This is the 1st commit message: Add Display for Expr::BinaryExpr # This is the commit message #2: Update logical_plan/operators tests # This is the commit message #3: rebase and debug display for non binary expr * Add Display for Expr::BinaryExpr Update logical_plan/operators tests rebase and debug display for non binary expr Add Display for Expr::BinaryExpr Update logical_plan/operators tests Updating tests Update aggregate display Updating tests without aggregate More tests Working on agg/scalar functions Fix binary_expr in create_name function and attendant tests More tests More tests Doc tests Rebase and update new tests * Submodule update * Restore submodule references from master Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
* [feat] stubs for provider re-organization * [feat] implement infer_schema to make test pass * [wip] trying to implement pruned_partition_list * [typo] * [fix] replace enum with trait for extensibility * [fix] add partition cols to infered schema * [feat] forked file format executors avro still missing * [doc] comments about why we are flattening * [test] migrated tests to file formats * [test] improve listing test * [feat] add avro to refactored format providers * [fix] remove try from new when unnecessary * [fix] remove try_ from ListingTable new * [refacto] renamed format module to file_format also removed statistics from the PartitionedFile abstraction * [fix] removed Ballista stubs * [fix] rename create_executor * [feat] added store * [fix] Clippy * [test] improve file_format tests with limit * [fix] limit file system read size * [fix] avoid fetching unnecessary stats after limit * [fix] improve readability * [doc] improve comments * [refacto] keep async reader stub * [doc] cleanup comments * [test] test file listing * [fix] add last_modified back * [refacto] simplify csv reader exec * [refacto] change SizedFile back to FileMeta * [doc] comment clarification * [fix] avoid keeping object store as field * [refacto] grouped params to avoid too_many_arguments * [fix] get_by_uri also returns path * [fix] ListingTable at store level instead of registry * [fix] builder take self and not ref to self * Replace file format providers (#2) * [fix] replace file format providers in datafusion * [lint] clippy * [fix] replace file format providers in ballista * [fix] await in python wrapper * [doc] clearer doc about why sql() is async * [doc] typos and clarity * [fix] missing await after rebase
Rationale for this change
As discussed in apache#1010, this PR replaces the old file format datasources with the new one (both table providers and execution plan)
What changes are included in this PR?
These are mostly mechanical changes that come from the replacement of the file format datasources. A few changes are noteworthy:
struct XxxReadOptions<'a>
objects. This PR mimics the current behavior as much as possible, but a separate work should be conducted to make the configuration system more consistent and intuitiveExecutionContext.read_xxx
andExecutionContext.register_xxx
are now async because they might require to infer the schemaget_file_metadata
service in Ballista does not return the list of files anymoreAre there any user-facing changes?
ExecutionContext.read_xxx
andExecutionContext.register_xxx
now asyncWhat changes are left for later?
When possible, the behavior was left to be the same. For this reason, some todos were left:
get_file_metadata
) instead of locallyPartitionedFile
,ParquetPartition
,FilePartition
is not very clear. The structures should need to be simplified a bit.LocalFileSystem
ObjectStore
is hardcoded in Ballista