Skip to content

Commit

Permalink
feat: enable metadata import/export through C data interface (#3944)
Browse files Browse the repository at this point in the history
* feat: enable metadata export through C data interface

* chore: clippy warnings

* Update arrow-schema/src/ffi.rs

Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com>

* make parsing more defensive.

* use IntoIterator

* handle integer overflow

---------

Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com>
  • Loading branch information
wjones127 and tustvold authored Mar 29, 2023
1 parent 6c13dd7 commit e919e99
Show file tree
Hide file tree
Showing 3 changed files with 195 additions and 7 deletions.
6 changes: 6 additions & 0 deletions arrow-pyarrow-integration-testing/tests/test_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,12 @@ def test_field_roundtrip(pyarrow_type):
field = rust.round_trip_field(pyarrow_field)
assert field == pyarrow_field

def test_field_metadata_roundtrip():
metadata = {"hello": "World! 😊", "x": "2"}
pyarrow_field = pa.field("test", pa.int32(), metadata=metadata)
field = rust.round_trip_field(pyarrow_field)
assert field == pyarrow_field
assert field.metadata == pyarrow_field.metadata

def test_schema_roundtrip():
pyarrow_fields = zip(string.ascii_lowercase, _supported_pyarrow_types)
Expand Down
169 changes: 163 additions & 6 deletions arrow-schema/src/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@
use crate::{ArrowError, DataType, Field, Schema, TimeUnit, UnionMode};
use bitflags::bitflags;
use std::ffi::{c_char, c_void, CStr, CString};
use std::{
collections::HashMap,
ffi::{c_char, c_void, CStr, CString},
};

bitflags! {
pub struct Flags: i64 {
Expand Down Expand Up @@ -74,6 +77,7 @@ pub struct FFI_ArrowSchema {
struct SchemaPrivateData {
children: Box<[*mut FFI_ArrowSchema]>,
dictionary: *mut FFI_ArrowSchema,
metadata: Option<Vec<u8>>,
}

// callback used to drop [FFI_ArrowSchema] when it is exported.
Expand Down Expand Up @@ -130,6 +134,7 @@ impl FFI_ArrowSchema {
let mut private_data = Box::new(SchemaPrivateData {
children: children_ptr,
dictionary: dictionary_ptr,
metadata: None,
});

// intentionally set from private_data (see https://github.com/apache/arrow-rs/issues/580)
Expand All @@ -152,6 +157,63 @@ impl FFI_ArrowSchema {
Ok(self)
}

pub fn with_metadata<I, S>(mut self, metadata: I) -> Result<Self, ArrowError>
where
I: IntoIterator<Item = (S, S)>,
S: AsRef<str>,
{
let metadata: Vec<(S, S)> = metadata.into_iter().collect();
// https://arrow.apache.org/docs/format/CDataInterface.html#c.ArrowSchema.metadata
let new_metadata = if !metadata.is_empty() {
let mut metadata_serialized: Vec<u8> = Vec::new();
let num_entries: i32 = metadata.len().try_into().map_err(|_| {
ArrowError::CDataInterface(format!(
"metadata can only have {} entries, but {} were provided",
i32::MAX,
metadata.len()
))
})?;
metadata_serialized.extend(num_entries.to_ne_bytes());

for (key, value) in metadata.into_iter() {
let key_len: i32 = key.as_ref().len().try_into().map_err(|_| {
ArrowError::CDataInterface(format!(
"metadata key can only have {} bytes, but {} were provided",
i32::MAX,
key.as_ref().len()
))
})?;
let value_len: i32 = value.as_ref().len().try_into().map_err(|_| {
ArrowError::CDataInterface(format!(
"metadata value can only have {} bytes, but {} were provided",
i32::MAX,
value.as_ref().len()
))
})?;

metadata_serialized.extend(key_len.to_ne_bytes());
metadata_serialized.extend_from_slice(key.as_ref().as_bytes());
metadata_serialized.extend(value_len.to_ne_bytes());
metadata_serialized.extend_from_slice(value.as_ref().as_bytes());
}

self.metadata = metadata_serialized.as_ptr() as *const c_char;
Some(metadata_serialized)
} else {
self.metadata = std::ptr::null_mut();
None
};

unsafe {
let mut private_data =
Box::from_raw(self.private_data as *mut SchemaPrivateData);
private_data.metadata = new_metadata;
self.private_data = Box::into_raw(private_data) as *mut c_void;
}

Ok(self)
}

pub fn empty() -> Self {
Self {
format: std::ptr::null_mut(),
Expand Down Expand Up @@ -212,6 +274,71 @@ impl FFI_ArrowSchema {
pub fn dictionary_ordered(&self) -> bool {
self.flags & 0b00000001 != 0
}

pub fn metadata(&self) -> Result<HashMap<String, String>, ArrowError> {
if self.metadata.is_null() {
Ok(HashMap::new())
} else {
let mut pos = 0;
let buffer: *const u8 = self.metadata as *const u8;

fn next_four_bytes(buffer: *const u8, pos: &mut isize) -> [u8; 4] {
let out = unsafe {
[
*buffer.offset(*pos),
*buffer.offset(*pos + 1),
*buffer.offset(*pos + 2),
*buffer.offset(*pos + 3),
]
};
*pos += 4;
out
}

fn next_n_bytes(buffer: *const u8, pos: &mut isize, n: i32) -> &[u8] {
let out = unsafe {
std::slice::from_raw_parts(buffer.offset(*pos), n.try_into().unwrap())
};
*pos += isize::try_from(n).unwrap();
out
}

let num_entries = i32::from_ne_bytes(next_four_bytes(buffer, &mut pos));
if num_entries < 0 {
return Err(ArrowError::CDataInterface(
"Negative number of metadata entries".to_string(),
));
}

let mut metadata = HashMap::with_capacity(
num_entries.try_into().expect("Too many metadata entries"),
);

for _ in 0..num_entries {
let key_length = i32::from_ne_bytes(next_four_bytes(buffer, &mut pos));
if key_length < 0 {
return Err(ArrowError::CDataInterface(
"Negative key length in metadata".to_string(),
));
}
let key = String::from_utf8(
next_n_bytes(buffer, &mut pos, key_length).to_vec(),
)?;
let value_length = i32::from_ne_bytes(next_four_bytes(buffer, &mut pos));
if value_length < 0 {
return Err(ArrowError::CDataInterface(
"Negative value length in metadata".to_string(),
));
}
let value = String::from_utf8(
next_n_bytes(buffer, &mut pos, value_length).to_vec(),
)?;
metadata.insert(key, value);
}

Ok(metadata)
}
}
}

impl Drop for FFI_ArrowSchema {
Expand Down Expand Up @@ -421,7 +548,8 @@ impl TryFrom<&FFI_ArrowSchema> for Field {

fn try_from(c_schema: &FFI_ArrowSchema) -> Result<Self, ArrowError> {
let dtype = DataType::try_from(c_schema)?;
let field = Field::new(c_schema.name(), dtype, c_schema.nullable());
let mut field = Field::new(c_schema.name(), dtype, c_schema.nullable());
field.set_metadata(c_schema.metadata()?);
Ok(field)
}
}
Expand All @@ -433,7 +561,7 @@ impl TryFrom<&FFI_ArrowSchema> for Schema {
// interpret it as a struct type then extract its fields
let dtype = DataType::try_from(c_schema)?;
if let DataType::Struct(fields) = dtype {
Ok(Schema::new(fields))
Ok(Schema::new(fields).with_metadata(c_schema.metadata()?))
} else {
Err(ArrowError::CDataInterface(
"Unable to interpret C data struct as a Schema".to_string(),
Expand Down Expand Up @@ -558,7 +686,8 @@ impl TryFrom<&Field> for FFI_ArrowSchema {

FFI_ArrowSchema::try_from(field.data_type())?
.with_name(field.name())?
.with_flags(flags)
.with_flags(flags)?
.with_metadata(field.metadata())
}
}

Expand All @@ -567,7 +696,8 @@ impl TryFrom<&Schema> for FFI_ArrowSchema {

fn try_from(schema: &Schema) -> Result<Self, ArrowError> {
let dtype = DataType::Struct(schema.fields().clone());
let c_schema = FFI_ArrowSchema::try_from(&dtype)?;
let c_schema =
FFI_ArrowSchema::try_from(&dtype)?.with_metadata(&schema.metadata)?;
Ok(c_schema)
}
}
Expand Down Expand Up @@ -655,7 +785,9 @@ mod tests {
Field::new("name", DataType::Utf8, false),
Field::new("address", DataType::Utf8, false),
Field::new("priority", DataType::UInt8, false),
]);
])
.with_metadata([("hello".to_string(), "world".to_string())].into());

round_trip_schema(schema);

// test that we can interpret struct types as schema
Expand Down Expand Up @@ -700,4 +832,29 @@ mod tests {
let arrow_schema = FFI_ArrowSchema::try_from(schema).unwrap();
assert!(arrow_schema.child(0).dictionary_ordered());
}

#[test]
fn test_set_field_metadata() {
let metadata_cases: Vec<HashMap<String, String>> = vec![
[].into(),
[("key".to_string(), "value".to_string())].into(),
[
("key".to_string(), "".to_string()),
("ascii123".to_string(), "你好".to_string()),
("".to_string(), "value".to_string()),
]
.into(),
];

let mut schema = FFI_ArrowSchema::try_new("b", vec![], None)
.unwrap()
.with_name("test")
.unwrap();

for metadata in metadata_cases {
schema = schema.with_metadata(&metadata).unwrap();
let field = Field::try_from(&schema).unwrap();
assert_eq!(field.metadata(), &metadata);
}
}
}
27 changes: 26 additions & 1 deletion arrow/src/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,8 @@ mod tests {
use crate::datatypes::{Field, Int8Type};
use arrow_array::builder::UnionBuilder;
use arrow_array::types::{Float64Type, Int32Type};
use arrow_array::{Float64Array, UnionArray};
use arrow_array::{Float64Array, StructArray, UnionArray};
use std::collections::HashMap;
use std::convert::TryFrom;
use std::mem::ManuallyDrop;
use std::ptr::addr_of_mut;
Expand Down Expand Up @@ -1092,6 +1093,30 @@ mod tests {
Ok(())
}

#[test]
fn test_struct_array() -> Result<()> {
let metadata: HashMap<String, String> =
[("Hello".to_string(), "World! 😊".to_string())].into();
let struct_array = StructArray::from(vec![(
Field::new("a", DataType::Int32, false).with_metadata(metadata),
Arc::new(Int32Array::from(vec![2, 4, 6])) as Arc<dyn Array>,
)]);

// export it
let array = ArrowArray::try_from(struct_array.data().clone())?;

// (simulate consumer) import it
let data = ArrayData::try_from(array)?;
let array = make_array(data);

// perform some operation
let array = array.as_any().downcast_ref::<StructArray>().unwrap();
assert_eq!(array.data_type(), struct_array.data_type());
assert_eq!(array, &struct_array);

Ok(())
}

#[test]
fn test_union_sparse_array() -> Result<()> {
let mut builder = UnionBuilder::new_sparse();
Expand Down

0 comments on commit e919e99

Please sign in to comment.