Skip to content

Commit

Permalink
Minor: Move project_schema to datafusion_common (#7237)
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb authored Aug 9, 2023
1 parent a76c3d3 commit 61329a0
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 52 deletions.
1 change: 1 addition & 0 deletions datafusion/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub use scalar::{ScalarType, ScalarValue};
pub use schema_reference::{OwnedSchemaReference, SchemaReference};
pub use stats::{ColumnStatistics, Statistics};
pub use table_reference::{OwnedTableReference, ResolvedTableReference, TableReference};
pub use utils::project_schema;

/// Downcast an Arrow Array to a concrete type, return an `DataFusionError::Internal` if the cast is
/// not possible. In normal usage of DataFusion the downcast should always succeed.
Expand Down
42 changes: 41 additions & 1 deletion datafusion/common/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::{DataFusionError, Result, ScalarValue};
use arrow::array::{ArrayRef, PrimitiveArray};
use arrow::compute;
use arrow::compute::{lexicographical_partition_ranges, SortColumn, SortOptions};
use arrow::datatypes::UInt32Type;
use arrow::datatypes::{SchemaRef, UInt32Type};
use arrow::record_batch::RecordBatch;
use sqlparser::ast::Ident;
use sqlparser::dialect::GenericDialect;
Expand All @@ -31,6 +31,46 @@ use std::cmp::Ordering;
use std::ops::Range;
use std::sync::Arc;

/// Applies an optional projection to a [`SchemaRef`], returning the
/// projected schema
///
/// Example:
/// ```
/// use arrow::datatypes::{SchemaRef, Schema, Field, DataType};
/// use datafusion_common::project_schema;
///
/// // Schema with columns 'a', 'b', and 'c'
/// let schema = SchemaRef::new(Schema::new(vec![
/// Field::new("a", DataType::Int32, true),
/// Field::new("b", DataType::Int64, true),
/// Field::new("c", DataType::Utf8, true),
/// ]));
///
/// // Pick columns 'c' and 'b'
/// let projection = Some(vec![2,1]);
/// let projected_schema = project_schema(
/// &schema,
/// projection.as_ref()
/// ).unwrap();
///
/// let expected_schema = SchemaRef::new(Schema::new(vec![
/// Field::new("c", DataType::Utf8, true),
/// Field::new("b", DataType::Int64, true),
/// ]));
///
/// assert_eq!(projected_schema, expected_schema);
/// ```
pub fn project_schema(
schema: &SchemaRef,
projection: Option<&Vec<usize>>,
) -> Result<SchemaRef> {
let schema = match projection {
Some(columns) => Arc::new(schema.project(columns)?),
None => Arc::clone(schema),
};
Ok(schema)
}

/// Given column vectors, returns row at `idx`.
pub fn get_row_at_idx(columns: &[ArrayRef], idx: usize) -> Result<Vec<ScalarValue>> {
columns
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/empty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ use std::sync::Arc;

use arrow::datatypes::*;
use async_trait::async_trait;
use datafusion_common::project_schema;

use crate::datasource::{TableProvider, TableType};
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::logical_expr::Expr;
use crate::physical_plan::project_schema;
use crate::physical_plan::{empty::EmptyExec, ExecutionPlan};

/// An empty plan that is useful for testing and generating plans
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use arrow::datatypes::{DataType, Field, SchemaBuilder, SchemaRef};
use arrow_schema::Schema;
use async_trait::async_trait;
use dashmap::DashMap;
use datafusion_common::{plan_err, SchemaExt, ToDFSchema};
use datafusion_common::{plan_err, project_schema, SchemaExt, ToDFSchema};
use datafusion_expr::expr::Sort;
use datafusion_optimizer::utils::conjunction;
use datafusion_physical_expr::{create_physical_expr, LexOrdering, PhysicalSortExpr};
Expand All @@ -50,7 +50,7 @@ use crate::{
error::{DataFusionError, Result},
execution::context::SessionState,
logical_expr::Expr,
physical_plan::{empty::EmptyExec, project_schema, ExecutionPlan, Statistics},
physical_plan::{empty::EmptyExec, ExecutionPlan, Statistics},
};

use super::PartitionedFile;
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/physical_plan/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
use super::expressions::PhysicalSortExpr;
use super::{
common, project_schema, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
RecordBatchStream, SendableRecordBatchStream, Statistics,
common, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
SendableRecordBatchStream, Statistics,
};
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use core::fmt;
use datafusion_common::Result;
use datafusion_common::{project_schema, Result};
use std::any::Any;
use std::sync::Arc;
use std::task::{Context, Poll};
Expand Down
41 changes: 1 addition & 40 deletions datafusion/core/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -660,46 +660,6 @@ use datafusion_physical_expr::{
pub use datafusion_physical_expr::{AggregateExpr, PhysicalExpr};
use datafusion_physical_expr::{EquivalenceProperties, PhysicalSortRequirement};

/// Applies an optional projection to a [`SchemaRef`], returning the
/// projected schema
///
/// Example:
/// ```
/// use arrow::datatypes::{SchemaRef, Schema, Field, DataType};
/// use datafusion::physical_plan::project_schema;
///
/// // Schema with columns 'a', 'b', and 'c'
/// let schema = SchemaRef::new(Schema::new(vec![
/// Field::new("a", DataType::Int32, true),
/// Field::new("b", DataType::Int64, true),
/// Field::new("c", DataType::Utf8, true),
/// ]));
///
/// // Pick columns 'c' and 'b'
/// let projection = Some(vec![2,1]);
/// let projected_schema = project_schema(
/// &schema,
/// projection.as_ref()
/// ).unwrap();
///
/// let expected_schema = SchemaRef::new(Schema::new(vec![
/// Field::new("c", DataType::Utf8, true),
/// Field::new("b", DataType::Int64, true),
/// ]));
///
/// assert_eq!(projected_schema, expected_schema);
/// ```
pub fn project_schema(
schema: &SchemaRef,
projection: Option<&Vec<usize>>,
) -> Result<SchemaRef> {
let schema = match projection {
Some(columns) => Arc::new(schema.project(columns)?),
None => Arc::clone(schema),
};
Ok(schema)
}

pub mod aggregates;
pub mod analyze;
pub mod coalesce_batches;
Expand Down Expand Up @@ -728,6 +688,7 @@ pub mod windows;

use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
pub use datafusion_common::utils::project_schema;
use datafusion_execution::TaskContext;
pub use datafusion_physical_expr::{expressions, functions, hash_utils, udf};

Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/tests/custom_sources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use datafusion::logical_expr::{
use datafusion::physical_plan::empty::EmptyExec;
use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::{
project_schema, ColumnStatistics, DisplayAs, ExecutionPlan, Partitioning,
RecordBatchStream, SendableRecordBatchStream, Statistics,
ColumnStatistics, DisplayAs, ExecutionPlan, Partitioning, RecordBatchStream,
SendableRecordBatchStream, Statistics,
};
use datafusion::scalar::ScalarValue;
use datafusion::{
Expand All @@ -37,6 +37,7 @@ use datafusion::{
use datafusion::{error::Result, physical_plan::DisplayFormatType};

use datafusion_common::cast::as_primitive_array;
use datafusion_common::project_schema;
use futures::stream::Stream;
use std::any::Any;
use std::pin::Pin;
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/tests/custom_sources_cases/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,16 @@ use datafusion::{
error::Result,
logical_expr::Expr,
physical_plan::{
expressions::PhysicalSortExpr, project_schema, ColumnStatistics, DisplayAs,
DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
Statistics,
expressions::PhysicalSortExpr, ColumnStatistics, DisplayAs, DisplayFormatType,
ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
},
prelude::SessionContext,
scalar::ScalarValue,
};

use async_trait::async_trait;
use datafusion::execution::context::{SessionState, TaskContext};
use datafusion_common::project_schema;

/// This is a testing structure for statistics
/// It will act both as a table provider and execution plan
Expand Down

0 comments on commit 61329a0

Please sign in to comment.