Skip to content

Commit

Permalink
Merge pull request #5426 from zhyass/feature_cluster_table
Browse files Browse the repository at this point in the history
feat(fuse): add system$clustering_information function
  • Loading branch information
BohuTANG authored May 20, 2022
2 parents e0b72be + 407e2ff commit d98a162
Show file tree
Hide file tree
Showing 25 changed files with 1,141 additions and 57 deletions.
72 changes: 70 additions & 2 deletions common/datavalues/src/data_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,20 @@
// Borrow from apache/arrow/rust/datafusion/src/functions.rs
// See notice.md

use std::cmp::Ordering;
use std::fmt;
use std::sync::Arc;

use common_exception::ErrorCode;
use common_exception::Result;
use common_macros::MallocSizeOf;
use ordered_float::OrderedFloat;
use serde_json::json;

use crate::prelude::*;

/// A specific value of a data type.
#[derive(serde::Serialize, serde::Deserialize, Clone, PartialEq, PartialOrd)]
#[derive(serde::Serialize, serde::Deserialize, Clone, PartialEq)]
pub enum DataValue {
/// Base type.
Null,
Expand All @@ -44,6 +46,8 @@ pub enum DataValue {
Variant(VariantValue),
}

impl Eq for DataValue {}

#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, MallocSizeOf)]
pub enum ValueType {
Null,
Expand Down Expand Up @@ -271,7 +275,71 @@ impl DataValue {
}
}

impl Eq for DataValue {}
impl PartialOrd for DataValue {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

impl Ord for DataValue {
fn cmp(&self, other: &Self) -> Ordering {
if self.value_type() == other.value_type() {
return match (self, other) {
(DataValue::Null, DataValue::Null) => Ordering::Equal,
(DataValue::Boolean(v1), DataValue::Boolean(v2)) => v1.cmp(v2),
(DataValue::UInt64(v1), DataValue::UInt64(v2)) => v1.cmp(v2),
(DataValue::Int64(v1), DataValue::Int64(v2)) => v1.cmp(v2),
(DataValue::Float64(v1), DataValue::Float64(v2)) => {
OrderedFloat::from(*v1).cmp(&OrderedFloat::from(*v2))
}
(DataValue::String(v1), DataValue::String(v2)) => v1.cmp(v2),
(DataValue::Array(v1), DataValue::Array(v2)) => {
for (l, r) in v1.iter().zip(v2) {
let cmp = l.cmp(r);
if cmp != Ordering::Equal {
return cmp;
}
}
v1.len().cmp(&v2.len())
}
(DataValue::Struct(v1), DataValue::Struct(v2)) => {
for (l, r) in v1.iter().zip(v2.iter()) {
let cmp = l.cmp(r);
if cmp != Ordering::Equal {
return cmp;
}
}
v1.len().cmp(&v2.len())
}
(DataValue::Variant(v1), DataValue::Variant(v2)) => v1.cmp(v2),
_ => unreachable!(),
};
}

if self.is_null() {
return Ordering::Less;
}

if other.is_null() {
return Ordering::Greater;
}

if !self.is_numeric() || !other.is_numeric() {
panic!(
"Cannot compare different types with {:?} and {:?}",
self.value_type(),
other.value_type()
);
}

if self.is_float() || other.is_float() {
return OrderedFloat::from(self.as_f64().unwrap())
.cmp(&OrderedFloat::from(other.as_f64().unwrap()));
}

self.as_i64().unwrap().cmp(&other.as_i64().unwrap())
}
}

// Did not use std::convert:TryFrom
// Because we do not need custom type error.
Expand Down
1 change: 1 addition & 0 deletions common/exception/src/exception_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ build_exceptions! {
InvalidTimezone(1067),
InvalidDate(1068),
InvalidTimestamp(1069),
InvalidClusterKeys(1070),

// Uncategorized error codes.
UnexpectedResponseType(1066),
Expand Down
72 changes: 72 additions & 0 deletions query/src/procedures/systems/clustering_information.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use common_datablocks::DataBlock;
use common_datavalues::DataSchema;
use common_exception::Result;

use crate::procedures::Procedure;
use crate::procedures::ProcedureFeatures;
use crate::sessions::QueryContext;
use crate::storages::fuse::table_functions::get_cluster_keys;
use crate::storages::fuse::table_functions::ClusteringInformation;
use crate::storages::fuse::FuseTable;

pub struct ClusteringInformationProcedure {}

impl ClusteringInformationProcedure {
pub fn try_create() -> Result<Box<dyn Procedure>> {
Ok(Box::new(ClusteringInformationProcedure {}))
}
}

#[async_trait::async_trait]
impl Procedure for ClusteringInformationProcedure {
fn name(&self) -> &str {
"CLUSTERING_INFORMATION"
}

fn features(&self) -> ProcedureFeatures {
// Todo(zhyass): ProcedureFeatures::default().variadic_arguments(2, 3)
ProcedureFeatures::default().num_arguments(2)
}

async fn inner_eval(&self, ctx: Arc<QueryContext>, args: Vec<String>) -> Result<DataBlock> {
let database_name = args[0].clone();
let table_name = args[1].clone();
let tenant_id = ctx.get_tenant();
let tbl = ctx
.get_catalog(ctx.get_current_catalog())?
.get_table(
tenant_id.as_str(),
database_name.as_str(),
table_name.as_str(),
)
.await?;

let tbl = FuseTable::try_from_table(tbl.as_ref())?;
let definition = if args.len() > 2 { &args[2] } else { "" };
let cluster_keys = get_cluster_keys(tbl, definition)?;

Ok(ClusteringInformation::new(ctx, tbl, cluster_keys)
.get_clustering_info()
.await?)
}

fn schema(&self) -> Arc<DataSchema> {
ClusteringInformation::schema()
}
}
8 changes: 1 addition & 7 deletions query/src/procedures/systems/fuse_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use std::sync::Arc;

use common_datablocks::DataBlock;
use common_datavalues::DataSchema;
use common_exception::ErrorCode;
use common_exception::Result;

use crate::procedures::Procedure;
Expand Down Expand Up @@ -57,12 +56,7 @@ impl Procedure for FuseSegmentProcedure {
)
.await?;

let tbl = tbl.as_any().downcast_ref::<FuseTable>().ok_or_else(|| {
ErrorCode::BadArguments(format!(
"expecting fuse table, but got table of engine type: {}",
tbl.get_table_info().meta.engine
))
})?;
let tbl = FuseTable::try_from_table(tbl.as_ref())?;

Ok(FuseSegment::new(ctx, tbl, snapshot_id)
.get_segments()
Expand Down
8 changes: 1 addition & 7 deletions query/src/procedures/systems/fuse_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use std::sync::Arc;

use common_datablocks::DataBlock;
use common_datavalues::DataSchema;
use common_exception::ErrorCode;
use common_exception::Result;

use crate::procedures::Procedure;
Expand Down Expand Up @@ -57,12 +56,7 @@ impl Procedure for FuseSnapshotProcedure {
)
.await?;

let tbl = tbl.as_any().downcast_ref::<FuseTable>().ok_or_else(|| {
ErrorCode::BadArguments(format!(
"expecting fuse table, but got table of engine type: {}",
tbl.get_table_info().meta.engine
))
})?;
let tbl = FuseTable::try_from_table(tbl.as_ref())?;

Ok(FuseSnapshot::new(ctx, tbl).get_history().await?)
}
Expand Down
2 changes: 2 additions & 0 deletions query/src/procedures/systems/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod clustering_information;
mod fuse_segment;
mod fuse_snapshot;
mod system;

pub use clustering_information::ClusteringInformationProcedure;
pub use fuse_segment::FuseSegmentProcedure;
pub use fuse_snapshot::FuseSnapshotProcedure;
pub use system::SystemProcedure;
5 changes: 5 additions & 0 deletions query/src/procedures/systems/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::procedures::systems::ClusteringInformationProcedure;
use crate::procedures::systems::FuseSegmentProcedure;
use crate::procedures::systems::FuseSnapshotProcedure;
use crate::procedures::ProcedureFactory;
Expand All @@ -20,6 +21,10 @@ pub struct SystemProcedure;

impl SystemProcedure {
pub fn register(factory: &mut ProcedureFactory) {
factory.register(
"system$clustering_information",
Box::new(ClusteringInformationProcedure::try_create),
);
factory.register(
"system$fuse_snapshot",
Box::new(FuseSnapshotProcedure::try_create),
Expand Down
4 changes: 4 additions & 0 deletions query/src/storages/fuse/fuse_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ impl FuseTable {
&self.meta_location_generator
}

pub fn cluster_keys(&self) -> Vec<Expression> {
self.order_keys.clone()
}

pub fn parse_storage_prefix(table_info: &TableInfo) -> Result<String> {
let table_id = table_info.ident.table_id;
let db_id = table_info
Expand Down
40 changes: 16 additions & 24 deletions query/src/storages/fuse/statistics/reducers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,14 @@ pub fn reduce_block_stats<T: Borrow<ColumnsStatistics>>(stats: &[T]) -> Result<C
let min = min_stats
.iter()
.filter(|s| !s.is_null())
.min_by(|&x, &y| x.partial_cmp(y).unwrap_or(Ordering::Equal))
.min_by(|&x, &y| x.cmp(y))
.cloned()
.unwrap_or(DataValue::Null);

let max = max_stats
.iter()
.filter(|s| !s.is_null())
.max_by(|&x, &y| x.partial_cmp(y).unwrap_or(Ordering::Equal))
.max_by(|&x, &y| x.cmp(y))
.cloned()
.unwrap_or(DataValue::Null);

Expand Down Expand Up @@ -113,18 +113,14 @@ pub fn reduce_cluster_stats<T: Borrow<Option<ClusterStatistics>>>(
break;
}
(_, true) => break,
_ => {
if let Some(cmp) = l.partial_cmp(r) {
match cmp {
Ordering::Equal => continue,
Ordering::Less => break,
Ordering::Greater => {
min = stat.min.clone();
break;
}
}
_ => match l.cmp(r) {
Ordering::Equal => continue,
Ordering::Less => break,
Ordering::Greater => {
min = stat.min.clone();
break;
}
}
},
}
}

Expand All @@ -135,18 +131,14 @@ pub fn reduce_cluster_stats<T: Borrow<Option<ClusterStatistics>>>(
break;
}
(_, true) => break,
_ => {
if let Some(cmp) = l.partial_cmp(r) {
match cmp {
Ordering::Equal => continue,
Ordering::Less => {
max = stat.max.clone();
break;
}
Ordering::Greater => break,
}
_ => match l.cmp(r) {
Ordering::Equal => continue,
Ordering::Less => {
max = stat.max.clone();
break;
}
}
Ordering::Greater => break,
},
}
}
}
Expand Down
Loading

0 comments on commit d98a162

Please sign in to comment.