Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions arrow-avro/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ snappy = ["snap", "crc"]
canonical_extension_types = ["arrow-schema/canonical_extension_types"]
md5 = ["dep:md5"]
sha256 = ["dep:sha2"]
small_decimals = []

[dependencies]
arrow-schema = { workspace = true }
Expand Down
91 changes: 75 additions & 16 deletions arrow-avro/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ use crate::schema::{
};
use arrow_schema::{
ArrowError, DataType, Field, Fields, IntervalUnit, TimeUnit, DECIMAL128_MAX_PRECISION,
DECIMAL128_MAX_SCALE,
DECIMAL256_MAX_PRECISION,
};
use serde_json::Value;
use std::borrow::Cow;
#[cfg(feature = "small_decimals")]
use arrow_schema::{DECIMAL32_MAX_PRECISION, DECIMAL64_MAX_PRECISION};
use std::collections::HashMap;
use std::sync::Arc;

Expand Down Expand Up @@ -401,7 +401,7 @@ pub enum Codec {
/// Represents Avro fixed type, maps to Arrow's FixedSizeBinary data type
/// The i32 parameter indicates the fixed binary size
Fixed(i32),
/// Represents Avro decimal type, maps to Arrow's Decimal128 or Decimal256 data types
/// Represents Avro decimal type, maps to Arrow's Decimal32, Decimal64, Decimal128, or Decimal256 data types
///
/// The fields are `(precision, scale, fixed_size)`.
/// - `precision` (`usize`): Total number of digits.
Expand Down Expand Up @@ -447,20 +447,28 @@ impl Codec {
}
Self::Interval => DataType::Interval(IntervalUnit::MonthDayNano),
Self::Fixed(size) => DataType::FixedSizeBinary(*size),
Self::Decimal(precision, scale, size) => {
Self::Decimal(precision, scale, _size) => {
let p = *precision as u8;
let s = scale.unwrap_or(0) as i8;
let too_large_for_128 = match *size {
Some(sz) => sz > 16,
None => {
(p as usize) > DECIMAL128_MAX_PRECISION as usize
|| (s as usize) > DECIMAL128_MAX_SCALE as usize
#[cfg(feature = "small_decimals")]
{
if *precision <= DECIMAL32_MAX_PRECISION as usize {
DataType::Decimal32(p, s)
} else if *precision <= DECIMAL64_MAX_PRECISION as usize {
DataType::Decimal64(p, s)
} else if *precision <= DECIMAL128_MAX_PRECISION as usize {
DataType::Decimal128(p, s)
} else {
DataType::Decimal256(p, s)
}
}
#[cfg(not(feature = "small_decimals"))]
{
if *precision <= DECIMAL128_MAX_PRECISION as usize {
DataType::Decimal128(p, s)
} else {
DataType::Decimal256(p, s)
}
};
if too_large_for_128 {
DataType::Decimal256(p, s)
} else {
DataType::Decimal128(p, s)
}
}
Self::Uuid => DataType::FixedSizeBinary(16),
Expand Down Expand Up @@ -506,6 +514,29 @@ impl From<PrimitiveType> for Codec {
}
}

/// Compute the exact maximum base‑10 precision that fits in `n` bytes for Avro
/// `fixed` decimals stored as two's‑complement unscaled integers (big‑endian).
///
/// Per Avro spec (Decimal logical type), for a fixed length `n`:
/// max precision = ⌊log₁₀(2^(8n − 1) − 1)⌋.
///
/// This function returns `None` if `n` is 0 or greater than 32 (Arrow supports
/// Decimal256, which is 32 bytes and has max precision 76).
const fn max_precision_for_fixed_bytes(n: usize) -> Option<usize> {
// Precomputed exact table for n = 1..=32
// 1:2, 2:4, 3:6, 4:9, 5:11, 6:14, 7:16, 8:18, 9:21, 10:23, 11:26, 12:28,
// 13:31, 14:33, 15:35, 16:38, 17:40, 18:43, 19:45, 20:47, 21:50, 22:52,
// 23:55, 24:57, 25:59, 26:62, 27:64, 28:67, 29:69, 30:71, 31:74, 32:76
const MAX_P: [usize; 32] = [
2, 4, 6, 9, 11, 14, 16, 18, 21, 23, 26, 28, 31, 33, 35, 38, 40, 43, 45, 47, 50, 52, 55, 57,
59, 62, 64, 67, 69, 71, 74, 76,
];
match n {
1..=32 => Some(MAX_P[n - 1]),
_ => None,
}
}

fn parse_decimal_attributes(
attributes: &Attributes,
fallback_size: Option<usize>,
Expand All @@ -529,6 +560,34 @@ fn parse_decimal_attributes(
.and_then(|v| v.as_u64())
.map(|s| s as usize)
.or(fallback_size);
if precision == 0 {
return Err(ArrowError::ParseError(
"Decimal requires precision > 0".to_string(),
));
}
if scale > precision {
return Err(ArrowError::ParseError(format!(
"Decimal has invalid scale > precision: scale={scale}, precision={precision}"
)));
}
if precision > DECIMAL256_MAX_PRECISION as usize {
return Err(ArrowError::ParseError(format!(
"Decimal precision {precision} exceeds maximum supported by Arrow ({})",
DECIMAL256_MAX_PRECISION
)));
}
if let Some(sz) = size {
let max_p = max_precision_for_fixed_bytes(sz).ok_or_else(|| {
ArrowError::ParseError(format!(
"Invalid fixed size for decimal: {sz}, must be between 1 and 32 bytes"
))
})?;
if precision > max_p {
return Err(ArrowError::ParseError(format!(
"Decimal precision {precision} exceeds capacity of fixed size {sz} bytes (max {max_p})"
)));
}
}
Ok((precision, scale, size))
}

Expand Down Expand Up @@ -747,7 +806,7 @@ impl<'a> Maker<'a> {
Ok(field)
}
ComplexType::Array(a) => {
let mut field = self.parse_type(a.items.as_ref(), namespace)?;
let field = self.parse_type(a.items.as_ref(), namespace)?;
Ok(AvroDataType {
nullability: None,
metadata: a.attributes.field_metadata(),
Expand Down
142 changes: 121 additions & 21 deletions arrow-avro/src/reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,7 @@ mod test {
};
use arrow_array::types::{Int32Type, IntervalMonthDayNanoType};
use arrow_array::*;
use arrow_buffer::{Buffer, NullBuffer, OffsetBuffer, ScalarBuffer};
use arrow_buffer::{i256, Buffer, NullBuffer, OffsetBuffer, ScalarBuffer};
use arrow_schema::{ArrowError, DataType, Field, Fields, IntervalUnit, Schema};
use bytes::{Buf, BufMut, Bytes};
use futures::executor::block_on;
Expand Down Expand Up @@ -2176,37 +2176,137 @@ mod test {

#[test]
fn test_decimal() {
let files = [
("avro/fixed_length_decimal.avro", 25, 2),
("avro/fixed_length_decimal_legacy.avro", 13, 2),
("avro/int32_decimal.avro", 4, 2),
("avro/int64_decimal.avro", 10, 2),
// Choose expected Arrow types depending on the `small_decimals` feature flag.
// With `small_decimals` enabled, Decimal32/Decimal64 are used where their
// precision allows; otherwise, those cases resolve to Decimal128.
#[cfg(feature = "small_decimals")]
let files: [(&str, DataType); 8] = [
(
"avro/fixed_length_decimal.avro",
DataType::Decimal128(25, 2),
),
(
"avro/fixed_length_decimal_legacy.avro",
DataType::Decimal64(13, 2),
),
("avro/int32_decimal.avro", DataType::Decimal32(4, 2)),
("avro/int64_decimal.avro", DataType::Decimal64(10, 2)),
(
"test/data/int256_decimal.avro",
DataType::Decimal256(76, 10),
),
(
"test/data/fixed256_decimal.avro",
DataType::Decimal256(76, 10),
),
(
"test/data/fixed_length_decimal_legacy_32.avro",
DataType::Decimal32(9, 2),
),
("test/data/int128_decimal.avro", DataType::Decimal128(38, 2)),
];
#[cfg(not(feature = "small_decimals"))]
let files: [(&str, DataType); 8] = [
(
"avro/fixed_length_decimal.avro",
DataType::Decimal128(25, 2),
),
(
"avro/fixed_length_decimal_legacy.avro",
DataType::Decimal128(13, 2),
),
("avro/int32_decimal.avro", DataType::Decimal128(4, 2)),
("avro/int64_decimal.avro", DataType::Decimal128(10, 2)),
(
"test/data/int256_decimal.avro",
DataType::Decimal256(76, 10),
),
(
"test/data/fixed256_decimal.avro",
DataType::Decimal256(76, 10),
),
(
"test/data/fixed_length_decimal_legacy_32.avro",
DataType::Decimal128(9, 2),
),
("test/data/int128_decimal.avro", DataType::Decimal128(38, 2)),
];
let decimal_values: Vec<i128> = (1..=24).map(|n| n as i128 * 100).collect();
for (file, precision, scale) in files {
let file_path = arrow_test_data(file);
for (file, expected_dt) in files {
let (precision, scale) = match expected_dt {
DataType::Decimal32(p, s)
| DataType::Decimal64(p, s)
| DataType::Decimal128(p, s)
| DataType::Decimal256(p, s) => (p, s),
_ => unreachable!("Unexpected decimal type in test inputs"),
};
assert!(scale >= 0, "test data uses non-negative scales only");
let scale_u32 = scale as u32;
let file_path: String = if file.starts_with("avro/") {
arrow_test_data(file)
} else {
std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join(file)
.to_string_lossy()
.into_owned()
};
let pow10: i128 = 10i128.pow(scale_u32);
let values_i128: Vec<i128> = (1..=24).map(|n| (n as i128) * pow10).collect();
let build_expected = |dt: &DataType, values: &[i128]| -> ArrayRef {
match *dt {
DataType::Decimal32(p, s) => {
let it = values.iter().map(|&v| v as i32);
Arc::new(
Decimal32Array::from_iter_values(it)
.with_precision_and_scale(p, s)
.unwrap(),
)
}
DataType::Decimal64(p, s) => {
let it = values.iter().map(|&v| v as i64);
Arc::new(
Decimal64Array::from_iter_values(it)
.with_precision_and_scale(p, s)
.unwrap(),
)
}
DataType::Decimal128(p, s) => {
let it = values.iter().copied();
Arc::new(
Decimal128Array::from_iter_values(it)
.with_precision_and_scale(p, s)
.unwrap(),
)
}
DataType::Decimal256(p, s) => {
let it = values.iter().map(|&v| i256::from_i128(v));
Arc::new(
Decimal256Array::from_iter_values(it)
.with_precision_and_scale(p, s)
.unwrap(),
)
}
_ => unreachable!("Unexpected decimal type in test"),
}
};
let actual_batch = read_file(&file_path, 8, false);
let expected_array = Decimal128Array::from_iter_values(decimal_values.clone())
.with_precision_and_scale(precision, scale)
.unwrap();
let actual_nullable = actual_batch.schema().field(0).is_nullable();
let expected_array = build_expected(&expected_dt, &values_i128);
let mut meta = HashMap::new();
meta.insert("precision".to_string(), precision.to_string());
meta.insert("scale".to_string(), scale.to_string());
let field_with_meta = Field::new("value", DataType::Decimal128(precision, scale), true)
.with_metadata(meta);
let expected_schema = Arc::new(Schema::new(vec![field_with_meta]));
let field =
Field::new("value", expected_dt.clone(), actual_nullable).with_metadata(meta);
let expected_schema = Arc::new(Schema::new(vec![field]));
let expected_batch =
RecordBatch::try_new(expected_schema.clone(), vec![Arc::new(expected_array)])
.expect("Failed to build expected RecordBatch");
RecordBatch::try_new(expected_schema.clone(), vec![expected_array]).unwrap();
assert_eq!(
actual_batch, expected_batch,
"Decoded RecordBatch does not match the expected Decimal128 data for file {file}"
"Decoded RecordBatch does not match for {file}"
);
let actual_batch_small = read_file(&file_path, 3, false);
assert_eq!(
actual_batch_small,
expected_batch,
"Decoded RecordBatch does not match the expected Decimal128 data for file {file} with batch size 3"
actual_batch_small, expected_batch,
"Decoded RecordBatch does not match for {file} with batch size 3"
);
}
}
Expand Down
Loading
Loading