Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(fuse): add system$clustering_information function #5426

Merged
merged 15 commits into from
May 20, 2022
72 changes: 70 additions & 2 deletions common/datavalues/src/data_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,20 @@
// Borrow from apache/arrow/rust/datafusion/src/functions.rs
// See notice.md

use std::cmp::Ordering;
use std::fmt;
use std::sync::Arc;

use common_exception::ErrorCode;
use common_exception::Result;
use common_macros::MallocSizeOf;
use ordered_float::OrderedFloat;
use serde_json::json;

use crate::prelude::*;

/// A specific value of a data type.
#[derive(serde::Serialize, serde::Deserialize, Clone, PartialEq, PartialOrd)]
#[derive(serde::Serialize, serde::Deserialize, Clone, PartialEq)]
pub enum DataValue {
/// Base type.
Null,
Expand All @@ -44,6 +46,8 @@ pub enum DataValue {
Variant(VariantValue),
}

impl Eq for DataValue {}

#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, MallocSizeOf)]
pub enum ValueType {
Null,
Expand Down Expand Up @@ -271,7 +275,71 @@ impl DataValue {
}
}

impl Eq for DataValue {}
impl PartialOrd for DataValue {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

impl Ord for DataValue {
fn cmp(&self, other: &Self) -> Ordering {
if self.value_type() == other.value_type() {
return match (self, other) {
(DataValue::Null, DataValue::Null) => Ordering::Equal,
(DataValue::Boolean(v1), DataValue::Boolean(v2)) => v1.cmp(v2),
(DataValue::UInt64(v1), DataValue::UInt64(v2)) => v1.cmp(v2),
(DataValue::Int64(v1), DataValue::Int64(v2)) => v1.cmp(v2),
(DataValue::Float64(v1), DataValue::Float64(v2)) => {
OrderedFloat::from(*v1).cmp(&OrderedFloat::from(*v2))
}
(DataValue::String(v1), DataValue::String(v2)) => v1.cmp(v2),
(DataValue::Array(v1), DataValue::Array(v2)) => {
for (l, r) in v1.iter().zip(v2) {
let cmp = l.cmp(r);
if cmp != Ordering::Equal {
return cmp;
}
}
v1.len().cmp(&v2.len())
}
(DataValue::Struct(v1), DataValue::Struct(v2)) => {
for (l, r) in v1.iter().zip(v2.iter()) {
let cmp = l.cmp(r);
if cmp != Ordering::Equal {
return cmp;
}
}
v1.len().cmp(&v2.len())
}
(DataValue::Variant(v1), DataValue::Variant(v2)) => v1.cmp(v2),
_ => unreachable!(),
};
}

if self.is_null() {
return Ordering::Less;
}

if other.is_null() {
return Ordering::Greater;
}

if !self.is_numeric() || !other.is_numeric() {
panic!(
"Cannot compare different types with {:?} and {:?}",
self.value_type(),
other.value_type()
);
}

if self.is_float() || other.is_float() {
return OrderedFloat::from(self.as_f64().unwrap())
.cmp(&OrderedFloat::from(other.as_f64().unwrap()));
}

self.as_i64().unwrap().cmp(&other.as_i64().unwrap())
}
}

// Did not use std::convert:TryFrom
// Because we do not need custom type error.
Expand Down
1 change: 1 addition & 0 deletions common/exception/src/exception_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ build_exceptions! {
InvalidTimezone(1067),
InvalidDate(1068),
InvalidTimestamp(1069),
InvalidClusterKeys(1070),

// Uncategorized error codes.
UnexpectedResponseType(1066),
Expand Down
72 changes: 72 additions & 0 deletions query/src/procedures/systems/clustering_information.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use common_datablocks::DataBlock;
use common_datavalues::DataSchema;
use common_exception::Result;

use crate::procedures::Procedure;
use crate::procedures::ProcedureFeatures;
use crate::sessions::QueryContext;
use crate::storages::fuse::table_functions::get_cluster_keys;
use crate::storages::fuse::table_functions::ClusteringInformation;
use crate::storages::fuse::FuseTable;

pub struct ClusteringInformationProcedure {}

impl ClusteringInformationProcedure {
pub fn try_create() -> Result<Box<dyn Procedure>> {
Ok(Box::new(ClusteringInformationProcedure {}))
}
}

#[async_trait::async_trait]
impl Procedure for ClusteringInformationProcedure {
fn name(&self) -> &str {
"CLUSTERING_INFORMATION"
}

fn features(&self) -> ProcedureFeatures {
// Todo(zhyass): ProcedureFeatures::default().variadic_arguments(2, 3)
ProcedureFeatures::default().num_arguments(2)
}

async fn inner_eval(&self, ctx: Arc<QueryContext>, args: Vec<String>) -> Result<DataBlock> {
let database_name = args[0].clone();
let table_name = args[1].clone();
let tenant_id = ctx.get_tenant();
let tbl = ctx
.get_catalog(ctx.get_current_catalog())?
.get_table(
tenant_id.as_str(),
database_name.as_str(),
table_name.as_str(),
)
.await?;

let tbl = FuseTable::try_from_table(tbl.as_ref())?;
let definition = if args.len() > 2 { &args[2] } else { "" };
let cluster_keys = get_cluster_keys(tbl, definition)?;

Ok(ClusteringInformation::new(ctx, tbl, cluster_keys)
.get_clustering_info()
.await?)
}

fn schema(&self) -> Arc<DataSchema> {
ClusteringInformation::schema()
}
}
8 changes: 1 addition & 7 deletions query/src/procedures/systems/fuse_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use std::sync::Arc;

use common_datablocks::DataBlock;
use common_datavalues::DataSchema;
use common_exception::ErrorCode;
use common_exception::Result;

use crate::procedures::Procedure;
Expand Down Expand Up @@ -57,12 +56,7 @@ impl Procedure for FuseSegmentProcedure {
)
.await?;

let tbl = tbl.as_any().downcast_ref::<FuseTable>().ok_or_else(|| {
ErrorCode::BadArguments(format!(
"expecting fuse table, but got table of engine type: {}",
tbl.get_table_info().meta.engine
))
})?;
let tbl = FuseTable::try_from_table(tbl.as_ref())?;

Ok(FuseSegment::new(ctx, tbl, snapshot_id)
.get_segments()
Expand Down
8 changes: 1 addition & 7 deletions query/src/procedures/systems/fuse_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use std::sync::Arc;

use common_datablocks::DataBlock;
use common_datavalues::DataSchema;
use common_exception::ErrorCode;
use common_exception::Result;

use crate::procedures::Procedure;
Expand Down Expand Up @@ -57,12 +56,7 @@ impl Procedure for FuseSnapshotProcedure {
)
.await?;

let tbl = tbl.as_any().downcast_ref::<FuseTable>().ok_or_else(|| {
ErrorCode::BadArguments(format!(
"expecting fuse table, but got table of engine type: {}",
tbl.get_table_info().meta.engine
))
})?;
let tbl = FuseTable::try_from_table(tbl.as_ref())?;

Ok(FuseSnapshot::new(ctx, tbl).get_history().await?)
}
Expand Down
2 changes: 2 additions & 0 deletions query/src/procedures/systems/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod clustering_information;
mod fuse_segment;
mod fuse_snapshot;
mod system;

pub use clustering_information::ClusteringInformationProcedure;
pub use fuse_segment::FuseSegmentProcedure;
pub use fuse_snapshot::FuseSnapshotProcedure;
pub use system::SystemProcedure;
5 changes: 5 additions & 0 deletions query/src/procedures/systems/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::procedures::systems::ClusteringInformationProcedure;
use crate::procedures::systems::FuseSegmentProcedure;
use crate::procedures::systems::FuseSnapshotProcedure;
use crate::procedures::ProcedureFactory;
Expand All @@ -20,6 +21,10 @@ pub struct SystemProcedure;

impl SystemProcedure {
pub fn register(factory: &mut ProcedureFactory) {
factory.register(
"system$clustering_information",
Box::new(ClusteringInformationProcedure::try_create),
);
factory.register(
"system$fuse_snapshot",
Box::new(FuseSnapshotProcedure::try_create),
Expand Down
4 changes: 4 additions & 0 deletions query/src/storages/fuse/fuse_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ impl FuseTable {
&self.meta_location_generator
}

pub fn cluster_keys(&self) -> Vec<Expression> {
self.order_keys.clone()
}

pub fn parse_storage_prefix(table_info: &TableInfo) -> Result<String> {
let table_id = table_info.ident.table_id;
let db_id = table_info
Expand Down
40 changes: 16 additions & 24 deletions query/src/storages/fuse/statistics/reducers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,14 @@ pub fn reduce_block_stats<T: Borrow<ColumnsStatistics>>(stats: &[T]) -> Result<C
let min = min_stats
.iter()
.filter(|s| !s.is_null())
.min_by(|&x, &y| x.partial_cmp(y).unwrap_or(Ordering::Equal))
.min_by(|&x, &y| x.cmp(y))
.cloned()
.unwrap_or(DataValue::Null);

let max = max_stats
.iter()
.filter(|s| !s.is_null())
.max_by(|&x, &y| x.partial_cmp(y).unwrap_or(Ordering::Equal))
.max_by(|&x, &y| x.cmp(y))
.cloned()
.unwrap_or(DataValue::Null);

Expand Down Expand Up @@ -113,18 +113,14 @@ pub fn reduce_cluster_stats<T: Borrow<Option<ClusterStatistics>>>(
break;
}
(_, true) => break,
_ => {
if let Some(cmp) = l.partial_cmp(r) {
match cmp {
Ordering::Equal => continue,
Ordering::Less => break,
Ordering::Greater => {
min = stat.min.clone();
break;
}
}
_ => match l.cmp(r) {
Ordering::Equal => continue,
Ordering::Less => break,
Ordering::Greater => {
min = stat.min.clone();
break;
}
}
},
}
}

Expand All @@ -135,18 +131,14 @@ pub fn reduce_cluster_stats<T: Borrow<Option<ClusterStatistics>>>(
break;
}
(_, true) => break,
_ => {
if let Some(cmp) = l.partial_cmp(r) {
match cmp {
Ordering::Equal => continue,
Ordering::Less => {
max = stat.max.clone();
break;
}
Ordering::Greater => break,
}
_ => match l.cmp(r) {
Ordering::Equal => continue,
Ordering::Less => {
max = stat.max.clone();
break;
}
}
Ordering::Greater => break,
},
}
}
}
Expand Down
Loading