Skip to content

Commit

Permalink
Support to read/write customized metadata in ipc files (#4003)
Browse files Browse the repository at this point in the history
Test Plan: Pass CI
  • Loading branch information
framlog authored Apr 3, 2023
1 parent 877a3a0 commit 5a63a63
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 29 deletions.
52 changes: 23 additions & 29 deletions arrow-ipc/src/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use flatbuffers::{
use std::collections::HashMap;
use std::sync::Arc;

use crate::{size_prefixed_root_as_message, CONTINUATION_MARKER};
use crate::{size_prefixed_root_as_message, KeyValue, CONTINUATION_MARKER};
use DataType::*;

/// Serialize a schema in IPC format
Expand All @@ -38,6 +38,25 @@ pub fn schema_to_fb(schema: &Schema) -> FlatBufferBuilder {
fbb
}

pub fn metadata_to_fb<'a>(
fbb: &mut FlatBufferBuilder<'a>,
metadata: &HashMap<String, String>,
) -> WIPOffset<Vector<'a, ForwardsUOffset<KeyValue<'a>>>> {
let custom_metadata = metadata
.iter()
.map(|(k, v)| {
let fb_key_name = fbb.create_string(k);
let fb_val_name = fbb.create_string(v);

let mut kv_builder = crate::KeyValueBuilder::new(fbb);
kv_builder.add_key(fb_key_name);
kv_builder.add_value(fb_val_name);
kv_builder.finish()
})
.collect::<Vec<_>>();
fbb.create_vector(&custom_metadata)
}

pub fn schema_to_fb_offset<'a>(
fbb: &mut FlatBufferBuilder<'a>,
schema: &Schema,
Expand All @@ -49,24 +68,8 @@ pub fn schema_to_fb_offset<'a>(
.collect::<Vec<_>>();
let fb_field_list = fbb.create_vector(&fields);

let fb_metadata_list = if !schema.metadata().is_empty() {
let custom_metadata = schema
.metadata()
.iter()
.map(|(k, v)| {
let fb_key_name = fbb.create_string(k);
let fb_val_name = fbb.create_string(v);

let mut kv_builder = crate::KeyValueBuilder::new(fbb);
kv_builder.add_key(fb_key_name);
kv_builder.add_value(fb_val_name);
kv_builder.finish()
})
.collect::<Vec<_>>();
Some(fbb.create_vector(&custom_metadata))
} else {
None
};
let fb_metadata_list =
(!schema.metadata().is_empty()).then(|| metadata_to_fb(fbb, schema.metadata()));

let mut builder = crate::SchemaBuilder::new(fbb);
builder.add_fields(fb_field_list);
Expand Down Expand Up @@ -440,16 +443,7 @@ pub(crate) fn build_field<'a>(
// Optional custom metadata.
let mut fb_metadata = None;
if !field.metadata().is_empty() {
let mut kv_vec = vec![];
for (k, v) in field.metadata() {
let kv_args = crate::KeyValueArgs {
key: Some(fbb.create_string(k.as_str())),
value: Some(fbb.create_string(v.as_str())),
};
let kv_offset = crate::KeyValue::create(fbb, &kv_args);
kv_vec.push(kv_offset);
}
fb_metadata = Some(fbb.create_vector(&kv_vec));
fb_metadata = Some(metadata_to_fb(fbb, field.metadata()));
};

let fb_field_name = fbb.create_string(field.name().as_str());
Expand Down
38 changes: 38 additions & 0 deletions arrow-ipc/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -792,6 +792,9 @@ pub struct FileReader<R: Read + Seek> {
/// Metadata version
metadata_version: crate::MetadataVersion,

/// User defined metadata
custom_metadata: HashMap<String, String>,

/// Optional projection and projected_schema
projection: Option<(Vec<usize>, Schema)>,
}
Expand Down Expand Up @@ -862,6 +865,16 @@ impl<R: Read + Seek> FileReader<R> {
let ipc_schema = footer.schema().unwrap();
let schema = crate::convert::fb_to_schema(ipc_schema);

let mut custom_metadata = HashMap::new();
if let Some(fb_custom_metadata) = footer.custom_metadata() {
for kv in fb_custom_metadata.into_iter() {
custom_metadata.insert(
kv.key().unwrap().to_string(),
kv.value().unwrap().to_string(),
);
}
}

// Create an array of optional dictionary value arrays, one per field.
let mut dictionaries_by_id = HashMap::new();
if let Some(dictionaries) = footer.dictionaries() {
Expand Down Expand Up @@ -926,10 +939,16 @@ impl<R: Read + Seek> FileReader<R> {
total_blocks,
dictionaries_by_id,
metadata_version: footer.version(),
custom_metadata,
projection,
})
}

/// Return user defined customized metadata
pub fn custom_metadata(&self) -> &HashMap<String, String> {
&self.custom_metadata
}

/// Return the number of batches in the file
pub fn num_batches(&self) -> usize {
self.total_blocks
Expand Down Expand Up @@ -1522,6 +1541,25 @@ mod tests {
reader.next().unwrap().unwrap()
}

#[test]
fn test_roundtrip_with_custom_metadata() {
let schema = Schema::new(vec![Field::new("dummy", DataType::Float64, false)]);
let mut buf = Vec::new();
let mut writer = crate::writer::FileWriter::try_new(&mut buf, &schema).unwrap();
let mut test_metadata = HashMap::new();
test_metadata.insert("abc".to_string(), "abc".to_string());
test_metadata.insert("def".to_string(), "def".to_string());
for (k, v) in &test_metadata {
writer.write_metadata(k, v);
}
writer.finish().unwrap();
drop(writer);

let reader =
crate::reader::FileReader::try_new(std::io::Cursor::new(buf), None).unwrap();
assert_eq!(reader.custom_metadata(), &test_metadata);
}

#[test]
fn test_roundtrip_nested_dict() {
let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
Expand Down
12 changes: 12 additions & 0 deletions arrow-ipc/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,8 @@ pub struct FileWriter<W: Write> {
finished: bool,
/// Keeps track of dictionaries that have been written
dictionary_tracker: DictionaryTracker,
/// User level customized metadata
custom_metadata: HashMap<String, String>,

data_gen: IpcDataGenerator,
}
Expand Down Expand Up @@ -742,10 +744,15 @@ impl<W: Write> FileWriter<W> {
record_blocks: vec![],
finished: false,
dictionary_tracker: DictionaryTracker::new(true),
custom_metadata: HashMap::new(),
data_gen,
})
}

pub fn write_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
self.custom_metadata.insert(key.into(), value.into());
}

/// Write a record batch to the file
pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
if self.finished {
Expand Down Expand Up @@ -798,13 +805,18 @@ impl<W: Write> FileWriter<W> {
let dictionaries = fbb.create_vector(&self.dictionary_blocks);
let record_batches = fbb.create_vector(&self.record_blocks);
let schema = crate::convert::schema_to_fb_offset(&mut fbb, &self.schema);
let fb_custom_metadata = (!self.custom_metadata.is_empty())
.then(|| crate::convert::metadata_to_fb(&mut fbb, &self.custom_metadata));

let root = {
let mut footer_builder = crate::FooterBuilder::new(&mut fbb);
footer_builder.add_version(self.write_options.metadata_version);
footer_builder.add_schema(schema);
footer_builder.add_dictionaries(dictionaries);
footer_builder.add_recordBatches(record_batches);
if let Some(fb_custom_metadata) = fb_custom_metadata {
footer_builder.add_custom_metadata(fb_custom_metadata);
}
footer_builder.finish()
};
fbb.finish(root, None);
Expand Down

0 comments on commit 5a63a63

Please sign in to comment.