Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 19 additions & 12 deletions datafusion/core/src/datasource/listing/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ use datafusion_expr::Expr;
use object_store::ObjectMeta;

/// A metadata column that can be used to filter files
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum MetadataColumn {
/// The location of the file in object store
Location,
/// The location of the file in object store, with an optional prefix.
Location(Option<Arc<str>>),
/// The last modified timestamp of the file
LastModified,
/// The size of the file in bytes
Expand All @@ -60,7 +60,7 @@ impl MetadataColumn {
/// The name of the metadata column (one of `location`, `last_modified`, or `size`)
pub fn name(&self) -> &str {
match self {
MetadataColumn::Location => "location",
MetadataColumn::Location(_) => "location",
MetadataColumn::LastModified => "last_modified",
MetadataColumn::Size => "size",
}
Expand All @@ -69,7 +69,7 @@ impl MetadataColumn {
/// Returns the arrow type of this metadata column
pub fn arrow_type(&self) -> DataType {
match self {
MetadataColumn::Location => DataType::Utf8,
MetadataColumn::Location(_) => DataType::Utf8,
MetadataColumn::LastModified => {
DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into()))
}
Expand All @@ -85,8 +85,10 @@ impl MetadataColumn {
/// Returns the scalar value for this metadata column given an object meta
pub fn to_scalar_value(&self, meta: &ObjectMeta) -> ScalarValue {
match self {
MetadataColumn::Location => {
ScalarValue::Utf8(Some(meta.location.to_string()))
MetadataColumn::Location(prefix) => {
let location = meta.location.to_string();
let prefix = prefix.as_ref().map(|p| p.as_ref()).unwrap_or("");
ScalarValue::Utf8(Some(format!("{}{}", prefix, location)))
}
MetadataColumn::LastModified => ScalarValue::TimestampMicrosecond(
Some(meta.last_modified.timestamp_micros()),
Expand All @@ -98,7 +100,8 @@ impl MetadataColumn {

pub(crate) fn builder(&self, capacity: usize) -> MetadataBuilder {
match self {
MetadataColumn::Location => MetadataBuilder::Location(
MetadataColumn::Location(prefix) => MetadataBuilder::Location(
prefix.clone(),
StringBuilder::with_capacity(capacity, capacity * 10),
),
MetadataColumn::LastModified => MetadataBuilder::LastModified(
Expand All @@ -118,7 +121,7 @@ impl FromStr for MetadataColumn {

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"location" => Ok(MetadataColumn::Location),
"location" => Ok(MetadataColumn::Location(None)),
"last_modified" => Ok(MetadataColumn::LastModified),
"size" => Ok(MetadataColumn::Size),
_ => plan_err!(
Expand All @@ -130,15 +133,19 @@ impl FromStr for MetadataColumn {
}

pub(crate) enum MetadataBuilder {
Location(StringBuilder),
Location(Option<Arc<str>>, StringBuilder),
LastModified(TimestampMicrosecondBuilder),
Size(UInt64Builder),
}

impl MetadataBuilder {
pub fn append(&mut self, meta: &ObjectMeta) {
match self {
Self::Location(builder) => builder.append_value(&meta.location),
Self::Location(prefix, builder) => {
let location = meta.location.to_string();
let prefix = prefix.as_ref().map(|p| p.as_ref()).unwrap_or("");
builder.append_value(format!("{}{}", prefix, location))
}
Self::LastModified(builder) => {
builder.append_value(meta.last_modified.timestamp_micros())
}
Expand All @@ -148,7 +155,7 @@ impl MetadataBuilder {

pub fn finish(self) -> Arc<dyn Array> {
match self {
MetadataBuilder::Location(mut builder) => Arc::new(builder.finish()),
MetadataBuilder::Location(_, mut builder) => Arc::new(builder.finish()),
MetadataBuilder::LastModified(mut builder) => Arc::new(builder.finish()),
MetadataBuilder::Size(mut builder) => Arc::new(builder.finish()),
}
Expand Down
14 changes: 6 additions & 8 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -895,6 +895,10 @@ impl ListingTable {
fn metadata_column_names(&self) -> impl Iterator<Item = &str> {
self.options.metadata_cols.iter().map(|col| col.name())
}

fn metadata_columns(&self) -> &Vec<MetadataColumn> {
&self.options.metadata_cols
}
}

// Expressions can be used for extended columns (partition/metadata) pruning if they can be evaluated using
Expand Down Expand Up @@ -1021,10 +1025,7 @@ impl TableProvider for ListingTable {
.cloned()
.collect();

let metadata_cols = self
.metadata_column_names()
.filter_map(|c| MetadataColumn::from_str(c).ok())
.collect::<Vec<_>>();
let metadata_cols = self.metadata_columns().clone();

// create the execution plan
self.options
Expand Down Expand Up @@ -1215,10 +1216,7 @@ impl ListingTable {
.await?;
let file_list = stream::iter(file_list).flatten();

let metadata_cols = self
.metadata_column_names()
.map(MetadataColumn::from_str)
.collect::<Result<Vec<_>>>()?;
let metadata_cols = self.metadata_columns().clone();

// collect the statistics if required by the config + filter out files that don't match the metadata filters
let files = file_list
Expand Down
17 changes: 14 additions & 3 deletions datafusion/core/src/datasource/physical_plan/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

use std::{
borrow::Cow, collections::HashMap, fmt::Debug, marker::PhantomData, mem::size_of,
str::FromStr, sync::Arc, vec,
sync::Arc, vec,
};

use super::{get_projected_output_ordering, statistics::MinMaxStatistics};
Expand Down Expand Up @@ -449,6 +449,8 @@ pub struct ExtendedColumnProjector {
projected_metadata_indexes: Vec<usize>,
/// The schema of the table once the projection was applied.
projected_schema: SchemaRef,
/// Mapping between the column name and the metadata column.
metadata_map: HashMap<String, MetadataColumn>,
}

impl ExtendedColumnProjector {
Expand Down Expand Up @@ -478,11 +480,17 @@ impl ExtendedColumnProjector {
}
}

let mut metadata_map = HashMap::new();
for metadata_col in metadata_cols.iter() {
metadata_map.insert(metadata_col.name().to_string(), metadata_col.clone());
}

Self {
key_buffer_cache: Default::default(),
projected_partition_indexes,
projected_metadata_indexes,
projected_schema,
metadata_map,
}
}

Expand Down Expand Up @@ -554,8 +562,11 @@ impl ExtendedColumnProjector {
for &sidx in &self.projected_metadata_indexes {
// Get the metadata column type from the field name
let field_name = self.projected_schema.field(sidx).name();
let metadata_col = MetadataColumn::from_str(field_name).map_err(|e| {
DataFusionError::Execution(format!("Invalid metadata column: {}", e))
let metadata_col = self.metadata_map.get(field_name).ok_or_else(|| {
DataFusionError::Execution(format!(
"Invalid metadata column: {}",
field_name
))
})?;

// Convert metadata to scalar value based on the column type
Expand Down