Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for appending data to external tables - CSV #6526

Merged
merged 44 commits into from
Jun 6, 2023
Merged

Add support for appending data to external tables - CSV #6526

merged 44 commits into from
Jun 6, 2023

Conversation

mustafasrepo
Copy link
Contributor

Which issue does this PR close?

Closes #

Rationale for this change

This PR adds the support for the following SQL queries:

CREATE EXTERNAL TABLE source_table (
    a1  VARCHAR NOT NULL,
    a2  INT NOT NULL
)
STORED AS CSV
WITH HEADER ROW
OPTIONS ('UNBOUNDED' 'TRUE')
LOCATION '{source}';

CREATE EXTERNAL TABLE sink_table (
    a1  VARCHAR NOT NULL,
    a2  INT NOT NULL
)
STORED AS CSV
WITH HEADER ROW
OPTIONS ('UNBOUNDED' 'TRUE')
LOCATION '{sink}';

INSERT INTO sink_table
SELECT a1, a2 FROM source_table;

This PR adds support for appending data to external tables, which previously only supported memory tables. It introduces new structs and modifications to existing structs, enabling users to efficiently work with file-based storage systems when appending data.

What changes are included in this PR?

  • Added FileSinkConfig struct for base configurations when creating a physical plan for any given file format.
  • Added FileWriterExt and to handle writing record batches to a file-like output.
  • Added CsvSink struct for which implements DataSink to write results to CSV file.

Are these changes tested?

Yes

Are there any user-facing changes?

This change allows users to append data to external tables, which was not possible before. Users can now work with file-based storage systems more efficiently, especially when appending data.

metesynnada and others added 30 commits April 14, 2023 10:51
# Please enter a commit message to explain why this merge is necessary,
# especially if it merges an updated upstream into a topic branch.
#
# Lines starting with '#' will be ignored, and an empty message aborts
# the commit.
# Conflicts:
#	datafusion/core/src/datasource/file_format/file_type.rs
#	datafusion/core/src/physical_plan/file_format/mod.rs
# Conflicts:
#	datafusion/core/src/datasource/listing/table.rs
#	datafusion/core/src/physical_plan/file_format/csv.rs
# Please enter a commit message to explain why this merge is necessary,
# especially if it merges an updated upstream into a topic branch.
#
# Lines starting with '#' will be ignored, and an empty message aborts
# the commit.
@ozankabak
Copy link
Contributor

I have been looking forward to this for a while. @metesynnada did a great job starting this work and @mustafasrepo took it to the finish line!

@alamb alamb added the api change Changes the API exposed to users of the crate label Jun 4, 2023
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @mustafasrepo and @metesynnada and @ozankabak -- this is really nice

I went though it carefully and while I had some small suggestions I also think this could be merged as is and iterate on it from there.

Prior to review, I did not appreciate this PR lays the foundation for streaming (multi-part) writes to object store ❤️

I think the FileSInkConfig is an excellent idea and follows the existing FileScanConfig pattern 👍 It also seems able to extend nicely to writing multiple file partitions (eventually) which is excellent

Here is a list of potential follow on work (some/all of which I would like to help with) -- If you agree, I can file ticket and help with this too as we would love to have streaming write support in IOx as well.

  • End to end sql level tests in sqllogictests (I can do this)
  • Tests for streaming writes
  • Tests for abort behavior (making sure all writers are canceled and the correct error is returned)
  • Implement something similar for JSON and parquet.
  • Documentation / note that we support streaming multi-part writes

datafusion/core/src/datasource/listing/table.rs Outdated Show resolved Hide resolved
datafusion/core/src/datasource/listing/table.rs Outdated Show resolved Hide resolved
datafusion/core/src/physical_plan/file_format/mod.rs Outdated Show resolved Hide resolved
datafusion/core/src/datasource/file_format/mod.rs Outdated Show resolved Hide resolved
&mut self,
cx: &mut Context<'_>,
) -> Poll<std::result::Result<(), Error>> {
loop {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My reading of https://docs.rs/tokio/1.28.2/tokio/io/trait.AsyncWrite.html#tymethod.poll_shutdown implies that by writing on shutdown it means as written this writer will buffer everything before starting a write.

This is probably fine for the initial version, but it is probably something to improving in the future.

datafusion/core/src/physical_plan/file_format/csv.rs Outdated Show resolved Hide resolved
datafusion/core/src/datasource/file_format/csv.rs Outdated Show resolved Hide resolved
/// `AsyncPutWriter` is an object that facilitates asynchronous writing to object stores.
/// It is specifically designed for the `object_store` crate's `put` method and sends
/// whole bytes at once when the buffer is flushed.
pub struct AsyncPutWriter {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tustvold and/or @crepererum (my go to people for rust async / stream expertise) I wonder if you have some time to review this code that adapts DataFusion to use the object_store put multi part features to do streaming writes

Pretty exciting!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused to see this here, we already provide AsynWrite from ObjectStore, so I'm not sure why we are re-implementing the buffering here? Am i missing something?

Copy link
Contributor

@metesynnada metesynnada Jun 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We hypothesize that the consistent use of put_multipart for every put operation might adversely impact the cloud side, as it anticipates files exceeding a specific size (for example, 5MB for AWS). To mitigate this, we've developed a wrapper for the put operation that standardizes the write operation on AsyncWrite. Do you have a suggestion to simplify here?

Copy link
Contributor

@tustvold tustvold Jun 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yes 100% put_multipart is overkill for most use-cases, my suggestion was to type-erase at the level of the BatchSerializer and then have different impls for the different write modes. The async Read + abort interface feels a tad over complicated, at least imo.

@ozankabak
Copy link
Contributor

Here is a list of potential follow on work (some/all of which I would like to help with) -- If you agree, I can file ticket and help with this too as we would love to have streaming write support in IOx as well.

This sounds great! I checked in the comment improvements per your suggestions, @metesynnada and/or @mustafasrepo will shortly go through your other points. Thanks for the review.

@github-actions github-actions bot added the sqllogictest SQL Logic Tests (.slt) label Jun 5, 2023
Copy link
Contributor

@tustvold tustvold left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had a quick review, I think this is probably fine, but type-erasing the writer mode seems a little peculiar to me.

This is because each of the methods has rather different characteristics, and imo warrants writing in a different manner.

Put

The write is completely synchronous (it is writing to memory) and is then atomically flushed, with no need for abort behaviour or async write. All file formats can support this mode

Put Multipart

The write is async with a final atomic close. Requires custom abort logic. All file formats can support this mode

Append

Abort is fatal (not even entirely sure how to surface this), only supported by row oriented file formats. Even then requires custom handling for things like CSV headers, etc...

Proposal

I guess my proposal would be to simply add a match block within DataSink for each of the various FileWriterMode. Over time I expect we will be able to extract common logic for each of the variants, e.g. a generic Put version using a RecordBatchWriter, etc... but I'm not sure that trying to unify all the writer modes is a good abstraction and certainly at this stage where we only have one impl seems a touch premature perhaps?


impl<W: AsyncWrite + Unpin + Send> FileWriterExt for AsyncPut<W> {}

/// An extension trait for `AsyncWrite` types that adds an `abort_writer` method.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about of instead of using a trait to add abort, just having a struct like

struct AbortableWrite<W> {
    write: W,
    abort: Option<Box<dyn FnOnce() -> BoxFuture<'static, Result<()>>>>
}

datafusion/core/src/physical_plan/file_format/csv.rs Outdated Show resolved Hide resolved
datafusion/core/src/datasource/file_format/mod.rs Outdated Show resolved Hide resolved
@mustafasrepo
Copy link
Contributor Author

Had a quick review, I think this is probably fine, but type-erasing the writer mode seems a little peculiar to me.

This is because each of the methods has rather different characteristics, and imo warrants writing in a different manner.

Put

The write is completely synchronous (it is writing to memory) and is then atomically flushed, with no need for abort behaviour or async write. All file formats can support this mode

Put Multipart

The write is async with a final atomic close. Requires custom abort logic. All file formats can support this mode

Append

Abort is fatal (not even entirely sure how to surface this), only supported by row oriented file formats. Even then requires custom handling for things like CSV headers, etc...

Proposal

I guess my proposal would be to simply add a match block within DataSink for each of the various FileWriterMode. Over time I expect we will be able to extract common logic for each of the variants, e.g. a generic Put version using a RecordBatchWriter, etc... but I'm not sure that trying to unify all the writer modes is a good abstraction and certainly at this stage where we only have one impl seems a touch premature perhaps?

You proposal makes sense to me. I have removed FileWriterFactory. Now writer is created in the CsvSink. I in the future need for trait arises we can do so.

Copy link
Contributor

@tustvold tustvold left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, I would have gone further and removed FileWriterExt in favour of just a type-erased BatchSerializer, but we can always continue to iterate on this design

@mustafasrepo
Copy link
Contributor Author

Thank you, I would have gone further and removed FileWriterExt in favour of just a type-erased BatchSerializer, but we can always continue to iterate on this design

I have removed FileWriterExt. We now use AbortableWrite<W>. This increases the code reuse. Thanks for the suggestion.

/// A wrapper around an `AsyncWrite` type that provides append functionality.
pub struct AsyncAppend<W: AsyncWrite + Unpin + Send> {
/// A wrapper struct with abort method and writer
struct AbortableWrite<W: AsyncWrite + Unpin + Send> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I think this construction makes the intent of the code much clearer. Thank you

# Conflicts:
#	datafusion/core/src/physical_plan/file_format/mod.rs
@alamb
Copy link
Contributor

alamb commented Jun 6, 2023

I am going to merge this PR so we can continue work on main. I am really excited to see this progress. Thank you @mustafasrepo @metesynnada @ozankabak and @tustvold

@alamb alamb merged commit 36292f6 into apache:main Jun 6, 2023
@alamb alamb mentioned this pull request Jun 6, 2023
38 tasks
@alamb
Copy link
Contributor

alamb commented Jun 6, 2023

I filed #6569 to start tracking next steps for the streaming write API support

///
/// Prints in the format:
/// ```text
/// [file1, file2,...]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mustafasrepo could you comment why this doesn't match the implementation? I don't see the trailing "..." there. FileGroupsDisplay uses 5 as the largest number or groups shown. Probably we should also limit this list to 5? Maybe having an explicit constant could be useful?
I found this conflicting with my attempt to implement #6383

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, I think functionality should match document, and display ... after a limit.
I think during refactor, I have lost this functionality. But I am not sure. We can add a test for this use case, to not lose functionality in the future.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will you have time to do this or I should try?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I won't be able to look into this 2-3 days. In the mean time, if you want to do this, you can do so.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I'll try to find a couple of hours over the weekend, will ping you if there will be something to share

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR created - #6637

@mustafasrepo mustafasrepo deleted the feature/listing_table_insert_into_support2 branch June 13, 2023 05:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api change Changes the API exposed to users of the crate core Core DataFusion crate sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants