Skip to content

Commit

Permalink
adapt to new version datafusion.
Browse files Browse the repository at this point in the history
  • Loading branch information
Rachelint committed Jul 19, 2023
1 parent b65a125 commit 725179c
Show file tree
Hide file tree
Showing 16 changed files with 284 additions and 176 deletions.
241 changes: 163 additions & 78 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ edition = "2021"
license = "MIT OR Apache-2.0"

[workspace.dependencies]
arrow = { version = "38.0.0" }
datafusion = { git = "https://github.com/ceresdb/arrow-datafusion.git", rev = "acb5d97a8a8de5296989740f97db3773fe3aa45a", default-features = false }
arrow = { version = "43.0.0" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "44e3876758bd234547ed07afd6e48c7fc62ea525", default-features = false }
hashbrown = { version = "0.13.2" }
15 changes: 3 additions & 12 deletions datafusion_util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,10 @@ use datafusion::arrow::datatypes::Fields;
use datafusion::common::{DataFusionError, ToDFSchema};
use datafusion::datasource::MemTable;
use datafusion::execution::context::TaskContext;
use datafusion::execution::memory_pool::UnboundedMemoryPool;
use datafusion::logical_expr::expr::Sort;
use datafusion::physical_expr::execution_props::ExecutionProps;
use datafusion::physical_expr::{create_physical_expr, PhysicalExpr};
use datafusion::physical_optimizer::pruning::PruningPredicate;
use datafusion::physical_plan::common::SizedRecordBatchStream;
use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MemTrackingMetrics};
use datafusion::physical_plan::{collect, EmptyRecordBatchStream, ExecutionPlan};
use datafusion::prelude::{lit, Column, Expr, SessionContext};
use datafusion::{
Expand Down Expand Up @@ -248,24 +245,18 @@ where

/// Create a SendableRecordBatchStream a RecordBatch
pub fn stream_from_batch(schema: SchemaRef, batch: RecordBatch) -> SendableRecordBatchStream {
stream_from_batches(schema, vec![Arc::new(batch)])
stream_from_batches(schema, vec![batch])
}

/// Create a SendableRecordBatchStream from Vec of RecordBatches with the same schema
pub fn stream_from_batches(
schema: SchemaRef,
batches: Vec<Arc<RecordBatch>>,
batches: Vec<RecordBatch>,
) -> SendableRecordBatchStream {
if batches.is_empty() {
return Box::pin(EmptyRecordBatchStream::new(schema));
}

// TODO should track this memory properly
let dummy_pool = Arc::new(UnboundedMemoryPool::default()) as _;
let dummy_metrics = ExecutionPlanMetricsSet::new();
let mem_metrics = MemTrackingMetrics::new(&dummy_metrics, &dummy_pool, 0);
let stream = SizedRecordBatchStream::new(batches[0].schema(), batches, mem_metrics);
Box::pin(stream)
Box::pin(MemoryStream::new_with_schema(batches, schema))
}

/// Create a SendableRecordBatchStream that sends back no RecordBatches with a specific schema
Expand Down
3 changes: 1 addition & 2 deletions iox_query/src/exec/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ use async_trait::async_trait;
use datafusion::error::Result;
use datafusion::execution::context::{QueryPlanner, SessionState};
use datafusion::logical_expr::UserDefinedLogicalNode;
use datafusion::physical_plan::planner::{DefaultPhysicalPlanner, ExtensionPlanner};
use datafusion::physical_plan::PhysicalPlanner;
use datafusion::physical_planner::{PhysicalPlanner, DefaultPhysicalPlanner, ExtensionPlanner};
use datafusion::{logical_expr::LogicalPlan, physical_plan::ExecutionPlan, prelude::*};
use std::sync::Arc;

Expand Down
20 changes: 11 additions & 9 deletions iox_query/src/exec/gapfill/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use datafusion::{
expressions::Column,
metrics::{BaselineMetrics, ExecutionPlanMetricsSet},
DisplayFormatType, Distribution, ExecutionPlan, Partitioning, PhysicalExpr,
SendableRecordBatchStream, Statistics,
SendableRecordBatchStream, Statistics, DisplayAs,
},
prelude::Expr,
};
Expand Down Expand Up @@ -528,20 +528,26 @@ impl ExecutionPlan for GapFillExec {
)?))
}

fn statistics(&self) -> Statistics {
Statistics::default()
}
}

impl DisplayAs for GapFillExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match t {
DisplayFormatType::Default => {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
let group_expr: Vec<_> = self.group_expr.iter().map(|e| e.to_string()).collect();
let aggr_expr: Vec<_> = self
.params
.fill_strategy
.iter()
.map(|(e, fs)| match fs {
FillStrategy::PrevNullAsIntentional => {
format!("LOCF(null-as-intentional, {e})")
format!("LOCF(null-as-intentional, {})", e)
}
FillStrategy::PrevNullAsMissing => format!("LOCF({e})"),
FillStrategy::LinearInterpolate => format!("INTERPOLATE({e})"),
FillStrategy::PrevNullAsMissing => format!("LOCF({})", e),
FillStrategy::LinearInterpolate => format!("INTERPOLATE({})", e),
FillStrategy::Null => e.to_string(),
})
.collect();
Expand All @@ -560,8 +566,4 @@ impl ExecutionPlan for GapFillExec {
}
}
}

fn statistics(&self) -> Statistics {
Statistics::default()
}
}
20 changes: 11 additions & 9 deletions iox_query/src/exec/non_null_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ use datafusion::{
expressions::PhysicalSortExpr,
metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet},
DisplayFormatType, Distribution, ExecutionPlan, Partitioning, SendableRecordBatchStream,
Statistics,
Statistics, DisplayAs,
},
};

Expand Down Expand Up @@ -272,14 +272,6 @@ impl ExecutionPlan for NonNullCheckerExec {
Ok(AdapterStream::adapt(self.schema(), rx, handle))
}

fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match t {
DisplayFormatType::Default => {
write!(f, "NonNullCheckerExec")
}
}
}

fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
Expand All @@ -290,6 +282,16 @@ impl ExecutionPlan for NonNullCheckerExec {
}
}

impl DisplayAs for NonNullCheckerExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "NonNullCheckerExec")
}
}
}
}

async fn check_for_nulls(
mut input_stream: SendableRecordBatchStream,
schema: SchemaRef,
Expand Down
21 changes: 12 additions & 9 deletions iox_query/src/exec/schema_pivot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use arrow::{
error::ArrowError,
record_batch::RecordBatch,
};
use datafusion::error::DataFusionError;
use datafusion::{error::DataFusionError, physical_plan::DisplayAs};
use datafusion::{
common::{DFSchemaRef, ToDFSchema},
error::{DataFusionError as Error, Result},
Expand Down Expand Up @@ -247,14 +247,6 @@ impl ExecutionPlan for SchemaPivotExec {
Ok(AdapterStream::adapt(self.schema(), rx, handle))
}

fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match t {
DisplayFormatType::Default => {
write!(f, "SchemaPivotExec")
}
}
}

fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
Expand All @@ -265,6 +257,17 @@ impl ExecutionPlan for SchemaPivotExec {
}
}

impl DisplayAs for SchemaPivotExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "SchemaPivotExec")
}
}
}
}


// Algorithm: for each column we haven't seen a value for yet,
// check each input row;
//
Expand Down
20 changes: 11 additions & 9 deletions iox_query/src/exec/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ use datafusion::{
expressions::PhysicalSortExpr,
metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RecordOutput},
ColumnarValue, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, PhysicalExpr,
SendableRecordBatchStream, Statistics,
SendableRecordBatchStream, Statistics, DisplayAs,
},
scalar::ScalarValue,
};
Expand Down Expand Up @@ -270,14 +270,6 @@ impl ExecutionPlan for StreamSplitExec {
}
}

fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match t {
DisplayFormatType::Default => {
write!(f, "StreamSplitExec")
}
}
}

fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
Expand All @@ -289,6 +281,16 @@ impl ExecutionPlan for StreamSplitExec {
}
}

impl DisplayAs for StreamSplitExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "StreamSplitExec")
}
}
}
}

impl StreamSplitExec {
/// if in State::New, sets up the output running and sets self.state --> `Running`
fn start_if_needed(&self, context: Arc<TaskContext>) -> Result<()> {
Expand Down
25 changes: 13 additions & 12 deletions iox_query/src/logical_optimizer/handle_gapfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use datafusion::{
error::{DataFusionError, Result},
logical_expr::{
utils::expr_to_columns, Aggregate, BuiltinScalarFunction, Extension, LogicalPlan,
Projection,
Projection, expr::{ScalarUDF, ScalarFunction},
},
optimizer::{optimizer::ApplyOrder, OptimizerConfig, OptimizerRule},
prelude::{col, Expr},
Expand Down Expand Up @@ -328,7 +328,7 @@ impl TreeNodeRewriter for DateBinGapfillRewriter {
type N = Expr;
fn pre_visit(&mut self, expr: &Expr) -> Result<RewriteRecursion> {
match expr {
Expr::ScalarUDF { fun, .. } if fun.name == DATE_BIN_GAPFILL_UDF_NAME => {
Expr::ScalarUDF(ScalarUDF { fun, .. }) if fun.name == DATE_BIN_GAPFILL_UDF_NAME => {
Ok(RewriteRecursion::Mutate)
}
_ => Ok(RewriteRecursion::Continue),
Expand All @@ -337,12 +337,12 @@ impl TreeNodeRewriter for DateBinGapfillRewriter {

fn mutate(&mut self, expr: Expr) -> Result<Expr> {
match expr {
Expr::ScalarUDF { fun, args } if fun.name == DATE_BIN_GAPFILL_UDF_NAME => {
Expr::ScalarUDF(ScalarUDF { fun, args }) if fun.name == DATE_BIN_GAPFILL_UDF_NAME => {
self.args = Some(args.clone());
Ok(Expr::ScalarFunction {
Ok(Expr::ScalarFunction(ScalarFunction {
fun: BuiltinScalarFunction::DateBin,
args,
})
}))
}
_ => Ok(expr),
}
Expand All @@ -368,7 +368,7 @@ fn handle_projection(proj: &Projection) -> Result<Option<LogicalPlan>> {
let fill_cols: Vec<(&Expr, FillStrategy)> = proj_exprs
.iter()
.filter_map(|e| match e {
Expr::ScalarUDF { fun, args } if fun.name == LOCF_UDF_NAME => {
Expr::ScalarUDF(ScalarUDF { fun, args }) if fun.name == LOCF_UDF_NAME => {
let col = &args[0];
Some((col, FillStrategy::PrevNullAsMissing))
}
Expand Down Expand Up @@ -397,7 +397,7 @@ fn handle_projection(proj: &Projection) -> Result<Option<LogicalPlan>> {
.iter()
.cloned()
.map(|e| match e {
Expr::ScalarUDF { fun, mut args } if fun.name == LOCF_UDF_NAME => args.remove(0),
Expr::ScalarUDF(ScalarUDF { fun, mut args }) if fun.name == LOCF_UDF_NAME => args.remove(0),
_ => e,
})
.collect();
Expand All @@ -419,7 +419,7 @@ fn count_udf(e: &Expr, name: &str) -> Result<usize> {
let mut count = 0;
e.apply(&mut |expr| {
match expr {
Expr::ScalarUDF { fun, .. } if fun.name == name => {
Expr::ScalarUDF(ScalarUDF { fun, .. }) if fun.name == name => {
count += 1;
}
_ => (),
Expand Down Expand Up @@ -454,6 +454,7 @@ mod test {

use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use datafusion::error::Result;
use datafusion::logical_expr::expr::ScalarUDF;
use datafusion::logical_expr::{logical_plan, LogicalPlan, LogicalPlanBuilder};
use datafusion::optimizer::optimizer::Optimizer;
use datafusion::optimizer::OptimizerContext;
Expand Down Expand Up @@ -484,17 +485,17 @@ mod test {
}

fn date_bin_gapfill_with_origin(interval: Expr, time: Expr, origin: Expr) -> Result<Expr> {
Ok(Expr::ScalarUDF {
Ok(Expr::ScalarUDF(ScalarUDF {
fun: query_functions::registry().udf(DATE_BIN_GAPFILL_UDF_NAME)?,
args: vec![interval, time, origin],
})
}))
}

fn locf(arg: Expr) -> Result<Expr> {
Ok(Expr::ScalarUDF {
Ok(Expr::ScalarUDF(ScalarUDF {
fun: query_functions::registry().udf(LOCF_UDF_NAME)?,
args: vec![arg],
})
}))
}

fn optimize(plan: &LogicalPlan) -> Result<Option<LogicalPlan>> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use datafusion::{
DFSchema,
},
error::Result,
logical_expr::{Between, BinaryExpr, LogicalPlan, Operator},
logical_expr::{Between, BinaryExpr, LogicalPlan, Operator, expr::Alias},
optimizer::utils::split_conjunction,
prelude::{Column, Expr},
};
Expand Down Expand Up @@ -79,7 +79,7 @@ impl TreeNodeVisitor for TimeRangeVisitor {
fn unwrap_alias(mut e: &Expr) -> &Expr {
loop {
match e {
Expr::Alias(inner, _) => e = inner.as_ref(),
Expr::Alias(Alias { expr, .. }) => e = expr.as_ref(),
e => break e,
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use datafusion::{
common::{tree_node::TreeNodeRewriter, DFSchema},
error::DataFusionError,
logical_expr::{utils::from_plan, LogicalPlan, Operator},
optimizer::{utils::rewrite_preserving_name, OptimizerConfig, OptimizerRule},
logical_expr::{utils::from_plan, LogicalPlan, Operator, expr_rewriter::rewrite_preserving_name, expr::ScalarUDF},
optimizer::{OptimizerConfig, OptimizerRule},
prelude::{binary_expr, lit, Expr},
scalar::ScalarValue,
};
Expand Down Expand Up @@ -72,7 +72,7 @@ impl TreeNodeRewriter for InfluxRegexToDataFusionRegex {

fn mutate(&mut self, expr: Expr) -> Result<Expr, DataFusionError> {
match expr {
Expr::ScalarUDF { fun, mut args } => {
Expr::ScalarUDF(ScalarUDF { fun, mut args }) => {
if (args.len() == 2)
&& ((fun.name == REGEX_MATCH_UDF_NAME)
|| (fun.name == REGEX_NOT_MATCH_UDF_NAME))
Expand All @@ -88,7 +88,7 @@ impl TreeNodeRewriter for InfluxRegexToDataFusionRegex {
}
}

Ok(Expr::ScalarUDF { fun, args })
Ok(Expr::ScalarUDF(ScalarUDF{ fun, args }))
}
_ => Ok(expr),
}
Expand Down
12 changes: 11 additions & 1 deletion iox_query_influxql/src/frontend/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use datafusion::datasource::provider_as_source;
use datafusion::execution::context::{SessionState, TaskContext};
use datafusion::logical_expr::{AggregateUDF, LogicalPlan, ScalarUDF, TableSource};
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_plan::{Partitioning, SendableRecordBatchStream};
use datafusion::physical_plan::{Partitioning, SendableRecordBatchStream, DisplayAs, DisplayFormatType};
use datafusion::{
error::{DataFusionError, Result},
physical_plan::ExecutionPlan,
Expand Down Expand Up @@ -114,6 +114,16 @@ impl ExecutionPlan for SchemaExec {
}
}

impl DisplayAs for SchemaExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "SchemaExec")
}
}
}
}

/// Create plans for running InfluxQL queries against databases
#[derive(Debug, Default)]
pub struct InfluxQLQueryPlanner {}
Expand Down
Loading

0 comments on commit 725179c

Please sign in to comment.