diff --git a/parquet/src/bin/parquet-concat.rs b/parquet/src/bin/parquet-concat.rs index e8ce4ca1dbed..eaed0d995220 100644 --- a/parquet/src/bin/parquet-concat.rs +++ b/parquet/src/bin/parquet-concat.rs @@ -66,47 +66,55 @@ impl Args { let output = File::create(&self.output)?; - let inputs = self - .input - .iter() - .map(|x| { - let reader = File::open(x)?; - let metadata = ParquetMetaDataReader::new().parse_and_finish(&reader)?; - Ok((reader, metadata)) - }) - .collect::>>()?; + let schema = { + let inputs = self + .input + .iter() + .map(|x| { + let reader = File::open(x)?; + let metadata = ParquetMetaDataReader::new().parse_and_finish(&reader)?; + Ok(metadata) + }) + .collect::>>()?; - let expected = inputs[0].1.file_metadata().schema(); - for (_, metadata) in inputs.iter().skip(1) { - let actual = metadata.file_metadata().schema(); - if expected != actual { - return Err(ParquetError::General(format!( - "inputs must have the same schema, {expected:#?} vs {actual:#?}" - ))); + let expected = inputs[0].file_metadata().schema(); + for metadata in inputs.iter().skip(1) { + let actual = metadata.file_metadata().schema(); + if expected != actual { + return Err(ParquetError::General(format!( + "inputs must have the same schema, {expected:#?} vs {actual:#?}" + ))); + } } - } + inputs[0].file_metadata().schema_descr().root_schema_ptr() + }; let props = Arc::new(WriterProperties::builder().build()); - let schema = inputs[0].1.file_metadata().schema_descr().root_schema_ptr(); let mut writer = SerializedFileWriter::new(output, schema, props)?; - for (input, metadata) in inputs { - for rg in metadata.row_groups() { - let mut rg_out = writer.next_row_group()?; - for column in rg.columns() { - let result = ColumnCloseResult { - bytes_written: column.compressed_size() as _, - rows_written: rg.num_rows() as _, - metadata: column.clone(), - bloom_filter: None, - column_index: None, - offset_index: None, - }; - rg_out.append_column(&input, result)?; + self.input + .iter() + .map(|x| { + let input = File::open(x)?; + let metadata = ParquetMetaDataReader::new().parse_and_finish(&input)?; + for rg in metadata.row_groups() { + let mut rg_out = writer.next_row_group()?; + for column in rg.columns() { + let result = ColumnCloseResult { + bytes_written: column.compressed_size() as _, + rows_written: rg.num_rows() as _, + metadata: column.clone(), + bloom_filter: None, + column_index: None, + offset_index: None, + }; + rg_out.append_column(&input, result)?; + } + rg_out.close()?; } - rg_out.close()?; - } - } + Ok(()) + }) + .collect::>>()?; writer.close()?;