Skip to content

Commit

Permalink
Merge branch 'main' into chore/11598-rename-array-to-nested
Browse files Browse the repository at this point in the history
  • Loading branch information
goldmedal committed Jul 24, 2024
2 parents 93bbcee + 5c37d00 commit a05fe1f
Show file tree
Hide file tree
Showing 87 changed files with 2,969 additions and 1,644 deletions.
6 changes: 5 additions & 1 deletion datafusion-examples/examples/custom_file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ impl FileFormat for TSVFileFormat {
}
}

#[derive(Default)]
#[derive(Default, Debug)]
/// Factory for creating TSV file formats
///
/// This factory is a wrapper around the CSV file format factory
Expand Down Expand Up @@ -166,6 +166,10 @@ impl FileFormatFactory for TSVFileFactory {
fn default(&self) -> std::sync::Arc<dyn FileFormat> {
todo!()
}

fn as_any(&self) -> &dyn Any {
self
}
}

impl GetExt for TSVFileFactory {
Expand Down
15 changes: 9 additions & 6 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,18 +374,21 @@ config_namespace! {

/// (writing) Sets parquet writer version
/// valid values are "1.0" and "2.0"
pub writer_version: String, default = "1.0".into()
pub writer_version: String, default = "1.0".to_string()

/// (writing) Sets default parquet compression codec.
/// Valid values are: uncompressed, snappy, gzip(level),
/// lzo, brotli(level), lz4, zstd(level), and lz4_raw.
/// These values are not case sensitive. If NULL, uses
/// default parquet writer setting
///
/// Note that this default setting is not the same as
/// the default parquet writer setting.
pub compression: Option<String>, default = Some("zstd(3)".into())

/// (writing) Sets if dictionary encoding is enabled. If NULL, uses
/// default parquet writer setting
pub dictionary_enabled: Option<bool>, default = None
pub dictionary_enabled: Option<bool>, default = Some(true)

/// (writing) Sets best effort maximum dictionary page size, in bytes
pub dictionary_page_size_limit: usize, default = 1024 * 1024
Expand All @@ -398,21 +401,21 @@ config_namespace! {

/// (writing) Sets max statistics size for any column. If NULL, uses
/// default parquet writer setting
pub max_statistics_size: Option<usize>, default = None
pub max_statistics_size: Option<usize>, default = Some(4096)

/// (writing) Target maximum number of rows in each row group (defaults to 1M
/// rows). Writing larger row groups requires more memory to write, but
/// can get better compression and be faster to read.
pub max_row_group_size: usize, default = 1024 * 1024
pub max_row_group_size: usize, default = 1024 * 1024

/// (writing) Sets "created by" property
pub created_by: String, default = concat!("datafusion version ", env!("CARGO_PKG_VERSION")).into()

/// (writing) Sets column index truncate length
pub column_index_truncate_length: Option<usize>, default = None
pub column_index_truncate_length: Option<usize>, default = Some(64)

/// (writing) Sets best effort maximum number of rows in data page
pub data_page_row_count_limit: usize, default = usize::MAX
pub data_page_row_count_limit: usize, default = 20_000

/// (writing) Sets default encoding for any column.
/// Valid values are: plain, plain_dictionary, rle,
Expand Down
30 changes: 2 additions & 28 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -521,34 +521,8 @@ impl DFSchema {

/// Find the field with the given name
pub fn field_with_unqualified_name(&self, name: &str) -> Result<&Field> {
let matches = self.qualified_fields_with_unqualified_name(name);
match matches.len() {
0 => Err(unqualified_field_not_found(name, self)),
1 => Ok(matches[0].1),
_ => {
// When `matches` size > 1, it doesn't necessarily mean an `ambiguous name` problem.
// Because name may generate from Alias/... . It means that it don't own qualifier.
// For example:
// Join on id = b.id
// Project a.id as id TableScan b id
// In this case, there isn't `ambiguous name` problem. When `matches` just contains
// one field without qualifier, we should return it.
let fields_without_qualifier = matches
.iter()
.filter(|(q, _)| q.is_none())
.collect::<Vec<_>>();
if fields_without_qualifier.len() == 1 {
Ok(fields_without_qualifier[0].1)
} else {
_schema_err!(SchemaError::AmbiguousReference {
field: Column {
relation: None,
name: name.to_string(),
},
})
}
}
}
self.qualified_field_with_unqualified_name(name)
.map(|(_, field)| field)
}

/// Find the field with the given qualified name
Expand Down
72 changes: 0 additions & 72 deletions datafusion/common/src/file_options/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -644,56 +644,6 @@ mod tests {
"datafusion's default is zstd"
);

// TODO: data_page_row_count_limit defaults do not match
// refer to https://github.com/apache/datafusion/issues/11367
assert_eq!(
default_writer_props.data_page_row_count_limit(),
20_000,
"extern parquet's default data_page_row_count_limit is 20_000"
);
assert_eq!(
from_datafusion_defaults.data_page_row_count_limit(),
usize::MAX,
"datafusion's default is usize::MAX"
);

// TODO: column_index_truncate_length do not match
// refer to https://github.com/apache/datafusion/issues/11367
assert_eq!(
default_writer_props.column_index_truncate_length(),
Some(64),
"extern parquet's default is 64"
);
assert_eq!(
from_datafusion_defaults.column_index_truncate_length(),
None,
"datafusion's default is None"
);

// The next few examples are where datafusion's default is None.
// But once datafusion's TableParquetOptions are converted to a WriterProperties,
// then we get the extern parquet's defaults.
//
// In other words, we do not get indeterminate behavior in the output writer props.
// But this is only because we use the extern parquet's defaults when we leave
// the datafusion setting as None.

// datafusion's `None` for Option<bool> => becomes parquet's true
// TODO: should this be changed?
// refer to https://github.com/apache/datafusion/issues/11367
assert!(
default_writer_props.dictionary_enabled(&"default".into()),
"extern parquet's default is true"
);
assert_eq!(
default_table_writer_opts.global.dictionary_enabled, None,
"datafusion's has no default"
);
assert!(
from_datafusion_defaults.dictionary_enabled(&"default".into()),
"should see the extern parquet's default over-riding datafusion's None",
);

// datafusion's `None` for Option<String> => becomes parquet's EnabledStatistics::Page
// TODO: should this be changed?
// refer to https://github.com/apache/datafusion/issues/11367
Expand All @@ -712,35 +662,13 @@ mod tests {
"should see the extern parquet's default over-riding datafusion's None",
);

// datafusion's `None` for Option<usize> => becomes parquet's 4096
// TODO: should this be changed?
// refer to https://github.com/apache/datafusion/issues/11367
assert_eq!(
default_writer_props.max_statistics_size(&"default".into()),
4096,
"extern parquet's default is 4096"
);
assert_eq!(
default_table_writer_opts.global.max_statistics_size, None,
"datafusion's has no default"
);
assert_eq!(
default_writer_props.max_statistics_size(&"default".into()),
4096,
"should see the extern parquet's default over-riding datafusion's None",
);

// Confirm all other settings are equal.
// First resolve the known discrepancies, (set as the same).
// TODO: once we fix the above mis-matches, we should be able to remove this.
let mut from_extern_parquet =
session_config_from_writer_props(&default_writer_props);
from_extern_parquet.global.compression = Some("zstd(3)".into());
from_extern_parquet.global.data_page_row_count_limit = usize::MAX;
from_extern_parquet.global.column_index_truncate_length = None;
from_extern_parquet.global.dictionary_enabled = None;
from_extern_parquet.global.statistics_enabled = None;
from_extern_parquet.global.max_statistics_size = None;

// Expected: the remaining should match
let same_created_by = default_table_writer_opts.global.created_by.clone(); // we expect these to be different
Expand Down
102 changes: 100 additions & 2 deletions datafusion/common/src/hash_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ use arrow_buffer::IntervalMonthDayNano;

use crate::cast::{
as_boolean_array, as_fixed_size_list_array, as_generic_binary_array,
as_large_list_array, as_list_array, as_primitive_array, as_string_array,
as_struct_array,
as_large_list_array, as_list_array, as_map_array, as_primitive_array,
as_string_array, as_struct_array,
};
use crate::error::{Result, _internal_err};

Expand Down Expand Up @@ -236,6 +236,40 @@ fn hash_struct_array(
Ok(())
}

fn hash_map_array(
array: &MapArray,
random_state: &RandomState,
hashes_buffer: &mut [u64],
) -> Result<()> {
let nulls = array.nulls();
let offsets = array.offsets();

// Create hashes for each entry in each row
let mut values_hashes = vec![0u64; array.entries().len()];
create_hashes(array.entries().columns(), random_state, &mut values_hashes)?;

// Combine the hashes for entries on each row with each other and previous hash for that row
if let Some(nulls) = nulls {
for (i, (start, stop)) in offsets.iter().zip(offsets.iter().skip(1)).enumerate() {
if nulls.is_valid(i) {
let hash = &mut hashes_buffer[i];
for values_hash in &values_hashes[start.as_usize()..stop.as_usize()] {
*hash = combine_hashes(*hash, *values_hash);
}
}
}
} else {
for (i, (start, stop)) in offsets.iter().zip(offsets.iter().skip(1)).enumerate() {
let hash = &mut hashes_buffer[i];
for values_hash in &values_hashes[start.as_usize()..stop.as_usize()] {
*hash = combine_hashes(*hash, *values_hash);
}
}
}

Ok(())
}

fn hash_list_array<OffsetSize>(
array: &GenericListArray<OffsetSize>,
random_state: &RandomState,
Expand Down Expand Up @@ -400,6 +434,10 @@ pub fn create_hashes<'a>(
let array = as_large_list_array(array)?;
hash_list_array(array, random_state, hashes_buffer)?;
}
DataType::Map(_, _) => {
let array = as_map_array(array)?;
hash_map_array(array, random_state, hashes_buffer)?;
}
DataType::FixedSizeList(_,_) => {
let array = as_fixed_size_list_array(array)?;
hash_fixed_list_array(array, random_state, hashes_buffer)?;
Expand Down Expand Up @@ -572,6 +610,7 @@ mod tests {
Some(vec![Some(3), None, Some(5)]),
None,
Some(vec![Some(0), Some(1), Some(2)]),
Some(vec![]),
];
let list_array =
Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(data)) as ArrayRef;
Expand All @@ -581,6 +620,7 @@ mod tests {
assert_eq!(hashes[0], hashes[5]);
assert_eq!(hashes[1], hashes[4]);
assert_eq!(hashes[2], hashes[3]);
assert_eq!(hashes[1], hashes[6]); // null vs empty list
}

#[test]
Expand Down Expand Up @@ -692,6 +732,64 @@ mod tests {
assert_eq!(hashes[0], hashes[1]);
}

#[test]
// Tests actual values of hashes, which are different if forcing collisions
#[cfg(not(feature = "force_hash_collisions"))]
fn create_hashes_for_map_arrays() {
let mut builder =
MapBuilder::new(None, StringBuilder::new(), Int32Builder::new());
// Row 0
builder.keys().append_value("key1");
builder.keys().append_value("key2");
builder.values().append_value(1);
builder.values().append_value(2);
builder.append(true).unwrap();
// Row 1
builder.keys().append_value("key1");
builder.keys().append_value("key2");
builder.values().append_value(1);
builder.values().append_value(2);
builder.append(true).unwrap();
// Row 2
builder.keys().append_value("key1");
builder.keys().append_value("key2");
builder.values().append_value(1);
builder.values().append_value(3);
builder.append(true).unwrap();
// Row 3
builder.keys().append_value("key1");
builder.keys().append_value("key3");
builder.values().append_value(1);
builder.values().append_value(2);
builder.append(true).unwrap();
// Row 4
builder.keys().append_value("key1");
builder.values().append_value(1);
builder.append(true).unwrap();
// Row 5
builder.keys().append_value("key1");
builder.values().append_null();
builder.append(true).unwrap();
// Row 6
builder.append(true).unwrap();
// Row 7
builder.keys().append_value("key1");
builder.values().append_value(1);
builder.append(false).unwrap();

let array = Arc::new(builder.finish()) as ArrayRef;

let random_state = RandomState::with_seeds(0, 0, 0, 0);
let mut hashes = vec![0; array.len()];
create_hashes(&[array], &random_state, &mut hashes).unwrap();
assert_eq!(hashes[0], hashes[1]); // same value
assert_ne!(hashes[0], hashes[2]); // different value
assert_ne!(hashes[0], hashes[3]); // different key
assert_ne!(hashes[0], hashes[4]); // missing an entry
assert_ne!(hashes[4], hashes[5]); // filled vs null value
assert_eq!(hashes[6], hashes[7]); // empty vs null map
}

#[test]
// Tests actual values of hashes, which are different if forcing collisions
#[cfg(not(feature = "force_hash_collisions"))]
Expand Down
2 changes: 1 addition & 1 deletion datafusion/common/src/scalar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1770,6 +1770,7 @@ impl ScalarValue {
}
DataType::List(_)
| DataType::LargeList(_)
| DataType::Map(_, _)
| DataType::Struct(_)
| DataType::Union(_, _) => {
let arrays = scalars.map(|s| s.to_array()).collect::<Result<Vec<_>>>()?;
Expand Down Expand Up @@ -1838,7 +1839,6 @@ impl ScalarValue {
| DataType::Time32(TimeUnit::Nanosecond)
| DataType::Time64(TimeUnit::Second)
| DataType::Time64(TimeUnit::Millisecond)
| DataType::Map(_, _)
| DataType::RunEndEncoded(_, _)
| DataType::ListView(_)
| DataType::LargeListView(_) => {
Expand Down
6 changes: 5 additions & 1 deletion datafusion/core/src/datasource/file_format/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ const INITIAL_BUFFER_BYTES: usize = 1048576;
/// If the buffered Arrow data exceeds this size, it is flushed to object store
const BUFFER_FLUSH_BYTES: usize = 1024000;

#[derive(Default)]
#[derive(Default, Debug)]
/// Factory struct used to create [ArrowFormat]
pub struct ArrowFormatFactory;

Expand All @@ -89,6 +89,10 @@ impl FileFormatFactory for ArrowFormatFactory {
fn default(&self) -> Arc<dyn FileFormat> {
Arc::new(ArrowFormat)
}

fn as_any(&self) -> &dyn Any {
self
}
}

impl GetExt for ArrowFormatFactory {
Expand Down
Loading

0 comments on commit a05fe1f

Please sign in to comment.