Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade to arrow 22 #3363

Merged
merged 8 commits into from
Sep 6, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ rust-version = "1.62"
readme = "README.md"

[dependencies]
arrow = "21.0.0"
arrow = "22.0.0"
clap = { version = "3", features = ["derive", "cargo"] }
datafusion = { path = "../datafusion/core", version = "11.0.0" }
datafusion = { version = "11.0.0", path = "../datafusion/core" }
dirs = "4.0.0"
env_logger = "0.9"
mimalloc = { version = "0.1", default-features = false }
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ path = "examples/avro_sql.rs"
required-features = ["datafusion/avro"]

[dev-dependencies]
arrow-flight = "21.0.0"
arrow-flight = "22.0.0"
async-trait = "0.1.41"
datafusion = { path = "../datafusion/core" }
futures = "0.3"
Expand Down
4 changes: 2 additions & 2 deletions datafusion-examples/examples/custom_datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,8 @@ impl ExecutionPlan for CustomExec {
db.data.values().cloned().collect()
};

let mut id_array = UInt8Builder::new(users.len());
let mut account_array = UInt64Builder::new(users.len());
let mut id_array = UInt8Builder::new();
let mut account_array = UInt64Builder::new();

for user in users {
id_array.append_value(user.id);
Expand Down
6 changes: 3 additions & 3 deletions datafusion/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ pyarrow = ["pyo3"]

[dependencies]
apache-avro = { version = "0.14", features = ["snappy"], optional = true }
arrow = { version = "21.0.0", features = ["prettyprint"] }
arrow = { version = "22.0.0", features = ["prettyprint"] }
avro-rs = { version = "0.13", features = ["snappy"], optional = true }
cranelift-module = { version = "0.87.0", optional = true }
object_store = { version = "0.4.0", optional = true }
ordered-float = "3.0"
parquet = { version = "21.0.0", optional = true, features = ["arrow"] }
pyo3 = { version = "0.16", optional = true }
parquet = { features = ["arrow"], optional = true, version = "22.0.0" }
pyo3 = { version = "0.17.1", optional = true }
serde_json = "1.0"
sqlparser = "0.22"
75 changes: 42 additions & 33 deletions datafusion/common/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub enum ScalarValue {
/// 64bit float
Float64(Option<f64>),
/// 128bit decimal, using the i128 to represent the decimal
Decimal128(Option<i128>, usize, usize),
Decimal128(Option<i128>, u8, u8),
/// signed 8bit int
Int8(Option<i8>),
/// signed 16bit int
Expand Down Expand Up @@ -460,6 +460,7 @@ macro_rules! typed_cast {
}};
}

// keep until https://github.com/apache/arrow-rs/issues/2054 is finished
macro_rules! build_list {
($VALUE_BUILDER_TY:ident, $SCALAR_TY:ident, $VALUES:expr, $SIZE:expr) => {{
match $VALUES {
Expand Down Expand Up @@ -527,9 +528,22 @@ macro_rules! build_timestamp_list {
}};
}

macro_rules! new_builder {
(StringBuilder, $len:expr) => {
StringBuilder::new()
};
(LargeStringBuilder, $len:expr) => {
LargeStringBuilder::new()
};
($el:ident, $len:expr) => {{
<$el>::with_capacity($len)
}};
}

macro_rules! build_values_list {
($VALUE_BUILDER_TY:ident, $SCALAR_TY:ident, $VALUES:expr, $SIZE:expr) => {{
let mut builder = ListBuilder::new($VALUE_BUILDER_TY::new($VALUES.len()));
let builder = new_builder!($VALUE_BUILDER_TY, $VALUES.len());
let mut builder = ListBuilder::new(builder);

for _ in 0..$SIZE {
for scalar_value in $VALUES {
Expand All @@ -552,7 +566,8 @@ macro_rules! build_values_list {

macro_rules! build_values_list_tz {
($VALUE_BUILDER_TY:ident, $SCALAR_TY:ident, $VALUES:expr, $SIZE:expr) => {{
let mut builder = ListBuilder::new($VALUE_BUILDER_TY::new($VALUES.len()));
let mut builder =
ListBuilder::new($VALUE_BUILDER_TY::with_capacity($VALUES.len()));

for _ in 0..$SIZE {
for scalar_value in $VALUES {
Expand Down Expand Up @@ -612,11 +627,7 @@ macro_rules! eq_array_primitive {

impl ScalarValue {
/// Create a decimal Scalar from value/precision and scale.
pub fn try_new_decimal128(
value: i128,
precision: usize,
scale: usize,
) -> Result<Self> {
pub fn try_new_decimal128(value: i128, precision: u8, scale: u8) -> Result<Self> {
// make sure the precision and scale is valid
if precision <= DECIMAL128_MAX_PRECISION && scale <= precision {
return Ok(ScalarValue::Decimal128(Some(value), precision, scale));
Expand Down Expand Up @@ -909,7 +920,7 @@ impl ScalarValue {

macro_rules! build_array_list_string {
($BUILDER:ident, $SCALAR_TY:ident) => {{
let mut builder = ListBuilder::new($BUILDER::new(0));
let mut builder = ListBuilder::new($BUILDER::new());
for scalar in scalars.into_iter() {
match scalar {
ScalarValue::List(Some(xs), _) => {
Expand Down Expand Up @@ -951,7 +962,7 @@ impl ScalarValue {
let array: ArrayRef = match &data_type {
DataType::Decimal128(precision, scale) => {
let decimal_array =
ScalarValue::iter_to_decimal_array(scalars, precision, scale)?;
ScalarValue::iter_to_decimal_array(scalars, *precision, *scale)?;
Arc::new(decimal_array)
}
DataType::Decimal256(_, _) => {
Expand Down Expand Up @@ -1155,8 +1166,8 @@ impl ScalarValue {

fn iter_to_decimal_array(
scalars: impl IntoIterator<Item = ScalarValue>,
precision: &usize,
scale: &usize,
precision: u8,
scale: u8,
) -> Result<Decimal128Array> {
let array = scalars
.into_iter()
Expand All @@ -1165,7 +1176,7 @@ impl ScalarValue {
_ => unreachable!(),
})
.collect::<Decimal128Array>()
.with_precision_and_scale(*precision, *scale)?;
.with_precision_and_scale(precision, scale)?;
Ok(array)
}

Expand Down Expand Up @@ -1231,25 +1242,25 @@ impl ScalarValue {
}

fn build_decimal_array(
value: &Option<i128>,
precision: &usize,
scale: &usize,
value: Option<i128>,
precision: u8,
scale: u8,
size: usize,
) -> Decimal128Array {
std::iter::repeat(*value)
std::iter::repeat(value)
.take(size)
.into_iter()
.collect::<Decimal128Array>()
.with_precision_and_scale(*precision, *scale)
.with_precision_and_scale(precision, scale)
.unwrap()
}

/// Converts a scalar value into an array of `size` rows.
pub fn to_array_of_size(&self, size: usize) -> ArrayRef {
match self {
ScalarValue::Decimal128(e, precision, scale) => {
Arc::new(ScalarValue::build_decimal_array(e, precision, scale, size))
}
ScalarValue::Decimal128(e, precision, scale) => Arc::new(
ScalarValue::build_decimal_array(*e, *precision, *scale, size),
),
ScalarValue::Boolean(e) => {
Arc::new(BooleanArray::from(vec![*e; size])) as ArrayRef
}
Expand Down Expand Up @@ -1451,18 +1462,14 @@ impl ScalarValue {
fn get_decimal_value_from_array(
array: &ArrayRef,
index: usize,
precision: &usize,
scale: &usize,
precision: u8,
scale: u8,
) -> ScalarValue {
let array = array.as_any().downcast_ref::<Decimal128Array>().unwrap();
if array.is_null(index) {
ScalarValue::Decimal128(None, *precision, *scale)
ScalarValue::Decimal128(None, precision, scale)
} else {
ScalarValue::Decimal128(
Some(array.value(index).as_i128()),
*precision,
*scale,
)
ScalarValue::Decimal128(Some(array.value(index).as_i128()), precision, scale)
}
}

Expand All @@ -1476,7 +1483,9 @@ impl ScalarValue {
Ok(match array.data_type() {
DataType::Null => ScalarValue::Null,
DataType::Decimal128(precision, scale) => {
ScalarValue::get_decimal_value_from_array(array, index, precision, scale)
ScalarValue::get_decimal_value_from_array(
array, index, *precision, *scale,
)
}
DataType::Boolean => typed_cast!(array, index, BooleanArray, Boolean),
DataType::Float64 => typed_cast!(array, index, Float64Array, Float64),
Expand Down Expand Up @@ -1636,8 +1645,8 @@ impl ScalarValue {
array: &ArrayRef,
index: usize,
value: &Option<i128>,
precision: usize,
scale: usize,
precision: u8,
scale: u8,
) -> bool {
let array = array.as_any().downcast_ref::<Decimal128Array>().unwrap();
if array.precision() != precision || array.scale() != scale {
Expand Down Expand Up @@ -3134,7 +3143,7 @@ mod tests {
let array = array.as_any().downcast_ref::<ListArray>().unwrap();

// Construct expected array with array builders
let field_a_builder = StringBuilder::new(4);
let field_a_builder = StringBuilder::new();
let primitive_value_builder = Int32Array::builder(8);
let field_primitive_list_builder = ListBuilder::new(primitive_value_builder);

Expand Down
15 changes: 8 additions & 7 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,17 @@ unicode_expressions = ["datafusion-physical-expr/regex_expressions", "datafusion
[dependencies]
ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] }
apache-avro = { version = "0.14", optional = true }
arrow = { version = "21.0.0", features = ["prettyprint"] }
arrow = { features = ["prettyprint"], version = "22.0.0" }
async-trait = "0.1.41"
bytes = "1.1"
chrono = { version = "0.4", default-features = false }
datafusion-common = { path = "../common", version = "11.0.0", features = ["parquet", "object_store"] }
datafusion-common = { path = "../common", features = ["parquet", "object_store"], version = "11.0.0" }
datafusion-expr = { path = "../expr", version = "11.0.0" }
datafusion-jit = { path = "../jit", version = "11.0.0", optional = true }
datafusion-optimizer = { path = "../optimizer", version = "11.0.0" }
datafusion-physical-expr = { path = "../physical-expr", version = "11.0.0" }
datafusion-row = { path = "../row", version = "11.0.0" }
datafusion-sql = { path = "../sql", version = "11.0.0" }
datafusion-physical-expr = { version = "11.0.0", path = "../physical-expr" }
datafusion-row = { version = "11.0.0", path = "../row" }
datafusion-sql = { version = "11.0.0", path = "../sql" }
futures = "0.3"
glob = "0.3.0"
hashbrown = { version = "0.12", features = ["raw"] }
Expand All @@ -78,10 +78,10 @@ num_cpus = "1.13.0"
object_store = "0.4.0"
ordered-float = "3.0"
parking_lot = "0.12"
parquet = { version = "21.0.0", features = ["arrow", "async"] }
parquet = { features = ["arrow", "async"], version = "22.0.0" }
paste = "^1.0"
pin-project-lite = "^0.2.7"
pyo3 = { version = "0.16", optional = true }
pyo3 = { version = "0.17.1", optional = true }
rand = "0.8"
rayon = { version = "1.5", optional = true }
smallvec = { version = "1.6", features = ["union"] }
Expand All @@ -93,6 +93,7 @@ url = "2.2"
uuid = { version = "1.0", features = ["v4"] }

[dev-dependencies]
arrow = { version = "22.0.0", features = ["prettyprint", "dyn_cmp_dict"] }
async-trait = "0.1.53"
criterion = "0.3"
csv = "1.1.6"
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/fuzz-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
arrow = { features = ["prettyprint"], version = "21.0.0" }
arrow = { features = ["prettyprint"], version = "22.0.0" }
env_logger = "0.9.0"
rand = "0.8"
8 changes: 4 additions & 4 deletions datafusion/core/src/avro_to_arrow/arrow_array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
}

fn build_boolean_array(&self, rows: RecordSlice, col_name: &str) -> ArrayRef {
let mut builder = BooleanBuilder::new(rows.len());
let mut builder = BooleanBuilder::with_capacity(rows.len());
for row in rows {
if let Some(value) = self.field_lookup(col_name, row) {
if let Some(boolean) = resolve_boolean(value) {
Expand Down Expand Up @@ -171,8 +171,8 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
where
T: ArrowPrimitiveType + ArrowDictionaryKeyType,
{
let key_builder = PrimitiveBuilder::<T>::new(row_len);
let values_builder = StringBuilder::new(row_len * 5);
let key_builder = PrimitiveBuilder::<T>::with_capacity(row_len);
let values_builder = StringBuilder::with_capacity(row_len, 5);
StringDictionaryBuilder::new(key_builder, values_builder)
}

Expand Down Expand Up @@ -258,7 +258,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
{
let mut builder: Box<dyn ArrayBuilder> = match data_type {
DataType::Utf8 => {
let values_builder = StringBuilder::new(rows.len() * 5);
let values_builder = StringBuilder::with_capacity(rows.len(), 5);
Box::new(ListBuilder::new(values_builder))
}
DataType::Dictionary(_, _) => {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/avro_to_arrow/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ fn schema_to_field_with_props(
AvroSchema::Fixed { size, .. } => DataType::FixedSizeBinary(*size as i32),
AvroSchema::Decimal {
precision, scale, ..
} => DataType::Decimal128(*precision, *scale),
} => DataType::Decimal128(*precision as u8, *scale as u8),
AvroSchema::Uuid => DataType::FixedSizeBinary(16),
AvroSchema::Date => DataType::Date32,
AvroSchema::TimeMillis => DataType::Time32(TimeUnit::Millisecond),
Expand Down
Loading