Skip to content

Commit

Permalink
refactor: separate common_util to multiple components (#1077)
Browse files Browse the repository at this point in the history
## Rationale
`common_util` is a bad design for containing too many separate modules.

## Detailed Changes
Separate it into multiple components.

## Test Plan
Existing unit tests and integration tests.
  • Loading branch information
ShiKaiWi authored Jul 18, 2023
1 parent 5b3b757 commit a4c4e28
Show file tree
Hide file tree
Showing 275 changed files with 1,497 additions and 1,056 deletions.
298 changes: 236 additions & 62 deletions Cargo.lock

Large diffs are not rendered by default.

38 changes: 34 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,30 @@ members = [
"catalog_impls",
"cluster",
"common_types",
"common_util",
"components/alloc_tracker",
"components/arena",
"components/arrow_ext",
"components/bytes_ext",
"components/codec",
"components/future_cancel",
"components/id_allocator",
"components/logger",
"components/macros",
"components/message_queue",
"components/metric_ext",
"components/object_store",
"components/panic_ext",
"components/parquet_ext",
"components/partitioned_lock",
"components/profile",
"components/runtime",
"components/size_ext",
"components/skiplist",
"components/table_kv",
"components/test_util",
"components/time_ext",
"components/timed_task",
"components/toml_ext",
"components/trace_metric",
"components/trace_metric_derive",
"components/trace_metric_derive_tests",
Expand Down Expand Up @@ -75,7 +88,6 @@ clru = "0.6.1"
cluster = { path = "cluster" }
criterion = "0.3"
common_types = { path = "common_types" }
common_util = { path = "common_util" }
datafusion = { git = "https://github.com/ceresdb/arrow-datafusion.git", rev = "acb5d97a8a8de5296989740f97db3773fe3aa45a" }
datafusion-proto = { git = "https://github.com/ceresdb/arrow-datafusion.git", rev = "acb5d97a8a8de5296989740f97db3773fe3aa45a" }
df_operator = { path = "df_operator" }
Expand All @@ -93,6 +105,7 @@ influxql-query = { git = "https://github.com/CeresDB/influxql", rev = "b65a125b9
influxql-schema = { git = "https://github.com/CeresDB/influxql", rev = "b65a125b9cdfa3121a3c8843bc48441b91049e31", package = "schema" }
interpreters = { path = "interpreters" }
itertools = "0.10.5"
macros = { path = "components/macros" }
meta_client = { path = "meta_client" }
object_store = { path = "components/object_store" }
partition_table_engine = { path = "partition_table_engine" }
Expand All @@ -107,24 +120,31 @@ prometheus-static-metric = "0.5"
prost = "0.11"
proxy = { path = "proxy" }
query_engine = { path = "query_engine" }
query_frontend = { path = "query_frontend" }
rand = "0.7"
remote_engine_client = { path = "remote_engine_client" }
reqwest = { version = "0.11", default-features = false, features = ["rustls-tls", "json"] }
router = { path = "router" }
runtime = { path = "components/runtime" }
snafu = { version = "0.6.10", features = ["backtraces"] }
serde = "1.0"
serde_json = "1.0.60"
server = { path = "server" }
size_ext = { path = "components/size_ext" }
smallvec = "1.6"
slog = "2.7"
spin = "0.9.6"
query_frontend = { path = "query_frontend" }
sqlparser = { version = "0.33", features = ["serde"] }
system_catalog = { path = "system_catalog" }
table_engine = { path = "table_engine" }
table_kv = { path = "components/table_kv" }
tempfile = "3.1.0"
test_util = { path = "components/test_util", features = ["test"] }
time_ext = { path = "components/time_ext" }
toml = "0.7"
toml_ext = { path = "components/toml_ext" }
generic_error = { path = "components/generic_error" }
partitioned_lock = { path = "components/partitioned_lock" }
tracing_util = { path = "components/tracing_util" }
trace_metric = { path = "components/trace_metric" }
trace_metric_derive = { path = "components/trace_metric_derive" }
Expand All @@ -134,29 +154,39 @@ tokio = { version = "1.25", features = ["full"] }
wal = { path = "wal" }
message_queue = { path = "components/message_queue" }
zstd = { version = "0.12", default-features = false }
id_allocator = { path = "components/id_allocator" }
panic_ext = { path = "components/panic_ext" }
timed_task = { path = "components/timed_task" }
future_cancel = { path = "components/future_cancel" }
alloc_tracker = { path = "components/alloc_tracker" }
metric_ext = { path = "components/metric_ext" }
codec = { path = "components/codec" }
hex = "0.4.3"

[dependencies]
analytic_engine = { workspace = true }
catalog = { workspace = true }
catalog_impls = { workspace = true }
clap = { workspace = true }
cluster = { workspace = true }
common_util = { workspace = true }
df_operator = { workspace = true }
etcd-client = { workspace = true }
interpreters = { workspace = true }
log = { workspace = true }
logger = { workspace = true }
meta_client = { workspace = true }
moka = { version = "0.10", features = ["future"] }
panic_ext = { workspace = true }
proxy = { workspace = true }
query_engine = { workspace = true }
router = { workspace = true }
runtime = { workspace = true }
serde = { workspace = true }
server = { workspace = true }
signal-hook = "0.3"
table_engine = { workspace = true }
toml = { workspace = true }
toml_ext = { workspace = true }
tracing_util = { workspace = true }

[build-dependencies]
Expand Down
13 changes: 11 additions & 2 deletions analytic_engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,38 +23,47 @@ async-trait = { workspace = true }
base64 = { workspace = true }
bytes = { workspace = true }
ceresdbproto = { workspace = true }
codec = { workspace = true }
common_types = { workspace = true }
common_util = { workspace = true }
datafusion = { workspace = true }
future_cancel = { workspace = true }
futures = { workspace = true }
generic_error = { workspace = true }
hex = { workspace = true }
id_allocator = { workspace = true }
lazy_static = { workspace = true }
log = { workspace = true }
lru = { workspace = true }
macros = { workspace = true }
message_queue = { workspace = true }
metric_ext = { workspace = true }
object_store = { workspace = true }
parquet = { workspace = true }
parquet_ext = { workspace = true }
prometheus = { workspace = true }
prost = { workspace = true }
remote_engine_client = { workspace = true }
router = { workspace = true }
runtime = { workspace = true }
serde = { workspace = true }
size_ext = { workspace = true }
skiplist = { path = "../components/skiplist" }
smallvec = { workspace = true }
snafu = { workspace = true }
table_engine = { workspace = true }
table_kv = { workspace = true }
tempfile = { workspace = true, optional = true }
time_ext = { workspace = true }
tokio = { workspace = true }
trace_metric = { workspace = true }
wal = { workspace = true }
xorfilter-rs = { workspace = true }

[dev-dependencies]
common_types = { workspace = true, features = ["test"] }
common_util = { workspace = true, features = ["test"] }
env_logger = { workspace = true }
pin-project-lite = { workspace = true }
rand = { workspace = true }
tempfile = { workspace = true }
test_util = { workspace = true }
wal = { workspace = true, features = ["test"] }
3 changes: 2 additions & 1 deletion analytic_engine/src/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
use std::{collections::HashMap, fmt, str::FromStr, sync::Arc};

use common_types::COMPACTION_STRATEGY;
use common_util::config::{ReadableSize, TimeUnit};
use serde::{Deserialize, Serialize};
use size_ext::ReadableSize;
use snafu::{ensure, Backtrace, GenerateBacktrace, ResultExt, Snafu};
use time_ext::TimeUnit;
use tokio::sync::oneshot;

use crate::{
Expand Down
5 changes: 3 additions & 2 deletions analytic_engine/src/compaction/picker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ use std::{
};

use common_types::time::Timestamp;
use common_util::{config::TimeUnit, define_result};
use log::{debug, info};
use macros::define_result;
use snafu::Snafu;
use time_ext::TimeUnit;

use crate::{
compaction::{
Expand Down Expand Up @@ -688,7 +689,7 @@ mod tests {
tests::build_schema,
time::{TimeRange, Timestamp},
};
use common_util::hash_map;
use macros::hash_map;
use tokio::sync::mpsc;

use super::*;
Expand Down
14 changes: 6 additions & 8 deletions analytic_engine/src/compaction/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,14 @@ use std::{

use async_trait::async_trait;
use common_types::request_id::RequestId;
use common_util::{
config::{ReadableDuration, ReadableSize},
define_result,
runtime::{JoinHandle, Runtime},
time::DurationExt,
};
use log::{debug, error, info, warn};
use macros::define_result;
use runtime::{JoinHandle, Runtime};
use serde::{Deserialize, Serialize};
use size_ext::ReadableSize;
use snafu::{ResultExt, Snafu};
use table_engine::table::TableId;
use time_ext::{DurationExt, ReadableDuration};
use tokio::{
sync::{
mpsc::{self, error::TrySendError, Receiver, Sender},
Expand All @@ -49,7 +47,7 @@ use crate::{
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Failed to join compaction schedule worker, err:{}", source))]
JoinWorker { source: common_util::runtime::Error },
JoinWorker { source: runtime::Error },
}

define_result!(Error);
Expand Down Expand Up @@ -647,7 +645,7 @@ impl ScheduleWorker {
for table_data in &tables_buf {
let last_flush_time = table_data.last_flush_time();
let flush_deadline_ms = last_flush_time + self.max_unflushed_duration.as_millis_u64();
let now_ms = common_util::time::current_time_millis();
let now_ms = time_ext::current_time_millis();
if now_ms > flush_deadline_ms {
info!(
"Scheduled flush is triggered, table:{}, last_flush_time:{last_flush_time}ms, max_unflushed_duration:{:?}",
Expand Down
2 changes: 1 addition & 1 deletion analytic_engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
use std::sync::Arc;

use async_trait::async_trait;
use common_util::error::BoxError;
use generic_error::BoxError;
use log::{error, info};
use snafu::{OptionExt, ResultExt};
use table_engine::{
Expand Down
2 changes: 1 addition & 1 deletion analytic_engine/src/instance/alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use std::collections::HashMap;

use common_util::error::BoxError;
use generic_error::BoxError;
use log::info;
use snafu::{ensure, ResultExt};
use table_engine::table::AlterSchemaRequest;
Expand Down
2 changes: 1 addition & 1 deletion analytic_engine/src/instance/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

//! Create table logic of instance
use common_util::error::BoxError;
use generic_error::BoxError;
use log::info;
use snafu::{OptionExt, ResultExt};
use table_engine::engine::CreateTableRequest;
Expand Down
3 changes: 2 additions & 1 deletion analytic_engine/src/instance/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
use std::sync::Arc;

use common_types::schema::Version;
use common_util::{define_result, error::GenericError};
use generic_error::GenericError;
use macros::define_result;
use snafu::{Backtrace, OptionExt, Snafu};
use table_engine::{
engine::{CloseTableRequest, CreateTableRequest, DropTableRequest, OpenShardRequest},
Expand Down
15 changes: 6 additions & 9 deletions analytic_engine/src/instance/flush_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,17 @@ use common_types::{
time::TimeRange,
SequenceNumber,
};
use common_util::{
config::ReadableDuration,
define_result,
error::{BoxError, GenericError},
runtime::{Runtime, RuntimeRef},
time,
};
use futures::{
channel::{mpsc, mpsc::channel},
stream, SinkExt, TryStreamExt,
};
use generic_error::{BoxError, GenericError};
use log::{debug, error, info};
use macros::define_result;
use runtime::{Runtime, RuntimeRef};
use snafu::{Backtrace, ResultExt, Snafu};
use table_engine::predicate::Predicate;
use time_ext::{self, ReadableDuration};
use tokio::{sync::oneshot, time::Instant};
use wal::manager::WalLocation;

Expand Down Expand Up @@ -126,7 +123,7 @@ pub enum Error {
ChannelSend { source: mpsc::SendError },

#[snafu(display("Runtime join error, source:{}", source))]
RuntimeJoin { source: common_util::runtime::Error },
RuntimeJoin { source: runtime::Error },

#[snafu(display("Other failure, msg:{}.\nBacktrace:\n{:?}", msg, backtrace))]
Other { msg: String, backtrace: Backtrace },
Expand Down Expand Up @@ -278,7 +275,7 @@ impl FlushTask {
})?;

self.table_data
.set_last_flush_time(time::current_time_millis());
.set_last_flush_time(time_ext::current_time_millis());

info!(
"Instance flush memtables done, table:{}, table_id:{}, request_id:{}, cost:{}ms",
Expand Down
8 changes: 3 additions & 5 deletions analytic_engine/src/instance/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,11 @@ pub(crate) mod write;
use std::sync::Arc;

use common_types::table::TableId;
use common_util::{
define_result,
error::{BoxError, GenericError},
runtime::Runtime,
};
use generic_error::{BoxError, GenericError};
use log::{error, info};
use macros::define_result;
use mem_collector::MemUsageCollector;
use runtime::Runtime;
use snafu::{ResultExt, Snafu};
use table_engine::{engine::EngineRuntimes, table::FlushRequest};
use tokio::sync::oneshot::{self, error::RecvError};
Expand Down
3 changes: 2 additions & 1 deletion analytic_engine/src/instance/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ use common_types::{
schema::RecordSchema,
time::TimeRange,
};
use common_util::{define_result, error::BoxError};
use futures::stream::Stream;
use generic_error::BoxError;
use log::debug;
use macros::define_result;
use snafu::{ResultExt, Snafu};
use table_engine::{
stream::{
Expand Down
3 changes: 2 additions & 1 deletion analytic_engine/src/instance/serial_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ use std::{
time::Instant,
};

use common_util::{runtime::Runtime, time::InstantExt};
use futures::Future;
use log::{error, warn};
use runtime::Runtime;
use table_engine::table::TableId;
use time_ext::InstantExt;
use tokio::sync::{
oneshot,
watch::{self, Receiver, Sender},
Expand Down
2 changes: 1 addition & 1 deletion analytic_engine/src/instance/wal_replayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::{

use async_trait::async_trait;
use common_types::{schema::IndexInWriterSchema, table::ShardId};
use common_util::error::BoxError;
use generic_error::BoxError;
use lazy_static::lazy_static;
use log::{debug, error, info, trace};
use prometheus::{exponential_buckets, register_histogram, Histogram};
Expand Down
Loading

0 comments on commit a4c4e28

Please sign in to comment.