Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: base implementation of default value #246

Merged
merged 14 commits into from
Sep 21, 2022
Merged
8 changes: 6 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion arrow_deps/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
arrow = "15.0.0"
arrow = { version = "15.0.0", features = ["prettyprint"]}
parquet = "15.0.0"

[dependencies.uncover]
Expand Down
3 changes: 2 additions & 1 deletion common_types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ paste = "1.0"
proto = { path = "../proto" }
snafu = { version ="0.6.10", features = ["backtraces"]}
# TODO(yingwen): Make sqlparser support a feature
sqlparser = "0.19.0"
sqlparser = { version = "0.19.0", features = ["serde"]}
serde = "1.0.81"
serde_derive = "1.0.81"
serde_json = "1.0.60"
1 change: 0 additions & 1 deletion common_types/src/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,6 @@ impl ColumnBlock {
DatumKind::from_data_type(array.data_type()).with_context(|| UnsupportedArray {
data_type: array.data_type().clone(),
})?;

Self::try_from_arrow_array_ref(&datum_kind, array)
}

Expand Down
48 changes: 43 additions & 5 deletions common_types/src/column_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{collections::BTreeMap, convert::TryFrom, str::FromStr};
use arrow_deps::arrow::datatypes::{DataType, Field};
use proto::common as common_pb;
use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu};
use sqlparser::ast::Expr;

use crate::datum::DatumKind;

Expand Down Expand Up @@ -61,6 +62,15 @@ pub enum Error {
source: Box<dyn std::error::Error + Send + Sync>,
backtrace: Backtrace,
},
#[snafu(display(
"Can not deserialize default-value-option from pb data, err:{}.\nBacktrace:\n{}",
source,
backtrace
))]
InvalidDefaultValueData {
source: serde_json::error::Error,
backtrace: Backtrace,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -150,6 +160,8 @@ pub struct ColumnSchema {
pub comment: String,
/// Column name in response
pub escaped_name: String,
/// Default value expr
pub default_value: Option<Expr>,
}

impl ColumnSchema {
Expand Down Expand Up @@ -187,6 +199,10 @@ impl ColumnSchema {
column_schema.set_is_tag(self.is_tag);
column_schema.set_comment(self.comment.clone());

if let Some(default_value) = &self.default_value {
column_schema.set_default_value(serde_json::to_vec(default_value).unwrap());
}

column_schema
}

Expand Down Expand Up @@ -250,18 +266,30 @@ impl ColumnSchema {
}
}

impl From<common_pb::ColumnSchema> for ColumnSchema {
fn from(column_schema: common_pb::ColumnSchema) -> Self {
impl TryFrom<common_pb::ColumnSchema> for ColumnSchema {
type Error = Error;

fn try_from(column_schema: common_pb::ColumnSchema) -> Result<Self> {
let escaped_name = column_schema.name.escape_debug().to_string();
Self {
let default_value_bytes = column_schema.get_default_value();
let default_value = if !default_value_bytes.is_empty() {
let expr = serde_json::from_slice::<Expr>(default_value_bytes)
.context(InvalidDefaultValueData)?;
Some(expr)
} else {
None
};

Ok(Self {
id: column_schema.id,
name: column_schema.name,
data_type: DatumKind::from(column_schema.data_type),
is_nullable: column_schema.is_nullable,
is_tag: column_schema.is_tag,
comment: column_schema.comment,
escaped_name,
}
default_value,
})
}
}

Expand Down Expand Up @@ -290,6 +318,7 @@ impl TryFrom<&Field> for ColumnSchema {
is_tag,
comment,
escaped_name: field.name().escape_debug().to_string(),
default_value: None,
})
}
}
Expand Down Expand Up @@ -357,6 +386,7 @@ pub struct Builder {
is_nullable: bool,
is_tag: bool,
comment: String,
default_value: Option<Expr>,
}

impl Builder {
Expand All @@ -369,6 +399,7 @@ impl Builder {
is_nullable: false,
is_tag: false,
comment: String::new(),
default_value: None,
}
}

Expand All @@ -394,6 +425,11 @@ impl Builder {
self
}

pub fn default_value(mut self, default_value: Option<Expr>) -> Self {
self.default_value = default_value;
self
}

pub fn validate(&self) -> Result<()> {
if self.is_tag {
ensure!(
Expand All @@ -418,6 +454,7 @@ impl Builder {
is_tag: self.is_tag,
comment: self.comment,
escaped_name,
default_value: self.default_value,
})
}
}
Expand Down Expand Up @@ -449,6 +486,7 @@ mod tests {
is_tag: true,
comment: "Comment of this column".to_string(),
escaped_name: "test_column_schema".escape_debug().to_string(),
default_value: None,
};

assert_eq!(&lhs, &rhs);
Expand All @@ -461,7 +499,7 @@ mod tests {
// Check pb specific fields
assert!(pb_schema.is_tag);

let schema_from_pb = ColumnSchema::from(pb_schema);
let schema_from_pb = ColumnSchema::try_from(pb_schema).unwrap();
assert_eq!(&schema_from_pb, &column_schema);
}

Expand Down
37 changes: 37 additions & 0 deletions common_types/src/datum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -846,6 +846,43 @@ pub mod arrow_convert {
Datum::Boolean(v) => Some(ScalarValue::Boolean(Some(*v))),
}
}

pub fn from_scalar_value(val: &ScalarValue) -> Option<Self> {
match val {
ScalarValue::Boolean(v) => v.map(Datum::Boolean),
ScalarValue::Float32(v) => v.map(Datum::Float),
ScalarValue::Float64(v) => v.map(Datum::Double),
ScalarValue::Int8(v) => v.map(Datum::Int8),
ScalarValue::Int16(v) => v.map(Datum::Int16),
ScalarValue::Int32(v) => v.map(Datum::Int32),
ScalarValue::Int64(v) => v.map(Datum::Int64),
ScalarValue::UInt8(v) => v.map(Datum::UInt8),
ScalarValue::UInt16(v) => v.map(Datum::UInt16),
ScalarValue::UInt32(v) => v.map(Datum::UInt32),
ScalarValue::UInt64(v) => v.map(Datum::UInt64),
ScalarValue::Utf8(v) | ScalarValue::LargeUtf8(v) => v
.as_ref()
.map(|v| Datum::String(StringBytes::copy_from_str(v.as_str()))),
ScalarValue::Binary(v) | ScalarValue::LargeBinary(v) => v
.as_ref()
.map(|v| Datum::Varbinary(Bytes::copy_from_slice(v.as_slice()))),
ScalarValue::TimestampMillisecond(v, _) => {
v.map(|v| Datum::Timestamp(Timestamp::new(v)))
}
ScalarValue::List(_, _)
| ScalarValue::Date32(_)
| ScalarValue::Date64(_)
| ScalarValue::TimestampSecond(_, _)
| ScalarValue::TimestampMicrosecond(_, _)
| ScalarValue::TimestampNanosecond(_, _)
| ScalarValue::IntervalYearMonth(_)
| ScalarValue::IntervalDayTime(_)
| ScalarValue::Struct(_, _)
| ScalarValue::Decimal128(_, _, _)
| ScalarValue::Null
| ScalarValue::IntervalMonthDayNano(_) => None,
}
}
}

impl<'a> DatumView<'a> {
Expand Down
6 changes: 5 additions & 1 deletion common_types/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ pub enum Error {
key: ArrowSchemaMetaKey,
backtrace: Backtrace,
},

#[snafu(display("Arrow schema meta key not found.\nerr:\n{}", source))]
ColumnSchemaDeserializeFailed { source: crate::column_schema::Error },
}

pub type SchemaId = u32;
Expand Down Expand Up @@ -824,7 +827,8 @@ impl TryFrom<common_pb::TableSchema> for Schema {
.enable_tsid_primary_key(schema.enable_tsid_primary_key);

for (i, column_schema_pb) in schema.columns.into_iter().enumerate() {
let column = ColumnSchema::from(column_schema_pb);
let column =
ColumnSchema::try_from(column_schema_pb).context(ColumnSchemaDeserializeFailed)?;

if i < schema.num_key_columns as usize {
builder = builder.add_key_column(column)?;
Expand Down
52 changes: 52 additions & 0 deletions common_types/src/tests.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.

use bytes::Bytes;
use sqlparser::ast::{BinaryOperator, Expr, Value};

use crate::{
column_schema,
Expand Down Expand Up @@ -46,12 +47,63 @@ fn base_schema_builder() -> schema::Builder {
.unwrap()
}

fn default_value_schema_builder() -> schema::Builder {
schema::Builder::new()
.auto_increment_column_id(true)
.add_key_column(
column_schema::Builder::new("key1".to_string(), DatumKind::Varbinary)
.build()
.expect("should succeed build column schema"),
)
.unwrap()
.add_key_column(
column_schema::Builder::new("key2".to_string(), DatumKind::Timestamp)
.build()
.expect("should succeed build column schema"),
)
.unwrap()
.add_normal_column(
// Do not need Cast
column_schema::Builder::new("field1".to_string(), DatumKind::Int64)
.default_value(Some(Expr::Value(Value::Number("10".to_string(), false))))
.build()
.expect("should succeed build column schema"),
)
.unwrap()
.add_normal_column(
// Need cast
jiacai2050 marked this conversation as resolved.
Show resolved Hide resolved
column_schema::Builder::new("field2".to_string(), DatumKind::UInt32)
.default_value(Some(Expr::Value(Value::Number("20".to_string(), false))))
.build()
.expect("should succeed build column schema"),
)
.unwrap()
.add_normal_column(
column_schema::Builder::new("field3".to_string(), DatumKind::UInt32)
.default_value(Some(Expr::BinaryOp {
left: Box::new(Expr::Value(Value::Number("1".to_string(), false))),
op: BinaryOperator::Plus,
right: Box::new(Expr::Value(Value::Number("2".to_string(), false))),
}))
.build()
.expect("should succeed build column schema"),
)
.unwrap()
}

/// Build a schema for testing:
/// (key1(varbinary), key2(timestamp), field1(double), field2(string))
pub fn build_schema() -> Schema {
base_schema_builder().build().unwrap()
}

/// Build a schema for testing:
/// (key1(varbinary), key2(timestamp), field1(uint32, default 10),
/// field2(uint32, default 20)), field3(uint32, default 1 + 2)
pub fn build_default_value_schema() -> Schema {
default_value_schema_builder().build().unwrap()
}

pub fn build_projected_schema() -> ProjectedSchema {
let schema = build_schema();
assert!(schema.num_columns() > 1);
Expand Down
1 change: 1 addition & 0 deletions common_util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ test = ["env_logger"]

[dependencies]
# In alphabetical order
arrow_deps = { path = "../arrow_deps" }
backtrace = "0.3.9"
common_types = { path = "../common_types", features = ["test"] }
chrono = "0.4"
Expand Down
1 change: 1 addition & 0 deletions common_util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub mod codec;
pub mod config;
pub mod metric;
pub mod panic;
pub mod record_batch;
pub mod runtime;
pub mod time;
pub mod toml;
Expand Down
23 changes: 23 additions & 0 deletions common_util/src/record_batch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.

use arrow_deps::arrow::util::pretty;
use common_types::record_batch::RecordBatch;

/// A helper function to assert record batch.
pub fn assert_record_batches_eq(expected: &[&str], record_batches: Vec<RecordBatch>) {
let arrow_record_batch = record_batches
.into_iter()
.map(|record| record.into_arrow_record_batch())
.collect::<Vec<_>>();

let expected_lines: Vec<String> = expected.iter().map(|&s| s.into()).collect();
let formatted = pretty::pretty_format_batches(arrow_record_batch.as_slice())
.unwrap()
.to_string();
let actual_lines: Vec<&str> = formatted.trim().lines().collect();
assert_eq!(
expected_lines, actual_lines,
"\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
expected_lines, actual_lines
);
}
1 change: 1 addition & 0 deletions df_operator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ pub mod registry;
pub mod scalar;
pub mod udaf;
pub mod udfs;
pub mod visitor;
Loading