From 31ee8d6878bb59b8f0d82423c75895d2dccca642 Mon Sep 17 00:00:00 2001 From: "xikai.wxk" Date: Wed, 19 Jul 2023 14:53:58 +0800 Subject: [PATCH 1/5] fix: refactor test_util crate --- Cargo.lock | 14 ---- Cargo.toml | 2 +- analytic_engine/src/manifest/details.rs | 3 +- analytic_engine/src/sst/parquet/writer.rs | 5 +- analytic_engine/src/tests/compaction_test.rs | 2 +- components/test_util/Cargo.toml | 20 ------ components/test_util/src/lib.rs | 75 ++++++++++---------- interpreters/src/tests.rs | 14 ++-- 8 files changed, 48 insertions(+), 87 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3ccfe5db0e..76f066ab7a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6390,23 +6390,9 @@ name = "test_util" version = "1.2.5-alpha" dependencies = [ "arrow", - "async-stream", - "async-trait", - "backtrace", - "ceresdbproto", "chrono", "common_types", "env_logger", - "futures 0.3.28", - "lazy_static", - "libc", - "log", - "pin-project-lite", - "serde", - "serde_json", - "snafu 0.6.10", - "tempfile", - "tokio", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index c82c27e73e..e0f23d1f4f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -139,7 +139,7 @@ system_catalog = { path = "system_catalog" } table_engine = { path = "table_engine" } table_kv = { path = "components/table_kv" } tempfile = "3.1.0" -test_util = { path = "components/test_util", features = ["test"] } +test_util = { path = "components/test_util" } time_ext = { path = "components/time_ext" } toml = "0.7" toml_ext = { path = "components/toml_ext" } diff --git a/analytic_engine/src/manifest/details.rs b/analytic_engine/src/manifest/details.rs index 2d9f6ad8c5..400d96238c 100644 --- a/analytic_engine/src/manifest/details.rs +++ b/analytic_engine/src/manifest/details.rs @@ -706,7 +706,6 @@ mod tests { use object_store::LocalFileSystem; use runtime::Runtime; use table_engine::table::{SchemaId, TableId, TableSeqGenerator}; - use test_util::tests::init_log_for_test; use wal::rocks_impl::manager::Builder as WalBuilder; use super::*; @@ -824,7 +823,7 @@ mod tests { impl TestContext { fn new(prefix: &str, schema_id: SchemaId) -> Self { - init_log_for_test(); + test_util::init_log_for_test(); let dir = tempfile::Builder::new().prefix(prefix).tempdir().unwrap(); let runtime = build_runtime(2); diff --git a/analytic_engine/src/sst/parquet/writer.rs b/analytic_engine/src/sst/parquet/writer.rs index c87f29fe0c..d4f0ff6ac7 100644 --- a/analytic_engine/src/sst/parquet/writer.rs +++ b/analytic_engine/src/sst/parquet/writer.rs @@ -341,7 +341,6 @@ mod tests { use runtime::{self, Runtime}; use table_engine::predicate::Predicate; use tempfile::tempdir; - use test_util::tests::init_log_for_test; use super::*; use crate::{ @@ -360,7 +359,7 @@ mod tests { #[test] fn test_parquet_build_and_read() { - init_log_for_test(); + test_util::init_log_for_test(); let runtime = Arc::new(runtime::Builder::default().build().unwrap()); parquet_write_and_then_read_back(runtime.clone(), 2, vec![2, 2, 2, 2, 2, 2, 2, 2, 2, 2]); @@ -587,7 +586,7 @@ mod tests { input_num_rows: Vec, expected_num_rows: Vec, ) { - init_log_for_test(); + test_util::init_log_for_test(); let schema = build_schema(); let mut poll_cnt = 0; let schema_clone = schema.clone(); diff --git a/analytic_engine/src/tests/compaction_test.rs b/analytic_engine/src/tests/compaction_test.rs index 2a2d367286..e4ec973787 100644 --- a/analytic_engine/src/tests/compaction_test.rs +++ b/analytic_engine/src/tests/compaction_test.rs @@ -85,7 +85,7 @@ fn test_table_compact_current_segment(engine_context: T) ) .await; - test_util::tests::init_log_for_test(); + test_util::init_log_for_test(); // Trigger a compaction. test_ctx.compact_table(compact_test_table1).await; diff --git a/components/test_util/Cargo.toml b/components/test_util/Cargo.toml index a377f35ae5..b62ad7baae 100644 --- a/components/test_util/Cargo.toml +++ b/components/test_util/Cargo.toml @@ -10,29 +10,9 @@ workspace = true [package.edition] workspace = true -[features] -test = ["env_logger"] - [dependencies] # In alphabetical order arrow = { workspace = true } -async-stream = { workspace = true } -async-trait = { workspace = true } -backtrace = "0.3.9" -ceresdbproto = { workspace = true } chrono = { workspace = true } common_types = { workspace = true, features = ["test"] } -env_logger = { workspace = true, optional = true } -futures = { workspace = true } -lazy_static = { workspace = true } -libc = "0.2" -log = { workspace = true } -pin-project-lite = { workspace = true } -serde = { workspace = true } -serde_json = { workspace = true } -snafu = { workspace = true } -tokio = { workspace = true } - -[dev-dependencies] env_logger = { workspace = true } -tempfile = { workspace = true } diff --git a/components/test_util/src/lib.rs b/components/test_util/src/lib.rs index 9702a45442..bc292dc506 100644 --- a/components/test_util/src/lib.rs +++ b/components/test_util/src/lib.rs @@ -2,48 +2,45 @@ //! Common utils shared by the whole project -#[cfg(any(test, feature = "test"))] -pub mod tests { - use std::{io::Write, sync::Once}; +use std::{io::Write, sync::Once}; - use arrow::util::pretty; - use common_types::record_batch::RecordBatch; +use arrow::util::pretty; +use common_types::record_batch::RecordBatch; - static INIT_LOG: Once = Once::new(); +static INIT_LOG: Once = Once::new(); - pub fn init_log_for_test() { - INIT_LOG.call_once(|| { - env_logger::Builder::from_default_env() - .format(|buf, record| { - writeln!( - buf, - "{} {} [{}:{}] {}", - chrono::Local::now().format("%Y-%m-%dT%H:%M:%S.%3f"), - buf.default_styled_level(record.level()), - record.file().unwrap_or("unknown"), - record.line().unwrap_or(0), - record.args() - ) - }) - .init(); - }); - } +pub fn init_log_for_test() { + INIT_LOG.call_once(|| { + env_logger::Builder::from_default_env() + .format(|buf, record| { + writeln!( + buf, + "{} {} [{}:{}] {}", + chrono::Local::now().format("%Y-%m-%dT%H:%M:%S.%3f"), + buf.default_styled_level(record.level()), + record.file().unwrap_or("unknown"), + record.line().unwrap_or(0), + record.args() + ) + }) + .init(); + }); +} - /// A helper function to assert record batch. - pub fn assert_record_batches_eq(expected: &[&str], record_batches: Vec) { - let arrow_record_batch = record_batches - .into_iter() - .map(|record| record.into_arrow_record_batch()) - .collect::>(); +/// A helper function to assert record batch. +pub fn assert_record_batches_eq(expected: &[&str], record_batches: Vec) { + let arrow_record_batch = record_batches + .into_iter() + .map(|record| record.into_arrow_record_batch()) + .collect::>(); - let expected_lines: Vec = expected.iter().map(|&s| s.into()).collect(); - let formatted = pretty::pretty_format_batches(arrow_record_batch.as_slice()) - .unwrap() - .to_string(); - let actual_lines: Vec<&str> = formatted.trim().lines().collect(); - assert_eq!( - expected_lines, actual_lines, - "\n\nexpected:\n\n{expected_lines:#?}\nactual:\n\n{actual_lines:#?}\n\n" - ); - } + let expected_lines: Vec = expected.iter().map(|&s| s.into()).collect(); + let formatted = pretty::pretty_format_batches(arrow_record_batch.as_slice()) + .unwrap() + .to_string(); + let actual_lines: Vec<&str> = formatted.trim().lines().collect(); + assert_eq!( + expected_lines, actual_lines, + "\n\nexpected:\n\n{expected_lines:#?}\nactual:\n\n{actual_lines:#?}\n\n" + ); } diff --git a/interpreters/src/tests.rs b/interpreters/src/tests.rs index 3773b23478..3b7c007724 100644 --- a/interpreters/src/tests.rs +++ b/interpreters/src/tests.rs @@ -143,7 +143,7 @@ where "| 7461676b32 | 2021-12-02T07:00:34 | 100.0 | hello3 | 2022-10-11 | 11:10:10.234 |", "+------------+---------------------+--------+--------+------------+--------------+", ]; - test_util::tests::assert_record_batches_eq(&expected, records); + test_util::assert_record_batches_eq(&expected, records); let sql = "select count(*) from test_table"; let output = self.sql_to_output(sql).await?; @@ -155,7 +155,7 @@ where "| 2 |", "+-----------------+", ]; - test_util::tests::assert_record_batches_eq(&expected, records); + test_util::assert_record_batches_eq(&expected, records); Ok(()) } @@ -182,7 +182,7 @@ where "| field4 | time | false | true | false | false |", "+--------+-----------+------------+-------------+--------+---------------+", ]; - test_util::tests::assert_record_batches_eq(&expected, records); + test_util::assert_record_batches_eq(&expected, records); } async fn test_exists_table(&self) { @@ -196,7 +196,7 @@ where "| 1 |", "+--------+", ]; - test_util::tests::assert_record_batches_eq(&expected, records); + test_util::assert_record_batches_eq(&expected, records); } async fn test_insert_table(&self) { @@ -262,7 +262,7 @@ where "| 7461676b32 | 2021-12-02T07:00:34 | 10 | 20 | 3 | 10 | 12 |", "+------------+---------------------+--------+--------+--------+--------+--------+", ]; - test_util::tests::assert_record_batches_eq(&expected, records); + test_util::assert_record_batches_eq(&expected, records); } async fn test_select_table(&self) { @@ -282,7 +282,7 @@ where "| test_table | CREATE TABLE `test_table` (`key1` varbinary NOT NULL, `key2` timestamp NOT NULL, `field1` double, `field2` string, `field3` date, `field4` time, PRIMARY KEY(key1,key2), TIMESTAMP KEY(key2)) ENGINE=Analytic |", "+------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+" ]; - test_util::tests::assert_record_batches_eq(&expected, records); + test_util::assert_record_batches_eq(&expected, records); } async fn test_alter_table(&self) { @@ -338,7 +338,7 @@ where #[tokio::test] async fn test_interpreters_rocks() { - test_util::tests::init_log_for_test(); + test_util::init_log_for_test(); let rocksdb_ctx = RocksDBEngineBuildContext::default(); test_interpreters(rocksdb_ctx).await; } From ed410a7e6af4a6f2aed7b1b5130d38ebe8f9d19e Mon Sep 17 00:00:00 2001 From: "xikai.wxk" Date: Wed, 19 Jul 2023 15:08:53 +0800 Subject: [PATCH 2/5] fix: remove unnecessary features for common_types --- common_types/Cargo.toml | 5 +- common_types/src/datum.rs | 390 +++++++++++++++++++------------------- common_types/src/lib.rs | 6 - common_types/src/time.rs | 39 ++-- 4 files changed, 210 insertions(+), 230 deletions(-) diff --git a/common_types/Cargo.toml b/common_types/Cargo.toml index 73a196d620..122bc613fa 100644 --- a/common_types/Cargo.toml +++ b/common_types/Cargo.toml @@ -11,19 +11,18 @@ workspace = true workspace = true [features] -default = ["arrow", "datafusion"] test = [] [dependencies] # In alphabetical order ahash = { version = "0.8.2", default-features = false, features = ["runtime-rng"] } -arrow = { workspace = true, optional = true } +arrow = { workspace = true } arrow_ext = { workspace = true } byteorder = "1.2" bytes_ext = { workspace = true } ceresdbproto = { workspace = true } chrono = { workspace = true } -datafusion = { workspace = true, optional = true } +datafusion = { workspace = true } macros = { workspace = true } murmur3 = "0.4.1" paste = { workspace = true } diff --git a/common_types/src/datum.rs b/common_types/src/datum.rs index 46f7a15e33..ce0fc65472 100644 --- a/common_types/src/datum.rs +++ b/common_types/src/datum.rs @@ -4,9 +4,13 @@ use std::{convert::TryFrom, fmt, str}; -use arrow::temporal_conversions::{EPOCH_DAYS_FROM_CE, NANOSECONDS}; +use arrow::{ + datatypes::{DataType, TimeUnit}, + temporal_conversions::{EPOCH_DAYS_FROM_CE, NANOSECONDS}, +}; use ceresdbproto::schema::DataType as DataTypePb; use chrono::{Datelike, Local, NaiveDate, NaiveTime, TimeZone, Timelike}; +use datafusion::scalar::ScalarValue; use serde::ser::{Serialize, Serializer}; use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; use sqlparser::ast::{DataType as SqlDataType, Value}; @@ -1134,219 +1138,209 @@ impl<'a> DatumView<'a> { } } -#[cfg(feature = "arrow")] -pub mod arrow_convert { - use arrow::datatypes::{DataType, TimeUnit}; - use datafusion::scalar::ScalarValue; - - use super::*; - - impl DatumKind { - /// Create DatumKind from [arrow::datatypes::DataType], if - /// the type is not supported, returns None - pub fn from_data_type(data_type: &DataType) -> Option { - match data_type { - DataType::Null => Some(Self::Null), - DataType::Timestamp(TimeUnit::Millisecond, None) => Some(Self::Timestamp), - DataType::Timestamp(TimeUnit::Nanosecond, None) => Some(Self::Timestamp), - DataType::Float64 => Some(Self::Double), - DataType::Float32 => Some(Self::Float), - DataType::Binary => Some(Self::Varbinary), - DataType::Utf8 => Some(Self::String), - DataType::UInt64 => Some(Self::UInt64), - DataType::UInt32 => Some(Self::UInt32), - DataType::UInt16 => Some(Self::UInt16), - DataType::UInt8 => Some(Self::UInt8), - DataType::Int64 => Some(Self::Int64), - DataType::Int32 => Some(Self::Int32), - DataType::Int16 => Some(Self::Int16), - DataType::Int8 => Some(Self::Int8), - DataType::Boolean => Some(Self::Boolean), - DataType::Date32 => Some(Self::Date), - DataType::Time64(TimeUnit::Nanosecond) => Some(Self::Time), - DataType::Dictionary(_, _) => Some(Self::String), - DataType::Float16 - | DataType::LargeUtf8 - | DataType::LargeBinary - | DataType::FixedSizeBinary(_) - | DataType::Struct(_) - | DataType::Union(_, _) - | DataType::List(_) - | DataType::LargeList(_) - | DataType::FixedSizeList(_, _) - | DataType::Time32(_) - | DataType::Time64(_) - | DataType::Timestamp(_, _) - | DataType::Date64 - | DataType::Interval(_) - | DataType::Duration(_) - | DataType::Decimal128(_, _) - | DataType::Decimal256(_, _) - | DataType::RunEndEncoded(_, _) - | DataType::Map(_, _) => None, - } +impl DatumKind { + /// Create DatumKind from [arrow::datatypes::DataType], if + /// the type is not supported, returns None + pub fn from_data_type(data_type: &DataType) -> Option { + match data_type { + DataType::Null => Some(Self::Null), + DataType::Timestamp(TimeUnit::Millisecond, None) => Some(Self::Timestamp), + DataType::Timestamp(TimeUnit::Nanosecond, None) => Some(Self::Timestamp), + DataType::Float64 => Some(Self::Double), + DataType::Float32 => Some(Self::Float), + DataType::Binary => Some(Self::Varbinary), + DataType::Utf8 => Some(Self::String), + DataType::UInt64 => Some(Self::UInt64), + DataType::UInt32 => Some(Self::UInt32), + DataType::UInt16 => Some(Self::UInt16), + DataType::UInt8 => Some(Self::UInt8), + DataType::Int64 => Some(Self::Int64), + DataType::Int32 => Some(Self::Int32), + DataType::Int16 => Some(Self::Int16), + DataType::Int8 => Some(Self::Int8), + DataType::Boolean => Some(Self::Boolean), + DataType::Date32 => Some(Self::Date), + DataType::Time64(TimeUnit::Nanosecond) => Some(Self::Time), + DataType::Dictionary(_, _) => Some(Self::String), + DataType::Float16 + | DataType::LargeUtf8 + | DataType::LargeBinary + | DataType::FixedSizeBinary(_) + | DataType::Struct(_) + | DataType::Union(_, _) + | DataType::List(_) + | DataType::LargeList(_) + | DataType::FixedSizeList(_, _) + | DataType::Time32(_) + | DataType::Time64(_) + | DataType::Timestamp(_, _) + | DataType::Date64 + | DataType::Interval(_) + | DataType::Duration(_) + | DataType::Decimal128(_, _) + | DataType::Decimal256(_, _) + | DataType::RunEndEncoded(_, _) + | DataType::Map(_, _) => None, } + } - pub fn to_arrow_data_type(&self) -> DataType { - match self { - DatumKind::Null => DataType::Null, - DatumKind::Timestamp => DataType::Timestamp(TimeUnit::Millisecond, None), - DatumKind::Double => DataType::Float64, - DatumKind::Float => DataType::Float32, - DatumKind::Varbinary => DataType::Binary, - DatumKind::String => DataType::Utf8, - DatumKind::UInt64 => DataType::UInt64, - DatumKind::UInt32 => DataType::UInt32, - DatumKind::UInt16 => DataType::UInt16, - DatumKind::UInt8 => DataType::UInt8, - DatumKind::Int64 => DataType::Int64, - DatumKind::Int32 => DataType::Int32, - DatumKind::Int16 => DataType::Int16, - DatumKind::Int8 => DataType::Int8, - DatumKind::Boolean => DataType::Boolean, - DatumKind::Date => DataType::Date32, - DatumKind::Time => DataType::Time64(TimeUnit::Nanosecond), - } + pub fn to_arrow_data_type(&self) -> DataType { + match self { + DatumKind::Null => DataType::Null, + DatumKind::Timestamp => DataType::Timestamp(TimeUnit::Millisecond, None), + DatumKind::Double => DataType::Float64, + DatumKind::Float => DataType::Float32, + DatumKind::Varbinary => DataType::Binary, + DatumKind::String => DataType::Utf8, + DatumKind::UInt64 => DataType::UInt64, + DatumKind::UInt32 => DataType::UInt32, + DatumKind::UInt16 => DataType::UInt16, + DatumKind::UInt8 => DataType::UInt8, + DatumKind::Int64 => DataType::Int64, + DatumKind::Int32 => DataType::Int32, + DatumKind::Int16 => DataType::Int16, + DatumKind::Int8 => DataType::Int8, + DatumKind::Boolean => DataType::Boolean, + DatumKind::Date => DataType::Date32, + DatumKind::Time => DataType::Time64(TimeUnit::Nanosecond), } } +} - impl Datum { - pub fn as_scalar_value(&self) -> Option { - match self { - Datum::Null => None, - Datum::Timestamp(v) => { - Some(ScalarValue::TimestampMillisecond(Some((*v).as_i64()), None)) - } - Datum::Double(v) => Some(ScalarValue::Float64(Some(*v))), - Datum::Float(v) => Some(ScalarValue::Float32(Some(*v))), - Datum::Varbinary(v) => Some(ScalarValue::Binary(Some(v.to_vec()))), - Datum::String(v) => Some(ScalarValue::Utf8(Some(v.to_string()))), - Datum::UInt64(v) => Some(ScalarValue::UInt64(Some(*v))), - Datum::UInt32(v) => Some(ScalarValue::UInt32(Some(*v))), - Datum::UInt16(v) => Some(ScalarValue::UInt16(Some(*v))), - Datum::UInt8(v) => Some(ScalarValue::UInt8(Some(*v))), - Datum::Int64(v) => Some(ScalarValue::Int64(Some(*v))), - Datum::Int32(v) => Some(ScalarValue::Int32(Some(*v))), - Datum::Int16(v) => Some(ScalarValue::Int16(Some(*v))), - Datum::Int8(v) => Some(ScalarValue::Int8(Some(*v))), - Datum::Boolean(v) => Some(ScalarValue::Boolean(Some(*v))), - Datum::Date(v) => Some(ScalarValue::Date32(Some(*v))), - Datum::Time(v) => Some(ScalarValue::Time64Nanosecond(Some(*v))), +impl Datum { + pub fn as_scalar_value(&self) -> Option { + match self { + Datum::Null => None, + Datum::Timestamp(v) => { + Some(ScalarValue::TimestampMillisecond(Some((*v).as_i64()), None)) } + Datum::Double(v) => Some(ScalarValue::Float64(Some(*v))), + Datum::Float(v) => Some(ScalarValue::Float32(Some(*v))), + Datum::Varbinary(v) => Some(ScalarValue::Binary(Some(v.to_vec()))), + Datum::String(v) => Some(ScalarValue::Utf8(Some(v.to_string()))), + Datum::UInt64(v) => Some(ScalarValue::UInt64(Some(*v))), + Datum::UInt32(v) => Some(ScalarValue::UInt32(Some(*v))), + Datum::UInt16(v) => Some(ScalarValue::UInt16(Some(*v))), + Datum::UInt8(v) => Some(ScalarValue::UInt8(Some(*v))), + Datum::Int64(v) => Some(ScalarValue::Int64(Some(*v))), + Datum::Int32(v) => Some(ScalarValue::Int32(Some(*v))), + Datum::Int16(v) => Some(ScalarValue::Int16(Some(*v))), + Datum::Int8(v) => Some(ScalarValue::Int8(Some(*v))), + Datum::Boolean(v) => Some(ScalarValue::Boolean(Some(*v))), + Datum::Date(v) => Some(ScalarValue::Date32(Some(*v))), + Datum::Time(v) => Some(ScalarValue::Time64Nanosecond(Some(*v))), } + } - pub fn from_scalar_value(val: &ScalarValue) -> Option { - match val { - ScalarValue::Boolean(v) => v.map(Datum::Boolean), - ScalarValue::Float32(v) => v.map(Datum::Float), - ScalarValue::Float64(v) => v.map(Datum::Double), - ScalarValue::Int8(v) => v.map(Datum::Int8), - ScalarValue::Int16(v) => v.map(Datum::Int16), - ScalarValue::Int32(v) => v.map(Datum::Int32), - ScalarValue::Int64(v) => v.map(Datum::Int64), - ScalarValue::UInt8(v) => v.map(Datum::UInt8), - ScalarValue::UInt16(v) => v.map(Datum::UInt16), - ScalarValue::UInt32(v) => v.map(Datum::UInt32), - ScalarValue::UInt64(v) => v.map(Datum::UInt64), - ScalarValue::Utf8(v) | ScalarValue::LargeUtf8(v) => v - .as_ref() - .map(|v| Datum::String(StringBytes::copy_from_str(v.as_str()))), - ScalarValue::Binary(v) - | ScalarValue::FixedSizeBinary(_, v) - | ScalarValue::LargeBinary(v) => v - .as_ref() - .map(|v| Datum::Varbinary(Bytes::copy_from_slice(v.as_slice()))), - ScalarValue::TimestampMillisecond(v, _) => { - v.map(|v| Datum::Timestamp(Timestamp::new(v))) - } - ScalarValue::Date32(v) => v.map(Datum::Date), - ScalarValue::Time64Nanosecond(v) => v.map(Datum::Time), - ScalarValue::Dictionary(_, literal) => Datum::from_scalar_value(literal), - ScalarValue::List(_, _) - | ScalarValue::Date64(_) - | ScalarValue::Time32Second(_) - | ScalarValue::Time32Millisecond(_) - | ScalarValue::Time64Microsecond(_) - | ScalarValue::TimestampSecond(_, _) - | ScalarValue::TimestampMicrosecond(_, _) - | ScalarValue::TimestampNanosecond(_, _) - | ScalarValue::IntervalYearMonth(_) - | ScalarValue::IntervalDayTime(_) - | ScalarValue::Struct(_, _) - | ScalarValue::Decimal128(_, _, _) - | ScalarValue::Null - | ScalarValue::IntervalMonthDayNano(_) => None, + pub fn from_scalar_value(val: &ScalarValue) -> Option { + match val { + ScalarValue::Boolean(v) => v.map(Datum::Boolean), + ScalarValue::Float32(v) => v.map(Datum::Float), + ScalarValue::Float64(v) => v.map(Datum::Double), + ScalarValue::Int8(v) => v.map(Datum::Int8), + ScalarValue::Int16(v) => v.map(Datum::Int16), + ScalarValue::Int32(v) => v.map(Datum::Int32), + ScalarValue::Int64(v) => v.map(Datum::Int64), + ScalarValue::UInt8(v) => v.map(Datum::UInt8), + ScalarValue::UInt16(v) => v.map(Datum::UInt16), + ScalarValue::UInt32(v) => v.map(Datum::UInt32), + ScalarValue::UInt64(v) => v.map(Datum::UInt64), + ScalarValue::Utf8(v) | ScalarValue::LargeUtf8(v) => v + .as_ref() + .map(|v| Datum::String(StringBytes::copy_from_str(v.as_str()))), + ScalarValue::Binary(v) + | ScalarValue::FixedSizeBinary(_, v) + | ScalarValue::LargeBinary(v) => v + .as_ref() + .map(|v| Datum::Varbinary(Bytes::copy_from_slice(v.as_slice()))), + ScalarValue::TimestampMillisecond(v, _) => { + v.map(|v| Datum::Timestamp(Timestamp::new(v))) } + ScalarValue::Date32(v) => v.map(Datum::Date), + ScalarValue::Time64Nanosecond(v) => v.map(Datum::Time), + ScalarValue::Dictionary(_, literal) => Datum::from_scalar_value(literal), + ScalarValue::List(_, _) + | ScalarValue::Date64(_) + | ScalarValue::Time32Second(_) + | ScalarValue::Time32Millisecond(_) + | ScalarValue::Time64Microsecond(_) + | ScalarValue::TimestampSecond(_, _) + | ScalarValue::TimestampMicrosecond(_, _) + | ScalarValue::TimestampNanosecond(_, _) + | ScalarValue::IntervalYearMonth(_) + | ScalarValue::IntervalDayTime(_) + | ScalarValue::Struct(_, _) + | ScalarValue::Decimal128(_, _, _) + | ScalarValue::Null + | ScalarValue::IntervalMonthDayNano(_) => None, } } +} - impl<'a> DatumView<'a> { - pub fn from_scalar_value(val: &'a ScalarValue) -> Option { - match val { - ScalarValue::Boolean(v) => v.map(DatumView::Boolean), - ScalarValue::Float32(v) => v.map(DatumView::Float), - ScalarValue::Float64(v) => v.map(DatumView::Double), - ScalarValue::Int8(v) => v.map(DatumView::Int8), - ScalarValue::Int16(v) => v.map(DatumView::Int16), - ScalarValue::Int32(v) => v.map(DatumView::Int32), - ScalarValue::Int64(v) => v.map(DatumView::Int64), - ScalarValue::UInt8(v) => v.map(DatumView::UInt8), - ScalarValue::UInt16(v) => v.map(DatumView::UInt16), - ScalarValue::UInt32(v) => v.map(DatumView::UInt32), - ScalarValue::UInt64(v) => v.map(DatumView::UInt64), - ScalarValue::Date32(v) => v.map(DatumView::Date), - ScalarValue::Time64Nanosecond(v) => v.map(DatumView::Time), - ScalarValue::Utf8(v) | ScalarValue::LargeUtf8(v) => { - v.as_ref().map(|v| DatumView::String(v.as_str())) - } - ScalarValue::Binary(v) - | ScalarValue::FixedSizeBinary(_, v) - | ScalarValue::LargeBinary(v) => { - v.as_ref().map(|v| DatumView::Varbinary(v.as_slice())) - } - ScalarValue::TimestampMillisecond(v, _) => { - v.map(|v| DatumView::Timestamp(Timestamp::new(v))) - } - ScalarValue::Dictionary(_, literal) => DatumView::from_scalar_value(literal), - ScalarValue::List(_, _) - | ScalarValue::Date64(_) - | ScalarValue::Time32Second(_) - | ScalarValue::Time32Millisecond(_) - | ScalarValue::Time64Microsecond(_) - | ScalarValue::TimestampSecond(_, _) - | ScalarValue::TimestampMicrosecond(_, _) - | ScalarValue::TimestampNanosecond(_, _) - | ScalarValue::IntervalYearMonth(_) - | ScalarValue::IntervalDayTime(_) - | ScalarValue::Struct(_, _) - | ScalarValue::Decimal128(_, _, _) - | ScalarValue::Null - | ScalarValue::IntervalMonthDayNano(_) => None, +impl<'a> DatumView<'a> { + pub fn from_scalar_value(val: &'a ScalarValue) -> Option { + match val { + ScalarValue::Boolean(v) => v.map(DatumView::Boolean), + ScalarValue::Float32(v) => v.map(DatumView::Float), + ScalarValue::Float64(v) => v.map(DatumView::Double), + ScalarValue::Int8(v) => v.map(DatumView::Int8), + ScalarValue::Int16(v) => v.map(DatumView::Int16), + ScalarValue::Int32(v) => v.map(DatumView::Int32), + ScalarValue::Int64(v) => v.map(DatumView::Int64), + ScalarValue::UInt8(v) => v.map(DatumView::UInt8), + ScalarValue::UInt16(v) => v.map(DatumView::UInt16), + ScalarValue::UInt32(v) => v.map(DatumView::UInt32), + ScalarValue::UInt64(v) => v.map(DatumView::UInt64), + ScalarValue::Date32(v) => v.map(DatumView::Date), + ScalarValue::Time64Nanosecond(v) => v.map(DatumView::Time), + ScalarValue::Utf8(v) | ScalarValue::LargeUtf8(v) => { + v.as_ref().map(|v| DatumView::String(v.as_str())) + } + ScalarValue::Binary(v) + | ScalarValue::FixedSizeBinary(_, v) + | ScalarValue::LargeBinary(v) => v.as_ref().map(|v| DatumView::Varbinary(v.as_slice())), + ScalarValue::TimestampMillisecond(v, _) => { + v.map(|v| DatumView::Timestamp(Timestamp::new(v))) } + ScalarValue::Dictionary(_, literal) => DatumView::from_scalar_value(literal), + ScalarValue::List(_, _) + | ScalarValue::Date64(_) + | ScalarValue::Time32Second(_) + | ScalarValue::Time32Millisecond(_) + | ScalarValue::Time64Microsecond(_) + | ScalarValue::TimestampSecond(_, _) + | ScalarValue::TimestampMicrosecond(_, _) + | ScalarValue::TimestampNanosecond(_, _) + | ScalarValue::IntervalYearMonth(_) + | ScalarValue::IntervalDayTime(_) + | ScalarValue::Struct(_, _) + | ScalarValue::Decimal128(_, _, _) + | ScalarValue::Null + | ScalarValue::IntervalMonthDayNano(_) => None, } } +} - impl From for DataType { - fn from(kind: DatumKind) -> Self { - match kind { - DatumKind::Null => DataType::Null, - DatumKind::Timestamp => DataType::Timestamp(TimeUnit::Millisecond, None), - DatumKind::Double => DataType::Float64, - DatumKind::Float => DataType::Float32, - DatumKind::Varbinary => DataType::Binary, - DatumKind::String => DataType::Utf8, - DatumKind::UInt64 => DataType::UInt64, - DatumKind::UInt32 => DataType::UInt32, - DatumKind::UInt16 => DataType::UInt16, - DatumKind::UInt8 => DataType::UInt8, - DatumKind::Int64 => DataType::Int64, - DatumKind::Int32 => DataType::Int32, - DatumKind::Int16 => DataType::Int16, - DatumKind::Int8 => DataType::Int8, - DatumKind::Boolean => DataType::Boolean, - DatumKind::Date => DataType::Date32, - DatumKind::Time => DataType::Time64(TimeUnit::Nanosecond), - } +impl From for DataType { + fn from(kind: DatumKind) -> Self { + match kind { + DatumKind::Null => DataType::Null, + DatumKind::Timestamp => DataType::Timestamp(TimeUnit::Millisecond, None), + DatumKind::Double => DataType::Float64, + DatumKind::Float => DataType::Float32, + DatumKind::Varbinary => DataType::Binary, + DatumKind::String => DataType::Utf8, + DatumKind::UInt64 => DataType::UInt64, + DatumKind::UInt32 => DataType::UInt32, + DatumKind::UInt16 => DataType::UInt16, + DatumKind::UInt8 => DataType::UInt8, + DatumKind::Int64 => DataType::Int64, + DatumKind::Int32 => DataType::Int32, + DatumKind::Int16 => DataType::Int16, + DatumKind::Int8 => DataType::Int8, + DatumKind::Boolean => DataType::Boolean, + DatumKind::Date => DataType::Date32, + DatumKind::Time => DataType::Time64(TimeUnit::Nanosecond), } } } diff --git a/common_types/src/lib.rs b/common_types/src/lib.rs index 7ff92bcf3f..38e445e763 100644 --- a/common_types/src/lib.rs +++ b/common_types/src/lib.rs @@ -4,21 +4,15 @@ pub mod bitset; pub mod bytes; -#[cfg(feature = "arrow")] pub mod column; -#[cfg(feature = "arrow")] pub mod column_schema; pub mod datum; pub mod hash; pub mod hex; -#[cfg(feature = "arrow")] pub mod projected_schema; -#[cfg(feature = "arrow")] pub mod record_batch; pub mod request_id; -#[cfg(feature = "arrow")] pub mod row; -#[cfg(feature = "arrow")] pub mod schema; pub mod string; pub mod table; diff --git a/common_types/src/time.rs b/common_types/src/time.rs index 9374834cb9..925ebd5731 100644 --- a/common_types/src/time.rs +++ b/common_types/src/time.rs @@ -1,4 +1,4 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. +// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. //! Time types @@ -10,6 +10,10 @@ use std::{ }; use ceresdbproto::time_range; +use datafusion::{ + prelude::{col, lit, Expr}, + scalar::ScalarValue, +}; use snafu::{Backtrace, OptionExt, Snafu}; /// Error of time module. @@ -298,28 +302,17 @@ impl TryFrom for TimeRange { } } -#[cfg(feature = "datafusion")] -mod datafusion_ext { - use datafusion::{ - prelude::{col, lit, Expr}, - scalar::ScalarValue, - }; - - use crate::time::TimeRange; - - impl TimeRange { - /// Creates expression like: - /// start <= time && time < end - pub fn to_df_expr(&self, column_name: impl AsRef) -> Expr { - let ts_start = - ScalarValue::TimestampMillisecond(Some(self.inclusive_start.as_i64()), None); - let ts_end = ScalarValue::TimestampMillisecond(Some(self.exclusive_end.as_i64()), None); - let column_name = column_name.as_ref(); - let ts_low = col(column_name).gt_eq(lit(ts_start)); - let ts_high = col(column_name).lt(lit(ts_end)); - - ts_low.and(ts_high) - } +impl TimeRange { + /// Creates expression like: + /// start <= time && time < end + pub fn to_df_expr(&self, column_name: impl AsRef) -> Expr { + let ts_start = ScalarValue::TimestampMillisecond(Some(self.inclusive_start.as_i64()), None); + let ts_end = ScalarValue::TimestampMillisecond(Some(self.exclusive_end.as_i64()), None); + let column_name = column_name.as_ref(); + let ts_low = col(column_name).gt_eq(lit(ts_start)); + let ts_high = col(column_name).lt(lit(ts_end)); + + ts_low.and(ts_high) } } From 83da21224f4622f9ae91a54f52f29322f8ebbd3c Mon Sep 17 00:00:00 2001 From: "xikai.wxk" Date: Wed, 19 Jul 2023 15:38:50 +0800 Subject: [PATCH 3/5] fix: remove bytes module from common_types --- Cargo.lock | 10 ++++++-- analytic_engine/Cargo.toml | 2 +- analytic_engine/src/compaction/picker.rs | 2 +- analytic_engine/src/instance/write.rs | 2 +- analytic_engine/src/manifest/meta_edit.rs | 2 +- analytic_engine/src/memtable/key.rs | 19 ++++++-------- analytic_engine/src/memtable/mod.rs | 2 +- analytic_engine/src/memtable/skiplist/iter.rs | 2 +- analytic_engine/src/memtable/skiplist/mod.rs | 4 +-- analytic_engine/src/payload.rs | 6 ++--- analytic_engine/src/sst/header.rs | 2 +- analytic_engine/src/sst/meta_data/cache.rs | 2 +- .../src/sst/parquet/async_reader.rs | 2 +- analytic_engine/src/sst/parquet/encoding.rs | 4 +-- analytic_engine/src/sst/parquet/meta_data.rs | 2 +- analytic_engine/src/sst/parquet/writer.rs | 2 +- analytic_engine/src/sst/writer.rs | 2 +- benchmarks/Cargo.toml | 1 + benchmarks/src/util.rs | 6 ++--- cluster/Cargo.toml | 1 + cluster/src/shard_lock_manager.rs | 3 ++- common_types/src/bytes.rs | 5 ---- common_types/src/column.rs | 4 +-- common_types/src/datum.rs | 3 ++- common_types/src/lib.rs | 1 - common_types/src/schema.rs | 3 ++- common_types/src/string.rs | 5 ++-- components/codec/Cargo.toml | 2 ++ components/codec/src/compact/bytes.rs | 2 +- components/codec/src/compact/datum.rs | 10 +++----- components/codec/src/compact/float.rs | 2 +- components/codec/src/compact/mod.rs | 12 ++++----- components/codec/src/compact/number.rs | 2 +- components/codec/src/lib.rs | 2 +- components/codec/src/memcomparable/bytes.rs | 2 +- components/codec/src/memcomparable/datum.rs | 4 +-- components/codec/src/memcomparable/mod.rs | 16 ++++++------ components/codec/src/memcomparable/number.rs | 2 +- components/codec/src/row.rs | 2 +- components/codec/src/varint.rs | 8 +++--- server/Cargo.toml | 2 +- server/src/http.rs | 2 +- system_catalog/Cargo.toml | 1 + system_catalog/src/sys_catalog_table.rs | 10 ++++---- table_engine/Cargo.toml | 1 + table_engine/src/partition/mod.rs | 2 +- .../src/partition/rule/df_adapter/mod.rs | 2 +- table_engine/src/partition/rule/key.rs | 4 +-- wal/Cargo.toml | 1 + wal/src/kv_encoder.rs | 25 ++++++++----------- wal/src/log_batch.rs | 9 +++---- wal/src/message_queue_impl/encoding.rs | 8 +++--- wal/src/message_queue_impl/region_context.rs | 3 ++- .../snapshot_synchronizer.rs | 2 +- wal/src/rocks_impl/manager.rs | 5 ++-- wal/src/table_kv_impl/namespace.rs | 3 ++- wal/src/table_kv_impl/table_unit.rs | 3 ++- wal/src/tests/util.rs | 7 ++---- 58 files changed, 123 insertions(+), 132 deletions(-) delete mode 100644 common_types/src/bytes.rs diff --git a/Cargo.lock b/Cargo.lock index 76f066ab7a..3f8629ca12 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -88,7 +88,7 @@ dependencies = [ "async-stream", "async-trait", "base64 0.13.1", - "bytes", + "bytes_ext", "ceresdbproto", "codec", "common_types", @@ -621,6 +621,7 @@ dependencies = [ "arena", "arrow", "base64 0.13.1", + "bytes_ext", "clap 3.2.23", "common_types", "criterion", @@ -1263,6 +1264,7 @@ name = "cluster" version = "1.2.5-alpha" dependencies = [ "async-trait", + "bytes_ext", "catalog", "ceresdbproto", "common_types", @@ -1295,6 +1297,7 @@ dependencies = [ name = "codec" version = "1.2.5-alpha" dependencies = [ + "bytes_ext", "common_types", "macros", "snafu 0.6.10", @@ -5715,7 +5718,7 @@ dependencies = [ "arrow", "arrow_ext", "async-trait", - "bytes", + "bytes_ext", "catalog", "ceresdbproto", "clru", @@ -6267,6 +6270,7 @@ version = "1.2.5-alpha" dependencies = [ "arrow", "async-trait", + "bytes_ext", "catalog", "ceresdbproto", "codec", @@ -6289,6 +6293,7 @@ dependencies = [ "arrow", "arrow_ext", "async-trait", + "bytes_ext", "ceresdbproto", "common_types", "datafusion", @@ -7150,6 +7155,7 @@ name = "wal" version = "1.2.5-alpha" dependencies = [ "async-trait", + "bytes_ext", "ceresdbproto", "chrono", "codec", diff --git a/analytic_engine/Cargo.toml b/analytic_engine/Cargo.toml index 26a3d0a827..97e9a794b6 100644 --- a/analytic_engine/Cargo.toml +++ b/analytic_engine/Cargo.toml @@ -21,7 +21,7 @@ arrow = { workspace = true } async-stream = { workspace = true } async-trait = { workspace = true } base64 = { workspace = true } -bytes = { workspace = true } +bytes_ext = { workspace = true } ceresdbproto = { workspace = true } codec = { workspace = true } common_types = { workspace = true } diff --git a/analytic_engine/src/compaction/picker.rs b/analytic_engine/src/compaction/picker.rs index ecda034687..cf808d6e4e 100644 --- a/analytic_engine/src/compaction/picker.rs +++ b/analytic_engine/src/compaction/picker.rs @@ -684,8 +684,8 @@ impl LevelPicker for TimeWindowPicker { mod tests { use std::time::Duration; + use bytes_ext::Bytes; use common_types::{ - bytes::Bytes, tests::build_schema, time::{TimeRange, Timestamp}, }; diff --git a/analytic_engine/src/instance/write.rs b/analytic_engine/src/instance/write.rs index 2cbf0e0ea2..c062e8f071 100644 --- a/analytic_engine/src/instance/write.rs +++ b/analytic_engine/src/instance/write.rs @@ -2,10 +2,10 @@ //! Write logic of instance +use bytes_ext::ByteVec; use ceresdbproto::{schema as schema_pb, table_requests}; use codec::row; use common_types::{ - bytes::ByteVec, row::{RowGroup, RowGroupSlicer}, schema::{IndexInWriterSchema, Schema}, }; diff --git a/analytic_engine/src/manifest/meta_edit.rs b/analytic_engine/src/manifest/meta_edit.rs index 734de5b3be..5024b529c7 100644 --- a/analytic_engine/src/manifest/meta_edit.rs +++ b/analytic_engine/src/manifest/meta_edit.rs @@ -4,7 +4,7 @@ use std::convert::TryFrom; -use bytes::{Buf, BufMut}; +use bytes_ext::{Buf, BufMut}; use ceresdbproto::{manifest as manifest_pb, schema as schema_pb}; use common_types::{ schema::{Schema, Version}, diff --git a/analytic_engine/src/memtable/key.rs b/analytic_engine/src/memtable/key.rs index ee4ea58fd1..f155bbc456 100644 --- a/analytic_engine/src/memtable/key.rs +++ b/analytic_engine/src/memtable/key.rs @@ -12,14 +12,9 @@ use std::mem; -use bytes::BufMut; +use bytes_ext::{BufMut, BytesMut, SafeBuf, SafeBufMut}; use codec::{memcomparable::MemComparable, Decoder, Encoder}; -use common_types::{ - bytes::{BytesMut, SafeBuf, SafeBufMut}, - row::Row, - schema::Schema, - SequenceNumber, -}; +use common_types::{row::Row, schema::Schema, SequenceNumber}; use macros::define_result; use snafu::{ensure, Backtrace, ResultExt, Snafu}; @@ -29,19 +24,19 @@ pub enum Error { EncodeKeyDatum { source: codec::memcomparable::Error }, #[snafu(display("Failed to encode sequence, err:{}", source))] - EncodeSequence { source: common_types::bytes::Error }, + EncodeSequence { source: bytes_ext::Error }, #[snafu(display("Failed to encode row index, err:{}", source))] - EncodeIndex { source: common_types::bytes::Error }, + EncodeIndex { source: bytes_ext::Error }, #[snafu(display("Failed to decode sequence, err:{}", source))] - DecodeSequence { source: common_types::bytes::Error }, + DecodeSequence { source: bytes_ext::Error }, #[snafu(display("Failed to decode row index, err:{}", source))] - DecodeIndex { source: common_types::bytes::Error }, + DecodeIndex { source: bytes_ext::Error }, #[snafu(display( - "Insufficent internal key length, len:{}.\nBacktrace:\n{}", + "Insufficient internal key length, len:{}.\nBacktrace:\n{}", len, backtrace ))] diff --git a/analytic_engine/src/memtable/mod.rs b/analytic_engine/src/memtable/mod.rs index ed3173e704..b005501adf 100644 --- a/analytic_engine/src/memtable/mod.rs +++ b/analytic_engine/src/memtable/mod.rs @@ -8,8 +8,8 @@ pub mod skiplist; use std::{ops::Bound, sync::Arc, time::Instant}; +use bytes_ext::{ByteVec, Bytes}; use common_types::{ - bytes::{ByteVec, Bytes}, projected_schema::ProjectedSchema, record_batch::RecordBatchWithKey, row::Row, diff --git a/analytic_engine/src/memtable/skiplist/iter.rs b/analytic_engine/src/memtable/skiplist/iter.rs index 6bdb9b6419..1fbca5eb15 100644 --- a/analytic_engine/src/memtable/skiplist/iter.rs +++ b/analytic_engine/src/memtable/skiplist/iter.rs @@ -5,9 +5,9 @@ use std::{cmp::Ordering, iter::Rev, ops::Bound, time::Instant}; use arena::{Arena, BasicStats}; +use bytes_ext::{Bytes, BytesMut}; use codec::row; use common_types::{ - bytes::{Bytes, BytesMut}, projected_schema::{ProjectedSchema, RowProjector}, record_batch::{RecordBatchWithKey, RecordBatchWithKeyBuilder}, row::contiguous::{ContiguousRowReader, ProjectedContiguousRow}, diff --git a/analytic_engine/src/memtable/skiplist/mod.rs b/analytic_engine/src/memtable/skiplist/mod.rs index d940bf42ea..bc9bcaae1b 100644 --- a/analytic_engine/src/memtable/skiplist/mod.rs +++ b/analytic_engine/src/memtable/skiplist/mod.rs @@ -12,9 +12,9 @@ use std::{ }; use arena::{Arena, BasicStats}; +use bytes_ext::Bytes; use codec::Encoder; use common_types::{ - bytes::Bytes, row::{contiguous::ContiguousRowWriter, Row}, schema::Schema, SequenceNumber, @@ -205,9 +205,9 @@ mod tests { use std::{ops::Bound, sync::Arc}; use arena::NoopCollector; + use bytes_ext::ByteVec; use codec::memcomparable::MemComparable; use common_types::{ - bytes::ByteVec, datum::Datum, projected_schema::ProjectedSchema, record_batch::RecordBatchWithKey, diff --git a/analytic_engine/src/payload.rs b/analytic_engine/src/payload.rs index e773e15545..eece0304b2 100644 --- a/analytic_engine/src/payload.rs +++ b/analytic_engine/src/payload.rs @@ -2,10 +2,10 @@ //! Payloads to write to wal +use bytes_ext::{Buf, BufMut, SafeBuf, SafeBufMut}; use ceresdbproto::{manifest as manifest_pb, table_requests}; use codec::{row::WalRowDecoder, Decoder}; use common_types::{ - bytes::{Buf, BufMut, SafeBuf, SafeBufMut}, row::{RowGroup, RowGroupBuilder}, schema::Schema, }; @@ -19,7 +19,7 @@ use crate::{table_options, TableOptions}; #[derive(Debug, Snafu)] pub enum Error { #[snafu(display("Failed to encode header, err:{}", source))] - EncodeHeader { source: common_types::bytes::Error }, + EncodeHeader { source: bytes_ext::Error }, #[snafu(display("Failed to encode body, err:{}.\nBacktrace:\n{}", source, backtrace))] EncodeBody { @@ -28,7 +28,7 @@ pub enum Error { }, #[snafu(display("Failed to decode header, err:{}", source))] - DecodeHeader { source: common_types::bytes::Error }, + DecodeHeader { source: bytes_ext::Error }, #[snafu(display( "Invalid wal entry header, value:{}.\nBacktrace:\n{}", diff --git a/analytic_engine/src/sst/header.rs b/analytic_engine/src/sst/header.rs index 1bc1aea3d5..84916bc04d 100644 --- a/analytic_engine/src/sst/header.rs +++ b/analytic_engine/src/sst/header.rs @@ -2,7 +2,7 @@ // The header parser for one sst. -use bytes::Bytes; +use bytes_ext::Bytes; use macros::define_result; use object_store::{ObjectStoreRef, Path}; use parquet::data_type::AsBytes; diff --git a/analytic_engine/src/sst/meta_data/cache.rs b/analytic_engine/src/sst/meta_data/cache.rs index 5e2bacdcbd..7083ca141b 100644 --- a/analytic_engine/src/sst/meta_data/cache.rs +++ b/analytic_engine/src/sst/meta_data/cache.rs @@ -135,7 +135,7 @@ mod tests { datatypes::{DataType, Field, Schema}, record_batch::RecordBatch, }; - use bytes::Bytes; + use bytes_ext::Bytes; use common_types::{ column_schema::Builder as ColumnSchemaBuilder, schema::Builder as CustomSchemaBuilder, diff --git a/analytic_engine/src/sst/parquet/async_reader.rs b/analytic_engine/src/sst/parquet/async_reader.rs index 787f6b83a4..49400a0d85 100644 --- a/analytic_engine/src/sst/parquet/async_reader.rs +++ b/analytic_engine/src/sst/parquet/async_reader.rs @@ -12,7 +12,7 @@ use std::{ use arrow::{datatypes::SchemaRef, record_batch::RecordBatch as ArrowRecordBatch}; use async_trait::async_trait; -use bytes::Bytes; +use bytes_ext::Bytes; use common_types::{ projected_schema::{ProjectedSchema, RowProjector}, record_batch::{ArrowRecordBatchProjector, RecordBatchWithKey}, diff --git a/analytic_engine/src/sst/parquet/encoding.rs b/analytic_engine/src/sst/parquet/encoding.rs index bfc1087f4d..9836994359 100644 --- a/analytic_engine/src/sst/parquet/encoding.rs +++ b/analytic_engine/src/sst/parquet/encoding.rs @@ -10,9 +10,9 @@ use arrow::{ util::bit_util, }; use async_trait::async_trait; +use bytes_ext::{BytesMut, SafeBufMut}; use ceresdbproto::sst as sst_pb; use common_types::{ - bytes::{BytesMut, SafeBufMut}, datum::DatumKind, schema::{ArrowSchema, ArrowSchemaRef, DataType, Field, Schema}, }; @@ -776,8 +776,8 @@ mod tests { use std::{pin::Pin, sync::Mutex, task::Poll}; use arrow::array::{Int32Array, StringArray, TimestampMillisecondArray, UInt64Array}; + use bytes_ext::Bytes; use common_types::{ - bytes::Bytes, column_schema, schema::{Builder, Schema, TSID_COLUMN}, time::{TimeRange, Timestamp}, diff --git a/analytic_engine/src/sst/parquet/meta_data.rs b/analytic_engine/src/sst/parquet/meta_data.rs index c1bf746a8d..621207d788 100644 --- a/analytic_engine/src/sst/parquet/meta_data.rs +++ b/analytic_engine/src/sst/parquet/meta_data.rs @@ -4,7 +4,7 @@ use std::{fmt, ops::Index, sync::Arc}; -use bytes::Bytes; +use bytes_ext::Bytes; use ceresdbproto::{schema as schema_pb, sst as sst_pb}; use common_types::{ datum::DatumKind, diff --git a/analytic_engine/src/sst/parquet/writer.rs b/analytic_engine/src/sst/parquet/writer.rs index d4f0ff6ac7..d091e243b9 100644 --- a/analytic_engine/src/sst/parquet/writer.rs +++ b/analytic_engine/src/sst/parquet/writer.rs @@ -330,8 +330,8 @@ mod tests { use std::{sync::Arc, task::Poll}; + use bytes_ext::Bytes; use common_types::{ - bytes::Bytes, projected_schema::ProjectedSchema, tests::{build_row, build_row_for_dictionary, build_schema, build_schema_with_dictionary}, time::{TimeRange, Timestamp}, diff --git a/analytic_engine/src/sst/writer.rs b/analytic_engine/src/sst/writer.rs index f4483c07ef..4d67fb5d21 100644 --- a/analytic_engine/src/sst/writer.rs +++ b/analytic_engine/src/sst/writer.rs @@ -5,7 +5,7 @@ use std::cmp; use async_trait::async_trait; -use bytes::Bytes; +use bytes_ext::Bytes; use common_types::{ record_batch::RecordBatchWithKey, request_id::RequestId, schema::Schema, time::TimeRange, SequenceNumber, diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index 67ee82381c..e07fd23f59 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -15,6 +15,7 @@ analytic_engine = { workspace = true } arena = { workspace = true } arrow = { workspace = true } base64 = { workspace = true } +bytes_ext = { workspace = true } clap = { workspace = true } common_types = { workspace = true } env_logger = { workspace = true } diff --git a/benchmarks/src/util.rs b/benchmarks/src/util.rs index e8509537a7..d16816eac3 100644 --- a/benchmarks/src/util.rs +++ b/benchmarks/src/util.rs @@ -21,8 +21,8 @@ use analytic_engine::{ table::sst_util, table_options::StorageFormat, }; +use bytes_ext::{BufMut, SafeBufMut}; use common_types::{ - bytes::{BufMut, SafeBufMut}, projected_schema::ProjectedSchema, schema::{IndexInWriterSchema, Schema}, }; @@ -37,10 +37,10 @@ use wal::log_batch::Payload; #[derive(Debug, Snafu)] pub enum Error { #[snafu(display("Failed to writer header, err:{}.", source))] - WriteHeader { source: common_types::bytes::Error }, + WriteHeader { source: bytes_ext::Error }, #[snafu(display("Failed to writer body, err:{}.", source))] - WriteBody { source: common_types::bytes::Error }, + WriteBody { source: bytes_ext::Error }, } define_result!(Error); diff --git a/cluster/Cargo.toml b/cluster/Cargo.toml index b317ea7a35..1b33748b55 100644 --- a/cluster/Cargo.toml +++ b/cluster/Cargo.toml @@ -12,6 +12,7 @@ workspace = true [dependencies] async-trait = { workspace = true } +bytes_ext = { workspace = true } catalog = { workspace = true } ceresdbproto = { workspace = true } common_types = { workspace = true } diff --git a/cluster/src/shard_lock_manager.rs b/cluster/src/shard_lock_manager.rs index 809b49b3dd..8c9a0d8ae9 100644 --- a/cluster/src/shard_lock_manager.rs +++ b/cluster/src/shard_lock_manager.rs @@ -7,8 +7,9 @@ use std::{ time::{Duration, Instant}, }; +use bytes_ext::Bytes; use ceresdbproto::meta_event::ShardLockValue; -use common_types::{bytes::Bytes, table::ShardId}; +use common_types::table::ShardId; use etcd_client::{ Client, Compare, CompareOp, LeaseKeepAliveStream, LeaseKeeper, PutOptions, Txn, TxnOp, }; diff --git a/common_types/src/bytes.rs b/common_types/src/bytes.rs deleted file mode 100644 index 9b98bb16cf..0000000000 --- a/common_types/src/bytes.rs +++ /dev/null @@ -1,5 +0,0 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. - -//! Bytes type. - -pub use bytes_ext::*; diff --git a/common_types/src/column.rs b/common_types/src/column.rs index d1cf85fefc..9c6900502a 100644 --- a/common_types/src/column.rs +++ b/common_types/src/column.rs @@ -1,4 +1,4 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. +// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. //! Column use std::sync::Arc; @@ -18,6 +18,7 @@ use arrow::{ datatypes::{DataType, Int32Type, TimeUnit}, error::ArrowError, }; +use bytes_ext::Bytes; use datafusion::physical_plan::{ expressions::{cast_column, DEFAULT_DATAFUSION_CAST_OPTIONS}, ColumnarValue, @@ -26,7 +27,6 @@ use paste::paste; use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; use crate::{ - bytes::Bytes, datum::{Datum, DatumKind, DatumView}, string::StringBytes, time::Timestamp, diff --git a/common_types/src/datum.rs b/common_types/src/datum.rs index ce0fc65472..826adc2f18 100644 --- a/common_types/src/datum.rs +++ b/common_types/src/datum.rs @@ -8,6 +8,7 @@ use arrow::{ datatypes::{DataType, TimeUnit}, temporal_conversions::{EPOCH_DAYS_FROM_CE, NANOSECONDS}, }; +use bytes_ext::Bytes; use ceresdbproto::schema::DataType as DataTypePb; use chrono::{Datelike, Local, NaiveDate, NaiveTime, TimeZone, Timelike}; use datafusion::scalar::ScalarValue; @@ -15,7 +16,7 @@ use serde::ser::{Serialize, Serializer}; use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; use sqlparser::ast::{DataType as SqlDataType, Value}; -use crate::{bytes::Bytes, hash::hash64, hex, string::StringBytes, time::Timestamp}; +use crate::{hash::hash64, hex, string::StringBytes, time::Timestamp}; const DATE_FORMAT: &str = "%Y-%m-%d"; const TIME_FORMAT: &str = "%H:%M:%S%.3f"; diff --git a/common_types/src/lib.rs b/common_types/src/lib.rs index 38e445e763..73232f4ebf 100644 --- a/common_types/src/lib.rs +++ b/common_types/src/lib.rs @@ -3,7 +3,6 @@ //! Contains common types pub mod bitset; -pub mod bytes; pub mod column; pub mod column_schema; pub mod datum; diff --git a/common_types/src/schema.rs b/common_types/src/schema.rs index d94f10e52a..2cc07dbacb 100644 --- a/common_types/src/schema.rs +++ b/common_types/src/schema.rs @@ -1292,9 +1292,10 @@ impl SchemaEncoder { #[cfg(test)] mod tests { + use bytes_ext::Bytes; + use super::*; use crate::{ - bytes::Bytes, datum::Datum, row::{Row, RowWithMeta}, time::Timestamp, diff --git a/common_types/src/string.rs b/common_types/src/string.rs index 406d94b5e1..17320dcbce 100644 --- a/common_types/src/string.rs +++ b/common_types/src/string.rs @@ -1,13 +1,12 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. +// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. //! Bytes that can safely cast to str/string. use std::{convert::TryFrom, fmt, ops, str}; +use bytes_ext::Bytes; use snafu::{Backtrace, ResultExt, Snafu}; -use crate::bytes::Bytes; - #[derive(Debug, Snafu)] pub enum Error { #[snafu(display("Bytes are not valid utf8, err:{}.\nBacktrace:\n{}", source, backtrace))] diff --git a/components/codec/Cargo.toml b/components/codec/Cargo.toml index 36b426b321..f5a2c29402 100644 --- a/components/codec/Cargo.toml +++ b/components/codec/Cargo.toml @@ -12,6 +12,8 @@ workspace = true [dependencies] # In alphabetical order +bytes_ext = { workspace = true } common_types = { workspace = true, features = ["test"] } macros = { workspace = true } snafu = { workspace = true } + diff --git a/components/codec/src/compact/bytes.rs b/components/codec/src/compact/bytes.rs index 09e68ba4f9..7e66bc62d5 100644 --- a/components/codec/src/compact/bytes.rs +++ b/components/codec/src/compact/bytes.rs @@ -4,7 +4,7 @@ use std::convert::TryFrom; -use common_types::bytes::{Buf, BufMut, Bytes, BytesMut, SafeBuf, SafeBufMut}; +use bytes_ext::{Buf, BufMut, Bytes, BytesMut, SafeBuf, SafeBufMut}; use snafu::{ensure, ResultExt}; use crate::{ diff --git a/components/codec/src/compact/datum.rs b/components/codec/src/compact/datum.rs index bde3f4c722..ce6f3b9443 100644 --- a/components/codec/src/compact/datum.rs +++ b/components/codec/src/compact/datum.rs @@ -2,12 +2,8 @@ //! Datum compact codec -use common_types::{ - bytes::{Buf, BufMut, BytesMut, SafeBufMut}, - datum::Datum, - string::StringBytes, - time::Timestamp, -}; +use bytes_ext::{Buf, BufMut, BytesMut, SafeBufMut}; +use common_types::{datum::Datum, string::StringBytes, time::Timestamp}; use snafu::ResultExt; use crate::{ @@ -220,7 +216,7 @@ impl DecodeTo for MemCompactDecoder { #[cfg(test)] mod tests { - use common_types::bytes::Bytes; + use bytes_ext::Bytes; use super::*; diff --git a/components/codec/src/compact/float.rs b/components/codec/src/compact/float.rs index 8e3652bc89..e9b6674ded 100644 --- a/components/codec/src/compact/float.rs +++ b/components/codec/src/compact/float.rs @@ -2,7 +2,7 @@ use std::mem; -use common_types::bytes::{SafeBuf, SafeBufMut}; +use bytes_ext::{SafeBuf, SafeBufMut}; use snafu::ResultExt; use crate::{ diff --git a/components/codec/src/compact/mod.rs b/components/codec/src/compact/mod.rs index 28f97c3aef..625adccefd 100644 --- a/components/codec/src/compact/mod.rs +++ b/components/codec/src/compact/mod.rs @@ -9,7 +9,7 @@ mod datum; mod float; mod number; -use common_types::bytes::SafeBuf; +use bytes_ext::SafeBuf; use macros::define_result; use snafu::{ensure, Backtrace, ResultExt, Snafu}; @@ -18,10 +18,10 @@ use crate::consts; #[derive(Debug, Snafu)] pub enum Error { #[snafu(display("Failed to encode flag, err:{}", source))] - EncodeKey { source: common_types::bytes::Error }, + EncodeKey { source: bytes_ext::Error }, #[snafu(display("Failed to encode value, err:{}", source))] - EncodeValue { source: common_types::bytes::Error }, + EncodeValue { source: bytes_ext::Error }, #[snafu(display("Failed to encode varint, err:{}", source))] EncodeVarint { source: crate::varint::Error }, @@ -30,7 +30,7 @@ pub enum Error { DecodeVarint { source: crate::varint::Error }, #[snafu(display("Failed to decode key, err:{}", source))] - DecodeKey { source: common_types::bytes::Error }, + DecodeKey { source: bytes_ext::Error }, #[snafu(display("Insufficient bytes to decode value.\nBacktrace:\n{}", backtrace))] DecodeEmptyValue { backtrace: Backtrace }, @@ -48,10 +48,10 @@ pub enum Error { }, #[snafu(display("Insufficient bytes to decode value, err:{}", source))] - DecodeValue { source: common_types::bytes::Error }, + DecodeValue { source: bytes_ext::Error }, #[snafu(display("Failed to skip decoded value, err:{}", source))] - SkipDecodedValue { source: common_types::bytes::Error }, + SkipDecodedValue { source: bytes_ext::Error }, #[snafu(display("Try into usize error:{}.\nBacktrace:\n{}", source, backtrace))] TryIntoUsize { diff --git a/components/codec/src/compact/number.rs b/components/codec/src/compact/number.rs index 929d792fba..884377db7f 100644 --- a/components/codec/src/compact/number.rs +++ b/components/codec/src/compact/number.rs @@ -2,7 +2,7 @@ //! Number format -use common_types::bytes::{Buf, SafeBufMut}; +use bytes_ext::{Buf, SafeBufMut}; use snafu::ResultExt; use crate::{ diff --git a/components/codec/src/lib.rs b/components/codec/src/lib.rs index 5281cbbc49..7f30215d50 100644 --- a/components/codec/src/lib.rs +++ b/components/codec/src/lib.rs @@ -11,7 +11,7 @@ pub mod memcomparable; pub mod row; mod varint; -use common_types::bytes::{Buf, BufMut}; +use bytes_ext::{Buf, BufMut}; // encoder/decoder /// Data encode abstraction diff --git a/components/codec/src/memcomparable/bytes.rs b/components/codec/src/memcomparable/bytes.rs index d60139c8bd..be99855e1e 100644 --- a/components/codec/src/memcomparable/bytes.rs +++ b/components/codec/src/memcomparable/bytes.rs @@ -2,7 +2,7 @@ //! Bytes format -use common_types::bytes::{Buf, BufMut, Bytes, BytesMut, SafeBuf, SafeBufMut}; +use bytes_ext::{Buf, BufMut, Bytes, BytesMut, SafeBuf, SafeBufMut}; use snafu::{ensure, ResultExt}; use crate::{ diff --git a/components/codec/src/memcomparable/datum.rs b/components/codec/src/memcomparable/datum.rs index 48a0210797..c13e7c938d 100644 --- a/components/codec/src/memcomparable/datum.rs +++ b/components/codec/src/memcomparable/datum.rs @@ -4,8 +4,8 @@ use std::i64; +use bytes_ext::{Buf, BufMut, BytesMut, SafeBufMut}; use common_types::{ - bytes::{Buf, BufMut, BytesMut, SafeBufMut}, datum::{Datum, DatumKind}, string::StringBytes, time::Timestamp, @@ -219,7 +219,7 @@ impl DecodeTo for MemComparable { mod tests { use core::cmp::Ordering; - use common_types::bytes::Bytes; + use bytes_ext::Bytes; use super::*; diff --git a/components/codec/src/memcomparable/mod.rs b/components/codec/src/memcomparable/mod.rs index 01d4cc49ae..3fb53a2be0 100644 --- a/components/codec/src/memcomparable/mod.rs +++ b/components/codec/src/memcomparable/mod.rs @@ -9,23 +9,21 @@ mod bytes; mod datum; mod number; -use common_types::{ - bytes::{BytesMut, SafeBuf}, - datum::DatumKind, -}; +use bytes_ext::{BytesMut, SafeBuf}; +use common_types::datum::DatumKind; use macros::define_result; use snafu::{ensure, Backtrace, ResultExt, Snafu}; #[derive(Debug, Snafu)] pub enum Error { #[snafu(display("Failed to encode flag, err:{}", source))] - EncodeKey { source: common_types::bytes::Error }, + EncodeKey { source: bytes_ext::Error }, #[snafu(display("Failed to encode value, err:{}", source))] - EncodeValue { source: common_types::bytes::Error }, + EncodeValue { source: bytes_ext::Error }, #[snafu(display("Failed to decode key, err:{}", source))] - DecodeKey { source: common_types::bytes::Error }, + DecodeKey { source: bytes_ext::Error }, #[snafu(display( "Invalid flag, expect:{}, actual:{}.\nBacktrace:\n{}", @@ -50,7 +48,7 @@ pub enum Error { }, #[snafu(display("Insufficient bytes to decode value, err:{}", source))] - DecodeValue { source: common_types::bytes::Error }, + DecodeValue { source: bytes_ext::Error }, #[snafu(display("Insufficient bytes to decode value group.\nBacktrace:\n{}", backtrace))] DecodeValueGroup { backtrace: Backtrace }, @@ -76,7 +74,7 @@ pub enum Error { }, #[snafu(display("Failed to skip padding bytes, err:{}.", source))] - SkipPadding { source: common_types::bytes::Error }, + SkipPadding { source: bytes_ext::Error }, #[snafu(display("Failed to decode string, err:{}", source))] DecodeString { source: common_types::string::Error }, diff --git a/components/codec/src/memcomparable/number.rs b/components/codec/src/memcomparable/number.rs index 0ce412a432..bd04d4e807 100644 --- a/components/codec/src/memcomparable/number.rs +++ b/components/codec/src/memcomparable/number.rs @@ -2,7 +2,7 @@ //! Number format -use common_types::bytes::{SafeBuf, SafeBufMut}; +use bytes_ext::{SafeBuf, SafeBufMut}; use snafu::ResultExt; use crate::{ diff --git a/components/codec/src/row.rs b/components/codec/src/row.rs index 873f4a779a..1ab26087fc 100644 --- a/components/codec/src/row.rs +++ b/components/codec/src/row.rs @@ -7,8 +7,8 @@ use std::convert::TryFrom; +use bytes_ext::{Buf, BufMut, ByteVec, BytesMut}; use common_types::{ - bytes::{Buf, BufMut, ByteVec, BytesMut}, datum::Datum, row::{Row, RowGroup}, schema::{IndexInWriterSchema, Schema}, diff --git a/components/codec/src/varint.rs b/components/codec/src/varint.rs index df7cab672a..de0d5705c8 100644 --- a/components/codec/src/varint.rs +++ b/components/codec/src/varint.rs @@ -1,7 +1,7 @@ // Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. //! Varint for codec whose test is covered by compact/number.rs -use common_types::bytes::{Buf, SafeBuf, SafeBufMut}; +use bytes_ext::{Buf, SafeBuf, SafeBufMut}; use macros::define_result; use snafu::{Backtrace, ResultExt, Snafu}; @@ -9,13 +9,13 @@ use snafu::{Backtrace, ResultExt, Snafu}; #[snafu(visibility(pub(crate)))] pub enum Error { #[snafu(display("Failed to encode varint, err:{}", source))] - EncodeVarint { source: common_types::bytes::Error }, + EncodeVarint { source: bytes_ext::Error }, #[snafu(display("Insufficient bytes to decode value.\nBacktrace:\n{}", backtrace))] DecodeEmptyValue { backtrace: Backtrace }, #[snafu(display("Insufficient bytes to decode value, err:{}", source))] - DecodeValue { source: common_types::bytes::Error }, + DecodeValue { source: bytes_ext::Error }, #[snafu(display("Value larger than 64 bits (overflow).\nBacktrace:\n{}", backtrace))] UvarintOverflow { backtrace: Backtrace }, @@ -143,7 +143,7 @@ pub fn decode_uvarint(buf: &mut B) -> Result { #[cfg(test)] mod tests { - use common_types::bytes::BytesMut; + use bytes_ext::BytesMut; use super::*; diff --git a/server/Cargo.toml b/server/Cargo.toml index cc9939384b..369877dd08 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -15,7 +15,7 @@ analytic_engine = { workspace = true } arrow = { workspace = true } arrow_ext = { workspace = true } async-trait = { workspace = true } -bytes = { workspace = true } +bytes_ext = { workspace = true } catalog = { workspace = true } ceresdbproto = { workspace = true } clru = { workspace = true } diff --git a/server/src/http.rs b/server/src/http.rs index 9c9b88e5f5..76227cc692 100644 --- a/server/src/http.rs +++ b/server/src/http.rs @@ -8,8 +8,8 @@ use std::{ }; use analytic_engine::setup::OpenedWals; +use bytes_ext::Bytes; use cluster::ClusterRef; -use common_types::bytes::Bytes; use generic_error::{BoxError, GenericError}; use log::{error, info}; use logger::RuntimeLevel; diff --git a/system_catalog/Cargo.toml b/system_catalog/Cargo.toml index 37807fb4e4..0b02bcbc5e 100644 --- a/system_catalog/Cargo.toml +++ b/system_catalog/Cargo.toml @@ -14,6 +14,7 @@ workspace = true # In alphabetical order arrow = { workspace = true } async-trait = { workspace = true } +bytes_ext = { workspace = true } catalog = { workspace = true } ceresdbproto = { workspace = true } codec = { workspace = true } diff --git a/system_catalog/src/sys_catalog_table.rs b/system_catalog/src/sys_catalog_table.rs index 8c3703a835..b8179e0ff3 100644 --- a/system_catalog/src/sys_catalog_table.rs +++ b/system_catalog/src/sys_catalog_table.rs @@ -5,11 +5,11 @@ use std::{collections::HashMap, mem}; use async_trait::async_trait; +use bytes_ext::{BufMut, Bytes, BytesMut, SafeBuf, SafeBufMut}; use catalog::consts; use ceresdbproto::sys_catalog::{CatalogEntry, SchemaEntry, TableEntry}; use codec::{memcomparable::MemComparable, Encoder}; use common_types::{ - bytes::{BufMut, Bytes, BytesMut, SafeBuf, SafeBufMut}, column_schema, datum::{Datum, DatumKind}, projected_schema::ProjectedSchema, @@ -107,19 +107,19 @@ pub enum Error { VisitorOpenTable { source: table_engine::engine::Error }, #[snafu(display("Failed to encode entry key header, err:{}", source))] - EncodeKeyHeader { source: common_types::bytes::Error }, + EncodeKeyHeader { source: bytes_ext::Error }, #[snafu(display("Failed to encode entry body, err:{}", source))] EncodeKeyBody { source: codec::memcomparable::Error }, #[snafu(display("Failed to encode table key type, err:{}", source))] - EncodeTableKeyType { source: common_types::bytes::Error }, + EncodeTableKeyType { source: bytes_ext::Error }, #[snafu(display("Failed to read entry key header, err:{}", source))] - ReadKeyHeader { source: common_types::bytes::Error }, + ReadKeyHeader { source: bytes_ext::Error }, #[snafu(display("Failed to read table key header, err:{}", source))] - ReadTableKeyHeader { source: common_types::bytes::Error }, + ReadTableKeyHeader { source: bytes_ext::Error }, #[snafu(display( "Invalid entry key header, value:{}.\nBacktrace:\n{}", diff --git a/table_engine/Cargo.toml b/table_engine/Cargo.toml index 078d3ebbc5..06f2226ed4 100644 --- a/table_engine/Cargo.toml +++ b/table_engine/Cargo.toml @@ -15,6 +15,7 @@ workspace = true arrow = { workspace = true } arrow_ext = { workspace = true } async-trait = { workspace = true } +bytes_ext = { workspace = true } ceresdbproto = { workspace = true } common_types = { workspace = true } datafusion = { workspace = true } diff --git a/table_engine/src/partition/mod.rs b/table_engine/src/partition/mod.rs index 83bb91acc1..a67a04fa08 100644 --- a/table_engine/src/partition/mod.rs +++ b/table_engine/src/partition/mod.rs @@ -4,8 +4,8 @@ pub mod rule; +use bytes_ext::Bytes; use ceresdbproto::cluster::partition_info::Info; -use common_types::bytes::Bytes; use macros::define_result; use snafu::{Backtrace, Snafu}; diff --git a/table_engine/src/partition/rule/df_adapter/mod.rs b/table_engine/src/partition/rule/df_adapter/mod.rs index 380b4f8878..a5e73c58c8 100644 --- a/table_engine/src/partition/rule/df_adapter/mod.rs +++ b/table_engine/src/partition/rule/df_adapter/mod.rs @@ -62,8 +62,8 @@ impl DfPartitionRuleAdapter { #[cfg(test)] mod tests { + use bytes_ext::BytesMut; use common_types::{ - bytes::BytesMut, column_schema, datum::{Datum, DatumKind}, row::RowGroupBuilder, diff --git a/table_engine/src/partition/rule/key.rs b/table_engine/src/partition/rule/key.rs index 1718f351c4..d6cf8669c1 100644 --- a/table_engine/src/partition/rule/key.rs +++ b/table_engine/src/partition/rule/key.rs @@ -1,11 +1,11 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. +// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. //! Key partition rule use std::collections::{HashMap, HashSet}; +use bytes_ext::{BufMut, BytesMut}; use common_types::{ - bytes::{BufMut, BytesMut}, datum::Datum, hash::hash64, row::{Row, RowGroup}, diff --git a/wal/Cargo.toml b/wal/Cargo.toml index 852d61271a..7c67ed608b 100644 --- a/wal/Cargo.toml +++ b/wal/Cargo.toml @@ -19,6 +19,7 @@ features = ["portable"] [dependencies] async-trait = { workspace = true } +bytes_ext = { workspace = true } ceresdbproto = { workspace = true } chrono = { workspace = true } codec = { workspace = true } diff --git a/wal/src/kv_encoder.rs b/wal/src/kv_encoder.rs index f0ea2a86c3..e882e4e9e7 100644 --- a/wal/src/kv_encoder.rs +++ b/wal/src/kv_encoder.rs @@ -2,12 +2,9 @@ //! Common Encoding for Wal logs +use bytes_ext::{self, Buf, BufMut, BytesMut, SafeBuf, SafeBufMut}; use codec::{Decoder, Encoder}; -use common_types::{ - bytes::{self, Buf, BufMut, BytesMut, SafeBuf, SafeBufMut}, - table::TableId, - SequenceNumber, -}; +use common_types::{table::TableId, SequenceNumber}; use generic_error::{BoxError, GenericError}; use macros::define_result; use snafu::{ensure, Backtrace, ResultExt, Snafu}; @@ -33,39 +30,39 @@ pub const NEWEST_META_VALUE_ENCODING_VERSION: u8 = META_VALUE_ENCODING_V0; pub enum Error { #[snafu(display("Failed to encode log key, err:{}", source))] EncodeLogKey { - source: bytes::Error, + source: bytes_ext::Error, backtrace: Backtrace, }, #[snafu(display("Failed to encode log value header, err:{}", source))] - EncodeLogValueHeader { source: bytes::Error }, + EncodeLogValueHeader { source: bytes_ext::Error }, #[snafu(display("Failed to encode log value payload, err:{}", source))] EncodeLogValuePayload { source: GenericError }, #[snafu(display("Failed to decode log key, err:{}", source))] - DecodeLogKey { source: bytes::Error }, + DecodeLogKey { source: bytes_ext::Error }, #[snafu(display("Failed to decode log value header, err:{}", source))] - DecodeLogValueHeader { source: bytes::Error }, + DecodeLogValueHeader { source: bytes_ext::Error }, #[snafu(display("Failed to decode log value payload, err:{}", source))] DecodeLogValuePayload { source: GenericError }, #[snafu(display("Failed to encode meta key, err:{}", source))] EncodeMetaKey { - source: bytes::Error, + source: bytes_ext::Error, backtrace: Backtrace, }, #[snafu(display("Failed to encode meta value, err:{}", source))] - EncodeMetaValue { source: bytes::Error }, + EncodeMetaValue { source: bytes_ext::Error }, #[snafu(display("Failed to decode meta key, err:{}", source))] - DecodeMetaKey { source: bytes::Error }, + DecodeMetaKey { source: bytes_ext::Error }, #[snafu(display("Failed to decode meta value, err:{}", source))] - DecodeMetaValue { source: bytes::Error }, + DecodeMetaValue { source: bytes_ext::Error }, #[snafu(display( "Found invalid meta key type, expect:{:?}, given:{}.\nBacktrace:\n{}", @@ -736,7 +733,7 @@ impl CommonLogEncoding { #[cfg(test)] mod tests { - use common_types::bytes::BytesMut; + use bytes_ext::BytesMut; use super::*; use crate::{ diff --git a/wal/src/log_batch.rs b/wal/src/log_batch.rs index 881f951035..cfe2d31ec6 100644 --- a/wal/src/log_batch.rs +++ b/wal/src/log_batch.rs @@ -1,14 +1,11 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. +// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. //! Log entries definition. use std::fmt::Debug; -use common_types::{ - bytes::{Buf, BufMut}, - table::TableId, - SequenceNumber, -}; +use bytes_ext::{Buf, BufMut}; +use common_types::{table::TableId, SequenceNumber}; use crate::manager::WalLocation; diff --git a/wal/src/message_queue_impl/encoding.rs b/wal/src/message_queue_impl/encoding.rs index 1e9ec14e8e..db01ba90d1 100644 --- a/wal/src/message_queue_impl/encoding.rs +++ b/wal/src/message_queue_impl/encoding.rs @@ -2,12 +2,12 @@ //! Meta encoding of wal's message queue implementation +use bytes_ext::{Buf, BufMut, BytesMut, SafeBuf, SafeBufMut}; use ceresdbproto::wal_on_mq::{ table_meta_data::SafeDeleteOffset, RegionMetaSnapshot as RegionMetaSnapshotPb, TableMetaData as TableMetaDataPb, }; use codec::{Decoder, Encoder}; -use common_types::bytes::{self, Buf, BufMut, BytesMut, SafeBuf, SafeBufMut}; use generic_error::{BoxError, GenericError}; use macros::define_result; use prost::Message; @@ -28,7 +28,7 @@ pub enum Error { "Failed to encode meta key of message queue implementation, source:{}", source ))] - EncodeMetaKey { source: bytes::Error }, + EncodeMetaKey { source: bytes_ext::Error }, #[snafu(display( "Failed to encode meta value of message queue implementation, err:{}", @@ -40,7 +40,7 @@ pub enum Error { "Failed to decode meta key of message queue implementation, err:{}", source ))] - DecodeMetaKey { source: bytes::Error }, + DecodeMetaKey { source: bytes_ext::Error }, #[snafu(display( "Failed to decode meta value of message queue implementation, err:{}", @@ -355,7 +355,7 @@ impl From for TableMetaData { #[cfg(test)] mod tests { - use common_types::bytes::BytesMut; + use bytes_ext::BytesMut; use super::*; use crate::message_queue_impl::region_context::{RegionMetaSnapshot, TableMetaData}; diff --git a/wal/src/message_queue_impl/region_context.rs b/wal/src/message_queue_impl/region_context.rs index ad369cc574..efccac2847 100644 --- a/wal/src/message_queue_impl/region_context.rs +++ b/wal/src/message_queue_impl/region_context.rs @@ -8,7 +8,8 @@ use std::{ sync::Arc, }; -use common_types::{bytes::BytesMut, table::TableId, SequenceNumber}; +use bytes_ext::BytesMut; +use common_types::{table::TableId, SequenceNumber}; use generic_error::{BoxError, GenericError}; use log::{debug, warn}; use macros::define_result; diff --git a/wal/src/message_queue_impl/snapshot_synchronizer.rs b/wal/src/message_queue_impl/snapshot_synchronizer.rs index 4d52412967..3b4ee222fa 100644 --- a/wal/src/message_queue_impl/snapshot_synchronizer.rs +++ b/wal/src/message_queue_impl/snapshot_synchronizer.rs @@ -4,7 +4,7 @@ use std::sync::Arc; -use common_types::bytes::BytesMut; +use bytes_ext::BytesMut; use generic_error::{BoxError, GenericError}; use log::debug; use macros::define_result; diff --git a/wal/src/rocks_impl/manager.rs b/wal/src/rocks_impl/manager.rs index 242dce017d..a47a09bbfb 100644 --- a/wal/src/rocks_impl/manager.rs +++ b/wal/src/rocks_impl/manager.rs @@ -14,9 +14,8 @@ use std::{ }; use async_trait::async_trait; -use common_types::{ - bytes::BytesMut, table::TableId, SequenceNumber, MAX_SEQUENCE_NUMBER, MIN_SEQUENCE_NUMBER, -}; +use bytes_ext::BytesMut; +use common_types::{table::TableId, SequenceNumber, MAX_SEQUENCE_NUMBER, MIN_SEQUENCE_NUMBER}; use generic_error::BoxError; use log::{debug, info, warn}; use rocksdb::{ diff --git a/wal/src/table_kv_impl/namespace.rs b/wal/src/table_kv_impl/namespace.rs index c1280a64ef..922e6bb408 100644 --- a/wal/src/table_kv_impl/namespace.rs +++ b/wal/src/table_kv_impl/namespace.rs @@ -1467,7 +1467,8 @@ fn purge_buckets( mod tests { use std::sync::Arc; - use common_types::{bytes::BytesMut, table::DEFAULT_SHARD_ID}; + use bytes_ext::BytesMut; + use common_types::table::DEFAULT_SHARD_ID; use runtime::{Builder, Runtime}; use table_kv::{memory::MemoryImpl, KeyBoundary, ScanContext, ScanRequest}; diff --git a/wal/src/table_kv_impl/table_unit.rs b/wal/src/table_kv_impl/table_unit.rs index 74fd7aa741..e9f0dff307 100644 --- a/wal/src/table_kv_impl/table_unit.rs +++ b/wal/src/table_kv_impl/table_unit.rs @@ -13,7 +13,8 @@ use std::{ time::Duration, }; -use common_types::{bytes::BytesMut, table::TableId}; +use bytes_ext::BytesMut; +use common_types::table::TableId; use generic_error::{BoxError, GenericError}; use log::debug; use macros::define_result; diff --git a/wal/src/tests/util.rs b/wal/src/tests/util.rs index 431199de66..6b05204175 100644 --- a/wal/src/tests/util.rs +++ b/wal/src/tests/util.rs @@ -10,11 +10,8 @@ use std::{ }; use async_trait::async_trait; -use common_types::{ - bytes::{BufMut, SafeBuf, SafeBufMut}, - table::TableId, - SequenceNumber, -}; +use bytes_ext::{BufMut, SafeBuf, SafeBufMut}; +use common_types::{table::TableId, SequenceNumber}; use message_queue::kafka::{config::Config as KafkaConfig, kafka_impl::KafkaImpl}; use runtime::{self, Runtime}; use snafu::Snafu; From ea00c85b8cab8517f74c50bb897416c7a9cb63a3 Mon Sep 17 00:00:00 2001 From: "xikai.wxk" Date: Wed, 19 Jul 2023 15:56:57 +0800 Subject: [PATCH 4/5] chore: separate hash module from common_types crate --- Cargo.lock | 27 ++++++++++++------- Cargo.toml | 26 +++++++++--------- common_types/Cargo.toml | 5 +--- common_types/src/datum.rs | 3 ++- common_types/src/hex.rs | 3 --- common_types/src/lib.rs | 1 - components/hash_ext/Cargo.toml | 17 ++++++++++++ .../hash.rs => components/hash_ext/src/lib.rs | 2 +- components/object_store/Cargo.toml | 2 +- components/object_store/src/disk_cache.rs | 2 +- components/object_store/src/mem_cache.rs | 2 +- components/partitioned_lock/Cargo.toml | 2 +- components/partitioned_lock/src/lib.rs | 2 +- interpreters/Cargo.toml | 1 + interpreters/src/insert.rs | 2 +- query_frontend/Cargo.toml | 2 +- query_frontend/src/promql/udf.rs | 2 +- table_engine/Cargo.toml | 1 + table_engine/src/partition/rule/key.rs | 2 +- 19 files changed, 64 insertions(+), 40 deletions(-) create mode 100644 components/hash_ext/Cargo.toml rename common_types/src/hash.rs => components/hash_ext/src/lib.rs (96%) diff --git a/Cargo.lock b/Cargo.lock index 3f8629ca12..45aa776217 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1328,19 +1328,16 @@ dependencies = [ name = "common_types" version = "1.2.5-alpha" dependencies = [ - "ahash 0.8.3", "arrow", "arrow_ext", - "byteorder", "bytes_ext", "ceresdbproto", "chrono", "datafusion", + "hash_ext", "macros", - "murmur3", "paste 1.0.12", "prost", - "seahash", "serde", "serde_json", "snafu 0.6.10", @@ -2584,6 +2581,16 @@ dependencies = [ "num-traits", ] +[[package]] +name = "hash_ext" +version = "1.2.5-alpha" +dependencies = [ + "ahash 0.8.3", + "byteorder", + "murmur3", + "seahash", +] + [[package]] name = "hashbrown" version = "0.12.3" @@ -2925,6 +2932,7 @@ dependencies = [ "datafusion-proto", "df_operator", "generic_error", + "hash_ext", "log", "macros", "meta_client", @@ -3968,10 +3976,10 @@ dependencies = [ "ceresdbproto", "chrono", "clru", - "common_types", "crc", "futures 0.3.28", "generic_error", + "hash_ext", "lazy_static", "log", "lru", @@ -4242,7 +4250,7 @@ dependencies = [ name = "partitioned_lock" version = "1.2.5-alpha" dependencies = [ - "common_types", + "hash_ext", "tokio", ] @@ -4919,7 +4927,7 @@ dependencies = [ "datafusion-proto", "df_operator", "generic_error", - "hashbrown 0.12.3", + "hash_ext", "influxdb_influxql_parser", "iox_query_influxql", "itertools", @@ -6301,6 +6309,7 @@ dependencies = [ "df_operator", "futures 0.3.28", "generic_error", + "hash_ext", "itertools", "log", "macros", @@ -6998,8 +7007,8 @@ version = "1.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ - "cfg-if 0.1.10", - "rand 0.3.23", + "cfg-if 1.0.0", + "rand 0.8.5", "static_assertions 1.1.0", ] diff --git a/Cargo.toml b/Cargo.toml index e0f23d1f4f..166d7b16fe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,7 @@ members = [ "components/bytes_ext", "components/codec", "components/future_cancel", + "components/hash_ext", "components/id_allocator", "components/logger", "components/macros", @@ -69,6 +70,7 @@ name = "ceresdb-server" path = "src/bin/ceresdb-server.rs" [workspace.dependencies] +alloc_tracker = { path = "components/alloc_tracker" } arrow = { version = "38.0.0", features = ["prettyprint"] } arrow_ipc = { version = "38.0.0" } arrow_ext = { path = "components/arrow_ext" } @@ -82,6 +84,7 @@ bytes_ext = { path = "components/bytes_ext" } catalog = { path = "catalog" } catalog_impls = { path = "catalog_impls" } ceresdbproto = "1.0" +codec = { path = "components/codec" } chrono = "0.4" clap = "3.0" clru = "0.6.1" @@ -91,14 +94,18 @@ common_types = { path = "common_types" } datafusion = { git = "https://github.com/ceresdb/arrow-datafusion.git", rev = "acb5d97a8a8de5296989740f97db3773fe3aa45a" } datafusion-proto = { git = "https://github.com/ceresdb/arrow-datafusion.git", rev = "acb5d97a8a8de5296989740f97db3773fe3aa45a" } df_operator = { path = "df_operator" } +future_cancel = { path = "components/future_cancel" } etcd-client = "0.10.3" env_logger = "0.6" futures = "0.3" -xorfilter-rs = { git = "https://github.com/CeresDB/xorfilter", rev = "ac8ef01" } +generic_error = { path = "components/generic_error" } +hash_ext = { path = "components/hash_ext" } +hex = "0.4.3" lazy_static = "1.4.0" log = "0.4" logger = { path = "components/logger" } lru = "0.7.6" +id_allocator = { path = "components/id_allocator" } influxql-logical-planner = { git = "https://github.com/CeresDB/influxql", rev = "b65a125b9cdfa3121a3c8843bc48441b91049e31", package = "iox_query_influxql" } influxql-parser = { git = "https://github.com/CeresDB/influxql", rev = "b65a125b9cdfa3121a3c8843bc48441b91049e31", package = "influxdb_influxql_parser" } influxql-query = { git = "https://github.com/CeresDB/influxql", rev = "b65a125b9cdfa3121a3c8843bc48441b91049e31", package = "iox_query" } @@ -106,8 +113,12 @@ influxql-schema = { git = "https://github.com/CeresDB/influxql", rev = "b65a125b interpreters = { path = "interpreters" } itertools = "0.10.5" macros = { path = "components/macros" } +message_queue = { path = "components/message_queue" } meta_client = { path = "meta_client" } +metric_ext = { path = "components/metric_ext" } object_store = { path = "components/object_store" } +panic_ext = { path = "components/panic_ext" } +partitioned_lock = { path = "components/partitioned_lock" } partition_table_engine = { path = "partition_table_engine" } parquet_ext = { path = "components/parquet_ext" } parquet = { version = "38.0.0" } @@ -143,8 +154,7 @@ test_util = { path = "components/test_util" } time_ext = { path = "components/time_ext" } toml = "0.7" toml_ext = { path = "components/toml_ext" } -generic_error = { path = "components/generic_error" } -partitioned_lock = { path = "components/partitioned_lock" } +timed_task = { path = "components/timed_task" } tracing_util = { path = "components/tracing_util" } trace_metric = { path = "components/trace_metric" } trace_metric_derive = { path = "components/trace_metric_derive" } @@ -152,16 +162,8 @@ trace_metric_derive_tests = { path = "components/trace_metric_derive_tests" } tonic = "0.8.1" tokio = { version = "1.25", features = ["full"] } wal = { path = "wal" } -message_queue = { path = "components/message_queue" } +xorfilter-rs = { git = "https://github.com/CeresDB/xorfilter", rev = "ac8ef01" } zstd = { version = "0.12", default-features = false } -id_allocator = { path = "components/id_allocator" } -panic_ext = { path = "components/panic_ext" } -timed_task = { path = "components/timed_task" } -future_cancel = { path = "components/future_cancel" } -alloc_tracker = { path = "components/alloc_tracker" } -metric_ext = { path = "components/metric_ext" } -codec = { path = "components/codec" } -hex = "0.4.3" [dependencies] analytic_engine = { workspace = true } diff --git a/common_types/Cargo.toml b/common_types/Cargo.toml index 122bc613fa..f9f41d990b 100644 --- a/common_types/Cargo.toml +++ b/common_types/Cargo.toml @@ -15,19 +15,16 @@ test = [] [dependencies] # In alphabetical order -ahash = { version = "0.8.2", default-features = false, features = ["runtime-rng"] } arrow = { workspace = true } arrow_ext = { workspace = true } -byteorder = "1.2" bytes_ext = { workspace = true } ceresdbproto = { workspace = true } chrono = { workspace = true } datafusion = { workspace = true } +hash_ext = { workspace = true } macros = { workspace = true } -murmur3 = "0.4.1" paste = { workspace = true } prost = { workspace = true } -seahash = "4.1.0" serde = { workspace = true } serde_json = { workspace = true } snafu = { workspace = true } diff --git a/common_types/src/datum.rs b/common_types/src/datum.rs index 826adc2f18..106ace0aeb 100644 --- a/common_types/src/datum.rs +++ b/common_types/src/datum.rs @@ -12,11 +12,12 @@ use bytes_ext::Bytes; use ceresdbproto::schema::DataType as DataTypePb; use chrono::{Datelike, Local, NaiveDate, NaiveTime, TimeZone, Timelike}; use datafusion::scalar::ScalarValue; +use hash_ext::hash64; use serde::ser::{Serialize, Serializer}; use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; use sqlparser::ast::{DataType as SqlDataType, Value}; -use crate::{hash::hash64, hex, string::StringBytes, time::Timestamp}; +use crate::{hex, string::StringBytes, time::Timestamp}; const DATE_FORMAT: &str = "%Y-%m-%d"; const TIME_FORMAT: &str = "%H:%M:%S%.3f"; diff --git a/common_types/src/hex.rs b/common_types/src/hex.rs index c8e8be26ac..8133c9b4d4 100644 --- a/common_types/src/hex.rs +++ b/common_types/src/hex.rs @@ -1,8 +1,5 @@ // Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. -// TODO: move this module to test_util package after remove the common_types -// from the dependencies of the test_util. - /// Try to decode bytes from hex literal string. /// /// None will be returned if the input literal is hex-invalid. diff --git a/common_types/src/lib.rs b/common_types/src/lib.rs index 73232f4ebf..3e11c49c26 100644 --- a/common_types/src/lib.rs +++ b/common_types/src/lib.rs @@ -6,7 +6,6 @@ pub mod bitset; pub mod column; pub mod column_schema; pub mod datum; -pub mod hash; pub mod hex; pub mod projected_schema; pub mod record_batch; diff --git a/components/hash_ext/Cargo.toml b/components/hash_ext/Cargo.toml new file mode 100644 index 0000000000..4c978544f0 --- /dev/null +++ b/components/hash_ext/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "hash_ext" + +[package.version] +workspace = true + +[package.authors] +workspace = true + +[package.edition] +workspace = true + +[dependencies] +ahash = { version = "0.8.2", default-features = false, features = ["runtime-rng"] } +byteorder = "1.2" +murmur3 = "0.4.1" +seahash = "4.1.0" diff --git a/common_types/src/hash.rs b/components/hash_ext/src/lib.rs similarity index 96% rename from common_types/src/hash.rs rename to components/hash_ext/src/lib.rs index a4fad360d7..9b4b22c762 100644 --- a/common_types/src/hash.rs +++ b/components/hash_ext/src/lib.rs @@ -1,4 +1,4 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. +// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. /// Which Hash to use: /// - Memory: aHash diff --git a/components/object_store/Cargo.toml b/components/object_store/Cargo.toml index f74349a185..fdd9774006 100644 --- a/components/object_store/Cargo.toml +++ b/components/object_store/Cargo.toml @@ -16,10 +16,10 @@ bytes = { workspace = true } ceresdbproto = { workspace = true } chrono = { workspace = true } clru = { workspace = true } -common_types = { workspace = true } crc = "3.0.0" futures = { workspace = true } generic_error = { workspace = true } +hash_ext = { workspace = true } lazy_static = { workspace = true } log = { workspace = true } lru = { workspace = true } diff --git a/components/object_store/src/disk_cache.rs b/components/object_store/src/disk_cache.rs index 6c0d8aa515..a88abba85e 100644 --- a/components/object_store/src/disk_cache.rs +++ b/components/object_store/src/disk_cache.rs @@ -11,9 +11,9 @@ use std::{collections::BTreeMap, fmt::Display, ops::Range, sync::Arc}; use async_trait::async_trait; use bytes::{Bytes, BytesMut}; -use common_types::hash::SeaHasherBuilder; use crc::{Crc, CRC_32_ISCSI}; use futures::stream::BoxStream; +use hash_ext::SeaHasherBuilder; use log::{debug, error, info}; use lru::LruCache; use partitioned_lock::PartitionedMutexAsync; diff --git a/components/object_store/src/mem_cache.rs b/components/object_store/src/mem_cache.rs index deba4c4004..daa7c43198 100644 --- a/components/object_store/src/mem_cache.rs +++ b/components/object_store/src/mem_cache.rs @@ -14,8 +14,8 @@ use std::{ use async_trait::async_trait; use bytes::Bytes; use clru::{CLruCache, CLruCacheConfig, WeightScale}; -use common_types::hash::{ahash::RandomState, build_fixed_seed_ahasher_builder}; use futures::stream::BoxStream; +use hash_ext::{ahash::RandomState, build_fixed_seed_ahasher_builder}; use macros::define_result; use partitioned_lock::PartitionedMutex; use snafu::{OptionExt, Snafu}; diff --git a/components/partitioned_lock/Cargo.toml b/components/partitioned_lock/Cargo.toml index 045865a8ff..af37f9ce7a 100644 --- a/components/partitioned_lock/Cargo.toml +++ b/components/partitioned_lock/Cargo.toml @@ -14,4 +14,4 @@ workspace = true tokio = { workspace = true } [dev-dependencies] -common_types = { workspace = true } +hash_ext = { workspace = true } diff --git a/components/partitioned_lock/src/lib.rs b/components/partitioned_lock/src/lib.rs index e42c9f8c3a..63bc2f1cd3 100644 --- a/components/partitioned_lock/src/lib.rs +++ b/components/partitioned_lock/src/lib.rs @@ -170,7 +170,7 @@ mod tests { use std::collections::HashMap; // TODO: remove this importing. - use common_types::hash::{build_fixed_seed_ahasher_builder, SeaHasherBuilder}; + use hash_ext::{build_fixed_seed_ahasher_builder, SeaHasherBuilder}; use super::*; diff --git a/interpreters/Cargo.toml b/interpreters/Cargo.toml index da80389852..1cb5abccf4 100644 --- a/interpreters/Cargo.toml +++ b/interpreters/Cargo.toml @@ -21,6 +21,7 @@ datafusion = { workspace = true } datafusion-proto = { workspace = true } df_operator = { workspace = true } generic_error = { workspace = true } +hash_ext = { workspace = true } log = { workspace = true } macros = { workspace = true } meta_client = { workspace = true } diff --git a/interpreters/src/insert.rs b/interpreters/src/insert.rs index 649aef5481..62c6e66fc4 100644 --- a/interpreters/src/insert.rs +++ b/interpreters/src/insert.rs @@ -15,7 +15,6 @@ use common_types::{ column::{ColumnBlock, ColumnBlockBuilder}, column_schema::ColumnId, datum::Datum, - hash::hash64, row::RowGroup, }; use datafusion::{ @@ -28,6 +27,7 @@ use datafusion::{ }, }; use df_operator::visitor::find_columns_by_expr; +use hash_ext::hash64; use macros::define_result; use query_frontend::plan::InsertPlan; use snafu::{OptionExt, ResultExt, Snafu}; diff --git a/query_frontend/Cargo.toml b/query_frontend/Cargo.toml index 4107d303db..6152caa043 100644 --- a/query_frontend/Cargo.toml +++ b/query_frontend/Cargo.toml @@ -26,7 +26,7 @@ datafusion = { workspace = true } datafusion-proto = { workspace = true } df_operator = { workspace = true } generic_error = { workspace = true } -hashbrown = { version = "0.12", features = ["raw"] } +hash_ext = { workspace = true } influxql-logical-planner = { workspace = true } influxql-parser = { workspace = true } influxql-schema = { workspace = true } diff --git a/query_frontend/src/promql/udf.rs b/query_frontend/src/promql/udf.rs index 71429cfa31..965f38e3af 100644 --- a/query_frontend/src/promql/udf.rs +++ b/query_frontend/src/promql/udf.rs @@ -10,12 +10,12 @@ use arrow::{ datatypes::DataType, }; use codec::{compact::MemCompactEncoder, Encoder}; -use common_types::hash::hash64; use datafusion::{ error::{DataFusionError, Result as DataFusionResult}, logical_expr::{create_udf, Expr, Volatility}, physical_plan::{functions::make_scalar_function, udf::ScalarUDF}, }; +use hash_ext::hash64; /// The name of the regex_match UDF given to DataFusion. pub const REGEX_MATCH_UDF_NAME: &str = "RegexMatch"; diff --git a/table_engine/Cargo.toml b/table_engine/Cargo.toml index 06f2226ed4..06c7ff8b27 100644 --- a/table_engine/Cargo.toml +++ b/table_engine/Cargo.toml @@ -23,6 +23,7 @@ datafusion-proto = { workspace = true } df_operator = { workspace = true } futures = { workspace = true } generic_error = { workspace = true } +hash_ext = { workspace = true } itertools = { workspace = true } log = { workspace = true } macros = { workspace = true } diff --git a/table_engine/src/partition/rule/key.rs b/table_engine/src/partition/rule/key.rs index d6cf8669c1..70863fc4d2 100644 --- a/table_engine/src/partition/rule/key.rs +++ b/table_engine/src/partition/rule/key.rs @@ -7,9 +7,9 @@ use std::collections::{HashMap, HashSet}; use bytes_ext::{BufMut, BytesMut}; use common_types::{ datum::Datum, - hash::hash64, row::{Row, RowGroup}, }; +use hash_ext::hash64; use itertools::Itertools; use log::{debug, error}; use snafu::OptionExt; From f541924b4c2943672d85137e6de18ffef81ad2d4 Mon Sep 17 00:00:00 2001 From: "xikai.wxk" Date: Wed, 19 Jul 2023 16:05:08 +0800 Subject: [PATCH 5/5] chore: make bitset in common_types private --- common_types/src/lib.rs | 3 +-- common_types/src/{ => row}/bitset.rs | 2 ++ common_types/src/row/contiguous.rs | 6 ++++-- common_types/src/row/mod.rs | 1 + 4 files changed, 8 insertions(+), 4 deletions(-) rename common_types/src/{ => row}/bitset.rs (99%) diff --git a/common_types/src/lib.rs b/common_types/src/lib.rs index 3e11c49c26..2c57865a03 100644 --- a/common_types/src/lib.rs +++ b/common_types/src/lib.rs @@ -2,11 +2,10 @@ //! Contains common types -pub mod bitset; pub mod column; pub mod column_schema; pub mod datum; -pub mod hex; +pub(crate) mod hex; pub mod projected_schema; pub mod record_batch; pub mod request_id; diff --git a/common_types/src/bitset.rs b/common_types/src/row/bitset.rs similarity index 99% rename from common_types/src/bitset.rs rename to common_types/src/row/bitset.rs index a2c5f50dd8..fab92feb72 100644 --- a/common_types/src/bitset.rs +++ b/common_types/src/row/bitset.rs @@ -2,6 +2,8 @@ //! Simple BitSet implementation. +#![allow(dead_code)] + const BIT_MASK: [u8; 8] = [1, 2, 4, 8, 16, 32, 64, 128]; const UNSET_BIT_MASK: [u8; 8] = [ 255 - 1, diff --git a/common_types/src/row/contiguous.rs b/common_types/src/row/contiguous.rs index 50cd99c7d3..5e66fc8b02 100644 --- a/common_types/src/row/contiguous.rs +++ b/common_types/src/row/contiguous.rs @@ -13,10 +13,12 @@ use prost::encoding::{decode_varint, encode_varint, encoded_len_varint}; use snafu::{ensure, Backtrace, Snafu}; use crate::{ - bitset::{BitSet, RoBitSet}, datum::{Datum, DatumKind, DatumView}, projected_schema::RowProjector, - row::Row, + row::{ + bitset::{BitSet, RoBitSet}, + Row, + }, schema::{IndexInWriterSchema, Schema}, time::Timestamp, }; diff --git a/common_types/src/row/mod.rs b/common_types/src/row/mod.rs index 46ee38e654..8507493312 100644 --- a/common_types/src/row/mod.rs +++ b/common_types/src/row/mod.rs @@ -17,6 +17,7 @@ use crate::{ time::Timestamp, }; +pub(crate) mod bitset; pub mod contiguous; #[derive(Debug, Snafu)]