Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into support_inlist_#3031
Browse files Browse the repository at this point in the history
  • Loading branch information
liukun4515 committed Aug 27, 2022
2 parents 7254c52 + b1db5ff commit aa4fe6c
Show file tree
Hide file tree
Showing 30 changed files with 755 additions and 228 deletions.
4 changes: 2 additions & 2 deletions benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
[package]
name = "datafusion-benchmarks"
description = "DataFusion Benchmarks"
version = "5.0.0"
version = "11.0.0"
edition = "2021"
authors = ["Apache Arrow <dev@arrow.apache.org>"]
homepage = "https://github.com/apache/arrow-datafusion"
repository = "https://github.com/apache/arrow-datafusion"
license = "Apache-2.0"
publish = false
rust-version = "1.59"
rust-version = "1.62"

[features]
simd = ["datafusion/simd"]
Expand Down
2 changes: 2 additions & 0 deletions benchmarks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ The benchmark program also supports CSV and Parquet input file formats and a uti
cargo run --release --bin tpch -- convert --input ./data --output /mnt/tpch-parquet --format parquet
```

Or if you want to verify and run all the queries in the benchmark, you can just run `cargo test`.

## Expected output

The result of query 1 should produce the following output when executed against the SF=1 dataset.
Expand Down
25 changes: 2 additions & 23 deletions benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,6 @@ async fn convert_tbl(opt: ConvertOpt) -> Result<()> {

// create the physical plan
let csv = csv.to_logical_plan()?;
let csv = ctx.optimize(&csv)?;
let csv = ctx.create_physical_plan(&csv).await?;

let output_path = output_root_path.join(table);
Expand Down Expand Up @@ -702,9 +701,8 @@ mod tests {
run_query(1).await
}

#[ignore] // https://github.com/apache/arrow-datafusion/issues/159
#[tokio::test]
async fn run_2() -> Result<()> {
async fn run_q2() -> Result<()> {
run_query(2).await
}

Expand All @@ -713,7 +711,6 @@ mod tests {
run_query(3).await
}

#[ignore] // https://github.com/apache/arrow-datafusion/issues/160
#[tokio::test]
async fn run_q4() -> Result<()> {
run_query(4).await
Expand Down Expand Up @@ -749,7 +746,6 @@ mod tests {
run_query(10).await
}

#[ignore] // https://github.com/apache/arrow-datafusion/issues/163
#[tokio::test]
async fn run_q11() -> Result<()> {
run_query(11).await
Expand Down Expand Up @@ -781,7 +777,6 @@ mod tests {
run_query(16).await
}

#[ignore] // https://github.com/apache/arrow-datafusion/issues/168
#[tokio::test]
async fn run_q17() -> Result<()> {
run_query(17).await
Expand All @@ -792,12 +787,12 @@ mod tests {
run_query(18).await
}

#[ignore] // maybe it works, but if it never finishes, how can you tell?
#[tokio::test]
async fn run_q19() -> Result<()> {
run_query(19).await
}

#[ignore] // https://github.com/apache/arrow-datafusion/issues/171
#[tokio::test]
async fn run_q20() -> Result<()> {
run_query(20).await
Expand All @@ -809,7 +804,6 @@ mod tests {
run_query(21).await
}

#[ignore] // https://github.com/apache/arrow-datafusion/issues/175
#[tokio::test]
async fn run_q22() -> Result<()> {
run_query(22).await
Expand All @@ -821,21 +815,6 @@ mod tests {
return "NULL".to_string();
}

// Special case ListArray as there is no pretty print support for it yet
if let DataType::FixedSizeList(_, n) = column.data_type() {
let array = column
.as_any()
.downcast_ref::<FixedSizeListArray>()
.unwrap()
.value(row_index);

let mut r = Vec::with_capacity(*n as usize);
for i in 0..*n {
r.push(col_str(&array, i as usize));
}
return format!("[{}]", r.join(","));
}

array_value_to_string(column, row_index).unwrap()
}

Expand Down
4 changes: 2 additions & 2 deletions datafusion-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@
[package]
name = "datafusion-examples"
description = "DataFusion usage examples"
version = "5.0.0"
version = "11.0.0"
homepage = "https://github.com/apache/arrow-datafusion"
repository = "https://github.com/apache/arrow-datafusion"
authors = ["Apache Arrow <dev@arrow.apache.org>"]
license = "Apache-2.0"
keywords = [ "arrow", "query", "sql" ]
edition = "2021"
publish = false
rust-version = "1.59"
rust-version = "1.62"

[[example]]
name = "avro_sql"
Expand Down
2 changes: 1 addition & 1 deletion datafusion/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pyarrow = ["pyo3"]
apache-avro = { version = "0.14", features = ["snappy"], optional = true }
arrow = { version = "20.0.0", features = ["prettyprint"] }
avro-rs = { version = "0.13", features = ["snappy"], optional = true }
cranelift-module = { version = "0.86.1", optional = true }
cranelift-module = { version = "0.87.0", optional = true }
object_store = { version = "0.4", optional = true }
ordered-float = "3.0"
parquet = { version = "20.0.0", features = ["arrow"], optional = true }
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,10 @@ impl FileFormat for CsvFormat {
Some(records_to_read),
self.has_header,
)?;
schemas.push(schema.clone());
if records_read == 0 {
continue;
}
schemas.push(schema.clone());
records_to_read -= records_read;
if records_to_read == 0 {
break;
Expand Down
68 changes: 66 additions & 2 deletions datafusion/core/src/datasource/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::{any::Any, sync::Arc};

use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use datafusion_expr::LogicalPlanBuilder;

use crate::{
error::Result,
Expand Down Expand Up @@ -81,14 +82,36 @@ impl TableProvider for ViewTable {
async fn scan(
&self,
state: &SessionState,
_projection: &Option<Vec<usize>>,
projection: &Option<Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
// clone state and start_execution so that now() works in views
let mut state_cloned = state.clone();
state_cloned.execution_props.start_execution();
state_cloned.create_physical_plan(&self.logical_plan).await
if let Some(projection) = projection {
// avoiding adding a redundant projection (e.g. SELECT * FROM view)
let current_projection =
(0..self.logical_plan.schema().fields().len()).collect::<Vec<usize>>();
if projection == &current_projection {
state_cloned.create_physical_plan(&self.logical_plan).await
} else {
let fields: Vec<Expr> = projection
.iter()
.map(|i| {
Expr::Column(
self.logical_plan.schema().field(*i).qualified_column(),
)
})
.collect();
let plan = LogicalPlanBuilder::from(self.logical_plan.clone())
.project(fields)?
.build()?;
state_cloned.create_physical_plan(&plan).await
}
} else {
state_cloned.create_physical_plan(&self.logical_plan).await
}
}
}

Expand All @@ -99,6 +122,47 @@ mod tests {

use super::*;

#[tokio::test]
async fn issue_3242() -> Result<()> {
// regression test for https://github.com/apache/arrow-datafusion/pull/3242
let session_ctx = SessionContext::with_config(
SessionConfig::new().with_information_schema(true),
);

session_ctx
.sql("create view v as select 1 as a, 2 as b, 3 as c")
.await?
.collect()
.await?;

let results = session_ctx
.sql("select * from (select b from v)")
.await?
.collect()
.await?;

let expected = vec!["+---+", "| b |", "+---+", "| 2 |", "+---+"];

assert_batches_eq!(expected, &results);

Ok(())
}

#[tokio::test]
async fn create_view_return_empty_dataframe() -> Result<()> {
let session_ctx = SessionContext::new();

let df = session_ctx
.sql("CREATE VIEW xyz AS SELECT 1")
.await?
.collect()
.await?;

assert!(df.is_empty());

Ok(())
}

#[tokio::test]
async fn query_view() -> Result<()> {
let session_ctx = SessionContext::with_config(
Expand Down
10 changes: 6 additions & 4 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use crate::{
},
};
pub use datafusion_physical_expr::execution_props::ExecutionProps;
use datafusion_physical_expr::var_provider::is_system_variables;
use parking_lot::RwLock;
use std::sync::Arc;
use std::{
Expand Down Expand Up @@ -370,14 +371,16 @@ impl SessionContext {
Arc::new(ViewTable::try_new((*input).clone(), definition)?);

self.register_table(name.as_str(), table)?;
Ok(Arc::new(DataFrame::new(self.state.clone(), &input)))
let plan = LogicalPlanBuilder::empty(false).build()?;
Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
}
(_, Err(_)) => {
let table =
Arc::new(ViewTable::try_new((*input).clone(), definition)?);

self.register_table(name.as_str(), table)?;
Ok(Arc::new(DataFrame::new(self.state.clone(), &input)))
let plan = LogicalPlanBuilder::empty(false).build()?;
Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
}
(false, Ok(_)) => Err(DataFusionError::Execution(format!(
"Table '{:?}' already exists",
Expand Down Expand Up @@ -1561,8 +1564,7 @@ impl ContextProvider for SessionState {
return None;
}

let first_variable = &variable_names[0];
let provider_type = if first_variable.len() > 1 && &first_variable[0..2] == "@@" {
let provider_type = if is_system_variables(variable_names) {
VarType::System
} else {
VarType::UserDefined
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/tests/empty.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
c1,c2,c3
26 changes: 26 additions & 0 deletions datafusion/core/tests/sql/create_drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,3 +261,29 @@ async fn create_pipe_delimited_csv_table() -> Result<()> {

Ok(())
}

#[tokio::test]
async fn create_csv_table_empty_file() -> Result<()> {
let ctx =
SessionContext::with_config(SessionConfig::new().with_information_schema(true));

let sql = "CREATE EXTERNAL TABLE empty STORED AS CSV WITH HEADER ROW LOCATION 'tests/empty.csv'";
ctx.sql(sql).await.unwrap();
let sql =
"select column_name, data_type, ordinal_position from information_schema.columns";
let results = execute_to_batches(&ctx, sql).await;

let expected = vec![
"+-------------+-----------+------------------+",
"| column_name | data_type | ordinal_position |",
"+-------------+-----------+------------------+",
"| c1 | Utf8 | 0 |",
"| c2 | Utf8 | 1 |",
"| c3 | Utf8 | 2 |",
"+-------------+-----------+------------------+",
];

assert_batches_eq!(expected, &results);

Ok(())
}
Loading

0 comments on commit aa4fe6c

Please sign in to comment.