Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into fix_case_when_dat…
Browse files Browse the repository at this point in the history
…atype_#2818
  • Loading branch information
liukun4515 committed Jul 2, 2022
2 parents 00edadf + 88b88d4 commit d50c671
Show file tree
Hide file tree
Showing 25 changed files with 271 additions and 123 deletions.
2 changes: 1 addition & 1 deletion datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ rust-version = "1.59"
readme = "README.md"

[dependencies]
arrow = { version = "16.0.0" }
arrow = { version = "17.0.0" }
clap = { version = "3", features = ["derive", "cargo"] }
datafusion = { path = "../datafusion/core", version = "9.0.0" }
dirs = "4.0.0"
Expand Down
2 changes: 1 addition & 1 deletion datafusion-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ pub async fn main() -> Result<()> {
env::set_current_dir(&p).unwrap();
};

let mut session_config = SessionConfig::new().with_information_schema(true);
let mut session_config = SessionConfig::from_env().with_information_schema(true);

if let Some(batch_size) = args.batch_size {
session_config = session_config.with_batch_size(batch_size);
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ path = "examples/avro_sql.rs"
required-features = ["datafusion/avro"]

[dev-dependencies]
arrow-flight = { version = "16.0.0" }
arrow-flight = { version = "17.0.0" }
async-trait = "0.1.41"
datafusion = { path = "../datafusion/core" }
futures = "0.3"
Expand Down
4 changes: 2 additions & 2 deletions datafusion/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ jit = ["cranelift-module"]
pyarrow = ["pyo3"]

[dependencies]
arrow = { version = "16.0.0", features = ["prettyprint"] }
arrow = { version = "17.0.0", features = ["prettyprint"] }
avro-rs = { version = "0.13", features = ["snappy"], optional = true }
cranelift-module = { version = "0.85.0", optional = true }
ordered-float = "3.0"
parquet = { version = "16.0.0", features = ["arrow"], optional = true }
parquet = { version = "17.0.0", features = ["arrow"], optional = true }
pyo3 = { version = "0.16", optional = true }
sqlparser = "0.18"
23 changes: 20 additions & 3 deletions datafusion/common/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@
use crate::error::{DataFusionError, Result};
use arrow::{
array::*,
compute::kernels::cast::cast,
compute::kernels::cast::{cast, cast_with_options, CastOptions},
datatypes::{
ArrowDictionaryKeyType, ArrowNativeType, DataType, Field, Float32Type,
Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, IntervalUnit, TimeUnit,
TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
TimestampSecondType, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
DECIMAL_MAX_PRECISION,
},
util::decimal::{BasicDecimal, Decimal128},
};
use ordered_float::OrderedFloat;
use std::cmp::Ordering;
Expand Down Expand Up @@ -1275,7 +1276,11 @@ impl ScalarValue {
if array.is_null(index) {
ScalarValue::Decimal128(None, *precision, *scale)
} else {
ScalarValue::Decimal128(Some(array.value(index)), *precision, *scale)
ScalarValue::Decimal128(
Some(array.value(index).as_i128()),
*precision,
*scale,
)
}
}

Expand Down Expand Up @@ -1437,6 +1442,14 @@ impl ScalarValue {
})
}

/// Try to parse `value` into a ScalarValue of type `target_type`
pub fn try_from_string(value: String, target_type: &DataType) -> Result<Self> {
let value = ScalarValue::Utf8(Some(value));
let cast_options = CastOptions { safe: false };
let cast_arr = cast_with_options(&value.to_array(), target_type, &cast_options)?;
ScalarValue::try_from_array(&cast_arr, 0)
}

fn eq_array_decimal(
array: &ArrayRef,
index: usize,
Expand All @@ -1450,7 +1463,11 @@ impl ScalarValue {
}
match value {
None => array.is_null(index),
Some(v) => !array.is_null(index) && array.value(index) == *v,
Some(v) => {
!array.is_null(index)
&& array.value(index)
== Decimal128::new(precision, scale, &v.to_le_bytes())
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ unicode_expressions = ["datafusion-physical-expr/regex_expressions", "datafusion

[dependencies]
ahash = { version = "0.7", default-features = false }
arrow = { version = "16.0.0", features = ["prettyprint"] }
arrow = { version = "17.0.0", features = ["prettyprint"] }
async-trait = "0.1.41"
avro-rs = { version = "0.13", features = ["snappy"], optional = true }
chrono = { version = "0.4", default-features = false }
Expand All @@ -77,7 +77,7 @@ num-traits = { version = "0.2", optional = true }
num_cpus = "1.13.0"
ordered-float = "3.0"
parking_lot = "0.12"
parquet = { version = "16.0.0", features = ["arrow"] }
parquet = { version = "17.0.0", features = ["arrow"] }
paste = "^1.0"
pin-project-lite = "^0.2.7"
pyo3 = { version = "0.16", optional = true }
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/fuzz-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
arrow = { version = "16.0.0", features = ["prettyprint"] }
arrow = { version = "17.0.0", features = ["prettyprint"] }
env_logger = "0.9.0"
rand = "0.8"
36 changes: 34 additions & 2 deletions datafusion/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
use arrow::datatypes::DataType;
use datafusion_common::ScalarValue;
use itertools::Itertools;
use log::warn;
use std::collections::HashMap;
use std::env;

/// Configuration option "datafusion.optimizer.filter_null_join_keys"
pub const OPT_FILTER_NULL_JOIN_KEYS: &str = "datafusion.optimizer.filter_null_join_keys";
Expand Down Expand Up @@ -144,6 +146,7 @@ impl BuiltInConfigs {

/// Generate documentation that can be included int he user guide
pub fn generate_config_markdown() -> String {
use std::fmt::Write as _;
let configs = Self::new();
let mut docs = "| key | type | default | description |\n".to_string();
docs += "|-----|------|---------|-------------|\n";
Expand All @@ -152,8 +155,9 @@ impl BuiltInConfigs {
.iter()
.sorted_by_key(|c| c.key.as_str())
{
docs += &format!(
"| {} | {} | {} | {} |\n",
let _ = writeln!(
&mut docs,
"| {} | {} | {} | {} |",
config.key, config.data_type, config.default_value, config.description
);
}
Expand Down Expand Up @@ -184,6 +188,34 @@ impl ConfigOptions {
Self { options }
}

/// Create new ConfigOptions struct, taking values from environment variables where possible.
/// For example, setting DATAFUSION_EXECUTION_BATCH_SIZE to control `datafusion.execution.batch_size`.
pub fn from_env() -> Self {
let mut options = HashMap::new();
let built_in = BuiltInConfigs::new();
for config_def in &built_in.config_definitions {
let config_value = {
let mut env_key = config_def.key.replace('.', "_");
env_key.make_ascii_uppercase();
match env::var(&env_key) {
Ok(value) => match ScalarValue::try_from_string(
value.clone(),
&config_def.data_type,
) {
Ok(parsed) => parsed,
Err(_) => {
warn!("Warning: could not parse environment variable {}={} to type {}.", env_key, value, config_def.data_type);
config_def.default_value.clone()
}
},
Err(_) => config_def.default_value.clone(),
}
};
options.insert(config_def.key.clone(), config_value);
}
Self { options }
}

/// set a configuration option
pub fn set(&mut self, key: &str, value: ScalarValue) {
self.options.insert(key.to_string(), value);
Expand Down
8 changes: 8 additions & 0 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1041,6 +1041,14 @@ impl SessionConfig {
Default::default()
}

/// Create an execution config with config options read from the environment
pub fn from_env() -> Self {
Self {
config_options: ConfigOptions::from_env(),
..Default::default()
}
}

/// Set a configuration option
pub fn set(mut self, key: &str, value: ScalarValue) -> Self {
self.config_options.set(key, value);
Expand Down
16 changes: 10 additions & 6 deletions datafusion/core/src/physical_plan/hash_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,25 +62,29 @@ fn hash_decimal128<'a>(
if array.null_count() == 0 {
if mul_col {
for (i, hash) in hashes_buffer.iter_mut().enumerate() {
*hash =
combine_hashes(i128::get_hash(&array.value(i), random_state), *hash);
*hash = combine_hashes(
i128::get_hash(&array.value(i).as_i128(), random_state),
*hash,
);
}
} else {
for (i, hash) in hashes_buffer.iter_mut().enumerate() {
*hash = i128::get_hash(&array.value(i), random_state);
*hash = i128::get_hash(&array.value(i).as_i128(), random_state);
}
}
} else if mul_col {
for (i, hash) in hashes_buffer.iter_mut().enumerate() {
if !array.is_null(i) {
*hash =
combine_hashes(i128::get_hash(&array.value(i), random_state), *hash);
*hash = combine_hashes(
i128::get_hash(&array.value(i).as_i128(), random_state),
*hash,
);
}
}
} else {
for (i, hash) in hashes_buffer.iter_mut().enumerate() {
if !array.is_null(i) {
*hash = i128::get_hash(&array.value(i), random_state);
*hash = i128::get_hash(&array.value(i).as_i128(), random_state);
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,15 @@ mod tests {
let array = array.as_any().downcast_ref::<DecimalArray>().unwrap();
assert_eq!(1, array.len());
assert_eq!(DataType::Decimal(10, 1), array.data_type().clone());
assert_eq!(123i128, array.value(0));
assert_eq!(123i128, array.value(0).as_i128());

// decimal scalar to array with size
let array = decimal_value.to_array_of_size(10);
let array_decimal = array.as_any().downcast_ref::<DecimalArray>().unwrap();
assert_eq!(10, array.len());
assert_eq!(DataType::Decimal(10, 1), array.data_type().clone());
assert_eq!(123i128, array_decimal.value(0));
assert_eq!(123i128, array_decimal.value(9));
assert_eq!(123i128, array_decimal.value(0).as_i128());
assert_eq!(123i128, array_decimal.value(9).as_i128());
// test eq array
assert!(decimal_value.eq_array(&array, 1));
assert!(decimal_value.eq_array(&array, 5));
Expand Down
49 changes: 49 additions & 0 deletions datafusion/core/tests/config_from_env.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use datafusion::config::ConfigOptions;
use std::env;

#[test]
fn get_config_bool_from_env() {
let config_key = "datafusion.optimizer.filter_null_join_keys";
let env_key = "DATAFUSION_OPTIMIZER_FILTER_NULL_JOIN_KEYS";
env::set_var(env_key, "true");
let config = ConfigOptions::from_env();
env::remove_var(env_key);
assert!(config.get_bool(config_key));
}

#[test]
fn get_config_int_from_env() {
let config_key = "datafusion.execution.batch_size";
let env_key = "DATAFUSION_EXECUTION_BATCH_SIZE";
env::set_var(env_key, "4096");
let config = ConfigOptions::from_env();
env::remove_var(env_key);
assert_eq!(config.get_u64(config_key), 4096);
}

#[test]
fn get_config_int_from_env_invalid() {
let config_key = "datafusion.execution.coalesce_target_batch_size";
let env_key = "DATAFUSION_EXECUTION_COALESCE_TARGET_BATCH_SIZE";
env::set_var(env_key, "abc");
let config = ConfigOptions::from_env();
env::remove_var(env_key);
assert_eq!(config.get_u64(config_key), 4096); // set to its default value
}
2 changes: 1 addition & 1 deletion datafusion/expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,6 @@ path = "src/lib.rs"

[dependencies]
ahash = { version = "0.7", default-features = false }
arrow = { version = "16.0.0", features = ["prettyprint"] }
arrow = { version = "17.0.0", features = ["prettyprint"] }
datafusion-common = { path = "../common", version = "9.0.0" }
sqlparser = "0.18"
2 changes: 1 addition & 1 deletion datafusion/jit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ path = "src/lib.rs"
jit = []

[dependencies]
arrow = { version = "16.0.0" }
arrow = { version = "17.0.0" }
cranelift = "0.85.0"
cranelift-jit = "0.85.0"
cranelift-module = "0.85.0"
Expand Down
2 changes: 1 addition & 1 deletion datafusion/optimizer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ default = ["unicode_expressions"]
unicode_expressions = []

[dependencies]
arrow = { version = "16.0.0", features = ["prettyprint"] }
arrow = { version = "17.0.0", features = ["prettyprint"] }
async-trait = "0.1.41"
chrono = { version = "0.4", default-features = false }
datafusion-common = { path = "../common", version = "9.0.0" }
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ unicode_expressions = ["unicode-segmentation"]

[dependencies]
ahash = { version = "0.7", default-features = false }
arrow = { version = "16.0.0", features = ["prettyprint"] }
arrow = { version = "17.0.0", features = ["prettyprint"] }
blake2 = { version = "^0.10.2", optional = true }
blake3 = { version = "1.0", optional = true }
chrono = { version = "0.4", default-features = false }
Expand Down
6 changes: 3 additions & 3 deletions datafusion/physical-expr/src/aggregate/min_max.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,17 +182,17 @@ macro_rules! typed_min_max_batch_decimal128 {
for i in 1..array.len() {
result = result.$OP(array.value(i));
}
ScalarValue::Decimal128(Some(result), *$PRECISION, *$SCALE)
ScalarValue::Decimal128(Some(result.as_i128()), *$PRECISION, *$SCALE)
} else {
let mut result = 0_i128;
let mut has_value = false;
for i in 0..array.len() {
if !has_value && array.is_valid(i) {
has_value = true;
result = array.value(i);
result = array.value(i).as_i128();
}
if array.is_valid(i) {
result = result.$OP(array.value(i));
result = result.$OP(array.value(i).as_i128());
}
}
ScalarValue::Decimal128(Some(result), *$PRECISION, *$SCALE)
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/aggregate/sum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ fn sum_decimal_batch(
let mut result = 0_i128;
for i in 0..array.len() {
if array.is_valid(i) {
result += array.value(i);
result += array.value(i).as_i128();
}
}
Ok(ScalarValue::Decimal128(Some(result), *precision, *scale))
Expand Down
Loading

0 comments on commit d50c671

Please sign in to comment.