Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(batch): support file scan a directory of parquet files #17811

Merged
merged 1 commit into from
Jul 25, 2024

Conversation

chenzl25
Copy link
Contributor

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.

Example:

-- function signature file_scan(file_format, storage_type, s3_region, s3_access_key, s3_secret_key, file_directory)
select * from file_scan(
  'parquet',
  's3',
  'ap-southeast-2',
  'xxxxxxxxxx',
  'yyyyyyyy',
  's3://your-bucket/path/to/directory/'
);

@chenzl25 chenzl25 added the user-facing-changes Contains changes that are visible to users label Jul 25, 2024
@tabVersion tabVersion requested a review from wcy-fdu July 25, 2024 06:50
Copy link
Member

@fuyufjh fuyufjh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally LGTM

Comment on lines +127 to +130
Err(Error::new(
ErrorKind::DataInvalid,
format!("Invalid s3 url: {}, should start with {}", dir, prefix),
))?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why it's not an assertion? Because the code says the bucket was extracted from dir

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we need to ensure the URL prefix is s3:// instead of something else like http://. The bucket is the URL's host and can't guarantee the protocol.

})?;

if let Some(files) = files {
// if the file location is a directory, we need to remove the last argument and add all files in the directory as arguments
Copy link
Member

@fuyufjh fuyufjh Jul 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bit dirty to list files in binder, but acceptable to me

Copy link
Contributor

@tabVersion tabVersion left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rest LGTM

complete_stages.insert(
stage.id,
Arc::new(stage.clone_with_exchange_info(exchange_info, Some(1))),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we set the parallelism to 1 previously?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it only supports reading one file in file_scan() previously.

@@ -61,7 +61,7 @@ impl Executor for S3FileScanExecutor {
impl S3FileScanExecutor {
pub fn new(
file_format: FileFormat,
location: String,
file_location: Vec<String>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont see an example of how to set multiple file_locations

Comment on lines +65 to +66
// The rest of the arguments are file locations
let file_location = eval_args[5..].iter().cloned().collect_vec();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tabVersion We set file_location here.

@chenzl25 chenzl25 added this pull request to the merge queue Jul 25, 2024
Merged via the queue into main with commit 9be6c76 Jul 25, 2024
35 of 37 checks passed
@chenzl25 chenzl25 deleted the dylan/support_file_scan_a_directory branch July 25, 2024 11:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/feature user-facing-changes Contains changes that are visible to users
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants