Skip to content

Commit

Permalink
Add some unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ygf11 committed Sep 14, 2022
1 parent e4a467a commit 38935ff
Show file tree
Hide file tree
Showing 13 changed files with 239 additions and 72 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion arrow_deps/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
arrow = "15.0.0"
arrow = { version = "15.0.0", features = ["prettyprint"]}
parquet = "15.0.0"

[dependencies.uncover]
Expand Down
1 change: 0 additions & 1 deletion common_types/src/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,6 @@ impl ColumnBlock {
DatumKind::from_data_type(array.data_type()).with_context(|| UnsupportedArray {
data_type: array.data_type().clone(),
})?;

Self::try_from_arrow_array_ref(&datum_kind, array)
}

Expand Down
52 changes: 52 additions & 0 deletions common_types/src/tests.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.

use bytes::Bytes;
use sqlparser::ast::{BinaryOperator, Expr, Value};

use crate::{
column_schema,
Expand Down Expand Up @@ -46,12 +47,63 @@ fn base_schema_builder() -> schema::Builder {
.unwrap()
}

fn default_value_schema_builder() -> schema::Builder {
schema::Builder::new()
.auto_increment_column_id(true)
.add_key_column(
column_schema::Builder::new("key1".to_string(), DatumKind::Varbinary)
.build()
.expect("should succeed build column schema"),
)
.unwrap()
.add_key_column(
column_schema::Builder::new("key2".to_string(), DatumKind::Timestamp)
.build()
.expect("should succeed build column schema"),
)
.unwrap()
.add_normal_column(
// Do not need Cast
column_schema::Builder::new("field1".to_string(), DatumKind::Int64)
.default_value(Some(Expr::Value(Value::Number("10".to_string(), false))))
.build()
.expect("should succeed build column schema"),
)
.unwrap()
.add_normal_column(
// Need cast
column_schema::Builder::new("field2".to_string(), DatumKind::UInt32)
.default_value(Some(Expr::Value(Value::Number("20".to_string(), false))))
.build()
.expect("should succeed build column schema"),
)
.unwrap()
.add_normal_column(
column_schema::Builder::new("field3".to_string(), DatumKind::UInt32)
.default_value(Some(Expr::BinaryOp {
left: Box::new(Expr::Value(Value::Number("1".to_string(), false))),
op: BinaryOperator::Plus,
right: Box::new(Expr::Value(Value::Number("2".to_string(), false))),
}))
.build()
.expect("should succeed build column schema"),
)
.unwrap()
}

/// Build a schema for testing:
/// (key1(varbinary), key2(timestamp), field1(double), field2(string))
pub fn build_schema() -> Schema {
base_schema_builder().build().unwrap()
}

/// Build a schema for testing:
/// (key1(varbinary), key2(timestamp), field1(uint32, default 10),
/// field2(uint32, default 20)), field3(uint32, default 1 + 2)
pub fn build_default_value_schema() -> Schema {
default_value_schema_builder().build().unwrap()
}

pub fn build_projected_schema() -> ProjectedSchema {
let schema = build_schema();
assert!(schema.num_columns() > 1);
Expand Down
1 change: 1 addition & 0 deletions common_util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ test = ["env_logger"]

[dependencies]
# In alphabetical order
arrow_deps = { path = "../arrow_deps" }
backtrace = "0.3.9"
common_types = { path = "../common_types", features = ["test"] }
chrono = "0.4"
Expand Down
1 change: 1 addition & 0 deletions common_util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub mod panic;
pub mod runtime;
pub mod time;
pub mod toml;
pub mod record_batch;

#[cfg(any(test, feature = "test"))]
pub mod tests {
Expand Down
23 changes: 23 additions & 0 deletions common_util/src/record_batch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.

use arrow_deps::arrow::util::pretty;
use common_types::record_batch::RecordBatch;

/// A helper function to assert record batch.
pub fn assert_record_batches_eq(expected: &[&str], record_batches: Vec<RecordBatch>) {
let arrow_record_batch = record_batches
.into_iter()
.map(|record| record.into_arrow_record_batch())
.collect::<Vec<_>>();

let expected_lines: Vec<String> = expected.iter().map(|&s| s.into()).collect();
let formatted = pretty::pretty_format_batches(arrow_record_batch.as_slice())
.unwrap()
.to_string();
let actual_lines: Vec<&str> = formatted.trim().lines().collect();
assert_eq!(
expected_lines, actual_lines,
"\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
expected_lines, actual_lines
);
}
19 changes: 15 additions & 4 deletions interpreters/src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ use arrow_deps::{
common::DFSchema,
error::DataFusionError,
logical_expr::ColumnarValue as DfColumnarValue,
optimizer::simplify_expressions::ConstEvaluator,
physical_expr::{
create_physical_expr, execution_props::ExecutionProps, expressions::CastExpr,
},
},
datafusion_expr::expr::Expr as DfLogicalExpr,
datafusion_expr::{expr::Expr as DfLogicalExpr, expr_rewriter::ExprRewritable},
};
use async_trait::async_trait;
use common_types::{
Expand Down Expand Up @@ -180,8 +181,16 @@ fn fill_default_values(
let input_arrow_schema = Arc::new(ArrowSchema::empty());
let input_batch = RecordBatch::new_empty(input_arrow_schema.clone());
for (column_idx, default_value_expr) in default_value_map.iter() {
// Optimize logicalß expr
let execution_props = ExecutionProps::default();
let mut const_optimizer = ConstEvaluator::new(&execution_props);
let evaluated_expr = default_value_expr.clone()
.rewrite(&mut const_optimizer)
.context(DataFusionExpr)?;

// Create physical expr
let physical_expr = create_physical_expr(
default_value_expr,
&evaluated_expr,
&input_df_schema,
&input_arrow_schema,
&ExecutionProps::default(),
Expand All @@ -207,6 +216,7 @@ fn fill_default_values(
let output = casted_physical_expr
.evaluate(&input_batch)
.context(DataFusionExecutor)?;

fill_column_to_row_group(*column_idx, &output, rows)?;
}

Expand All @@ -221,8 +231,9 @@ fn fill_column_to_row_group(
match column {
DfColumnarValue::Array(array) => {
for row_idx in 0..rows.num_rows() {
let column_block =
ColumnBlock::try_cast_arrow_array_ref(array).context(ConvertColumnBlock)?;
let datum_kind = rows.schema().column(column_idx).data_type;
let column_block = ColumnBlock::try_from_arrow_array_ref(&datum_kind, array)
.context(ConvertColumnBlock)?;
let datum = column_block.datum(row_idx);
rows.get_row_mut(row_idx)
.map(|row| std::mem::replace(row.index_mut(column_idx), datum.clone()));
Expand Down
1 change: 0 additions & 1 deletion interpreters/src/show_create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ impl ShowCreateInterpreter {

let mut res = String::new();
for col in table_schema.columns() {
println!("{:?}", col);
res += format!("`{}` {}", col.name, col.data_type).as_str();
if col.is_tag {
res += " TAG";
Expand Down
45 changes: 45 additions & 0 deletions interpreters/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,49 @@ where
}
}

async fn test_insert_table_with_missing_columns(&self) {
let catalog_manager = Arc::new(build_catalog_manager(self.engine()).await);
let ctx = Context::builder(RequestId::next_id())
.default_catalog_and_schema(DEFAULT_CATALOG.to_string(), DEFAULT_SCHEMA.to_string())
.build();
let insert_factory =
Factory::new(ExecutorImpl::new(), catalog_manager.clone(), self.engine());
let insert_sql = "INSERT INTO test_missing_columns_table(key1, key2) VALUES('tagk', 1638428434000), ('tagk2', 1638428434000);";

let plan = sql_to_plan(&self.meta_provider, insert_sql);
let interpreter = insert_factory.create(ctx, plan);
let output = interpreter.execute().await.unwrap();
if let Output::AffectedRows(v) = output {
assert_eq!(v, 2);
} else {
panic!();
}

// Check data which just insert.
let select_sql =
"SELECT key1, key2, field1, field2, field3 from test_missing_columns_table";
let select_factory = Factory::new(ExecutorImpl::new(), catalog_manager, self.engine());
let ctx = Context::builder(RequestId::next_id())
.default_catalog_and_schema(DEFAULT_CATALOG.to_string(), DEFAULT_SCHEMA.to_string())
.build();
let plan = sql_to_plan(&self.meta_provider, select_sql);
let interpreter = select_factory.create(ctx, plan);
let output = interpreter.execute().await.unwrap();
if let Output::Records(records) = output {
let expected = vec![
"+------------+---------------------+--------+--------+--------+",
"| key1 | key2 | field1 | field2 | field3 |",
"+------------+---------------------+--------+--------+--------+",
"| 7461676b | 2021-12-02 07:00:34 | 10 | 20 | 3 |",
"| 7461676b32 | 2021-12-02 07:00:34 | 10 | 20 | 3 |",
"+------------+---------------------+--------+--------+--------+",
];
common_util::record_batch::assert_record_batches_eq(&expected, records)
} else {
panic!();
}
}

async fn test_select_table(&self) {
let sql = "select * from test_table";
let output = self.sql_to_output(sql).await.unwrap();
Expand Down Expand Up @@ -201,4 +244,6 @@ where
env.test_show_create_table().await;
env.test_alter_table().await;
env.test_drop_table().await;

env.test_insert_table_with_missing_columns().await;
}
6 changes: 3 additions & 3 deletions server/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub enum Error {

#[snafu(display("Failed to handle request, err:{}", source))]
HandleRequest {
source: crate::handlers::error::Error,
source: Box<crate::handlers::error::Error>,
},

#[snafu(display("Missing runtimes to build service.\nBacktrace:\n{}", backtrace))]
Expand Down Expand Up @@ -130,7 +130,7 @@ impl<Q: QueryExecutor + 'static> Service<Q> {
.map_err(|e| {
// TODO(yingwen): Maybe truncate and print the sql
error!("Http service Failed to handle sql, err:{}", e);
e
Box::new(e)
})
.context(HandleRequest);
match result {
Expand Down Expand Up @@ -282,7 +282,7 @@ impl<Q: QueryExecutor + 'static> Service<Q> {
.await
.map_err(|e| {
error!("Http service failed to handle admin reject, err:{}", e);
e
Box::new(e)
})
.context(HandleRequest);

Expand Down
Loading

0 comments on commit 38935ff

Please sign in to comment.