Skip to content

Commit

Permalink
fix bug: write column metadata to the behind of the column chunk data
Browse files Browse the repository at this point in the history
  • Loading branch information
liukun4515 committed Jun 28, 2022
1 parent 9f7b600 commit 82c5534
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 24 deletions.
31 changes: 18 additions & 13 deletions parquet/src/file/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,24 @@ impl ColumnChunkMetaData {

/// Method to convert to Thrift.
pub fn to_thrift(&self) -> ColumnChunk {
let column_metadata = ColumnMetaData {
let column_metadata = self.to_column_metadata_thrift();

ColumnChunk {
file_path: self.file_path().map(|s| s.to_owned()),
file_offset: self.file_offset,
meta_data: Some(column_metadata),
offset_index_offset: self.offset_index_offset,
offset_index_length: self.offset_index_length,
column_index_offset: self.column_index_offset,
column_index_length: self.column_index_length,
crypto_metadata: None,
encrypted_column_metadata: None,
}
}

/// Method to convert to Thrift `ColumnMetaData`
pub fn to_column_metadata_thrift(&self) -> ColumnMetaData {
ColumnMetaData {
type_: self.column_type.into(),
encodings: self.encodings().iter().map(|&v| v.into()).collect(),
path_in_schema: Vec::from(self.column_path.as_ref()),
Expand All @@ -597,18 +614,6 @@ impl ColumnChunkMetaData {
.as_ref()
.map(|vec| vec.iter().map(page_encoding_stats::to_thrift).collect()),
bloom_filter_offset: self.bloom_filter_offset,
};

ColumnChunk {
file_path: self.file_path().map(|s| s.to_owned()),
file_offset: self.file_offset,
meta_data: Some(column_metadata),
offset_index_offset: self.offset_index_offset,
offset_index_length: self.offset_index_length,
column_index_offset: self.column_index_offset,
column_index_length: self.column_index_length,
crypto_metadata: None,
encrypted_column_metadata: None,
}
}
}
Expand Down
17 changes: 6 additions & 11 deletions parquet/src/file/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,16 +434,6 @@ impl<'a, W: Write> SerializedPageWriter<'a, W> {
}
Ok(self.sink.bytes_written() - start_pos)
}

/// Serializes column chunk into Thrift.
/// Returns Ok() if there are not errors serializing and writing data into the sink.
#[inline]
fn serialize_column_chunk(&mut self, chunk: parquet::ColumnChunk) -> Result<()> {
let mut protocol = TCompactOutputProtocol::new(&mut self.sink);
chunk.write_to_out_protocol(&mut protocol)?;
protocol.flush()?;
Ok(())
}
}

impl<'a, W: Write> PageWriter for SerializedPageWriter<'a, W> {
Expand Down Expand Up @@ -533,7 +523,12 @@ impl<'a, W: Write> PageWriter for SerializedPageWriter<'a, W> {
}

fn write_metadata(&mut self, metadata: &ColumnChunkMetaData) -> Result<()> {
self.serialize_column_chunk(metadata.to_thrift())
let mut protocol = TCompactOutputProtocol::new(&mut self.sink);
metadata
.to_column_metadata_thrift()
.write_to_out_protocol(&mut protocol)?;
protocol.flush()?;
Ok(())
}

fn close(&mut self) -> Result<()> {
Expand Down

0 comments on commit 82c5534

Please sign in to comment.