Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ jobs:
export RUST_MIN_STACK=20971520
export TPCH_DATA=`realpath datafusion/sqllogictest/test_files/tpch/data`
cargo test plan_q --package datafusion-benchmarks --profile ci --features=ci -- --test-threads=1
INCLUDE_TPCH=true cargo test --profile ci --package datafusion-sqllogictest --test sqllogictests
INCLUDE_TPCH=true cargo test --features backtrace --profile ci --package datafusion-sqllogictest --test sqllogictests
- name: Verify Working Directory Clean
run: git diff --exit-code

Expand Down Expand Up @@ -331,7 +331,7 @@ jobs:
- name: Run sqllogictest
run: |
cd datafusion/sqllogictest
PG_COMPAT=true PG_URI="postgresql://postgres:postgres@$POSTGRES_HOST:$POSTGRES_PORT/db_test" cargo test --profile ci --features=postgres --test sqllogictests
PG_COMPAT=true PG_URI="postgresql://postgres:postgres@$POSTGRES_HOST:$POSTGRES_PORT/db_test" cargo test --features backtrace --profile ci --features=postgres --test sqllogictests
env:
# use postgres for the host here because we have specified a container for the job
POSTGRES_HOST: postgres
Expand Down
4 changes: 4 additions & 0 deletions datafusion-examples/examples/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,10 @@ async fn write_out(ctx: &SessionContext) -> std::result::Result<(), DataFusionEr
// Ensure the column names and types match the target table
df = df.with_column_renamed("column1", "tablecol1").unwrap();

// Support datatype cast for insert api same as insert into sql
// TODO https://github.com/apache/datafusion/issues/15015
df = df.with_column("tablecol1", cast(col("tablecol1"), DataType::Utf8View))?;

ctx.sql(
"create external table
test(tablecol1 varchar)
Expand Down
3 changes: 3 additions & 0 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1898,6 +1898,9 @@ mod tests {
Ok(())
}

/// Note: We now default to use Utf8View, but we don't support for Utf8View in JSON reader
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This testing failed with error JSON reader don't support utf8view, we may need to file a ticket for arrow-rs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is the risk for the PR @alamb

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/// so we can't test JSON with Utf8View
#[ignore]
#[tokio::test]
async fn test_insert_into_sql_json_defaults() -> Result<()> {
helper_test_insert_into_sql("json", FileCompressionType::UNCOMPRESSED, "", None)
Expand Down
6 changes: 6 additions & 0 deletions datafusion/core/tests/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2411,6 +2411,12 @@ async fn write_table_with_order() -> Result<()> {
write_df = write_df
.with_column_renamed("column1", "tablecol1")
.unwrap();

// Support datatype cast for insert api same as insert into sql
// TODO https://github.com/apache/datafusion/issues/15015
write_df =
write_df.with_column("tablecol1", cast(col("tablecol1"), DataType::Utf8View))?;

let sql_str =
"create external table data(tablecol1 varchar) stored as parquet location '"
.to_owned()
Expand Down
48 changes: 24 additions & 24 deletions datafusion/core/tests/sql/explain_analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,9 @@ async fn csv_explain_plans() {
// Verify schema
let expected = vec![
"Explain [plan_type:Utf8, plan:Utf8]",
" Projection: aggregate_test_100.c1 [c1:Utf8]",
" Filter: aggregate_test_100.c2 > Int64(10) [c1:Utf8, c2:Int8, c3:Int16, c4:Int16, c5:Int32, c6:Int64, c7:Int16, c8:Int32, c9:UInt32, c10:UInt64, c11:Float32, c12:Float64, c13:Utf8]",
" TableScan: aggregate_test_100 [c1:Utf8, c2:Int8, c3:Int16, c4:Int16, c5:Int32, c6:Int64, c7:Int16, c8:Int32, c9:UInt32, c10:UInt64, c11:Float32, c12:Float64, c13:Utf8]",
" Projection: aggregate_test_100.c1 [c1:Utf8View]",
" Filter: aggregate_test_100.c2 > Int64(10) [c1:Utf8View, c2:Int8, c3:Int16, c4:Int16, c5:Int32, c6:Int64, c7:Int16, c8:Int32, c9:UInt32, c10:UInt64, c11:Float32, c12:Float64, c13:Utf8View]",
" TableScan: aggregate_test_100 [c1:Utf8View, c2:Int8, c3:Int16, c4:Int16, c5:Int32, c6:Int64, c7:Int16, c8:Int32, c9:UInt32, c10:UInt64, c11:Float32, c12:Float64, c13:Utf8View]",
];
let formatted = plan.display_indent_schema().to_string();
let actual: Vec<&str> = formatted.trim().lines().collect();
Expand Down Expand Up @@ -222,11 +222,11 @@ async fn csv_explain_plans() {
" {",
" graph[label=\"Detailed LogicalPlan\"]",
" 7[shape=box label=\"Explain\\nSchema: [plan_type:Utf8, plan:Utf8]\"]",
" 8[shape=box label=\"Projection: aggregate_test_100.c1\\nSchema: [c1:Utf8]\"]",
" 8[shape=box label=\"Projection: aggregate_test_100.c1\\nSchema: [c1:Utf8View]\"]",
" 7 -> 8 [arrowhead=none, arrowtail=normal, dir=back]",
" 9[shape=box label=\"Filter: aggregate_test_100.c2 > Int64(10)\\nSchema: [c1:Utf8, c2:Int8, c3:Int16, c4:Int16, c5:Int32, c6:Int64, c7:Int16, c8:Int32, c9:UInt32, c10:UInt64, c11:Float32, c12:Float64, c13:Utf8]\"]",
" 9[shape=box label=\"Filter: aggregate_test_100.c2 > Int64(10)\\nSchema: [c1:Utf8View, c2:Int8, c3:Int16, c4:Int16, c5:Int32, c6:Int64, c7:Int16, c8:Int32, c9:UInt32, c10:UInt64, c11:Float32, c12:Float64, c13:Utf8View]\"]",
" 8 -> 9 [arrowhead=none, arrowtail=normal, dir=back]",
" 10[shape=box label=\"TableScan: aggregate_test_100\\nSchema: [c1:Utf8, c2:Int8, c3:Int16, c4:Int16, c5:Int32, c6:Int64, c7:Int16, c8:Int32, c9:UInt32, c10:UInt64, c11:Float32, c12:Float64, c13:Utf8]\"]",
" 10[shape=box label=\"TableScan: aggregate_test_100\\nSchema: [c1:Utf8View, c2:Int8, c3:Int16, c4:Int16, c5:Int32, c6:Int64, c7:Int16, c8:Int32, c9:UInt32, c10:UInt64, c11:Float32, c12:Float64, c13:Utf8View]\"]",
" 9 -> 10 [arrowhead=none, arrowtail=normal, dir=back]",
" }",
"}",
Expand All @@ -250,9 +250,9 @@ async fn csv_explain_plans() {
// Verify schema
let expected = vec![
"Explain [plan_type:Utf8, plan:Utf8]",
" Projection: aggregate_test_100.c1 [c1:Utf8]",
" Filter: aggregate_test_100.c2 > Int8(10) [c1:Utf8, c2:Int8]",
" TableScan: aggregate_test_100 projection=[c1, c2], partial_filters=[aggregate_test_100.c2 > Int8(10)] [c1:Utf8, c2:Int8]",
" Projection: aggregate_test_100.c1 [c1:Utf8View]",
" Filter: aggregate_test_100.c2 > Int8(10) [c1:Utf8View, c2:Int8]",
" TableScan: aggregate_test_100 projection=[c1, c2], partial_filters=[aggregate_test_100.c2 > Int8(10)] [c1:Utf8View, c2:Int8]",
];
let formatted = plan.display_indent_schema().to_string();
let actual: Vec<&str> = formatted.trim().lines().collect();
Expand Down Expand Up @@ -296,11 +296,11 @@ async fn csv_explain_plans() {
" {",
" graph[label=\"Detailed LogicalPlan\"]",
" 7[shape=box label=\"Explain\\nSchema: [plan_type:Utf8, plan:Utf8]\"]",
" 8[shape=box label=\"Projection: aggregate_test_100.c1\\nSchema: [c1:Utf8]\"]",
" 8[shape=box label=\"Projection: aggregate_test_100.c1\\nSchema: [c1:Utf8View]\"]",
" 7 -> 8 [arrowhead=none, arrowtail=normal, dir=back]",
" 9[shape=box label=\"Filter: aggregate_test_100.c2 > Int8(10)\\nSchema: [c1:Utf8, c2:Int8]\"]",
" 9[shape=box label=\"Filter: aggregate_test_100.c2 > Int8(10)\\nSchema: [c1:Utf8View, c2:Int8]\"]",
" 8 -> 9 [arrowhead=none, arrowtail=normal, dir=back]",
" 10[shape=box label=\"TableScan: aggregate_test_100 projection=[c1, c2], partial_filters=[aggregate_test_100.c2 > Int8(10)]\\nSchema: [c1:Utf8, c2:Int8]\"]",
" 10[shape=box label=\"TableScan: aggregate_test_100 projection=[c1, c2], partial_filters=[aggregate_test_100.c2 > Int8(10)]\\nSchema: [c1:Utf8View, c2:Int8]\"]",
" 9 -> 10 [arrowhead=none, arrowtail=normal, dir=back]",
" }",
"}",
Expand Down Expand Up @@ -398,9 +398,9 @@ async fn csv_explain_verbose_plans() {
// Verify schema
let expected = vec![
"Explain [plan_type:Utf8, plan:Utf8]",
" Projection: aggregate_test_100.c1 [c1:Utf8]",
" Filter: aggregate_test_100.c2 > Int64(10) [c1:Utf8, c2:Int8, c3:Int16, c4:Int16, c5:Int32, c6:Int64, c7:Int16, c8:Int32, c9:UInt32, c10:UInt64, c11:Float32, c12:Float64, c13:Utf8]",
" TableScan: aggregate_test_100 [c1:Utf8, c2:Int8, c3:Int16, c4:Int16, c5:Int32, c6:Int64, c7:Int16, c8:Int32, c9:UInt32, c10:UInt64, c11:Float32, c12:Float64, c13:Utf8]",
" Projection: aggregate_test_100.c1 [c1:Utf8View]",
" Filter: aggregate_test_100.c2 > Int64(10) [c1:Utf8View, c2:Int8, c3:Int16, c4:Int16, c5:Int32, c6:Int64, c7:Int16, c8:Int32, c9:UInt32, c10:UInt64, c11:Float32, c12:Float64, c13:Utf8View]",
" TableScan: aggregate_test_100 [c1:Utf8View, c2:Int8, c3:Int16, c4:Int16, c5:Int32, c6:Int64, c7:Int16, c8:Int32, c9:UInt32, c10:UInt64, c11:Float32, c12:Float64, c13:Utf8View]",
];
let formatted = dataframe.logical_plan().display_indent_schema().to_string();
let actual: Vec<&str> = formatted.trim().lines().collect();
Expand Down Expand Up @@ -444,11 +444,11 @@ async fn csv_explain_verbose_plans() {
" {",
" graph[label=\"Detailed LogicalPlan\"]",
" 7[shape=box label=\"Explain\\nSchema: [plan_type:Utf8, plan:Utf8]\"]",
" 8[shape=box label=\"Projection: aggregate_test_100.c1\\nSchema: [c1:Utf8]\"]",
" 8[shape=box label=\"Projection: aggregate_test_100.c1\\nSchema: [c1:Utf8View]\"]",
" 7 -> 8 [arrowhead=none, arrowtail=normal, dir=back]",
" 9[shape=box label=\"Filter: aggregate_test_100.c2 > Int64(10)\\nSchema: [c1:Utf8, c2:Int8, c3:Int16, c4:Int16, c5:Int32, c6:Int64, c7:Int16, c8:Int32, c9:UInt32, c10:UInt64, c11:Float32, c12:Float64, c13:Utf8]\"]",
" 9[shape=box label=\"Filter: aggregate_test_100.c2 > Int64(10)\\nSchema: [c1:Utf8View, c2:Int8, c3:Int16, c4:Int16, c5:Int32, c6:Int64, c7:Int16, c8:Int32, c9:UInt32, c10:UInt64, c11:Float32, c12:Float64, c13:Utf8View]\"]",
" 8 -> 9 [arrowhead=none, arrowtail=normal, dir=back]",
" 10[shape=box label=\"TableScan: aggregate_test_100\\nSchema: [c1:Utf8, c2:Int8, c3:Int16, c4:Int16, c5:Int32, c6:Int64, c7:Int16, c8:Int32, c9:UInt32, c10:UInt64, c11:Float32, c12:Float64, c13:Utf8]\"]",
" 10[shape=box label=\"TableScan: aggregate_test_100\\nSchema: [c1:Utf8View, c2:Int8, c3:Int16, c4:Int16, c5:Int32, c6:Int64, c7:Int16, c8:Int32, c9:UInt32, c10:UInt64, c11:Float32, c12:Float64, c13:Utf8View]\"]",
" 9 -> 10 [arrowhead=none, arrowtail=normal, dir=back]",
" }",
"}",
Expand All @@ -472,9 +472,9 @@ async fn csv_explain_verbose_plans() {
// Verify schema
let expected = vec![
"Explain [plan_type:Utf8, plan:Utf8]",
" Projection: aggregate_test_100.c1 [c1:Utf8]",
" Filter: aggregate_test_100.c2 > Int8(10) [c1:Utf8, c2:Int8]",
" TableScan: aggregate_test_100 projection=[c1, c2], partial_filters=[aggregate_test_100.c2 > Int8(10)] [c1:Utf8, c2:Int8]",
" Projection: aggregate_test_100.c1 [c1:Utf8View]",
" Filter: aggregate_test_100.c2 > Int8(10) [c1:Utf8View, c2:Int8]",
" TableScan: aggregate_test_100 projection=[c1, c2], partial_filters=[aggregate_test_100.c2 > Int8(10)] [c1:Utf8View, c2:Int8]",
];
let formatted = plan.display_indent_schema().to_string();
let actual: Vec<&str> = formatted.trim().lines().collect();
Expand Down Expand Up @@ -518,11 +518,11 @@ async fn csv_explain_verbose_plans() {
" {",
" graph[label=\"Detailed LogicalPlan\"]",
" 7[shape=box label=\"Explain\\nSchema: [plan_type:Utf8, plan:Utf8]\"]",
" 8[shape=box label=\"Projection: aggregate_test_100.c1\\nSchema: [c1:Utf8]\"]",
" 8[shape=box label=\"Projection: aggregate_test_100.c1\\nSchema: [c1:Utf8View]\"]",
" 7 -> 8 [arrowhead=none, arrowtail=normal, dir=back]",
" 9[shape=box label=\"Filter: aggregate_test_100.c2 > Int8(10)\\nSchema: [c1:Utf8, c2:Int8]\"]",
" 9[shape=box label=\"Filter: aggregate_test_100.c2 > Int8(10)\\nSchema: [c1:Utf8View, c2:Int8]\"]",
" 8 -> 9 [arrowhead=none, arrowtail=normal, dir=back]",
" 10[shape=box label=\"TableScan: aggregate_test_100 projection=[c1, c2], partial_filters=[aggregate_test_100.c2 > Int8(10)]\\nSchema: [c1:Utf8, c2:Int8]\"]",
" 10[shape=box label=\"TableScan: aggregate_test_100 projection=[c1, c2], partial_filters=[aggregate_test_100.c2 > Int8(10)]\\nSchema: [c1:Utf8View, c2:Int8]\"]",
" 9 -> 10 [arrowhead=none, arrowtail=normal, dir=back]",
" }",
"}",
Expand Down
15 changes: 8 additions & 7 deletions datafusion/core/tests/user_defined/user_defined_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,14 @@ use std::hash::Hash;
use std::task::{Context, Poll};
use std::{any::Any, collections::BTreeMap, fmt, sync::Arc};

use arrow::array::StringViewArray;
use arrow::{
array::{Int64Array, StringArray},
datatypes::SchemaRef,
record_batch::RecordBatch,
array::Int64Array, datatypes::SchemaRef, record_batch::RecordBatch,
util::pretty::pretty_format_batches,
};
use datafusion::execution::session_state::SessionStateBuilder;
use datafusion::{
common::cast::{as_int64_array, as_string_array},
common::cast::as_int64_array,
common::{arrow_datafusion_err, internal_err, DFSchemaRef},
error::{DataFusionError, Result},
execution::{
Expand Down Expand Up @@ -100,6 +99,7 @@ use datafusion_optimizer::AnalyzerRule;
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};

use async_trait::async_trait;
use datafusion_common::cast::as_string_view_array;
use futures::{Stream, StreamExt};

/// Execute the specified sql and return the resulting record batches
Expand Down Expand Up @@ -807,10 +807,10 @@ fn accumulate_batch(
) -> BTreeMap<i64, String> {
let num_rows = input_batch.num_rows();
// Assuming the input columns are
// column[0]: customer_id / UTF8
// column[0]: customer_id / UTF8View
// column[1]: revenue: Int64
let customer_id =
as_string_array(input_batch.column(0)).expect("Column 0 is not customer_id");
as_string_view_array(input_batch.column(0)).expect("Column 0 is not customer_id");

let revenue = as_int64_array(input_batch.column(1)).unwrap();

Expand Down Expand Up @@ -857,7 +857,8 @@ impl Stream for TopKReader {
RecordBatch::try_new(
schema,
vec![
Arc::new(StringArray::from(customer)),
// Now the default for VARCHAR is StringViewArray
Arc::new(StringViewArray::from(customer)),
Arc::new(Int64Array::from(revenue)),
],
)
Expand Down
9 changes: 9 additions & 0 deletions datafusion/execution/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,15 @@ impl SessionConfig {
self
}

/// Returns true if the force view metadata option is enabled
pub fn with_parquet_force_view_metadata(
mut self,
schema_force_view_types: bool,
) -> Self {
self.options.execution.parquet.schema_force_view_types = schema_force_view_types;
self
}

/// Returns true if page index should be used to skip parquet data pages
pub fn parquet_page_index_pruning(&self) -> bool {
self.options.execution.parquet.enable_page_index
Expand Down
13 changes: 10 additions & 3 deletions datafusion/physical-expr/src/expressions/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,12 @@ fn boolean_op(
macro_rules! binary_string_array_flag_op {
($LEFT:expr, $RIGHT:expr, $OP:ident, $NOT:expr, $FLAG:expr) => {{
match $LEFT.data_type() {
DataType::Utf8View | DataType::Utf8 => {
DataType::Utf8 => {
compute_utf8_flag_op!($LEFT, $RIGHT, $OP, StringArray, $NOT, $FLAG)
},
DataType::Utf8View => {
compute_utf8_flag_op!($LEFT, $RIGHT, $OP, StringViewArray, $NOT, $FLAG)
},
DataType::LargeUtf8 => {
compute_utf8_flag_op!($LEFT, $RIGHT, $OP, LargeStringArray, $NOT, $FLAG)
},
Expand Down Expand Up @@ -212,17 +215,21 @@ macro_rules! binary_string_array_flag_op_scalar {
// This macro is slightly different from binary_string_array_flag_op because, when comparing with a scalar value,
// the query can be optimized in such a way that operands will be dicts, so we need to support it here
let result: Result<Arc<dyn Array>> = match $LEFT.data_type() {
DataType::Utf8View | DataType::Utf8 => {
DataType::Utf8 => {
compute_utf8_flag_op_scalar!($LEFT, $RIGHT, $OP, StringArray, $NOT, $FLAG)
},
DataType::Utf8View => {
compute_utf8_flag_op_scalar!($LEFT, $RIGHT, $OP, StringViewArray, $NOT, $FLAG)
},
DataType::LargeUtf8 => {
compute_utf8_flag_op_scalar!($LEFT, $RIGHT, $OP, LargeStringArray, $NOT, $FLAG)
},
DataType::Dictionary(_, _) => {
let values = $LEFT.as_any_dictionary().values();

match values.data_type() {
DataType::Utf8View | DataType::Utf8 => compute_utf8_flag_op_scalar!(values, $RIGHT, $OP, StringArray, $NOT, $FLAG),
DataType::Utf8 => compute_utf8_flag_op_scalar!(values, $RIGHT, $OP, StringArray, $NOT, $FLAG),
DataType::Utf8View => compute_utf8_flag_op_scalar!(values, $RIGHT, $OP, StringViewArray, $NOT, $FLAG),
DataType::LargeUtf8 => compute_utf8_flag_op_scalar!(values, $RIGHT, $OP, LargeStringArray, $NOT, $FLAG),
other => internal_err!(
"Data type {:?} not supported as a dictionary value type for binary_string_array_flag_op_scalar operation '{}' on string array",
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
SQLDataType::Varchar(length) => {
match (length, self.options.support_varchar_with_length) {
(Some(_), false) => plan_err!("does not support Varchar with length, please set `support_varchar_with_length` to be true"),
_ => Ok(DataType::Utf8),
_ => Ok(DataType::Utf8View),
}
}
SQLDataType::UnsignedBigInt(_) | SQLDataType::UnsignedInt8(_) => Ok(DataType::UInt64),
Expand Down
1 change: 1 addition & 0 deletions datafusion/sqllogictest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ tokio-postgres = { version = "0.7.12", optional = true }

[features]
avro = ["datafusion/avro"]
backtrace = ["datafusion/backtrace"]
postgres = [
"bytes",
"chrono",
Expand Down
Loading