Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend insert into support to include Json backed tables #7212

Merged
merged 6 commits into from
Aug 8, 2023

Conversation

devinjdangelo
Copy link
Contributor

Which issue does this PR close?

None, but progresses towards the goals of #5076 and #7079. Follow on to #7141.

Rationale for this change

Adds support for insert into <table> for tables which are backed by Json files.

What changes are included in this PR?

  • Implements JsonSink in similar fashion to CsvSink
  • Minor refactor of CsvSink to support code reuse with JsonSink
  • Generalized existing tests of insert into to be easily extensible for additional FileFormats and options
  • Added test coverage for appending to existing Json files and appending new Json files to a ListingTable
  • Added checks and throw error if attempting insert to sorted or compressed table since not implemented yet

Are these changes tested?

Yes

Are there any user-facing changes?

Insert into Json table will work now.

@github-actions github-actions bot added the core Core DataFusion crate label Aug 6, 2023
Copy link
Contributor

@metesynnada metesynnada left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks good. I appreciate your hard work. However, I have made some comments regarding the changes.

/// Serialization is assumed to be stateless, i.e.
/// each RecordBatch can be serialized without any
/// dependency on the RecordBatches before or after.
async fn stateless_serialize_and_write_files(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense to consolidate these into a unified approach.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree -- and I think that will mean when we parallelize the logic more all the writers will benefit

devinjdangelo and others added 2 commits August 7, 2023 06:45
Co-authored-by: Metehan Yıldırım <100111937+metesynnada@users.noreply.github.com>
@@ -608,17 +592,17 @@ impl DataSink for CsvSink {
))
}
FileWriterMode::PutMultipart => {
//currently assuming only 1 partition path (i.e. not hive style partitioning on a column)
// Currently assuming only 1 partition path (i.e. not hive-style partitioning on a column)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb If this is OK for you, overall LGTM.

@alamb
Copy link
Contributor

alamb commented Aug 7, 2023

Thank you -- I quickly skimmed this PR and it looks great @devinjdangelo -- thank you for the review @devinjdangelo . I will take a closer look tomorrow morning.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code looks great to me -- thank you @devinjdangelo and @metesynnada for the review. I tried it out locally and it was 👌 very nice.

It is somewhat awkward at the moment to use this feature as you can't create new tables, only append to existing:

$ mkdir /tmp/my_table
❯ create external table my_table(x int, y int) stored as JSON location '/tmp/my_table';
0 rows in set. Query took 0.002 seconds.

❯ insert into my_table values (1,2), (3, 4);
Error during planning: Cannot append 1 partitions to 0 files!

I filed #7228 to track improving this

/// Serialization is assumed to be stateless, i.e.
/// each RecordBatch can be serialized without any
/// dependency on the RecordBatches before or after.
async fn stateless_serialize_and_write_files(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree -- and I think that will mean when we parallelize the logic more all the writers will benefit

.map_err(|e| DataFusionError::Internal(e.to_string()))?;

// Read the records in the table
let batches = session_ctx.sql("select * from t").await?.collect().await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@alamb
Copy link
Contributor

alamb commented Aug 8, 2023

Again, thanks again !

@alamb alamb merged commit 3d917a0 into apache:main Aug 8, 2023
@alamb
Copy link
Contributor

alamb commented Aug 8, 2023

Here is a small follow on to reduce some duplication #7229

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants