Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create SchemaAdapter trait to map table schema to file schemas #1709

Merged
merged 3 commits into from
Jan 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 63 additions & 1 deletion datafusion/src/physical_plan/file_format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,14 @@ impl ExecutionPlan for AvroExec {
#[cfg(test)]
#[cfg(feature = "avro")]
mod tests {

use crate::datasource::file_format::{avro::AvroFormat, FileFormat};
use crate::datasource::object_store::local::{
local_object_reader_stream, local_unpartitioned_file, LocalFileSystem,
};
use crate::scalar::ScalarValue;
use arrow::datatypes::{DataType, Field, Schema};
use futures::StreamExt;
use sqlparser::ast::ObjectType::Schema;

use super::*;

Expand Down Expand Up @@ -228,6 +229,67 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn avro_exec_missing_column() -> Result<()> {
let testdata = crate::test_util::arrow_test_data();
let filename = format!("{}/avro/alltypes_plain.avro", testdata);
let actual_schema = AvroFormat {}
.infer_schema(local_object_reader_stream(vec![filename]))
.await?;

let mut fields = actual_schema.fields().clone();
fields.push(Field::new("missing_col", DataType::Int32, true));

let file_schema = Arc::new(Schema::new(fields));

let avro_exec = AvroExec::new(FileScanConfig {
object_store: Arc::new(LocalFileSystem {}),
file_groups: vec![vec![local_unpartitioned_file(filename.clone())]],
file_schema,
statistics: Statistics::default(),
// Include the missing column in the projection
projection: Some(vec![0, 1, 2, file_schema.fields().len()]),
limit: None,
table_partition_cols: vec![],
});
assert_eq!(avro_exec.output_partitioning().partition_count(), 1);

let mut results = avro_exec.execute(0).await.expect("plan execution failed");
let batch = results
.next()
.await
.expect("plan iterator empty")
.expect("plan iterator returned an error");

let expected = vec![
"+----+----------+-------------+-------------+",
"| id | bool_col | tinyint_col | missing_col |",
"+----+----------+-------------+-------------+",
"| 4 | true | 0 | |",
"| 5 | false | 1 | |",
"| 6 | true | 0 | |",
"| 7 | false | 1 | |",
"| 2 | true | 0 | |",
"| 3 | false | 1 | |",
"| 0 | true | 0 | |",
"| 1 | false | 1 | |",
"+----+----------+-------------+-------------+",
];

crate::assert_batches_eq!(expected, &[batch]);

let batch = results.next().await;
assert!(batch.is_none());

let batch = results.next().await;
assert!(batch.is_none());

let batch = results.next().await;
assert!(batch.is_none());

Ok(())
}

#[tokio::test]
async fn avro_exec_with_partition() -> Result<()> {
let testdata = crate::test_util::arrow_test_data();
Expand Down
47 changes: 47 additions & 0 deletions datafusion/src/physical_plan/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ impl ExecutionPlan for CsvExec {
#[cfg(test)]
mod tests {
use super::*;
use crate::test_util::aggr_test_schema_with_missing_col;
use crate::{
datasource::object_store::local::{local_unpartitioned_file, LocalFileSystem},
scalar::ScalarValue,
Expand Down Expand Up @@ -269,6 +270,52 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn csv_exec_with_missing_column() -> Result<()> {
let runtime = Arc::new(RuntimeEnv::default());
let file_schema = aggr_test_schema_with_missing_col();
let testdata = crate::test_util::arrow_test_data();
let filename = "aggregate_test_100.csv";
let path = format!("{}/csv/{}", testdata, filename);
let csv = CsvExec::new(
FileScanConfig {
object_store: Arc::new(LocalFileSystem {}),
file_schema,
file_groups: vec![vec![local_unpartitioned_file(path)]],
statistics: Statistics::default(),
projection: None,
limit: Some(5),
table_partition_cols: vec![],
},
true,
b',',
);
assert_eq!(14, csv.base_config.file_schema.fields().len());
assert_eq!(14, csv.projected_schema.fields().len());
assert_eq!(14, csv.schema().fields().len());

let mut it = csv.execute(0, runtime).await?;
let batch = it.next().await.unwrap()?;
assert_eq!(14, batch.num_columns());
assert_eq!(5, batch.num_rows());

let expected = vec![
"+----+----+-----+--------+------------+----------------------+-----+-------+------------+----------------------+-------------+---------------------+--------------------------------+-------------+",
"| c1 | c2 | c3 | c4 | c5 | c6 | c7 | c8 | c9 | c10 | c11 | c12 | c13 | missing_col |",
"+----+----+-----+--------+------------+----------------------+-----+-------+------------+----------------------+-------------+---------------------+--------------------------------+-------------+",
"| c | 2 | 1 | 18109 | 2033001162 | -6513304855495910254 | 25 | 43062 | 1491205016 | 5863949479783605708 | 0.110830784 | 0.9294097332465232 | 6WfVFBVGJSQb7FhA7E0lBwdvjfZnSW | |",
"| d | 5 | -40 | 22614 | 706441268 | -7542719935673075327 | 155 | 14337 | 3373581039 | 11720144131976083864 | 0.69632107 | 0.3114712539863804 | C2GT5KVyOPZpgKVl110TyZO0NcJ434 | |",
"| b | 1 | 29 | -18218 | 994303988 | 5983957848665088916 | 204 | 9489 | 3275293996 | 14857091259186476033 | 0.53840446 | 0.17909035118828576 | AyYVExXK6AR2qUTxNZ7qRHQOVGMLcz | |",
"| a | 1 | -85 | -15154 | 1171968280 | 1919439543497968449 | 77 | 52286 | 774637006 | 12101411955859039553 | 0.12285209 | 0.6864391962767343 | 0keZ5G8BffGwgF2RwQD59TFzMStxCB | |",
"| b | 5 | -82 | 22080 | 1824882165 | 7373730676428214987 | 208 | 34331 | 3342719438 | 3330177516592499461 | 0.82634634 | 0.40975383525297016 | Ig1QcuKsjHXkproePdERo2w0mYzIqd | |",
"+----+----+-----+--------+------------+----------------------+-----+-------+------------+----------------------+-------------+---------------------+--------------------------------+-------------+",
];

crate::assert_batches_eq!(expected, &[batch]);

Ok(())
}

#[tokio::test]
async fn csv_exec_with_partition() -> Result<()> {
let runtime = Arc::new(RuntimeEnv::default());
Expand Down
43 changes: 43 additions & 0 deletions datafusion/src/physical_plan/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ impl ExecutionPlan for NdJsonExec {

#[cfg(test)]
mod tests {
use arrow::array::Array;
use arrow::datatypes::{Field, Schema};
use futures::StreamExt;

use crate::datasource::{
Expand Down Expand Up @@ -211,6 +213,47 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn nd_json_exec_file_with_missing_column() -> Result<()> {
let runtime = Arc::new(RuntimeEnv::default());
use arrow::datatypes::DataType;
let path = format!("{}/1.json", TEST_DATA_BASE);

let actual_schema = infer_schema(path.clone()).await?;

let mut fields = actual_schema.fields().clone();
fields.push(Field::new("missing_col", DataType::Int32, true));
let missing_field_idx = fields.len() - 1;

let file_schema = Arc::new(Schema::new(fields));

let exec = NdJsonExec::new(FileScanConfig {
object_store: Arc::new(LocalFileSystem {}),
file_groups: vec![vec![local_unpartitioned_file(path.clone())]],
file_schema,
statistics: Statistics::default(),
projection: None,
limit: Some(3),
table_partition_cols: vec![],
});

let mut it = exec.execute(0, runtime).await?;
let batch = it.next().await.unwrap()?;

assert_eq!(batch.num_rows(), 3);
let values = batch
.column(missing_field_idx)
.as_any()
.downcast_ref::<arrow::array::Int32Array>()
.unwrap();
assert_eq!(values.len(), 3);
assert!(values.is_null(0));
assert!(values.is_null(1));
assert!(values.is_null(2));

Ok(())
}

#[tokio::test]
async fn nd_json_exec_file_projection() -> Result<()> {
let runtime = Arc::new(RuntimeEnv::default());
Expand Down
140 changes: 140 additions & 0 deletions datafusion/src/physical_plan/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,15 @@ pub use avro::AvroExec;
pub use csv::CsvExec;
pub use json::NdJsonExec;

use crate::error::DataFusionError;
use crate::{
datasource::{object_store::ObjectStore, PartitionedFile},
error::Result,
scalar::ScalarValue,
};
use arrow::array::new_null_array;
use lazy_static::lazy_static;
use log::info;
use std::{
collections::HashMap,
fmt::{Display, Formatter, Result as FmtResult},
Expand Down Expand Up @@ -165,6 +169,87 @@ impl<'a> Display for FileGroupsDisplay<'a> {
}
}

/// A utility which can adapt file-level record batches to a table schema which may have a schema
/// obtained from merging multiple file-level schemas.
///
/// This is useful for enabling schema evolution in partitioned datasets.
///
/// This has to be done in two stages.
///
/// 1. Before reading the file, we have to map projected column indexes from the table schema to
/// the file schema.
///
/// 2. After reading a record batch we need to map the read columns back to the expected columns
/// indexes and insert null-valued columns wherever the file schema was missing a colum present
/// in the table schema.
#[derive(Clone, Debug)]
pub(crate) struct SchemaAdapter {
/// Schema for the table
table_schema: SchemaRef,
}

impl SchemaAdapter {
pub(crate) fn new(table_schema: SchemaRef) -> SchemaAdapter {
Self { table_schema }
}

/// Map projected column indexes to the file schema. This will fail if the table schema
/// and the file schema contain a field with the same name and different types.
pub fn map_projections(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

&self,
file_schema: &Schema,
projections: &[usize],
) -> Result<Vec<usize>> {
let mut mapped: Vec<usize> = vec![];
for idx in projections {
let field = self.table_schema.field(*idx);
if let Ok(mapped_idx) = file_schema.index_of(field.name().as_str()) {
if file_schema.field(mapped_idx).data_type() == field.data_type() {
mapped.push(mapped_idx)
} else {
let msg = format!("Failed to map column projection for field {}. Incompatible data types {:?} and {:?}", field.name(), file_schema.field(mapped_idx).data_type(), field.data_type());
info!("{}", msg);
return Err(DataFusionError::Execution(msg));
}
}
}
Ok(mapped)
}

/// Re-order projected columns by index in record batch to match table schema column ordering. If the record
/// batch does not contain a column for an expected field, insert a null-valued column at the
/// required column index.
pub fn adapt_batch(
&self,
batch: RecordBatch,
projections: &[usize],
) -> Result<RecordBatch> {
let batch_rows = batch.num_rows();

let batch_schema = batch.schema();

let mut cols: Vec<ArrayRef> = Vec::with_capacity(batch.columns().len());
let batch_cols = batch.columns().to_vec();

for field_idx in projections {
let table_field = &self.table_schema.fields()[*field_idx];
if let Some((batch_idx, _name)) =
batch_schema.column_with_name(table_field.name().as_str())
{
cols.push(batch_cols[batch_idx].clone());
} else {
cols.push(new_null_array(table_field.data_type(), batch_rows))
}
}

let projected_schema = Arc::new(self.table_schema.clone().project(projections)?);

let merged_batch = RecordBatch::try_new(projected_schema, cols)?;

Ok(merged_batch)
}
}

/// A helper that projects partition columns into the file record batches.
///
/// One interesting trick is the usage of a cache for the key buffers of the partition column
Expand Down Expand Up @@ -467,6 +552,61 @@ mod tests {
crate::assert_batches_eq!(expected, &[projected_batch]);
}

#[test]
fn schema_adapter_adapt_projections() {
let table_schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Utf8, true),
Field::new("c2", DataType::Int64, true),
Field::new("c3", DataType::Int8, true),
]));

let file_schema = Schema::new(vec![
Field::new("c1", DataType::Utf8, true),
Field::new("c2", DataType::Int64, true),
]);

let file_schema_2 = Arc::new(Schema::new(vec![
Field::new("c3", DataType::Int8, true),
Field::new("c2", DataType::Int64, true),
]));

let file_schema_3 =
Arc::new(Schema::new(vec![Field::new("c3", DataType::Float32, true)]));

let adapter = SchemaAdapter::new(table_schema);

let projections1: Vec<usize> = vec![0, 1, 2];
let projections2: Vec<usize> = vec![2];

let mapped = adapter
.map_projections(&file_schema, projections1.as_slice())
.expect("mapping projections");

assert_eq!(mapped, vec![0, 1]);

let mapped = adapter
.map_projections(&file_schema, projections2.as_slice())
.expect("mapping projections");

assert!(mapped.is_empty());

let mapped = adapter
.map_projections(&file_schema_2, projections1.as_slice())
.expect("mapping projections");

assert_eq!(mapped, vec![1, 0]);

let mapped = adapter
.map_projections(&file_schema_2, projections2.as_slice())
.expect("mapping projections");

assert_eq!(mapped, vec![0]);

let mapped = adapter.map_projections(&file_schema_3, projections1.as_slice());

assert!(mapped.is_err());
}

// sets default for configs that play no role in projections
fn config_for_projection(
file_schema: SchemaRef,
Expand Down
Loading