Skip to content

Commit

Permalink
chore: bump datafusion version (#1086)
Browse files Browse the repository at this point in the history
## Rationale
Bump datafusion version to latest.
Close #1072

## Detailed Changes
Bump datafusion version.

- [datafusion release
note](https://github.com/apache/arrow-datafusion/tags), you can see from
[24.0.0-rc1](https://github.com/apache/arrow-datafusion/releases/tag/24.0.0-rc1)

- [arrow-rs release note](https://github.com/apache/arrow-rs/tags), you
can see from
[43.0](https://github.com/apache/arrow-rs/releases/tag/39.0.0)
## Test Plan
Test by ut.
  • Loading branch information
Rachelint authored Jul 20, 2023
1 parent db68fc8 commit cb54dab
Show file tree
Hide file tree
Showing 27 changed files with 716 additions and 323 deletions.
786 changes: 572 additions & 214 deletions Cargo.lock

Large diffs are not rendered by default.

22 changes: 11 additions & 11 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ path = "src/bin/ceresdb-server.rs"

[workspace.dependencies]
alloc_tracker = { path = "components/alloc_tracker" }
arrow = { version = "38.0.0", features = ["prettyprint"] }
arrow_ipc = { version = "38.0.0" }
arrow = { version = "43.0.0", features = ["prettyprint"] }
arrow_ipc = { version = "43.0.0" }
arrow_ext = { path = "components/arrow_ext" }
analytic_engine = { path = "analytic_engine" }
arena = { path = "components/arena" }
Expand All @@ -91,8 +91,8 @@ clru = "0.6.1"
cluster = { path = "cluster" }
criterion = "0.3"
common_types = { path = "common_types" }
datafusion = { git = "https://github.com/ceresdb/arrow-datafusion.git", rev = "acb5d97a8a8de5296989740f97db3773fe3aa45a" }
datafusion-proto = { git = "https://github.com/ceresdb/arrow-datafusion.git", rev = "acb5d97a8a8de5296989740f97db3773fe3aa45a" }
datafusion = { git = "https://github.com/CeresDB/arrow-datafusion.git", rev = "a6dcd943051a083693c352c6b4279156548490a0" }
datafusion-proto = { git = "https://github.com/CeresDB/arrow-datafusion.git", rev = "a6dcd943051a083693c352c6b4279156548490a0" }
df_operator = { path = "df_operator" }
future_cancel = { path = "components/future_cancel" }
etcd-client = "0.10.3"
Expand All @@ -106,10 +106,10 @@ log = "0.4"
logger = { path = "components/logger" }
lru = "0.7.6"
id_allocator = { path = "components/id_allocator" }
influxql-logical-planner = { git = "https://github.com/CeresDB/influxql", rev = "b65a125b9cdfa3121a3c8843bc48441b91049e31", package = "iox_query_influxql" }
influxql-parser = { git = "https://github.com/CeresDB/influxql", rev = "b65a125b9cdfa3121a3c8843bc48441b91049e31", package = "influxdb_influxql_parser" }
influxql-query = { git = "https://github.com/CeresDB/influxql", rev = "b65a125b9cdfa3121a3c8843bc48441b91049e31", package = "iox_query" }
influxql-schema = { git = "https://github.com/CeresDB/influxql", rev = "b65a125b9cdfa3121a3c8843bc48441b91049e31", package = "schema" }
influxql-logical-planner = { git = "https://github.com/CeresDB/influxql.git", rev = "b4520c6d5dfb6d68e4c7671555050391292dbffe", package = "iox_query_influxql" }
influxql-parser = { git = "https://github.com/CeresDB/influxql.git", rev = "b4520c6d5dfb6d68e4c7671555050391292dbffe", package = "influxdb_influxql_parser" }
influxql-query = { git = "https://github.com/CeresDB/influxql.git", rev = "b4520c6d5dfb6d68e4c7671555050391292dbffe", package = "iox_query" }
influxql-schema = { git = "https://github.com/CeresDB/influxql.git", rev = "b4520c6d5dfb6d68e4c7671555050391292dbffe", package = "schema" }
interpreters = { path = "interpreters" }
itertools = "0.10.5"
macros = { path = "components/macros" }
Expand All @@ -121,7 +121,7 @@ panic_ext = { path = "components/panic_ext" }
partitioned_lock = { path = "components/partitioned_lock" }
partition_table_engine = { path = "partition_table_engine" }
parquet_ext = { path = "components/parquet_ext" }
parquet = { version = "38.0.0" }
parquet = { version = "43.0.0" }
paste = "1.0"
pin-project-lite = "0.2.8"
profile = { path = "components/profile" }
Expand All @@ -145,7 +145,7 @@ size_ext = { path = "components/size_ext" }
smallvec = "1.6"
slog = "2.7"
spin = "0.9.6"
sqlparser = { version = "0.33", features = ["serde"] }
sqlparser = { version = "0.35", features = ["serde"] }
system_catalog = { path = "system_catalog" }
table_engine = { path = "table_engine" }
table_kv = { path = "components/table_kv" }
Expand All @@ -160,7 +160,7 @@ trace_metric = { path = "components/trace_metric" }
trace_metric_derive = { path = "components/trace_metric_derive" }
trace_metric_derive_tests = { path = "components/trace_metric_derive_tests" }
tonic = "0.8.1"
tokio = { version = "1.25", features = ["full"] }
tokio = { version = "1.29", features = ["full"] }
wal = { path = "wal" }
xorfilter-rs = { git = "https://github.com/CeresDB/xorfilter", rev = "ac8ef01" }
zstd = { version = "0.12", default-features = false }
Expand Down
4 changes: 4 additions & 0 deletions analytic_engine/src/sst/meta_data/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ impl MetaData {
///
/// After the building, a new parquet meta data will be generated which
/// contains no extended custom information.
// TODO: remove it and use the suggested api.
#[allow(deprecated)]
pub fn try_new(
parquet_meta_data: &parquet_ext::ParquetMetaData,
ignore_sst_filter: bool,
Expand Down Expand Up @@ -147,6 +149,8 @@ mod tests {
use super::MetaData;
use crate::sst::parquet::{encoding, meta_data::ParquetMetaData as CustomParquetMetaData};

// TODO: remove it and use the suggested api.
#[allow(deprecated)]
fn check_parquet_meta_data(original: &ParquetMetaData, processed: &ParquetMetaData) {
assert_eq!(original.page_indexes(), processed.page_indexes());
assert_eq!(original.offset_indexes(), processed.offset_indexes());
Expand Down
8 changes: 4 additions & 4 deletions analytic_engine/src/sst/parquet/async_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@ use common_types::{
};
use datafusion::{
common::ToDFSchema,
datasource::physical_plan::{parquet::page_filter::PagePruningPredicate, ParquetFileMetrics},
physical_expr::{create_physical_expr, execution_props::ExecutionProps},
physical_plan::{
file_format::{parquet::page_filter::PagePruningPredicate, ParquetFileMetrics},
metrics::ExecutionPlanMetricsSet,
},
physical_plan::metrics::ExecutionPlanMetricsSet,
};
use futures::{Stream, StreamExt};
use generic_error::{BoxError, GenericResult};
Expand Down Expand Up @@ -227,6 +225,8 @@ impl<'a> Reader<'a> {
.context(DataFusionError)
}

// TODO: remove it and use the suggested api.
#[allow(deprecated)]
async fn fetch_record_batch_streams(
&mut self,
suggested_parallelism: usize,
Expand Down
8 changes: 3 additions & 5 deletions common_types/src/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@ use arrow::{
error::ArrowError,
};
use bytes_ext::Bytes;
use datafusion::physical_plan::{
expressions::{cast_column, DEFAULT_DATAFUSION_CAST_OPTIONS},
ColumnarValue,
};
use datafusion::physical_plan::{expressions::cast_column, ColumnarValue};
use paste::paste;
use snafu::{Backtrace, OptionExt, ResultExt, Snafu};

Expand Down Expand Up @@ -858,7 +855,8 @@ pub fn cast_nanosecond_to_mills(array: &ArrayRef) -> Result<Arc<dyn Array>> {
let mills_column = cast_column(
&column,
&DataType::Timestamp(TimeUnit::Millisecond, None),
&DEFAULT_DATAFUSION_CAST_OPTIONS,
// It will use the default option internally when found None.
None,
)
.with_context(|| CastTimestamp {
data_type: DataType::Timestamp(TimeUnit::Millisecond, None),
Expand Down
14 changes: 12 additions & 2 deletions common_types/src/datum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1274,7 +1274,12 @@ impl Datum {
| ScalarValue::Struct(_, _)
| ScalarValue::Decimal128(_, _, _)
| ScalarValue::Null
| ScalarValue::IntervalMonthDayNano(_) => None,
| ScalarValue::IntervalMonthDayNano(_)
| ScalarValue::Fixedsizelist(_, _, _)
| ScalarValue::DurationSecond(_)
| ScalarValue::DurationMillisecond(_)
| ScalarValue::DurationMicrosecond(_)
| ScalarValue::DurationNanosecond(_) => None,
}
}
}
Expand Down Expand Up @@ -1318,7 +1323,12 @@ impl<'a> DatumView<'a> {
| ScalarValue::Struct(_, _)
| ScalarValue::Decimal128(_, _, _)
| ScalarValue::Null
| ScalarValue::IntervalMonthDayNano(_) => None,
| ScalarValue::IntervalMonthDayNano(_)
| ScalarValue::Fixedsizelist(_, _, _)
| ScalarValue::DurationSecond(_)
| ScalarValue::DurationMillisecond(_)
| ScalarValue::DurationMicrosecond(_)
| ScalarValue::DurationNanosecond(_) => None,
}
}
}
Expand Down
14 changes: 7 additions & 7 deletions components/parquet_ext/src/prune/equal.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.

use arrow::datatypes::SchemaRef;
use datafusion::{
common::Column,
logical_expr::{Expr, Operator},
logical_expr::{expr::InList, Expr, Operator},
scalar::ScalarValue,
};

Expand Down Expand Up @@ -173,11 +173,11 @@ fn normalize_predicate_expression(expr: &Expr) -> NormalizedExpr {
Operator::NotEq => normalize_equal_expr(left, right, false),
_ => unhandled,
},
Expr::InList {
Expr::InList(InList {
expr,
list,
negated,
} if list.len() < MAX_ELEMS_IN_LIST_FOR_FILTER => {
}) if list.len() < MAX_ELEMS_IN_LIST_FOR_FILTER => {
if list.is_empty() {
if *negated {
// "not in empty list" is always true
Expand Down Expand Up @@ -344,7 +344,7 @@ mod tests {
Expr::not_eq(make_literal_expr(0), make_column_expr("c1")),
Expr::eq(make_literal_expr(1), make_column_expr("c2")),
),
Expr::not(make_column_expr("c3")),
!make_column_expr("c3"),
),
);

Expand Down Expand Up @@ -373,7 +373,7 @@ mod tests {
fn test_normalize_unhandled() {
let lt_expr = Expr::gt(make_column_expr("c0"), make_literal_expr(0));
let empty_list_expr = Expr::in_list(make_column_expr("c0"), vec![], true);
let not_expr = Expr::not(make_column_expr("c0"));
let not_expr = !make_column_expr("c0");

let unhandled_exprs = vec![lt_expr, empty_list_expr, not_expr];
let expect_expr = NormalizedExpr::True;
Expand Down Expand Up @@ -415,7 +415,7 @@ mod tests {
Expr::not_eq(make_literal_expr(0), make_column_expr("c1")),
Expr::eq(make_literal_expr(1), make_column_expr("c2")),
),
Expr::not(make_column_expr("c3")),
!make_column_expr("c3"),
),
);
assert!(EqPruner::new(&true_expr).prune(&f));
Expand Down
4 changes: 2 additions & 2 deletions components/parquet_ext/src/reverse_reader.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.

use std::{collections::VecDeque, sync::Arc};

Expand Down Expand Up @@ -169,7 +169,7 @@ mod tests {
const TEST_BATCH_SIZE: usize = 1000;

fn check_reversed_row_iter(original: RowIter, reversed: ReversedFileReader) {
let mut original_reversed_rows: Vec<_> = original.into_iter().collect();
let mut original_reversed_rows: Vec<_> = original.into_iter().map(|v| v.unwrap()).collect();
original_reversed_rows.reverse();

let reversed_record_batches: Vec<_> = reversed
Expand Down
6 changes: 3 additions & 3 deletions df_operator/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use common_types::{column::ColumnBlock, datum::DatumKind};
use datafusion::{
error::DataFusionError,
logical_expr::{
AccumulatorFunctionImplementation, ReturnTypeFunction, ScalarFunctionImplementation,
AccumulatorFactoryFunction, ReturnTypeFunction, ScalarFunctionImplementation,
Signature as DfSignature, StateTypeFunction, TypeSignature as DfTypeSignature, Volatility,
},
physical_plan::ColumnarValue as DfColumnarValue,
Expand Down Expand Up @@ -253,7 +253,7 @@ impl ScalarFunction {
pub struct AggregateFunction {
type_signature: TypeSignature,
return_type: ReturnType,
df_accumulator: AccumulatorFunctionImplementation,
df_accumulator: AccumulatorFactoryFunction,
state_type: Vec<DatumKind>,
}

Expand Down Expand Up @@ -301,7 +301,7 @@ impl AggregateFunction {
}

#[inline]
pub(crate) fn to_datafusion_accumulator(&self) -> AccumulatorFunctionImplementation {
pub(crate) fn to_datafusion_accumulator(&self) -> AccumulatorFactoryFunction {
self.df_accumulator.clone()
}

Expand Down
2 changes: 1 addition & 1 deletion integration_tests/cases/common/dml/issue-59.result
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ GROUP BY id+1;

plan_type,plan,
String("logical_plan"),String("Projection: group_alias_0 AS issue59.id + Int64(1), COUNT(alias1) AS COUNT(DISTINCT issue59.account)\n Aggregate: groupBy=[[group_alias_0]], aggr=[[COUNT(alias1)]]\n Projection: group_alias_0, alias1\n Aggregate: groupBy=[[CAST(issue59.id AS Int64) + Int64(1) AS group_alias_0, issue59.account AS alias1]], aggr=[[]]\n Projection: issue59.id, issue59.account\n TableScan: issue59 projection=[id, account]"),
String("physical_plan"),String("ProjectionExec: expr=[group_alias_0@0 as issue59.id + Int64(1), COUNT(alias1)@1 as COUNT(DISTINCT issue59.account)]\n AggregateExec: mode=FinalPartitioned, gby=[group_alias_0@0 as group_alias_0], aggr=[COUNT(alias1)]\n CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec: partitioning=Hash([Column { name: \"group_alias_0\", index: 0 }], 8), input_partitions=8\n AggregateExec: mode=Partial, gby=[group_alias_0@0 as group_alias_0], aggr=[COUNT(alias1)]\n ProjectionExec: expr=[group_alias_0@0 as group_alias_0, alias1@1 as alias1]\n AggregateExec: mode=FinalPartitioned, gby=[group_alias_0@0 as group_alias_0, alias1@1 as alias1], aggr=[]\n CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec: partitioning=Hash([Column { name: \"group_alias_0\", index: 0 }, Column { name: \"alias1\", index: 1 }], 8), input_partitions=8\n AggregateExec: mode=Partial, gby=[CAST(id@0 AS Int64) + 1 as group_alias_0, account@1 as alias1], aggr=[]\n ProjectionExec: expr=[id@0 as id, account@1 as account]\n ScanTable: table=issue59, parallelism=8, order=None, \n"),
String("physical_plan"),String("ProjectionExec: expr=[group_alias_0@0 as issue59.id + Int64(1), COUNT(alias1)@1 as COUNT(DISTINCT issue59.account)]\n AggregateExec: mode=FinalPartitioned, gby=[group_alias_0@0 as group_alias_0], aggr=[COUNT(alias1)]\n CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec: partitioning=Hash([group_alias_0@0], 8), input_partitions=8\n AggregateExec: mode=Partial, gby=[group_alias_0@0 as group_alias_0], aggr=[COUNT(alias1)]\n ProjectionExec: expr=[group_alias_0@0 as group_alias_0, alias1@1 as alias1]\n AggregateExec: mode=FinalPartitioned, gby=[group_alias_0@0 as group_alias_0, alias1@1 as alias1], aggr=[]\n CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec: partitioning=Hash([group_alias_0@0, alias1@1], 8), input_partitions=8\n AggregateExec: mode=Partial, gby=[CAST(id@0 AS Int64) + 1 as group_alias_0, account@1 as alias1], aggr=[]\n ProjectionExec: expr=[id@0 as id, account@1 as account]\n ScanTable: table=issue59, parallelism=8, order=None, \n"),


DROP TABLE IF EXISTS issue59;
Expand Down
6 changes: 0 additions & 6 deletions integration_tests/cases/common/dummy/select_1.result
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,6 @@ NOT Int64(1) = Int64(1),
Boolean(false),


SELECT NOT(1);

NOT Int64(1),
Int64(-2),


SELECT TRUE;

Boolean(true),
Expand Down
4 changes: 0 additions & 4 deletions integration_tests/cases/common/dummy/select_1.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,10 @@ SELECT 1;

SELECT x;

-- FIXME
SELECT 'a';

SELECT NOT(1=1);

-- FIXME
SELECT NOT(1);

SELECT TRUE;

SELECT FALSE;
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/cases/common/optimizer/optimizer.result
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ EXPLAIN SELECT max(value) AS c1, avg(value) AS c2 FROM `07_optimizer_t` GROUP BY

plan_type,plan,
String("logical_plan"),String("Projection: MAX(07_optimizer_t.value) AS c1, AVG(07_optimizer_t.value) AS c2\n Aggregate: groupBy=[[07_optimizer_t.name]], aggr=[[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]]\n Projection: 07_optimizer_t.name, 07_optimizer_t.value\n TableScan: 07_optimizer_t projection=[name, value]"),
String("physical_plan"),String("ProjectionExec: expr=[MAX(07_optimizer_t.value)@1 as c1, AVG(07_optimizer_t.value)@2 as c2]\n AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]\n CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec: partitioning=Hash([Column { name: \"name\", index: 0 }], 8), input_partitions=8\n AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]\n ProjectionExec: expr=[name@0 as name, value@1 as value]\n ScanTable: table=07_optimizer_t, parallelism=8, order=None, \n"),
String("physical_plan"),String("ProjectionExec: expr=[MAX(07_optimizer_t.value)@1 as c1, AVG(07_optimizer_t.value)@2 as c2]\n AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]\n CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec: partitioning=Hash([name@0], 8), input_partitions=8\n AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]\n ProjectionExec: expr=[name@0 as name, value@1 as value]\n ScanTable: table=07_optimizer_t, parallelism=8, order=None, \n"),


DROP TABLE `07_optimizer_t`;
Expand Down
19 changes: 14 additions & 5 deletions query_engine/src/context.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.

//! Query context

Expand All @@ -8,10 +8,17 @@ use common_types::request_id::RequestId;
use datafusion::{
execution::{context::SessionState, runtime_env::RuntimeEnv},
optimizer::{
analyzer::AnalyzerRule, common_subexpr_eliminate::CommonSubexprEliminate,
eliminate_limit::EliminateLimit, optimizer::OptimizerRule,
push_down_filter::PushDownFilter, push_down_limit::PushDownLimit,
push_down_projection::PushDownProjection, simplify_expressions::SimplifyExpressions,
analyzer::{
count_wildcard_rule::CountWildcardRule, inline_table_scan::InlineTableScan,
AnalyzerRule,
},
common_subexpr_eliminate::CommonSubexprEliminate,
eliminate_limit::EliminateLimit,
optimizer::OptimizerRule,
push_down_filter::PushDownFilter,
push_down_limit::PushDownLimit,
push_down_projection::PushDownProjection,
simplify_expressions::SimplifyExpressions,
single_distinct_to_groupby::SingleDistinctToGroupBy,
},
physical_optimizer::optimizer::PhysicalOptimizerRule,
Expand Down Expand Up @@ -108,8 +115,10 @@ impl Context {

fn analyzer_rules() -> Vec<Arc<dyn AnalyzerRule + Send + Sync>> {
vec![
Arc::new(InlineTableScan::new()),
Arc::new(TypeConversion),
Arc::new(datafusion::optimizer::analyzer::type_coercion::TypeCoercion::new()),
Arc::new(CountWildcardRule::new()),
]
}
}
Loading

0 comments on commit cb54dab

Please sign in to comment.