Skip to content

Commit

Permalink
Migrate to arrow-* v53 (apache#626)
Browse files Browse the repository at this point in the history
* chore: migrate to arrow-* v53

* chore: update datafusion to 42

* test: fix incorrect test assertion

* chore: update python bindings to arrow 53
  • Loading branch information
sdd authored Sep 23, 2024
1 parent 88e5e4a commit d03c4f8
Show file tree
Hide file tree
Showing 5 changed files with 188 additions and 97 deletions.
14 changes: 7 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ rust-version = "1.77.1"
anyhow = "1.0.72"
apache-avro = "0.17"
array-init = "2"
arrow-arith = { version = "52" }
arrow-array = { version = "52" }
arrow-ord = { version = "52" }
arrow-schema = { version = "52" }
arrow-select = { version = "52" }
arrow-string = { version = "52" }
arrow-arith = { version = "53" }
arrow-array = { version = "53" }
arrow-ord = { version = "53" }
arrow-schema = { version = "53" }
arrow-select = { version = "53" }
arrow-string = { version = "53" }
async-stream = "0.3.5"
async-trait = "0.1"
async-std = "1.12"
Expand Down Expand Up @@ -72,7 +72,7 @@ murmur3 = "0.5.2"
once_cell = "1"
opendal = "0.50"
ordered-float = "4"
parquet = "52"
parquet = "53"
paste = "1"
pilota = "0.11.2"
pretty_assertions = "1.4"
Expand Down
4 changes: 2 additions & 2 deletions bindings/python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,5 @@ crate-type = ["cdylib"]

[dependencies]
iceberg = { path = "../../crates/iceberg" }
pyo3 = { version = "0.21", features = ["extension-module"] }
arrow = { version = "52", features = ["pyarrow"] }
pyo3 = { version = "0.22.3", features = ["extension-module"] }
arrow = { version = "53", features = ["pyarrow"] }
92 changes: 57 additions & 35 deletions crates/iceberg/src/arrow/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -665,54 +665,70 @@ pub(crate) fn get_arrow_datum(datum: &Datum) -> Result<Box<dyn ArrowDatum + Send
}

macro_rules! get_parquet_stat_as_datum {
($limit_type:ident) => {
($limit_type:tt) => {
paste::paste! {
/// Gets the $limit_type value from a parquet Statistics struct, as a Datum
pub(crate) fn [<get_parquet_stat_ $limit_type _as_datum>](
primitive_type: &PrimitiveType, stats: &Statistics
) -> Result<Option<Datum>> {
Ok(Some(match (primitive_type, stats) {
(PrimitiveType::Boolean, Statistics::Boolean(stats)) => Datum::bool(*stats.$limit_type()),
(PrimitiveType::Int, Statistics::Int32(stats)) => Datum::int(*stats.$limit_type()),
(PrimitiveType::Date, Statistics::Int32(stats)) => Datum::date(*stats.$limit_type()),
(PrimitiveType::Long, Statistics::Int64(stats)) => Datum::long(*stats.$limit_type()),
(PrimitiveType::Time, Statistics::Int64(stats)) => Datum::time_micros(*stats.$limit_type())?,
Ok(match (primitive_type, stats) {
(PrimitiveType::Boolean, Statistics::Boolean(stats)) => stats.[<$limit_type _opt>]().map(|val|Datum::bool(*val)),
(PrimitiveType::Int, Statistics::Int32(stats)) => stats.[<$limit_type _opt>]().map(|val|Datum::int(*val)),
(PrimitiveType::Date, Statistics::Int32(stats)) => stats.[<$limit_type _opt>]().map(|val|Datum::date(*val)),
(PrimitiveType::Long, Statistics::Int64(stats)) => stats.[<$limit_type _opt>]().map(|val|Datum::long(*val)),
(PrimitiveType::Time, Statistics::Int64(stats)) => {
let Some(val) = stats.[<$limit_type _opt>]() else {
return Ok(None);
};

Some(Datum::time_micros(*val)?)
}
(PrimitiveType::Timestamp, Statistics::Int64(stats)) => {
Datum::timestamp_micros(*stats.$limit_type())
stats.[<$limit_type _opt>]().map(|val|Datum::timestamp_micros(*val))
}
(PrimitiveType::Timestamptz, Statistics::Int64(stats)) => {
Datum::timestamptz_micros(*stats.$limit_type())
stats.[<$limit_type _opt>]().map(|val|Datum::timestamptz_micros(*val))
}
(PrimitiveType::TimestampNs, Statistics::Int64(stats)) => {
Datum::timestamp_nanos(*stats.$limit_type())
stats.[<$limit_type _opt>]().map(|val|Datum::timestamp_nanos(*val))
}
(PrimitiveType::TimestamptzNs, Statistics::Int64(stats)) => {
Datum::timestamptz_nanos(*stats.$limit_type())
stats.[<$limit_type _opt>]().map(|val|Datum::timestamptz_nanos(*val))
}
(PrimitiveType::Float, Statistics::Float(stats)) => Datum::float(*stats.$limit_type()),
(PrimitiveType::Double, Statistics::Double(stats)) => Datum::double(*stats.$limit_type()),
(PrimitiveType::Float, Statistics::Float(stats)) => stats.[<$limit_type _opt>]().map(|val|Datum::float(*val)),
(PrimitiveType::Double, Statistics::Double(stats)) => stats.[<$limit_type _opt>]().map(|val|Datum::double(*val)),
(PrimitiveType::String, Statistics::ByteArray(stats)) => {
Datum::string(stats.$limit_type().as_utf8()?)
let Some(val) = stats.[<$limit_type _opt>]() else {
return Ok(None);
};

Some(Datum::string(val.as_utf8()?))
}
(PrimitiveType::Decimal {
precision: _,
scale: _,
}, Statistics::ByteArray(stats)) => {
Datum::new(
let Some(bytes) = stats.[<$limit_type _bytes_opt>]() else {
return Ok(None);
};

Some(Datum::new(
primitive_type.clone(),
PrimitiveLiteral::Int128(i128::from_le_bytes(stats.[<$limit_type _bytes>]().try_into()?)),
)
PrimitiveLiteral::Int128(i128::from_le_bytes(bytes.try_into()?)),
))
}
(
PrimitiveType::Decimal {
precision: _,
scale: _,
},
Statistics::Int32(stats)) => {
Datum::new(
primitive_type.clone(),
PrimitiveLiteral::Int128(i128::from(*stats.$limit_type())),
)
stats.[<$limit_type _opt>]().map(|val| {
Datum::new(
primitive_type.clone(),
PrimitiveLiteral::Int128(i128::from(*val)),
)
})
}

(
Expand All @@ -722,40 +738,46 @@ macro_rules! get_parquet_stat_as_datum {
},
Statistics::Int64(stats),
) => {
Datum::new(
primitive_type.clone(),
PrimitiveLiteral::Int128(i128::from(*stats.$limit_type())),
)
stats.[<$limit_type _opt>]().map(|val| {
Datum::new(
primitive_type.clone(),
PrimitiveLiteral::Int128(i128::from(*val)),
)
})
}
(PrimitiveType::Uuid, Statistics::FixedLenByteArray(stats)) => {
let raw = stats.[<$limit_type _bytes>]();
if raw.len() != 16 {
let Some(bytes) = stats.[<$limit_type _bytes_opt>]() else {
return Ok(None);
};
if bytes.len() != 16 {
return Err(Error::new(
ErrorKind::Unexpected,
"Invalid length of uuid bytes.",
));
}
Datum::uuid(Uuid::from_bytes(
raw[..16].try_into().unwrap(),
))
Some(Datum::uuid(Uuid::from_bytes(
bytes[..16].try_into().unwrap(),
)))
}
(PrimitiveType::Fixed(len), Statistics::FixedLenByteArray(stat)) => {
let raw = stat.[<$limit_type _bytes>]();
if raw.len() != *len as usize {
let Some(bytes) = stat.[<$limit_type _bytes_opt>]() else {
return Ok(None);
};
if bytes.len() != *len as usize {
return Err(Error::new(
ErrorKind::Unexpected,
"Invalid length of fixed bytes.",
));
}
Datum::fixed(raw.to_vec())
Some(Datum::fixed(bytes.to_vec()))
}
(PrimitiveType::Binary, Statistics::ByteArray(stat)) => {
Datum::binary(stat.[<$limit_type _bytes>]().to_vec())
return Ok(stat.[<$limit_type _bytes_opt>]().map(|bytes|Datum::binary(bytes.to_vec())))
}
_ => {
return Ok(None);
}
}))
})
}
}
}
Expand Down
Loading

0 comments on commit d03c4f8

Please sign in to comment.