Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -559,12 +559,6 @@ config_namespace! {
/// (reading) Use any available bloom filters when reading parquet files
pub bloom_filter_on_read: bool, default = true

/// (reading) Whether or not to enable the caching of embedded metadata of Parquet files
/// (footer and page metadata). Enabling it can offer substantial performance improvements
/// for repeated queries over large files. By default, the cache is automatically
/// invalidated when the underlying file is modified.
pub cache_metadata: bool, default = false

// The following options affect writing to parquet files
// and map to parquet::file::properties::WriterProperties

Expand Down
3 changes: 0 additions & 3 deletions datafusion/common/src/file_options/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,6 @@ impl ParquetOptions {
binary_as_string: _, // not used for writer props
coerce_int96: _, // not used for writer props
skip_arrow_metadata: _,
cache_metadata: _,
} = self;

let mut builder = WriterProperties::builder()
Expand Down Expand Up @@ -502,7 +501,6 @@ mod tests {
binary_as_string: defaults.binary_as_string,
skip_arrow_metadata: defaults.skip_arrow_metadata,
coerce_int96: None,
cache_metadata: defaults.cache_metadata,
}
}

Expand Down Expand Up @@ -613,7 +611,6 @@ mod tests {
binary_as_string: global_options_defaults.binary_as_string,
skip_arrow_metadata: global_options_defaults.skip_arrow_metadata,
coerce_int96: None,
cache_metadata: global_options_defaults.cache_metadata,
},
column_specific_options,
key_value_metadata,
Expand Down
15 changes: 0 additions & 15 deletions datafusion/core/src/datasource/file_format/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,11 +254,6 @@ pub struct ParquetReadOptions<'a> {
pub file_sort_order: Vec<Vec<SortExpr>>,
/// Properties for decryption of Parquet files that use modular encryption
pub file_decryption_properties: Option<ConfigFileDecryptionProperties>,
/// Whether or not to enable the caching of embedded metadata of this Parquet file (footer and
/// page metadata). Enabling it can offer substantial performance improvements for repeated
/// queries over large files. By default, the cache is automatically invalidated when the
/// underlying file is modified.
pub cache_metadata: Option<bool>,
}

impl Default for ParquetReadOptions<'_> {
Expand All @@ -271,7 +266,6 @@ impl Default for ParquetReadOptions<'_> {
schema: None,
file_sort_order: vec![],
file_decryption_properties: None,
cache_metadata: None,
}
}
}
Expand Down Expand Up @@ -331,12 +325,6 @@ impl<'a> ParquetReadOptions<'a> {
self.file_decryption_properties = Some(file_decryption_properties);
self
}

/// Specify whether to enable or not metadata caching
pub fn cache_metadata(mut self, cache_metadata: bool) -> Self {
self.cache_metadata = Some(cache_metadata);
self
}
}

/// Options that control the reading of ARROW files.
Expand Down Expand Up @@ -602,9 +590,6 @@ impl ReadOptions<'_> for ParquetReadOptions<'_> {
if let Some(file_decryption_properties) = &self.file_decryption_properties {
options.crypto.file_decryption = Some(file_decryption_properties.clone());
}
if let Some(cache_metadata) = self.cache_metadata {
options.global.cache_metadata = cache_metadata;
}
let mut file_format = ParquetFormat::new().with_options(options);

if let Some(parquet_pruning) = self.parquet_pruning {
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/tests/parquet/page_pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -903,8 +903,8 @@ async fn without_pushdown_filter() {
)
.unwrap();

// Without filter will not read pageIndex.
assert!(bytes_scanned_with_filter > bytes_scanned_without_filter);
// Same amount of bytes are scanned when defaulting to cache parquet metadata
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nuno-faria this change seems to be brought by CachedParquetFileReaderFactory you added. I just changed the test to reflect the correct behaviour after using the factory as the default now. Just making sure if it looks correct?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right, when caching by default the page index is always read, even if the query does not require it. FYI @alamb

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that is best practice anyways and probably what people want

assert!(bytes_scanned_with_filter == bytes_scanned_without_filter);
}

#[tokio::test]
Expand Down
19 changes: 8 additions & 11 deletions datafusion/datasource-parquet/src/file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,17 +447,14 @@ impl FileFormat for ParquetFormat {

let mut source = ParquetSource::new(self.options.clone());

// Use the CachedParquetFileReaderFactory when metadata caching is enabled
if self.options.global.cache_metadata {
let metadata_cache =
state.runtime_env().cache_manager.get_file_metadata_cache();
let store = state
.runtime_env()
.object_store(conf.object_store_url.clone())?;
let cached_parquet_read_factory =
Arc::new(CachedParquetFileReaderFactory::new(store, metadata_cache));
source = source.with_parquet_file_reader_factory(cached_parquet_read_factory);
}
// Use the CachedParquetFileReaderFactory
let metadata_cache = state.runtime_env().cache_manager.get_file_metadata_cache();
let store = state
.runtime_env()
.object_store(conf.object_store_url.clone())?;
let cached_parquet_read_factory =
Arc::new(CachedParquetFileReaderFactory::new(store, metadata_cache));
source = source.with_parquet_file_reader_factory(cached_parquet_read_factory);

if let Some(metadata_size_hint) = metadata_size_hint {
source = source.with_metadata_size_hint(metadata_size_hint)
Expand Down
1 change: 0 additions & 1 deletion datafusion/proto-common/proto/datafusion_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,6 @@ message ParquetOptions {
bool schema_force_view_types = 28; // default = false
bool binary_as_string = 29; // default = false
bool skip_arrow_metadata = 30; // default = false
bool cache_metadata = 33; // default = false

oneof metadata_size_hint_opt {
uint64 metadata_size_hint = 4;
Expand Down
1 change: 0 additions & 1 deletion datafusion/proto-common/src/from_proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -999,7 +999,6 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions {
protobuf::parquet_options::CoerceInt96Opt::CoerceInt96(v) => Some(v),
}).unwrap_or(None),
skip_arrow_metadata: value.skip_arrow_metadata,
cache_metadata: value.cache_metadata,
})
}
}
Expand Down
18 changes: 0 additions & 18 deletions datafusion/proto-common/src/generated/pbjson.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5568,9 +5568,6 @@ impl serde::Serialize for ParquetOptions {
if self.skip_arrow_metadata {
len += 1;
}
if self.cache_metadata {
len += 1;
}
if self.dictionary_page_size_limit != 0 {
len += 1;
}
Expand Down Expand Up @@ -5670,9 +5667,6 @@ impl serde::Serialize for ParquetOptions {
if self.skip_arrow_metadata {
struct_ser.serialize_field("skipArrowMetadata", &self.skip_arrow_metadata)?;
}
if self.cache_metadata {
struct_ser.serialize_field("cacheMetadata", &self.cache_metadata)?;
}
if self.dictionary_page_size_limit != 0 {
#[allow(clippy::needless_borrow)]
#[allow(clippy::needless_borrows_for_generic_args)]
Expand Down Expand Up @@ -5810,8 +5804,6 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
"binaryAsString",
"skip_arrow_metadata",
"skipArrowMetadata",
"cache_metadata",
"cacheMetadata",
"dictionary_page_size_limit",
"dictionaryPageSizeLimit",
"data_page_row_count_limit",
Expand Down Expand Up @@ -5858,7 +5850,6 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
SchemaForceViewTypes,
BinaryAsString,
SkipArrowMetadata,
CacheMetadata,
DictionaryPageSizeLimit,
DataPageRowCountLimit,
MaxRowGroupSize,
Expand Down Expand Up @@ -5910,7 +5901,6 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
"schemaForceViewTypes" | "schema_force_view_types" => Ok(GeneratedField::SchemaForceViewTypes),
"binaryAsString" | "binary_as_string" => Ok(GeneratedField::BinaryAsString),
"skipArrowMetadata" | "skip_arrow_metadata" => Ok(GeneratedField::SkipArrowMetadata),
"cacheMetadata" | "cache_metadata" => Ok(GeneratedField::CacheMetadata),
"dictionaryPageSizeLimit" | "dictionary_page_size_limit" => Ok(GeneratedField::DictionaryPageSizeLimit),
"dataPageRowCountLimit" | "data_page_row_count_limit" => Ok(GeneratedField::DataPageRowCountLimit),
"maxRowGroupSize" | "max_row_group_size" => Ok(GeneratedField::MaxRowGroupSize),
Expand Down Expand Up @@ -5960,7 +5950,6 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
let mut schema_force_view_types__ = None;
let mut binary_as_string__ = None;
let mut skip_arrow_metadata__ = None;
let mut cache_metadata__ = None;
let mut dictionary_page_size_limit__ = None;
let mut data_page_row_count_limit__ = None;
let mut max_row_group_size__ = None;
Expand Down Expand Up @@ -6081,12 +6070,6 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
}
skip_arrow_metadata__ = Some(map_.next_value()?);
}
GeneratedField::CacheMetadata => {
if cache_metadata__.is_some() {
return Err(serde::de::Error::duplicate_field("cacheMetadata"));
}
cache_metadata__ = Some(map_.next_value()?);
}
GeneratedField::DictionaryPageSizeLimit => {
if dictionary_page_size_limit__.is_some() {
return Err(serde::de::Error::duplicate_field("dictionaryPageSizeLimit"));
Expand Down Expand Up @@ -6196,7 +6179,6 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
schema_force_view_types: schema_force_view_types__.unwrap_or_default(),
binary_as_string: binary_as_string__.unwrap_or_default(),
skip_arrow_metadata: skip_arrow_metadata__.unwrap_or_default(),
cache_metadata: cache_metadata__.unwrap_or_default(),
dictionary_page_size_limit: dictionary_page_size_limit__.unwrap_or_default(),
data_page_row_count_limit: data_page_row_count_limit__.unwrap_or_default(),
max_row_group_size: max_row_group_size__.unwrap_or_default(),
Expand Down
3 changes: 0 additions & 3 deletions datafusion/proto-common/src/generated/prost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -795,9 +795,6 @@ pub struct ParquetOptions {
/// default = false
#[prost(bool, tag = "30")]
pub skip_arrow_metadata: bool,
/// default = false
#[prost(bool, tag = "33")]
pub cache_metadata: bool,
#[prost(uint64, tag = "12")]
pub dictionary_page_size_limit: u64,
#[prost(uint64, tag = "18")]
Expand Down
1 change: 0 additions & 1 deletion datafusion/proto-common/src/to_proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -842,7 +842,6 @@ impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions {
binary_as_string: value.binary_as_string,
skip_arrow_metadata: value.skip_arrow_metadata,
coerce_int96_opt: value.coerce_int96.clone().map(protobuf::parquet_options::CoerceInt96Opt::CoerceInt96),
cache_metadata: value.cache_metadata,
})
}
}
Expand Down
3 changes: 0 additions & 3 deletions datafusion/proto/src/generated/datafusion_proto_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -795,9 +795,6 @@ pub struct ParquetOptions {
/// default = false
#[prost(bool, tag = "30")]
pub skip_arrow_metadata: bool,
/// default = false
#[prost(bool, tag = "33")]
pub cache_metadata: bool,
#[prost(uint64, tag = "12")]
pub dictionary_page_size_limit: u64,
#[prost(uint64, tag = "18")]
Expand Down
2 changes: 0 additions & 2 deletions datafusion/proto/src/logical_plan/file_formats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,6 @@ impl TableParquetOptionsProto {
coerce_int96_opt: global_options.global.coerce_int96.map(|compression| {
parquet_options::CoerceInt96Opt::CoerceInt96(compression)
}),
cache_metadata: global_options.global.cache_metadata,
}),
column_specific_options: column_specific_options.into_iter().map(|(column_name, options)| {
ParquetColumnSpecificOptions {
Expand Down Expand Up @@ -505,7 +504,6 @@ impl From<&ParquetOptionsProto> for ParquetOptions {
coerce_int96: proto.coerce_int96_opt.as_ref().map(|opt| match opt {
parquet_options::CoerceInt96Opt::CoerceInt96(coerce_int96) => coerce_int96.clone(),
}),
cache_metadata: proto.cache_metadata,
}
}
}
Expand Down
2 changes: 0 additions & 2 deletions datafusion/sqllogictest/test_files/information_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,6 @@ datafusion.execution.parquet.bloom_filter_fpp NULL
datafusion.execution.parquet.bloom_filter_ndv NULL
datafusion.execution.parquet.bloom_filter_on_read true
datafusion.execution.parquet.bloom_filter_on_write false
datafusion.execution.parquet.cache_metadata false
datafusion.execution.parquet.coerce_int96 NULL
datafusion.execution.parquet.column_index_truncate_length 64
datafusion.execution.parquet.compression zstd(3)
Expand Down Expand Up @@ -345,7 +344,6 @@ datafusion.execution.parquet.bloom_filter_fpp NULL (writing) Sets bloom filter f
datafusion.execution.parquet.bloom_filter_ndv NULL (writing) Sets bloom filter number of distinct values. If NULL, uses default parquet writer setting
datafusion.execution.parquet.bloom_filter_on_read true (reading) Use any available bloom filters when reading parquet files
datafusion.execution.parquet.bloom_filter_on_write false (writing) Write bloom filters for all columns when creating parquet files
datafusion.execution.parquet.cache_metadata false (reading) Whether or not to enable the caching of embedded metadata of Parquet files (footer and page metadata). Enabling it can offer substantial performance improvements for repeated queries over large files. By default, the cache is automatically invalidated when the underlying file is modified.
datafusion.execution.parquet.coerce_int96 NULL (reading) If true, parquet reader will read columns of physical type int96 as originating from a different resolution than nanosecond. This is useful for reading data from systems like Spark which stores microsecond resolution timestamps in an int96 allowing it to write values with a larger date range than 64-bit timestamps with nanosecond resolution.
datafusion.execution.parquet.column_index_truncate_length 64 (writing) Sets column index truncate length
datafusion.execution.parquet.compression zstd(3) (writing) Sets default parquet compression codec. Valid values are: uncompressed, snappy, gzip(level), lzo, brotli(level), lz4, zstd(level), and lz4_raw. These values are not case sensitive. If NULL, uses default parquet writer setting Note that this default setting is not the same as the default parquet writer setting.
Expand Down
6 changes: 0 additions & 6 deletions datafusion/sqllogictest/test_files/parquet.slt
Original file line number Diff line number Diff line change
Expand Up @@ -766,10 +766,6 @@ OPTIONS (MAX_ROW_GROUP_SIZE 4096, DATA_PAGE_ROW_COUNT_LIMIT 2048);
----
20000

# Enable the cache
statement ok
set datafusion.execution.parquet.cache_metadata = true;

statement ok
CREATE EXTERNAL TABLE t
STORED AS PARQUET
Expand Down Expand Up @@ -867,5 +863,3 @@ select part, k, v from t order by k
statement ok
DROP TABLE t;

statement ok
set datafusion.execution.parquet.cache_metadata = false;
Loading