Skip to content

Commit

Permalink
feat: base implementation of default value (apache#246)
Browse files Browse the repository at this point in the history
* feat: base implemment of default value

* Add some unit tests

* Make lint happy

* replace CastExpr with TryCastExpr when filling default value

* Update df_operator/src/visitor.rs

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>

* Update interpreters/src/insert.rs

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>

* Update sql/src/plan.rs

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>

* Add integration tests

* Fix minor comments

* Fix integration test

* Merge remote-tracking branch 'upstream/main' into simple-default-value

* Fix minor comments

* Improve conversion from Output to inner records

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
  • Loading branch information
ygf11 and waynexia authored Sep 21, 2022
1 parent f1e853e commit d6e9eff
Show file tree
Hide file tree
Showing 25 changed files with 769 additions and 89 deletions.
8 changes: 6 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion common_types/src/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,6 @@ impl ColumnBlock {
DatumKind::from_data_type(array.data_type()).with_context(|| UnsupportedArray {
data_type: array.data_type().clone(),
})?;

Self::try_from_arrow_array_ref(&datum_kind, array)
}

Expand Down
48 changes: 43 additions & 5 deletions common_types/src/column_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{collections::BTreeMap, convert::TryFrom, str::FromStr};
use arrow_deps::arrow::datatypes::{DataType, Field};
use proto::common as common_pb;
use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu};
use sqlparser::ast::Expr;

use crate::datum::DatumKind;

Expand Down Expand Up @@ -61,6 +62,15 @@ pub enum Error {
source: Box<dyn std::error::Error + Send + Sync>,
backtrace: Backtrace,
},
#[snafu(display(
"Can not deserialize default-value-option from pb data, err:{}.\nBacktrace:\n{}",
source,
backtrace
))]
InvalidDefaultValueData {
source: serde_json::error::Error,
backtrace: Backtrace,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -150,6 +160,8 @@ pub struct ColumnSchema {
pub comment: String,
/// Column name in response
pub escaped_name: String,
/// Default value expr
pub default_value: Option<Expr>,
}

impl ColumnSchema {
Expand Down Expand Up @@ -187,6 +199,10 @@ impl ColumnSchema {
column_schema.set_is_tag(self.is_tag);
column_schema.set_comment(self.comment.clone());

if let Some(default_value) = &self.default_value {
column_schema.set_default_value(serde_json::to_vec(default_value).unwrap());
}

column_schema
}

Expand Down Expand Up @@ -250,18 +266,30 @@ impl ColumnSchema {
}
}

impl From<common_pb::ColumnSchema> for ColumnSchema {
fn from(column_schema: common_pb::ColumnSchema) -> Self {
impl TryFrom<common_pb::ColumnSchema> for ColumnSchema {
type Error = Error;

fn try_from(column_schema: common_pb::ColumnSchema) -> Result<Self> {
let escaped_name = column_schema.name.escape_debug().to_string();
Self {
let default_value_bytes = column_schema.get_default_value();
let default_value = if !default_value_bytes.is_empty() {
let expr = serde_json::from_slice::<Expr>(default_value_bytes)
.context(InvalidDefaultValueData)?;
Some(expr)
} else {
None
};

Ok(Self {
id: column_schema.id,
name: column_schema.name,
data_type: DatumKind::from(column_schema.data_type),
is_nullable: column_schema.is_nullable,
is_tag: column_schema.is_tag,
comment: column_schema.comment,
escaped_name,
}
default_value,
})
}
}

Expand Down Expand Up @@ -290,6 +318,7 @@ impl TryFrom<&Field> for ColumnSchema {
is_tag,
comment,
escaped_name: field.name().escape_debug().to_string(),
default_value: None,
})
}
}
Expand Down Expand Up @@ -357,6 +386,7 @@ pub struct Builder {
is_nullable: bool,
is_tag: bool,
comment: String,
default_value: Option<Expr>,
}

impl Builder {
Expand All @@ -369,6 +399,7 @@ impl Builder {
is_nullable: false,
is_tag: false,
comment: String::new(),
default_value: None,
}
}

Expand All @@ -394,6 +425,11 @@ impl Builder {
self
}

pub fn default_value(mut self, default_value: Option<Expr>) -> Self {
self.default_value = default_value;
self
}

pub fn validate(&self) -> Result<()> {
if self.is_tag {
ensure!(
Expand All @@ -418,6 +454,7 @@ impl Builder {
is_tag: self.is_tag,
comment: self.comment,
escaped_name,
default_value: self.default_value,
})
}
}
Expand Down Expand Up @@ -449,6 +486,7 @@ mod tests {
is_tag: true,
comment: "Comment of this column".to_string(),
escaped_name: "test_column_schema".escape_debug().to_string(),
default_value: None,
};

assert_eq!(&lhs, &rhs);
Expand All @@ -461,7 +499,7 @@ mod tests {
// Check pb specific fields
assert!(pb_schema.is_tag);

let schema_from_pb = ColumnSchema::from(pb_schema);
let schema_from_pb = ColumnSchema::try_from(pb_schema).unwrap();
assert_eq!(&schema_from_pb, &column_schema);
}

Expand Down
37 changes: 37 additions & 0 deletions common_types/src/datum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -846,6 +846,43 @@ pub mod arrow_convert {
Datum::Boolean(v) => Some(ScalarValue::Boolean(Some(*v))),
}
}

pub fn from_scalar_value(val: &ScalarValue) -> Option<Self> {
match val {
ScalarValue::Boolean(v) => v.map(Datum::Boolean),
ScalarValue::Float32(v) => v.map(Datum::Float),
ScalarValue::Float64(v) => v.map(Datum::Double),
ScalarValue::Int8(v) => v.map(Datum::Int8),
ScalarValue::Int16(v) => v.map(Datum::Int16),
ScalarValue::Int32(v) => v.map(Datum::Int32),
ScalarValue::Int64(v) => v.map(Datum::Int64),
ScalarValue::UInt8(v) => v.map(Datum::UInt8),
ScalarValue::UInt16(v) => v.map(Datum::UInt16),
ScalarValue::UInt32(v) => v.map(Datum::UInt32),
ScalarValue::UInt64(v) => v.map(Datum::UInt64),
ScalarValue::Utf8(v) | ScalarValue::LargeUtf8(v) => v
.as_ref()
.map(|v| Datum::String(StringBytes::copy_from_str(v.as_str()))),
ScalarValue::Binary(v) | ScalarValue::LargeBinary(v) => v
.as_ref()
.map(|v| Datum::Varbinary(Bytes::copy_from_slice(v.as_slice()))),
ScalarValue::TimestampMillisecond(v, _) => {
v.map(|v| Datum::Timestamp(Timestamp::new(v)))
}
ScalarValue::List(_, _)
| ScalarValue::Date32(_)
| ScalarValue::Date64(_)
| ScalarValue::TimestampSecond(_, _)
| ScalarValue::TimestampMicrosecond(_, _)
| ScalarValue::TimestampNanosecond(_, _)
| ScalarValue::IntervalYearMonth(_)
| ScalarValue::IntervalDayTime(_)
| ScalarValue::Struct(_, _)
| ScalarValue::Decimal128(_, _, _)
| ScalarValue::Null
| ScalarValue::IntervalMonthDayNano(_) => None,
}
}
}

impl<'a> DatumView<'a> {
Expand Down
6 changes: 5 additions & 1 deletion common_types/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ pub enum Error {
key: ArrowSchemaMetaKey,
backtrace: Backtrace,
},

#[snafu(display("Arrow schema meta key not found.\nerr:\n{}", source))]
ColumnSchemaDeserializeFailed { source: crate::column_schema::Error },
}

pub type CatalogName = String;
Expand Down Expand Up @@ -825,7 +828,8 @@ impl TryFrom<common_pb::TableSchema> for Schema {
.enable_tsid_primary_key(schema.enable_tsid_primary_key);

for (i, column_schema_pb) in schema.columns.into_iter().enumerate() {
let column = ColumnSchema::from(column_schema_pb);
let column =
ColumnSchema::try_from(column_schema_pb).context(ColumnSchemaDeserializeFailed)?;

if i < schema.num_key_columns as usize {
builder = builder.add_key_column(column)?;
Expand Down
56 changes: 56 additions & 0 deletions common_types/src/tests.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.

use bytes::Bytes;
use sqlparser::ast::{BinaryOperator, Expr, Value};

use crate::{
column_schema,
Expand Down Expand Up @@ -46,12 +47,67 @@ fn base_schema_builder() -> schema::Builder {
.unwrap()
}

fn default_value_schema_builder() -> schema::Builder {
schema::Builder::new()
.auto_increment_column_id(true)
.add_key_column(
column_schema::Builder::new("key1".to_string(), DatumKind::Varbinary)
.build()
.expect("should succeed build column schema"),
)
.unwrap()
.add_key_column(
column_schema::Builder::new("key2".to_string(), DatumKind::Timestamp)
.build()
.expect("should succeed build column schema"),
)
.unwrap()
.add_normal_column(
// The data type of column and its default value will not be the same in most time.
// So we need check if the type coercion is legal and do type coercion when legal.
// In he following, the data type of column is `Int64`, and the type of default value
// expr is `Int64`. So we use this column to cover the test, which has the same type.
column_schema::Builder::new("field1".to_string(), DatumKind::Int64)
.default_value(Some(Expr::Value(Value::Number("10".to_string(), false))))
.build()
.expect("should succeed build column schema"),
)
.unwrap()
.add_normal_column(
// The data type of column is `UInt32`, and the type of default value expr is `Int64`.
// So we use this column to cover the test, which has different type.
column_schema::Builder::new("field2".to_string(), DatumKind::UInt32)
.default_value(Some(Expr::Value(Value::Number("20".to_string(), false))))
.build()
.expect("should succeed build column schema"),
)
.unwrap()
.add_normal_column(
column_schema::Builder::new("field3".to_string(), DatumKind::UInt32)
.default_value(Some(Expr::BinaryOp {
left: Box::new(Expr::Value(Value::Number("1".to_string(), false))),
op: BinaryOperator::Plus,
right: Box::new(Expr::Value(Value::Number("2".to_string(), false))),
}))
.build()
.expect("should succeed build column schema"),
)
.unwrap()
}

/// Build a schema for testing:
/// (key1(varbinary), key2(timestamp), field1(double), field2(string))
pub fn build_schema() -> Schema {
base_schema_builder().build().unwrap()
}

/// Build a schema for testing:
/// (key1(varbinary), key2(timestamp), field1(int64, default 10),
/// field2(uint32, default 20)), field3(uint32, default 1 + 2)
pub fn build_default_value_schema() -> Schema {
default_value_schema_builder().build().unwrap()
}

pub fn build_projected_schema() -> ProjectedSchema {
let schema = build_schema();
assert!(schema.num_columns() > 1);
Expand Down
1 change: 1 addition & 0 deletions common_util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub mod codec;
pub mod config;
pub mod metric;
pub mod panic;
pub mod record_batch;
pub mod runtime;
pub mod time;
pub mod toml;
Expand Down
23 changes: 23 additions & 0 deletions common_util/src/record_batch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.

use arrow_deps::arrow::util::pretty;
use common_types::record_batch::RecordBatch;

/// A helper function to assert record batch.
pub fn assert_record_batches_eq(expected: &[&str], record_batches: Vec<RecordBatch>) {
let arrow_record_batch = record_batches
.into_iter()
.map(|record| record.into_arrow_record_batch())
.collect::<Vec<_>>();

let expected_lines: Vec<String> = expected.iter().map(|&s| s.into()).collect();
let formatted = pretty::pretty_format_batches(arrow_record_batch.as_slice())
.unwrap()
.to_string();
let actual_lines: Vec<&str> = formatted.trim().lines().collect();
assert_eq!(
expected_lines, actual_lines,
"\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
expected_lines, actual_lines
);
}
1 change: 1 addition & 0 deletions df_operator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ pub mod registry;
pub mod scalar;
pub mod udaf;
pub mod udfs;
pub mod visitor;
Loading

0 comments on commit d6e9eff

Please sign in to comment.