Skip to content

Commit

Permalink
Move JoinType to datafusion_common (#6572)
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb authored Jun 7, 2023
1 parent 8f7f76d commit cbfb7c8
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 79 deletions.
98 changes: 98 additions & 0 deletions datafusion/common/src/join_type.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! [`JoinType`] and [`JoinConstraint`]

use std::{
fmt::{self, Display, Formatter},
str::FromStr,
};

use crate::{DataFusionError, Result};

/// Join type
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum JoinType {
/// Inner Join
Inner,
/// Left Join
Left,
/// Right Join
Right,
/// Full Join
Full,
/// Left Semi Join
LeftSemi,
/// Right Semi Join
RightSemi,
/// Left Anti Join
LeftAnti,
/// Right Anti Join
RightAnti,
}

impl JoinType {
pub fn is_outer(self) -> bool {
self == JoinType::Left || self == JoinType::Right || self == JoinType::Full
}
}

impl Display for JoinType {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
let join_type = match self {
JoinType::Inner => "Inner",
JoinType::Left => "Left",
JoinType::Right => "Right",
JoinType::Full => "Full",
JoinType::LeftSemi => "LeftSemi",
JoinType::RightSemi => "RightSemi",
JoinType::LeftAnti => "LeftAnti",
JoinType::RightAnti => "RightAnti",
};
write!(f, "{join_type}")
}
}

impl FromStr for JoinType {
type Err = DataFusionError;

fn from_str(s: &str) -> Result<Self> {
let s = s.to_uppercase();
match s.as_str() {
"INNER" => Ok(JoinType::Inner),
"LEFT" => Ok(JoinType::Left),
"RIGHT" => Ok(JoinType::Right),
"FULL" => Ok(JoinType::Full),
"LEFTSEMI" => Ok(JoinType::LeftSemi),
"RIGHTSEMI" => Ok(JoinType::RightSemi),
"LEFTANTI" => Ok(JoinType::LeftAnti),
"RIGHTANTI" => Ok(JoinType::RightAnti),
_ => Err(DataFusionError::NotImplemented(format!(
"The join type {s} does not exist or is not implemented"
))),
}
}
}

/// Join constraint
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum JoinConstraint {
/// Join ON
On,
/// Join USING
Using,
}
2 changes: 2 additions & 0 deletions datafusion/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub mod delta;
mod dfschema;
mod error;
pub mod from_slice;
mod join_type;
pub mod parsers;
#[cfg(feature = "pyarrow")]
mod pyarrow;
Expand All @@ -39,6 +40,7 @@ pub use error::{
field_not_found, unqualified_field_not_found, DataFusionError, Result, SchemaError,
SharedResult,
};
pub use join_type::{JoinConstraint, JoinType};
pub use scalar::{ScalarType, ScalarValue};
pub use schema_reference::{OwnedSchemaReference, SchemaReference};
pub use stats::{ColumnStatistics, Statistics};
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ use crate::arrow::array::BooleanBufferBuilder;
use crate::arrow::datatypes::TimeUnit;
use crate::error::{DataFusionError, Result};
use crate::execution::{context::TaskContext, memory_pool::MemoryConsumer};
use crate::logical_expr::JoinType;
use crate::physical_plan::joins::utils::{
adjust_indices_by_join_type, apply_join_filter_to_indices, build_batch_from_indices,
get_final_indices_from_bit_map, need_produce_result_in_final, JoinSide,
Expand All @@ -78,6 +77,7 @@ use crate::physical_plan::{
DisplayFormatType, Distribution, EquivalenceProperties, ExecutionPlan, Partitioning,
PhysicalExpr, RecordBatchStream, SendableRecordBatchStream, Statistics,
};
use datafusion_common::JoinType;

use super::{
utils::{OnceAsync, OnceFut},
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/physical_plan/joins/sort_merge_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ use futures::{Stream, StreamExt};

use crate::error::DataFusionError;
use crate::error::Result;
use crate::logical_expr::JoinType;
use crate::physical_plan::expressions::Column;
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::physical_plan::joins::utils::{
Expand All @@ -52,6 +51,7 @@ use crate::physical_plan::{
metrics, DisplayFormatType, Distribution, EquivalenceProperties, ExecutionPlan,
Partitioning, PhysicalExpr, RecordBatchStream, SendableRecordBatchStream, Statistics,
};
use datafusion_common::JoinType;
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
use datafusion_execution::TaskContext;

Expand Down Expand Up @@ -1396,7 +1396,6 @@ mod tests {

use crate::common::assert_contains;
use crate::error::Result;
use crate::logical_expr::JoinType;
use crate::physical_plan::expressions::Column;
use crate::physical_plan::joins::utils::JoinOn;
use crate::physical_plan::joins::SortMergeJoinExec;
Expand All @@ -1405,6 +1404,7 @@ mod tests {
use crate::prelude::{SessionConfig, SessionContext};
use crate::test::{build_table_i32, columns};
use crate::{assert_batches_eq, assert_batches_sorted_eq};
use datafusion_common::JoinType;
use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};

fn build_table(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ use datafusion_execution::memory_pool::MemoryConsumer;
use datafusion_physical_expr::intervals::{ExprIntervalGraph, Interval, IntervalBound};

use crate::error::{DataFusionError, Result};
use crate::logical_expr::JoinType;
use crate::physical_plan::common::SharedMemoryReservation;
use crate::physical_plan::joins::hash_join_utils::convert_sort_expr_with_filter_schema;
use crate::physical_plan::joins::hash_join_utils::JoinHashMap;
Expand All @@ -71,6 +70,7 @@ use crate::physical_plan::{
DisplayFormatType, Distribution, EquivalenceProperties, ExecutionPlan, Partitioning,
RecordBatchStream, SendableRecordBatchStream, Statistics,
};
use datafusion_common::JoinType;
use datafusion_execution::TaskContext;

const HASHMAP_SHRINK_SCALE_FACTOR: usize = 4;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/joins/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_physical_expr::{EquivalentClass, PhysicalExpr};

use crate::error::{DataFusionError, Result};
use crate::logical_expr::JoinType;
use crate::physical_plan::expressions::Column;
use datafusion_common::JoinType;

use crate::physical_plan::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder};
use crate::physical_plan::SchemaRef;
Expand Down
77 changes: 3 additions & 74 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,11 @@ use datafusion_common::{
use std::collections::{HashMap, HashSet};
use std::fmt::{self, Debug, Display, Formatter};
use std::hash::{Hash, Hasher};
use std::str::FromStr;
use std::sync::Arc;

// backwards compatible
pub use datafusion_common::{JoinConstraint, JoinType};

use super::DdlStatement;

/// A LogicalPlan represents the different types of relational
Expand Down Expand Up @@ -1128,79 +1130,6 @@ impl ToStringifiedPlan for LogicalPlan {
}
}

/// Join type
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum JoinType {
/// Inner Join
Inner,
/// Left Join
Left,
/// Right Join
Right,
/// Full Join
Full,
/// Left Semi Join
LeftSemi,
/// Right Semi Join
RightSemi,
/// Left Anti Join
LeftAnti,
/// Right Anti Join
RightAnti,
}

impl JoinType {
pub fn is_outer(self) -> bool {
self == JoinType::Left || self == JoinType::Right || self == JoinType::Full
}
}

impl Display for JoinType {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
let join_type = match self {
JoinType::Inner => "Inner",
JoinType::Left => "Left",
JoinType::Right => "Right",
JoinType::Full => "Full",
JoinType::LeftSemi => "LeftSemi",
JoinType::RightSemi => "RightSemi",
JoinType::LeftAnti => "LeftAnti",
JoinType::RightAnti => "RightAnti",
};
write!(f, "{join_type}")
}
}

impl FromStr for JoinType {
type Err = DataFusionError;

fn from_str(s: &str) -> Result<Self> {
let s = s.to_uppercase();
match s.as_str() {
"INNER" => Ok(JoinType::Inner),
"LEFT" => Ok(JoinType::Left),
"RIGHT" => Ok(JoinType::Right),
"FULL" => Ok(JoinType::Full),
"LEFTSEMI" => Ok(JoinType::LeftSemi),
"RIGHTSEMI" => Ok(JoinType::RightSemi),
"LEFTANTI" => Ok(JoinType::LeftAnti),
"RIGHTANTI" => Ok(JoinType::RightAnti),
_ => Err(DataFusionError::NotImplemented(format!(
"The join type {s} does not exist or is not implemented"
))),
}
}
}

/// Join constraint
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum JoinConstraint {
/// Join ON
On,
/// Join USING
Using,
}

/// Produces no rows: An empty relation with an empty schema
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct EmptyRelation {
Expand Down

0 comments on commit cbfb7c8

Please sign in to comment.