Skip to content

Commit 5a0a7b4

Browse files
phillipleblancsgrebnov
authored andcommitted
Add prefix to location metadata column (#82)
UPSTREAM NOTE: This will not be upstreamed as is.
1 parent 0bfa63a commit 5a0a7b4

File tree

6 files changed

+87
-62
lines changed

6 files changed

+87
-62
lines changed

datafusion/catalog-listing/src/metadata.rs

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ mod tests {
118118
file.clone(),
119119
&[],
120120
&[
121-
MetadataColumn::Location,
121+
MetadataColumn::Location(None),
122122
MetadataColumn::Size,
123123
MetadataColumn::LastModified,
124124
],
@@ -165,9 +165,12 @@ mod tests {
165165

166166
// Test with location filter - matching
167167
let filter = col("location").eq(lit("test/file.parquet"));
168-
let result =
169-
apply_metadata_filters(file.clone(), &[filter], &[MetadataColumn::Location])
170-
.unwrap();
168+
let result = apply_metadata_filters(
169+
file.clone(),
170+
&[filter],
171+
&[MetadataColumn::Location(None)],
172+
)
173+
.unwrap();
171174

172175
// The file should match
173176
assert!(result.is_some());
@@ -178,18 +181,24 @@ mod tests {
178181

179182
// Test with location filter - not matching
180183
let filter = col("location").eq(lit("test/different.parquet"));
181-
let result =
182-
apply_metadata_filters(file.clone(), &[filter], &[MetadataColumn::Location])
183-
.unwrap();
184+
let result = apply_metadata_filters(
185+
file.clone(),
186+
&[filter],
187+
&[MetadataColumn::Location(None)],
188+
)
189+
.unwrap();
184190

185191
// The file should not match
186192
assert!(result.is_none());
187193

188194
// Test with location filter - partial match (contains)
189195
let filter = col("location").like(lit("%file.parquet"));
190-
let result =
191-
apply_metadata_filters(file.clone(), &[filter], &[MetadataColumn::Location])
192-
.unwrap();
196+
let result = apply_metadata_filters(
197+
file.clone(),
198+
&[filter],
199+
&[MetadataColumn::Location(None)],
200+
)
201+
.unwrap();
193202

194203
// The file should match
195204
assert!(result.is_some());
@@ -329,7 +338,7 @@ mod tests {
329338
let result = apply_metadata_filters(
330339
file.clone(),
331340
&[filter],
332-
&[MetadataColumn::Location, MetadataColumn::Size],
341+
&[MetadataColumn::Location(None), MetadataColumn::Size],
333342
)
334343
.unwrap();
335344

@@ -344,7 +353,7 @@ mod tests {
344353
let result = apply_metadata_filters(
345354
file.clone(),
346355
&[filter],
347-
&[MetadataColumn::Location, MetadataColumn::Size],
356+
&[MetadataColumn::Location(None), MetadataColumn::Size],
348357
)
349358
.unwrap();
350359

@@ -366,7 +375,7 @@ mod tests {
366375
file.clone(),
367376
&[filter],
368377
&[
369-
MetadataColumn::Location,
378+
MetadataColumn::Location(None),
370379
MetadataColumn::Size,
371380
MetadataColumn::LastModified,
372381
],
@@ -397,7 +406,7 @@ mod tests {
397406
let result = apply_metadata_filters(
398407
file.clone(),
399408
&[filter],
400-
&[MetadataColumn::Location, MetadataColumn::Size],
409+
&[MetadataColumn::Location(None), MetadataColumn::Size],
401410
)
402411
.unwrap();
403412

@@ -418,7 +427,7 @@ mod tests {
418427
file.clone(),
419428
&[filter],
420429
&[
421-
MetadataColumn::Location,
430+
MetadataColumn::Location(None),
422431
MetadataColumn::Size,
423432
MetadataColumn::LastModified,
424433
],
@@ -447,7 +456,7 @@ mod tests {
447456
let result = apply_metadata_filters(
448457
file.clone(),
449458
&[filter1, filter2],
450-
&[MetadataColumn::Location, MetadataColumn::Size],
459+
&[MetadataColumn::Location(None), MetadataColumn::Size],
451460
)
452461
.unwrap();
453462

@@ -461,7 +470,7 @@ mod tests {
461470
let result = apply_metadata_filters(
462471
file.clone(),
463472
&[filter1, filter2],
464-
&[MetadataColumn::Location, MetadataColumn::Size],
473+
&[MetadataColumn::Location(None), MetadataColumn::Size],
465474
)
466475
.unwrap();
467476

datafusion/core/src/datasource/listing/table.rs

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -567,7 +567,7 @@ impl ListingOptions {
567567
}
568568

569569
// Check for duplicate metadata columns
570-
if !seen.insert(*col) {
570+
if !seen.insert(col.clone()) {
571571
return plan_err!("Duplicate metadata column: {}", col);
572572
}
573573
}
@@ -919,6 +919,10 @@ impl ListingTable {
919919
fn metadata_column_names(&self) -> impl Iterator<Item = &str> {
920920
self.options.metadata_cols.iter().map(|col| col.name())
921921
}
922+
923+
fn metadata_columns(&self) -> &Vec<MetadataColumn> {
924+
&self.options.metadata_cols
925+
}
922926
}
923927

924928
// Expressions can be used for extended columns (partition/metadata) pruning if they can be evaluated using
@@ -1044,10 +1048,7 @@ impl TableProvider for ListingTable {
10441048
.cloned()
10451049
.collect();
10461050

1047-
let metadata_cols = self
1048-
.metadata_column_names()
1049-
.filter_map(|c| MetadataColumn::from_str(c).ok())
1050-
.collect::<Vec<_>>();
1051+
let metadata_cols = self.metadata_columns().clone();
10511052

10521053
// create the execution plan
10531054
self.options
@@ -1225,10 +1226,7 @@ impl ListingTable {
12251226
ctx.config_options().execution.meta_fetch_concurrency;
12261227
let file_list = stream::iter(file_list).flatten_unordered(meta_fetch_concurrency);
12271228

1228-
let metadata_cols = self
1229-
.metadata_column_names()
1230-
.map(MetadataColumn::from_str)
1231-
.collect::<Result<Vec<_>>>()?;
1229+
let metadata_cols = self.metadata_columns().clone();
12321230

12331231
// collect the statistics if required by the config + filter out files that don't match the metadata filters
12341232
let files = file_list
@@ -2573,7 +2571,7 @@ mod tests {
25732571
// Test valid case - all different metadata columns
25742572
let options = ListingOptions::new(Arc::new(CsvFormat::default()))
25752573
.with_metadata_cols(vec![
2576-
MetadataColumn::Location,
2574+
MetadataColumn::Location(None),
25772575
MetadataColumn::Size,
25782576
MetadataColumn::LastModified,
25792577
]);
@@ -2582,7 +2580,7 @@ mod tests {
25822580
// Test invalid case - duplicate metadata column
25832581
let options = ListingOptions::new(Arc::new(CsvFormat::default()))
25842582
.with_metadata_cols(vec![
2585-
MetadataColumn::Location,
2583+
MetadataColumn::Location(None),
25862584
MetadataColumn::Size,
25872585
MetadataColumn::Size, // Duplicate
25882586
]);
@@ -2600,7 +2598,7 @@ mod tests {
26002598
]));
26012599

26022600
let options = ListingOptions::new(Arc::new(CsvFormat::default()))
2603-
.with_metadata_cols(vec![MetadataColumn::Location]);
2601+
.with_metadata_cols(vec![MetadataColumn::Location(None)]);
26042602

26052603
let result = options.validate_metadata_cols(&schema_with_location);
26062604
assert!(result.is_err());

datafusion/core/tests/sql/path_partition.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -584,7 +584,7 @@ async fn test_metadata_columns() -> Result<()> {
584584
("day", DataType::Int32),
585585
],
586586
&[
587-
MetadataColumn::Location,
587+
MetadataColumn::Location(None),
588588
MetadataColumn::Size,
589589
MetadataColumn::LastModified,
590590
],
@@ -639,7 +639,7 @@ async fn test_metadata_columns_pushdown() -> Result<()> {
639639
("day", DataType::Int32),
640640
],
641641
&[
642-
MetadataColumn::Location,
642+
MetadataColumn::Location(None),
643643
MetadataColumn::Size,
644644
MetadataColumn::LastModified,
645645
],

datafusion/datasource/src/file_scan_config.rs

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
2121
use std::{
2222
any::Any, borrow::Cow, collections::HashMap, fmt::Debug, fmt::Formatter,
23-
fmt::Result as FmtResult, marker::PhantomData, str::FromStr, sync::Arc,
23+
fmt::Result as FmtResult, marker::PhantomData, mem::size_of, sync::Arc, vec,
2424
};
2525

2626
use arrow::{
@@ -1129,6 +1129,8 @@ pub struct ExtendedColumnProjector {
11291129
projected_metadata_indexes: Vec<usize>,
11301130
/// The schema of the table once the projection was applied.
11311131
projected_schema: SchemaRef,
1132+
/// Mapping between the column name and the metadata column.
1133+
metadata_map: HashMap<String, MetadataColumn>,
11321134
}
11331135

11341136
impl ExtendedColumnProjector {
@@ -1162,11 +1164,17 @@ impl ExtendedColumnProjector {
11621164
projected_metadata_indexes.sort();
11631165
}
11641166

1167+
let mut metadata_map = HashMap::new();
1168+
for metadata_col in metadata_cols.iter() {
1169+
metadata_map.insert(metadata_col.name().to_string(), metadata_col.clone());
1170+
}
1171+
11651172
Self {
11661173
key_buffer_cache: Default::default(),
11671174
projected_partition_indexes,
11681175
projected_metadata_indexes,
11691176
projected_schema,
1177+
metadata_map,
11701178
}
11711179
}
11721180

@@ -1238,8 +1246,11 @@ impl ExtendedColumnProjector {
12381246
for &sidx in &self.projected_metadata_indexes {
12391247
// Get the metadata column type from the field name
12401248
let field_name = self.projected_schema.field(sidx).name();
1241-
let metadata_col = MetadataColumn::from_str(field_name).map_err(|e| {
1242-
DataFusionError::Execution(format!("Invalid metadata column: {}", e))
1249+
let metadata_col = self.metadata_map.get(field_name).ok_or_else(|| {
1250+
DataFusionError::Execution(format!(
1251+
"Invalid metadata column: {}",
1252+
field_name
1253+
))
12431254
})?;
12441255

12451256
// Convert metadata to scalar value based on the column type
@@ -2416,7 +2427,7 @@ mod tests {
24162427
fn test_projected_schema_with_metadata_col() {
24172428
let file_schema = aggr_test_schema();
24182429
let metadata_cols = vec![
2419-
MetadataColumn::Location,
2430+
MetadataColumn::Location(None),
24202431
MetadataColumn::Size,
24212432
MetadataColumn::LastModified,
24222433
];
@@ -2450,7 +2461,7 @@ mod tests {
24502461
#[test]
24512462
fn test_projected_schema_with_projection_and_metadata_cols() {
24522463
let file_schema = aggr_test_schema();
2453-
let metadata_cols = vec![MetadataColumn::Location, MetadataColumn::Size];
2464+
let metadata_cols = vec![MetadataColumn::Location(None), MetadataColumn::Size];
24542465

24552466
// Create projection that includes only the first two columns from file schema plus metadata
24562467
let file_schema_len = file_schema.fields().len();
@@ -2492,7 +2503,7 @@ mod tests {
24922503
wrap_partition_type_in_dict(DataType::Int32),
24932504
),
24942505
]);
2495-
let metadata_cols = vec![MetadataColumn::Location, MetadataColumn::Size];
2506+
let metadata_cols = vec![MetadataColumn::Location(None), MetadataColumn::Size];
24962507

24972508
// Create config with partition and metadata columns
24982509
let conf = FileScanConfigBuilder::new(
@@ -2528,7 +2539,7 @@ mod tests {
25282539

25292540
// Create metadata columns
25302541
let metadata_cols = vec![
2531-
MetadataColumn::Location,
2542+
MetadataColumn::Location(None),
25322543
MetadataColumn::Size,
25332544
MetadataColumn::LastModified,
25342545
];
@@ -2619,7 +2630,7 @@ mod tests {
26192630
];
26202631

26212632
// Create metadata columns
2622-
let metadata_cols = vec![MetadataColumn::Location, MetadataColumn::Size];
2633+
let metadata_cols = vec![MetadataColumn::Location(None), MetadataColumn::Size];
26232634

26242635
// Create test object metadata
26252636
let object_meta = create_test_object_meta("bucket/file.parquet", 1024);

datafusion/datasource/src/file_stream.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1094,7 +1094,7 @@ mod tests {
10941094
let mut projector = ExtendedColumnProjector::new(
10951095
schema_with_metadata.clone(),
10961096
&[],
1097-
&[MetadataColumn::Location, MetadataColumn::Size],
1097+
&[MetadataColumn::Location(None), MetadataColumn::Size],
10981098
);
10991099

11001100
// Project the batch
@@ -1132,7 +1132,7 @@ mod tests {
11321132
let mut projector = ExtendedColumnProjector::new(
11331133
schema_combined.clone(),
11341134
&["year".to_string()],
1135-
&[MetadataColumn::Location, MetadataColumn::Size],
1135+
&[MetadataColumn::Location(None), MetadataColumn::Size],
11361136
);
11371137

11381138
// Project the batch
@@ -1172,7 +1172,7 @@ mod tests {
11721172
let mut projector = ExtendedColumnProjector::new(
11731173
schema_mixed.clone(),
11741174
&["year".to_string()],
1175-
&[MetadataColumn::Location, MetadataColumn::Size],
1175+
&[MetadataColumn::Location(None), MetadataColumn::Size],
11761176
);
11771177

11781178
// We need to reorder the file batch to match the expected file columns in the mixed schema

0 commit comments

Comments
 (0)