Skip to content

Commit

Permalink
feat(query): Support Computed columns (databendlabs#11391)
Browse files Browse the repository at this point in the history
* feat(query): Support Computed columns

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix test

* fix

* fix

* fix

* fix

---------

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
b41sh and mergify[bot] authored May 15, 2023
1 parent a5070bf commit f530b1e
Show file tree
Hide file tree
Showing 73 changed files with 1,543 additions and 372 deletions.
15 changes: 3 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/common/io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ ordered-float = { workspace = true }
serde = { workspace = true }

[dev-dependencies]
aho-corasick = { version = "0.7.20" }
aho-corasick = { version = "1.0.1" }
rand = "0.8.5"
4 changes: 2 additions & 2 deletions src/common/io/tests/it/cursor_ext/fast_read_text_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ fn test_positions() -> Result<()> {
];

let patterns = &["'", "\\"];
let ac = AhoCorasick::new(patterns);
let ac = AhoCorasick::new(patterns).unwrap();
for (data, expect) in cases {
let mut positions = VecDeque::new();
for mat in ac.find_iter(&data) {
Expand All @@ -47,7 +47,7 @@ fn test_positions() -> Result<()> {
fn test_fast_read_text() -> Result<()> {
let data = r#"'abc','d\'ef','g\\\'hi'"#.to_string();
let patterns = &["'", "\\"];
let ac = AhoCorasick::new(patterns);
let ac = AhoCorasick::new(patterns).unwrap();
let mut positions = VecDeque::new();
for mat in ac.find_iter(&data) {
let pos = mat.start();
Expand Down
56 changes: 55 additions & 1 deletion src/meta/proto-conv/src/schema_from_to_protobuf_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,30 +71,84 @@ impl FromToProto for ex::TableField {
fn from_pb(p: pb::DataField) -> Result<Self, Incompatible> {
reader_check_msg(p.ver, p.min_reader_ver)?;

let computed_expr = match p.computed_expr {
Some(computed_expr) => Some(ex::ComputedExpr::from_pb(computed_expr)?),
None => None,
};

let v = ex::TableField::new_from_column_id(
&p.name,
ex::TableDataType::from_pb(p.data_type.ok_or_else(|| Incompatible {
reason: "DataField.data_type can not be None".to_string(),
})?)?,
p.column_id,
)
.with_default_expr(p.default_expr);
.with_default_expr(p.default_expr)
.with_computed_expr(computed_expr);
Ok(v)
}

fn to_pb(&self) -> Result<pb::DataField, Incompatible> {
let computed_expr = match self.computed_expr() {
Some(computed_expr) => Some(computed_expr.to_pb()?),
None => None,
};
let p = pb::DataField {
ver: VER,
min_reader_ver: MIN_READER_VER,
name: self.name().clone(),
default_expr: self.default_expr().cloned(),
data_type: Some(self.data_type().to_pb()?),
column_id: self.column_id(),
computed_expr,
};
Ok(p)
}
}

impl FromToProto for ex::ComputedExpr {
type PB = pb::ComputedExpr;

fn get_pb_ver(p: &Self::PB) -> u64 {
p.ver
}

fn from_pb(p: pb::ComputedExpr) -> Result<Self, Incompatible> {
reader_check_msg(p.ver, p.min_reader_ver)?;

let computed_expr = match p.computed_expr {
None => {
return Err(Incompatible {
reason: "Invalid ComputedExpr: .computed_expr can not be None".to_string(),
});
}
Some(x) => x,
};

let x = match computed_expr {
pb::computed_expr::ComputedExpr::Virtual(expr) => Self::Virtual(expr),
pb::computed_expr::ComputedExpr::Stored(expr) => Self::Stored(expr),
};
Ok(x)
}

fn to_pb(&self) -> Result<pb::ComputedExpr, Incompatible> {
let x = match self {
ex::ComputedExpr::Virtual(expr) => {
pb::computed_expr::ComputedExpr::Virtual(expr.clone())
}
ex::ComputedExpr::Stored(expr) => pb::computed_expr::ComputedExpr::Stored(expr.clone()),
};

Ok(pb::ComputedExpr {
ver: VER,
min_reader_ver: MIN_READER_VER,

computed_expr: Some(x),
})
}
}

impl FromToProto for ex::TableDataType {
type PB = pb::DataType;
fn get_pb_ver(p: &Self::PB) -> u64 {
Expand Down
1 change: 1 addition & 0 deletions src/meta/proto-conv/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ const META_CHANGE_LOG: &[(u64, &str)] = &[
(33, "2023-04-13: Update: add `shared_by` field into TableMeta", ),
(34, "2023-04-23: Add: metadata.proto/DataType Bitmap type", ),
(35, "2023-05-08: Add: CopyOptions::disable_variant_check", ),
(36, "2023-05-12: Add: metadata.proto/ComputedExpr", ),
// Dear developer:
// If you're gonna add a new metadata version, you'll have to add a test for it.
// You could just copy an existing test file(e.g., `../tests/it/v024_table_meta.rs`)
Expand Down
1 change: 1 addition & 0 deletions src/meta/proto-conv/tests/it/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,4 @@ mod v032_file_format_params;
mod v033_table_meta;
mod v034_schema;
mod v035_user_stage;
mod v036_table_meta;
109 changes: 109 additions & 0 deletions src/meta/proto-conv/tests/it/v036_table_meta.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright 2023 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use chrono::TimeZone;
use chrono::Utc;
use common_expression as ce;
use common_expression::types::NumberDataType;
use common_expression::ComputedExpr;
use common_meta_app::schema as mt;
use maplit::btreemap;
use maplit::btreeset;

use crate::common;

// These bytes are built when a new version in introduced,
// and are kept for backward compatibility test.
//
// *************************************************************
// * These messages should never be updated, *
// * only be added when a new version is added, *
// * or be removed when an old version is no longer supported. *
// *************************************************************
//
// The message bytes are built from the output of `test_build_pb_buf()`
#[test]
fn test_decode_v36_table_meta() -> anyhow::Result<()> {
let bytes = vec![
10, 223, 1, 10, 51, 10, 8, 110, 117, 108, 108, 97, 98, 108, 101, 18, 5, 97, 32, 43, 32, 51,
26, 26, 178, 2, 17, 154, 2, 8, 42, 0, 160, 6, 36, 168, 6, 24, 160, 6, 36, 168, 6, 24, 160,
6, 36, 168, 6, 24, 160, 6, 36, 168, 6, 24, 10, 27, 10, 6, 115, 116, 114, 105, 110, 103, 26,
9, 146, 2, 0, 160, 6, 36, 168, 6, 24, 32, 1, 160, 6, 36, 168, 6, 24, 10, 62, 10, 14, 118,
105, 114, 116, 117, 97, 108, 95, 115, 116, 114, 105, 110, 103, 26, 9, 146, 2, 0, 160, 6,
36, 168, 6, 24, 32, 2, 42, 25, 10, 17, 116, 111, 95, 98, 97, 115, 101, 54, 52, 40, 115,
116, 114, 105, 110, 103, 41, 160, 6, 36, 168, 6, 24, 160, 6, 36, 168, 6, 24, 10, 59, 10,
13, 115, 116, 111, 114, 101, 100, 95, 115, 116, 114, 105, 110, 103, 26, 9, 146, 2, 0, 160,
6, 36, 168, 6, 24, 32, 3, 42, 23, 18, 15, 114, 101, 118, 101, 114, 115, 101, 40, 115, 116,
114, 105, 110, 103, 41, 160, 6, 36, 168, 6, 24, 160, 6, 36, 168, 6, 24, 18, 6, 10, 1, 97,
18, 1, 98, 24, 4, 160, 6, 36, 168, 6, 24, 34, 10, 40, 97, 32, 43, 32, 50, 44, 32, 98, 41,
42, 10, 10, 3, 120, 121, 122, 18, 3, 102, 111, 111, 50, 2, 52, 52, 58, 10, 10, 3, 97, 98,
99, 18, 3, 100, 101, 102, 64, 0, 74, 10, 40, 97, 32, 43, 32, 50, 44, 32, 98, 41, 82, 7,
100, 101, 102, 97, 117, 108, 116, 162, 1, 23, 50, 48, 49, 52, 45, 49, 49, 45, 50, 56, 32,
49, 50, 58, 48, 48, 58, 48, 57, 32, 85, 84, 67, 170, 1, 23, 50, 48, 49, 52, 45, 49, 49, 45,
50, 57, 32, 49, 50, 58, 48, 48, 58, 49, 48, 32, 85, 84, 67, 178, 1, 13, 116, 97, 98, 108,
101, 95, 99, 111, 109, 109, 101, 110, 116, 186, 1, 6, 160, 6, 36, 168, 6, 24, 202, 1, 1,
99, 202, 1, 1, 99, 202, 1, 1, 99, 202, 1, 1, 99, 202, 1, 1, 99, 202, 1, 1, 99, 202, 1, 1,
99, 202, 1, 1, 99, 202, 1, 1, 99, 202, 1, 1, 99, 202, 1, 1, 99, 202, 1, 1, 99, 202, 1, 1,
99, 202, 1, 1, 99, 202, 1, 1, 99, 202, 1, 1, 99, 202, 1, 1, 99, 202, 1, 1, 99, 202, 1, 1,
99, 202, 1, 1, 99, 202, 1, 1, 99, 226, 1, 1, 1, 160, 6, 36, 168, 6, 24,
];

let want = || mt::TableMeta {
schema: Arc::new(ce::TableSchema::new_from(
vec![
ce::TableField::new(
"nullable",
ce::TableDataType::Nullable(Box::new(ce::TableDataType::Number(
NumberDataType::Int8,
))),
)
.with_default_expr(Some("a + 3".to_string())),
ce::TableField::new("string", ce::TableDataType::String),
ce::TableField::new("virtual_string", ce::TableDataType::String)
.with_computed_expr(Some(ComputedExpr::Virtual(
"to_base64(string)".to_string(),
))),
ce::TableField::new("stored_string", ce::TableDataType::String)
.with_computed_expr(Some(ComputedExpr::Stored("reverse(string)".to_string()))),
],
btreemap! {s("a") => s("b")},
)),
catalog: "default".to_string(),
engine: "44".to_string(),
storage_params: None,
part_prefix: "".to_string(),
engine_options: btreemap! {s("abc") => s("def")},
options: btreemap! {s("xyz") => s("foo")},
default_cluster_key: Some("(a + 2, b)".to_string()),
cluster_keys: vec!["(a + 2, b)".to_string()],
default_cluster_key_id: Some(0),
created_on: Utc.with_ymd_and_hms(2014, 11, 28, 12, 0, 9).unwrap(),
updated_on: Utc.with_ymd_and_hms(2014, 11, 29, 12, 0, 10).unwrap(),
comment: s("table_comment"),
field_comments: vec!["c".to_string(); 21],
drop_on: None,
statistics: Default::default(),
shared_by: btreeset! {1},
};

common::test_load_old(func_name!(), bytes.as_slice(), 36, want())?;
common::test_pb_from_to(func_name!(), want())?;
Ok(())
}

fn s(ss: impl ToString) -> String {
ss.to_string()
}
12 changes: 12 additions & 0 deletions src/meta/protos/proto/metadata.proto
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@ message DataSchema {
uint32 next_column_id = 3;
}

message ComputedExpr {
uint64 ver = 100;
uint64 min_reader_ver = 101;

oneof computed_expr {
string virtual = 1;
string stored = 2;
}
}

// One field, AKA column
message DataField {
uint64 ver = 100;
Expand All @@ -48,4 +58,6 @@ message DataField {
DataType data_type = 3;

uint32 column_id = 4;

optional ComputedExpr computed_expr = 5;
}
30 changes: 27 additions & 3 deletions src/query/ast/src/ast/statements/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -578,11 +578,35 @@ impl Display for OptimizeTableAction {
}
}

#[derive(Debug, Clone, PartialEq)]
pub enum ColumnExpr {
Default(Box<Expr>),
Virtual(Box<Expr>),
Stored(Box<Expr>),
}

impl Display for ColumnExpr {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
match self {
ColumnExpr::Default(expr) => {
write!(f, " DEFAULT {expr}")?;
}
ColumnExpr::Virtual(expr) => {
write!(f, " AS ({expr}) VIRTUAL")?;
}
ColumnExpr::Stored(expr) => {
write!(f, " AS ({expr}) STORED")?;
}
}
Ok(())
}
}

#[derive(Debug, Clone, PartialEq)]
pub struct ColumnDefinition {
pub name: Identifier,
pub data_type: TypeName,
pub default_expr: Option<Box<Expr>>,
pub expr: Option<ColumnExpr>,
pub comment: Option<String>,
}

Expand All @@ -594,8 +618,8 @@ impl Display for ColumnDefinition {
write!(f, " NOT NULL")?;
}

if let Some(default_expr) = &self.default_expr {
write!(f, " DEFAULT {default_expr}")?;
if let Some(expr) = &self.expr {
write!(f, "{expr}")?;
}
if let Some(comment) = &self.comment {
write!(f, " COMMENT '{comment}'")?;
Expand Down
Loading

0 comments on commit f530b1e

Please sign in to comment.