Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 42 additions & 34 deletions parquet/src/bin/parquet-concat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,47 +66,55 @@ impl Args {

let output = File::create(&self.output)?;

let inputs = self
.input
.iter()
.map(|x| {
let reader = File::open(x)?;
let metadata = ParquetMetaDataReader::new().parse_and_finish(&reader)?;
Ok((reader, metadata))
})
.collect::<Result<Vec<_>>>()?;
let schema = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since there are no tests, I think a comment here explaining the rationale for not keeping the files open is probably good

Suggested change
let schema = {
// Check schemas in a first pass to make sure they all match
// and then do the work in a second pass after the validatin
let schema = {

let inputs = self
.input
.iter()
.map(|x| {
let reader = File::open(x)?;
let metadata = ParquetMetaDataReader::new().parse_and_finish(&reader)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still parsing the metadata from all the files into memory before checking the schema

If you are looking to support the large file usecase better, it would require fewer resources (memory) to read the schema from the first file, and then verify the schema from the remaining files one at a time rather than reading the metadata for all files before validating

Ok(metadata)
})
.collect::<Result<Vec<_>>>()?;

let expected = inputs[0].1.file_metadata().schema();
for (_, metadata) in inputs.iter().skip(1) {
let actual = metadata.file_metadata().schema();
if expected != actual {
return Err(ParquetError::General(format!(
"inputs must have the same schema, {expected:#?} vs {actual:#?}"
)));
let expected = inputs[0].file_metadata().schema();
for metadata in inputs.iter().skip(1) {
let actual = metadata.file_metadata().schema();
if expected != actual {
return Err(ParquetError::General(format!(
"inputs must have the same schema, {expected:#?} vs {actual:#?}"
)));
}
}
}

inputs[0].file_metadata().schema_descr().root_schema_ptr()
};
let props = Arc::new(WriterProperties::builder().build());
let schema = inputs[0].1.file_metadata().schema_descr().root_schema_ptr();
let mut writer = SerializedFileWriter::new(output, schema, props)?;

for (input, metadata) in inputs {
for rg in metadata.row_groups() {
let mut rg_out = writer.next_row_group()?;
for column in rg.columns() {
let result = ColumnCloseResult {
bytes_written: column.compressed_size() as _,
rows_written: rg.num_rows() as _,
metadata: column.clone(),
bloom_filter: None,
column_index: None,
offset_index: None,
};
rg_out.append_column(&input, result)?;
self.input
.iter()
.map(|x| {
let input = File::open(x)?;
let metadata = ParquetMetaDataReader::new().parse_and_finish(&input)?;
for rg in metadata.row_groups() {
let mut rg_out = writer.next_row_group()?;
for column in rg.columns() {
let result = ColumnCloseResult {
bytes_written: column.compressed_size() as _,
rows_written: rg.num_rows() as _,
metadata: column.clone(),
bloom_filter: None,
column_index: None,
offset_index: None,
};
rg_out.append_column(&input, result)?;
}
rg_out.close()?;
}
rg_out.close()?;
}
}
Ok(())
})
.collect::<Result<Vec<_>>>()?;

writer.close()?;

Expand Down
Loading