Skip to content

Commit

Permalink
Upgrade to arrow 22 (#3363)
Browse files Browse the repository at this point in the history
  • Loading branch information
Brent Gardner authored Sep 6, 2022
1 parent c89b10f commit d16457a
Show file tree
Hide file tree
Showing 48 changed files with 218 additions and 194 deletions.
2 changes: 1 addition & 1 deletion datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ rust-version = "1.62"
readme = "README.md"

[dependencies]
arrow = "21.0.0"
arrow = "22.0.0"
clap = { version = "3", features = ["derive", "cargo"] }
datafusion = { path = "../datafusion/core", version = "11.0.0" }
dirs = "4.0.0"
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ path = "examples/avro_sql.rs"
required-features = ["datafusion/avro"]

[dev-dependencies]
arrow-flight = "21.0.0"
arrow-flight = "22.0.0"
async-trait = "0.1.41"
datafusion = { path = "../datafusion/core" }
futures = "0.3"
Expand Down
4 changes: 2 additions & 2 deletions datafusion-examples/examples/custom_datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,8 @@ impl ExecutionPlan for CustomExec {
db.data.values().cloned().collect()
};

let mut id_array = UInt8Builder::new(users.len());
let mut account_array = UInt64Builder::new(users.len());
let mut id_array = UInt8Builder::with_capacity(users.len());
let mut account_array = UInt64Builder::with_capacity(users.len());

for user in users {
id_array.append_value(user.id);
Expand Down
6 changes: 3 additions & 3 deletions datafusion/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ pyarrow = ["pyo3"]

[dependencies]
apache-avro = { version = "0.14", features = ["snappy"], optional = true }
arrow = { version = "21.0.0", features = ["prettyprint"] }
arrow = { version = "22.0.0", features = ["prettyprint"] }
avro-rs = { version = "0.13", features = ["snappy"], optional = true }
cranelift-module = { version = "0.87.0", optional = true }
object_store = { version = "0.4.0", optional = true }
ordered-float = "3.0"
parquet = { version = "21.0.0", optional = true, features = ["arrow"] }
pyo3 = { version = "0.16", optional = true }
parquet = { version = "22.0.0", features = ["arrow"], optional = true }
pyo3 = { version = "0.17.1", optional = true }
serde_json = "1.0"
sqlparser = "0.22"
75 changes: 42 additions & 33 deletions datafusion/common/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub enum ScalarValue {
/// 64bit float
Float64(Option<f64>),
/// 128bit decimal, using the i128 to represent the decimal
Decimal128(Option<i128>, usize, usize),
Decimal128(Option<i128>, u8, u8),
/// signed 8bit int
Int8(Option<i8>),
/// signed 16bit int
Expand Down Expand Up @@ -460,6 +460,7 @@ macro_rules! typed_cast {
}};
}

// keep until https://github.com/apache/arrow-rs/issues/2054 is finished
macro_rules! build_list {
($VALUE_BUILDER_TY:ident, $SCALAR_TY:ident, $VALUES:expr, $SIZE:expr) => {{
match $VALUES {
Expand Down Expand Up @@ -527,9 +528,22 @@ macro_rules! build_timestamp_list {
}};
}

macro_rules! new_builder {
(StringBuilder, $len:expr) => {
StringBuilder::new()
};
(LargeStringBuilder, $len:expr) => {
LargeStringBuilder::new()
};
($el:ident, $len:expr) => {{
<$el>::with_capacity($len)
}};
}

macro_rules! build_values_list {
($VALUE_BUILDER_TY:ident, $SCALAR_TY:ident, $VALUES:expr, $SIZE:expr) => {{
let mut builder = ListBuilder::new($VALUE_BUILDER_TY::new($VALUES.len()));
let builder = new_builder!($VALUE_BUILDER_TY, $VALUES.len());
let mut builder = ListBuilder::new(builder);

for _ in 0..$SIZE {
for scalar_value in $VALUES {
Expand All @@ -552,7 +566,8 @@ macro_rules! build_values_list {

macro_rules! build_values_list_tz {
($VALUE_BUILDER_TY:ident, $SCALAR_TY:ident, $VALUES:expr, $SIZE:expr) => {{
let mut builder = ListBuilder::new($VALUE_BUILDER_TY::new($VALUES.len()));
let mut builder =
ListBuilder::new($VALUE_BUILDER_TY::with_capacity($VALUES.len()));

for _ in 0..$SIZE {
for scalar_value in $VALUES {
Expand Down Expand Up @@ -612,11 +627,7 @@ macro_rules! eq_array_primitive {

impl ScalarValue {
/// Create a decimal Scalar from value/precision and scale.
pub fn try_new_decimal128(
value: i128,
precision: usize,
scale: usize,
) -> Result<Self> {
pub fn try_new_decimal128(value: i128, precision: u8, scale: u8) -> Result<Self> {
// make sure the precision and scale is valid
if precision <= DECIMAL128_MAX_PRECISION && scale <= precision {
return Ok(ScalarValue::Decimal128(Some(value), precision, scale));
Expand Down Expand Up @@ -911,7 +922,7 @@ impl ScalarValue {

macro_rules! build_array_list_string {
($BUILDER:ident, $SCALAR_TY:ident) => {{
let mut builder = ListBuilder::new($BUILDER::new(0));
let mut builder = ListBuilder::new($BUILDER::new());
for scalar in scalars.into_iter() {
match scalar {
ScalarValue::List(Some(xs), _) => {
Expand Down Expand Up @@ -953,7 +964,7 @@ impl ScalarValue {
let array: ArrayRef = match &data_type {
DataType::Decimal128(precision, scale) => {
let decimal_array =
ScalarValue::iter_to_decimal_array(scalars, precision, scale)?;
ScalarValue::iter_to_decimal_array(scalars, *precision, *scale)?;
Arc::new(decimal_array)
}
DataType::Decimal256(_, _) => {
Expand Down Expand Up @@ -1157,8 +1168,8 @@ impl ScalarValue {

fn iter_to_decimal_array(
scalars: impl IntoIterator<Item = ScalarValue>,
precision: &usize,
scale: &usize,
precision: u8,
scale: u8,
) -> Result<Decimal128Array> {
let array = scalars
.into_iter()
Expand All @@ -1167,7 +1178,7 @@ impl ScalarValue {
_ => unreachable!(),
})
.collect::<Decimal128Array>()
.with_precision_and_scale(*precision, *scale)?;
.with_precision_and_scale(precision, scale)?;
Ok(array)
}

Expand Down Expand Up @@ -1233,25 +1244,25 @@ impl ScalarValue {
}

fn build_decimal_array(
value: &Option<i128>,
precision: &usize,
scale: &usize,
value: Option<i128>,
precision: u8,
scale: u8,
size: usize,
) -> Decimal128Array {
std::iter::repeat(*value)
std::iter::repeat(value)
.take(size)
.into_iter()
.collect::<Decimal128Array>()
.with_precision_and_scale(*precision, *scale)
.with_precision_and_scale(precision, scale)
.unwrap()
}

/// Converts a scalar value into an array of `size` rows.
pub fn to_array_of_size(&self, size: usize) -> ArrayRef {
match self {
ScalarValue::Decimal128(e, precision, scale) => {
Arc::new(ScalarValue::build_decimal_array(e, precision, scale, size))
}
ScalarValue::Decimal128(e, precision, scale) => Arc::new(
ScalarValue::build_decimal_array(*e, *precision, *scale, size),
),
ScalarValue::Boolean(e) => {
Arc::new(BooleanArray::from(vec![*e; size])) as ArrayRef
}
Expand Down Expand Up @@ -1453,18 +1464,14 @@ impl ScalarValue {
fn get_decimal_value_from_array(
array: &ArrayRef,
index: usize,
precision: &usize,
scale: &usize,
precision: u8,
scale: u8,
) -> ScalarValue {
let array = array.as_any().downcast_ref::<Decimal128Array>().unwrap();
if array.is_null(index) {
ScalarValue::Decimal128(None, *precision, *scale)
ScalarValue::Decimal128(None, precision, scale)
} else {
ScalarValue::Decimal128(
Some(array.value(index).as_i128()),
*precision,
*scale,
)
ScalarValue::Decimal128(Some(array.value(index).as_i128()), precision, scale)
}
}

Expand All @@ -1478,7 +1485,9 @@ impl ScalarValue {
Ok(match array.data_type() {
DataType::Null => ScalarValue::Null,
DataType::Decimal128(precision, scale) => {
ScalarValue::get_decimal_value_from_array(array, index, precision, scale)
ScalarValue::get_decimal_value_from_array(
array, index, *precision, *scale,
)
}
DataType::Boolean => typed_cast!(array, index, BooleanArray, Boolean),
DataType::Float64 => typed_cast!(array, index, Float64Array, Float64),
Expand Down Expand Up @@ -1638,8 +1647,8 @@ impl ScalarValue {
array: &ArrayRef,
index: usize,
value: &Option<i128>,
precision: usize,
scale: usize,
precision: u8,
scale: u8,
) -> bool {
let array = array.as_any().downcast_ref::<Decimal128Array>().unwrap();
if array.precision() != precision || array.scale() != scale {
Expand Down Expand Up @@ -3138,7 +3147,7 @@ mod tests {
let array = array.as_any().downcast_ref::<ListArray>().unwrap();

// Construct expected array with array builders
let field_a_builder = StringBuilder::new(4);
let field_a_builder = StringBuilder::with_capacity(4, 1024);
let primitive_value_builder = Int32Array::builder(8);
let field_primitive_list_builder = ListBuilder::new(primitive_value_builder);

Expand Down
7 changes: 4 additions & 3 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ unicode_expressions = ["datafusion-physical-expr/regex_expressions", "datafusion
[dependencies]
ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] }
apache-avro = { version = "0.14", optional = true }
arrow = { version = "21.0.0", features = ["prettyprint"] }
arrow = { version = "22.0.0", features = ["prettyprint"] }
async-trait = "0.1.41"
bytes = "1.1"
chrono = { version = "0.4", default-features = false }
Expand All @@ -78,10 +78,10 @@ num_cpus = "1.13.0"
object_store = "0.4.0"
ordered-float = "3.0"
parking_lot = "0.12"
parquet = { version = "21.0.0", features = ["arrow", "async"] }
parquet = { version = "22.0.0", features = ["arrow", "async"] }
paste = "^1.0"
pin-project-lite = "^0.2.7"
pyo3 = { version = "0.16", optional = true }
pyo3 = { version = "0.17.1", optional = true }
rand = "0.8"
rayon = { version = "1.5", optional = true }
smallvec = { version = "1.6", features = ["union"] }
Expand All @@ -93,6 +93,7 @@ url = "2.2"
uuid = { version = "1.0", features = ["v4"] }

[dev-dependencies]
arrow = { version = "22.0.0", features = ["prettyprint", "dyn_cmp_dict"] }
async-trait = "0.1.53"
criterion = "0.3"
csv = "1.1.6"
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/fuzz-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
arrow = { features = ["prettyprint"], version = "21.0.0" }
arrow = { version = "22.0.0", features = ["prettyprint"] }
env_logger = "0.9.0"
rand = "0.8"
8 changes: 4 additions & 4 deletions datafusion/core/src/avro_to_arrow/arrow_array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
}

fn build_boolean_array(&self, rows: RecordSlice, col_name: &str) -> ArrayRef {
let mut builder = BooleanBuilder::new(rows.len());
let mut builder = BooleanBuilder::with_capacity(rows.len());
for row in rows {
if let Some(value) = self.field_lookup(col_name, row) {
if let Some(boolean) = resolve_boolean(value) {
Expand Down Expand Up @@ -171,8 +171,8 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
where
T: ArrowPrimitiveType + ArrowDictionaryKeyType,
{
let key_builder = PrimitiveBuilder::<T>::new(row_len);
let values_builder = StringBuilder::new(row_len * 5);
let key_builder = PrimitiveBuilder::<T>::with_capacity(row_len);
let values_builder = StringBuilder::with_capacity(row_len, 5);
StringDictionaryBuilder::new(key_builder, values_builder)
}

Expand Down Expand Up @@ -258,7 +258,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
{
let mut builder: Box<dyn ArrayBuilder> = match data_type {
DataType::Utf8 => {
let values_builder = StringBuilder::new(rows.len() * 5);
let values_builder = StringBuilder::with_capacity(rows.len(), 5);
Box::new(ListBuilder::new(values_builder))
}
DataType::Dictionary(_, _) => {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/avro_to_arrow/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ fn schema_to_field_with_props(
AvroSchema::Fixed { size, .. } => DataType::FixedSizeBinary(*size as i32),
AvroSchema::Decimal {
precision, scale, ..
} => DataType::Decimal128(*precision, *scale),
} => DataType::Decimal128(*precision as u8, *scale as u8),
AvroSchema::Uuid => DataType::FixedSizeBinary(16),
AvroSchema::Date => DataType::Date32,
AvroSchema::TimeMillis => DataType::Time32(TimeUnit::Millisecond),
Expand Down
54 changes: 23 additions & 31 deletions datafusion/core/src/catalog/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,15 +246,11 @@ struct InformationSchemaTablesBuilder {

impl InformationSchemaTablesBuilder {
fn new() -> Self {
// StringBuilder requires providing an initial capacity, so
// pick 10 here arbitrarily as this is not performance
// critical code and the number of tables is unavailable here.
let default_capacity = 10;
Self {
catalog_names: StringBuilder::new(default_capacity),
schema_names: StringBuilder::new(default_capacity),
table_names: StringBuilder::new(default_capacity),
table_types: StringBuilder::new(default_capacity),
catalog_names: StringBuilder::new(),
schema_names: StringBuilder::new(),
table_names: StringBuilder::new(),
table_types: StringBuilder::new(),
}
}

Expand Down Expand Up @@ -321,15 +317,11 @@ struct InformationSchemaViewBuilder {

impl InformationSchemaViewBuilder {
fn new() -> Self {
// StringBuilder requires providing an initial capacity, so
// pick 10 here arbitrarily as this is not performance
// critical code and the number of tables is unavailable here.
let default_capacity = 10;
Self {
catalog_names: StringBuilder::new(default_capacity),
schema_names: StringBuilder::new(default_capacity),
table_names: StringBuilder::new(default_capacity),
definitions: StringBuilder::new(default_capacity),
catalog_names: StringBuilder::new(),
schema_names: StringBuilder::new(),
table_names: StringBuilder::new(),
definitions: StringBuilder::new(),
}
}

Expand Down Expand Up @@ -408,21 +400,21 @@ impl InformationSchemaColumnsBuilder {
// critical code and the number of tables is unavailable here.
let default_capacity = 10;
Self {
catalog_names: StringBuilder::new(default_capacity),
schema_names: StringBuilder::new(default_capacity),
table_names: StringBuilder::new(default_capacity),
column_names: StringBuilder::new(default_capacity),
ordinal_positions: UInt64Builder::new(default_capacity),
column_defaults: StringBuilder::new(default_capacity),
is_nullables: StringBuilder::new(default_capacity),
data_types: StringBuilder::new(default_capacity),
character_maximum_lengths: UInt64Builder::new(default_capacity),
character_octet_lengths: UInt64Builder::new(default_capacity),
numeric_precisions: UInt64Builder::new(default_capacity),
numeric_precision_radixes: UInt64Builder::new(default_capacity),
numeric_scales: UInt64Builder::new(default_capacity),
datetime_precisions: UInt64Builder::new(default_capacity),
interval_types: StringBuilder::new(default_capacity),
catalog_names: StringBuilder::new(),
schema_names: StringBuilder::new(),
table_names: StringBuilder::new(),
column_names: StringBuilder::new(),
ordinal_positions: UInt64Builder::with_capacity(default_capacity),
column_defaults: StringBuilder::new(),
is_nullables: StringBuilder::new(),
data_types: StringBuilder::new(),
character_maximum_lengths: UInt64Builder::with_capacity(default_capacity),
character_octet_lengths: UInt64Builder::with_capacity(default_capacity),
numeric_precisions: UInt64Builder::with_capacity(default_capacity),
numeric_precision_radixes: UInt64Builder::with_capacity(default_capacity),
numeric_scales: UInt64Builder::with_capacity(default_capacity),
datetime_precisions: UInt64Builder::with_capacity(default_capacity),
interval_types: StringBuilder::new(),
}
}

Expand Down
Loading

0 comments on commit d16457a

Please sign in to comment.