Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/main' into alamb/expression_api
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed May 7, 2024
2 parents 95d6739 + 9fd697c commit 77b27cc
Show file tree
Hide file tree
Showing 89 changed files with 1,924 additions and 1,890 deletions.
2 changes: 1 addition & 1 deletion datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ pub async fn exec_from_lines(
Ok(_) => {}
Err(err) => eprintln!("{err}"),
}
query = "".to_owned();
query = "".to_string();
} else {
query.push('\n');
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/rewrite_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub fn main() -> Result<()> {
let config = OptimizerContext::default().with_skip_failing_rules(false);
let analyzer = Analyzer::with_rules(vec![Arc::new(MyAnalyzerRule {})]);
let analyzed_plan =
analyzer.execute_and_check(&logical_plan, config.options(), |_, _| {})?;
analyzer.execute_and_check(logical_plan, config.options(), |_, _| {})?;
println!(
"Analyzed Logical Plan:\n\n{}\n",
analyzed_plan.display_indent()
Expand Down
12 changes: 9 additions & 3 deletions datafusion/common/src/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -625,17 +625,23 @@ impl<T> Transformed<T> {
Self::new(data, false, TreeNodeRecursion::Continue)
}

/// Applies the given `f` to the data of this [`Transformed`] object.
/// Applies an infallible `f` to the data of this [`Transformed`] object,
/// without modifying the `transformed` flag.
pub fn update_data<U, F: FnOnce(T) -> U>(self, f: F) -> Transformed<U> {
Transformed::new(f(self.data), self.transformed, self.tnr)
}

/// Maps the data of [`Transformed`] object to the result of the given `f`.
/// Applies a fallible `f` (returns `Result`) to the data of this
/// [`Transformed`] object, without modifying the `transformed` flag.
pub fn map_data<U, F: FnOnce(T) -> Result<U>>(self, f: F) -> Result<Transformed<U>> {
f(self.data).map(|data| Transformed::new(data, self.transformed, self.tnr))
}

/// Maps the [`Transformed`] object to the result of the given `f`.
/// Applies a fallible transforming `f` to the data of this [`Transformed`]
/// object.
///
/// The returned `Transformed` object has the `transformed` flag set if either
/// `self` or the return value of `f` have the `transformed` flag set.
pub fn transform_data<U, F: FnOnce(T) -> Result<Transformed<U>>>(
self,
f: F,
Expand Down
48 changes: 0 additions & 48 deletions datafusion/common/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,34 +525,6 @@ pub fn list_ndims(data_type: &DataType) -> u64 {
}
}

/// An extension trait for smart pointers. Provides an interface to get a
/// raw pointer to the data (with metadata stripped away).
///
/// This is useful to see if two smart pointers point to the same allocation.
pub trait DataPtr {
/// Returns a raw pointer to the data, stripping away all metadata.
fn data_ptr(this: &Self) -> *const ();

/// Check if two pointers point to the same data.
fn data_ptr_eq(this: &Self, other: &Self) -> bool {
// Discard pointer metadata (including the v-table).
let this = Self::data_ptr(this);
let other = Self::data_ptr(other);

std::ptr::eq(this, other)
}
}

// Currently, it's brittle to compare `Arc`s of dyn traits with `Arc::ptr_eq`
// due to this check including v-table equality. It may be possible to use
// `Arc::ptr_eq` directly if a fix to https://github.com/rust-lang/rust/issues/103763
// is stabilized.
impl<T: ?Sized> DataPtr for Arc<T> {
fn data_ptr(this: &Self) -> *const () {
Arc::as_ptr(this) as *const ()
}
}

/// Adopted from strsim-rs for string similarity metrics
pub mod datafusion_strsim {
// Source: https://github.com/dguo/strsim-rs/blob/master/src/lib.rs
Expand Down Expand Up @@ -974,26 +946,6 @@ mod tests {
assert_eq!(longest_consecutive_prefix([1, 2, 3, 4]), 0);
}

#[test]
fn arc_data_ptr_eq() {
let x = Arc::new(());
let y = Arc::new(());
let y_clone = Arc::clone(&y);

assert!(
Arc::data_ptr_eq(&x, &x),
"same `Arc`s should point to same data"
);
assert!(
!Arc::data_ptr_eq(&x, &y),
"different `Arc`s should point to different data"
);
assert!(
Arc::data_ptr_eq(&y, &y_clone),
"cloned `Arc` should point to same data as the original"
);
}

#[test]
fn test_merge_and_order_indices() {
assert_eq!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,13 +203,9 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
Arc::new(builder.finish())
}

fn build_primitive_array<T: ArrowPrimitiveType + Resolver>(
&self,
rows: RecordSlice,
col_name: &str,
) -> ArrayRef
fn build_primitive_array<T>(&self, rows: RecordSlice, col_name: &str) -> ArrayRef
where
T: ArrowNumericType,
T: ArrowNumericType + Resolver,
T::Native: num_traits::cast::NumCast,
{
Arc::new(
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ impl BatchSerializer for JsonSerializer {
pub struct JsonSink {
/// Config options for writing data
config: FileSinkConfig,
///
/// Writer options for underlying Json writer
writer_options: JsonWriterOptions,
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ async fn fetch_statistics(
pub struct ParquetSink {
/// Config options for writing data
config: FileSinkConfig,
///
/// Underlying parquet options
parquet_options: TableParquetOptions,
/// File metadata from successfully produced parquet files. The Mutex is only used
/// to allow inserting to HashMap from behind borrowed reference in DataSink::write_all.
Expand Down
13 changes: 0 additions & 13 deletions datafusion/core/src/datasource/physical_plan/file_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -519,16 +519,13 @@ mod tests {
use std::sync::Arc;

use super::*;
use crate::datasource::file_format::write::BatchSerializer;
use crate::datasource::object_store::ObjectStoreUrl;
use crate::prelude::SessionContext;
use crate::test::{make_partition, object_store::register_test_store};

use arrow_schema::Schema;
use datafusion_common::{internal_err, Statistics};

use bytes::Bytes;

/// Test `FileOpener` which will simulate errors during file opening or scanning
#[derive(Default)]
struct TestOpener {
Expand Down Expand Up @@ -974,14 +971,4 @@ mod tests {

Ok(())
}

struct TestSerializer {
bytes: Bytes,
}

impl BatchSerializer for TestSerializer {
fn serialize(&self, _batch: RecordBatch, _initial: bool) -> Result<Bytes> {
Ok(self.bytes.clone())
}
}
}
3 changes: 2 additions & 1 deletion datafusion/core/src/datasource/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ impl TableProvider for StreamTable {
_state: &SessionState,
projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let projected_schema = match projection {
Some(p) => {
Expand All @@ -258,6 +258,7 @@ impl TableProvider for StreamTable {
projection,
projected_schema,
true,
limit,
)?))
}

Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,15 @@ impl TableProvider for StreamingTable {
_state: &SessionState,
projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
// TODO: push limit down
Ok(Arc::new(StreamingTableExec::try_new(
self.schema.clone(),
self.partitions.clone(),
projection,
None,
self.infinite,
limit,
)?))
}
}
26 changes: 0 additions & 26 deletions datafusion/core/src/execution/context/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,29 +57,3 @@ impl SessionContext {
Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;

use async_trait::async_trait;

// Test for compilation error when calling read_* functions from an #[async_trait] function.
// See https://github.com/apache/datafusion/issues/1154
#[async_trait]
trait CallReadTrait {
async fn call_read_avro(&self) -> DataFrame;
}

struct CallRead {}

#[async_trait]
impl CallReadTrait for CallRead {
async fn call_read_avro(&self) -> DataFrame {
let ctx = SessionContext::new();
ctx.read_avro("dummy", AvroReadOptions::default())
.await
.unwrap()
}
}
}
18 changes: 0 additions & 18 deletions datafusion/core/src/execution/context/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ mod tests {
use crate::assert_batches_eq;
use crate::test_util::{plan_and_collect, populate_csv_partitions};

use async_trait::async_trait;
use tempfile::TempDir;

#[tokio::test]
Expand Down Expand Up @@ -125,21 +124,4 @@ mod tests {

Ok(())
}

// Test for compilation error when calling read_* functions from an #[async_trait] function.
// See https://github.com/apache/datafusion/issues/1154
#[async_trait]
trait CallReadTrait {
async fn call_read_csv(&self) -> DataFrame;
}

struct CallRead {}

#[async_trait]
impl CallReadTrait for CallRead {
async fn call_read_csv(&self) -> DataFrame {
let ctx = SessionContext::new();
ctx.read_csv("dummy", CsvReadOptions::new()).await.unwrap()
}
}
}
45 changes: 30 additions & 15 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//! [`SessionContext`] contains methods for registering data sources and executing queries
//! [`SessionContext`] API for registering data sources and executing queries
use std::collections::{hash_map::Entry, HashMap, HashSet};
use std::fmt::Debug;
Expand Down Expand Up @@ -44,6 +44,7 @@ use crate::{
error::{DataFusionError, Result},
execution::{options::ArrowReadOptions, runtime_env::RuntimeEnv, FunctionRegistry},
logical_expr::AggregateUDF,
logical_expr::ScalarUDF,
logical_expr::{
CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateFunction,
CreateMemoryTable, CreateView, DropCatalogSchema, DropFunction, DropTable,
Expand All @@ -53,7 +54,7 @@ use crate::{
optimizer::analyzer::{Analyzer, AnalyzerRule},
optimizer::optimizer::{Optimizer, OptimizerConfig, OptimizerRule},
physical_optimizer::optimizer::{PhysicalOptimizer, PhysicalOptimizerRule},
physical_plan::{udf::ScalarUDF, ExecutionPlan},
physical_plan::ExecutionPlan,
physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner},
variable::{VarProvider, VarType},
};
Expand Down Expand Up @@ -221,17 +222,29 @@ where
///
/// # `SessionContext`, `SessionState`, and `TaskContext`
///
/// A [`SessionContext`] can be created from a [`SessionConfig`] and
/// stores the state for a particular query session. A single
/// [`SessionContext`] can run multiple queries.
/// The state required to optimize, and evaluate queries is
/// broken into three levels to allow tailoring
///
/// [`SessionState`] contains information available during query
/// planning (creating [`LogicalPlan`]s and [`ExecutionPlan`]s).
/// The objects are:
///
/// [`TaskContext`] contains the state available during query
/// execution [`ExecutionPlan::execute`]. It contains a subset of the
/// information in[`SessionState`] and is created from a
/// [`SessionContext`] or a [`SessionState`].
/// 1. [`SessionContext`]: Most users should use a `SessionContext`. It contains
/// all information required to execute queries including high level APIs such
/// as [`SessionContext::sql`]. All queries run with the same `SessionContext`
/// share the same configuration and resources (e.g. memory limits).
///
/// 2. [`SessionState`]: contains information required to plan and execute an
/// individual query (e.g. creating a [`LogicalPlan`] or [`ExecutionPlan`]).
/// Each query is planned and executed using its own `SessionState`, which can
/// be created with [`SessionContext::state`]. `SessionState` allows finer
/// grained control over query execution, for example disallowing DDL operations
/// such as `CREATE TABLE`.
///
/// 3. [`TaskContext`] contains the state required for query execution (e.g.
/// [`ExecutionPlan::execute`]). It contains a subset of information in
/// [`SessionState`]. `TaskContext` allows executing [`ExecutionPlan`]s
/// [`PhysicalExpr`]s without requiring a full [`SessionState`].
///
/// [`PhysicalExpr`]: crate::physical_expr::PhysicalExpr
#[derive(Clone)]
pub struct SessionContext {
/// UUID for the session
Expand Down Expand Up @@ -1905,7 +1918,7 @@ impl SessionState {

// analyze & capture output of each rule
let analyzer_result = self.analyzer.execute_and_check(
e.plan.as_ref(),
e.plan.as_ref().clone(),
self.options(),
|analyzed_plan, analyzer| {
let analyzer_name = analyzer.name().to_string();
Expand Down Expand Up @@ -1964,9 +1977,11 @@ impl SessionState {
logical_optimization_succeeded,
}))
} else {
let analyzed_plan =
self.analyzer
.execute_and_check(plan, self.options(), |_, _| {})?;
let analyzed_plan = self.analyzer.execute_and_check(
plan.clone(),
self.options(),
|_, _| {},
)?;
self.optimizer.optimize(analyzed_plan, self, |_, _| {})
}
}
Expand Down
20 changes: 0 additions & 20 deletions datafusion/core/src/execution/context/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ mod tests {
use datafusion_common::config::TableParquetOptions;
use datafusion_execution::config::SessionConfig;

use async_trait::async_trait;
use tempfile::tempdir;

#[tokio::test]
Expand Down Expand Up @@ -331,23 +330,4 @@ mod tests {
assert_eq!(total_rows, 5);
Ok(())
}

// Test for compilation error when calling read_* functions from an #[async_trait] function.
// See https://github.com/apache/datafusion/issues/1154
#[async_trait]
trait CallReadTrait {
async fn call_read_parquet(&self) -> DataFrame;
}

struct CallRead {}

#[async_trait]
impl CallReadTrait for CallRead {
async fn call_read_parquet(&self) -> DataFrame {
let ctx = SessionContext::new();
ctx.read_parquet("dummy", ParquetReadOptions::default())
.await
.unwrap()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ fn adjust_input_keys_ordering(
} else {
// By default, push down the parent requirements to children
for child in requirements.children.iter_mut() {
child.data = requirements.data.clone();
child.data.clone_from(&requirements.data);
}
}
Ok(Transformed::yes(requirements))
Expand Down
Loading

0 comments on commit 77b27cc

Please sign in to comment.