From b1a868aadd9cce1e7fea951a6e6be5ad73ed130e Mon Sep 17 00:00:00 2001 From: kamille Date: Tue, 27 Dec 2022 22:07:08 +0800 Subject: [PATCH] move router from server to a dependent crate. --- cluster/Cargo.toml | 1 - router/Cargo.toml | 24 +++ .../src/route => router/src}/cluster_based.rs | 3 +- router/src/endpoint.rs | 67 +++++++ {server/src/route => router/src}/hash.rs | 0 server/src/route/mod.rs => router/src/lib.rs | 11 +- .../src/route => router/src}/rule_based.rs | 5 +- server/Cargo.toml | 3 +- server/src/avro_util.rs | 168 ------------------ server/src/config.rs | 63 +------ server/src/grpc/mod.rs | 2 +- server/src/grpc/storage_service/error.rs | 14 +- server/src/grpc/storage_service/mod.rs | 2 +- server/src/grpc/storage_service/query.rs | 11 +- server/src/http.rs | 2 +- server/src/lib.rs | 2 - server/src/server.rs | 4 +- src/setup.rs | 8 +- 18 files changed, 126 insertions(+), 264 deletions(-) create mode 100644 router/Cargo.toml rename {server/src/route => router/src}/cluster_based.rs (98%) create mode 100644 router/src/endpoint.rs rename {server/src/route => router/src}/hash.rs (100%) rename server/src/route/mod.rs => router/src/lib.rs (97%) rename {server/src/route => router/src}/rule_based.rs (98%) delete mode 100644 server/src/avro_util.rs diff --git a/cluster/Cargo.toml b/cluster/Cargo.toml index e68c392a44..9c5b3e0cdc 100644 --- a/cluster/Cargo.toml +++ b/cluster/Cargo.toml @@ -11,7 +11,6 @@ workspace = true workspace = true [dependencies] -analytic_engine = { workspace = true } async-trait = { workspace = true } ceresdbproto = { workspace = true } common_types = { workspace = true } diff --git a/router/Cargo.toml b/router/Cargo.toml new file mode 100644 index 0000000000..1ca818ed68 --- /dev/null +++ b/router/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "router" + +[package.version] +workspace = true + +[package.authors] +workspace = true + +[package.edition] +workspace = true + +[dependencies] +async-trait = { workspace = true } +ceresdbproto = { workspace = true } +cluster = { workspace = true } +common_types = { workspace = true } +common_util = { workspace = true } +log = { workspace = true } +meta_client = { workspace = true } +serde = { workspace = true } +serde_derive = { workspace = true } +snafu = { workspace = true } +twox-hash = "1.6" diff --git a/server/src/route/cluster_based.rs b/router/src/cluster_based.rs similarity index 98% rename from server/src/route/cluster_based.rs rename to router/src/cluster_based.rs index 40d43bd23e..4d68c2c436 100644 --- a/server/src/route/cluster_based.rs +++ b/router/src/cluster_based.rs @@ -11,8 +11,7 @@ use meta_client::types::{NodeShard, RouteTablesRequest, RouteTablesResponse}; use snafu::{OptionExt, ResultExt}; use crate::{ - config::Endpoint, - route::{hash, OtherNoCause, OtherWithCause, ParseEndpoint, Result, Router}, + endpoint::Endpoint, hash, OtherNoCause, OtherWithCause, ParseEndpoint, Result, Router, }; pub struct ClusterBasedRouter { diff --git a/router/src/endpoint.rs b/router/src/endpoint.rs new file mode 100644 index 0000000000..5c333601cc --- /dev/null +++ b/router/src/endpoint.rs @@ -0,0 +1,67 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +//! Endpoint definition + +use std::str::FromStr; + +use ceresdbproto::storage; +use serde_derive::Deserialize; + +#[derive(Debug, Clone, Deserialize, PartialEq, Eq)] +pub struct Endpoint { + pub addr: String, + pub port: u16, +} + +impl Endpoint { + pub fn new(addr: String, port: u16) -> Self { + Self { addr, port } + } +} + +impl ToString for Endpoint { + fn to_string(&self) -> String { + format!("{}:{}", self.addr, self.port) + } +} + +impl FromStr for Endpoint { + type Err = Box; + + fn from_str(s: &str) -> std::result::Result { + let (addr, raw_port) = match s.rsplit_once(':') { + Some(v) => v, + None => { + let err_msg = "Can't find ':' in the source string".to_string(); + return Err(Self::Err::from(err_msg)); + } + }; + let port = raw_port.parse().map_err(|e| { + let err_msg = format!("Fail to parse port:{}, err:{}", raw_port, e); + Self::Err::from(err_msg) + })?; + + Ok(Endpoint { + addr: addr.to_string(), + port, + }) + } +} + +impl From for storage::Endpoint { + fn from(endpoint: Endpoint) -> Self { + storage::Endpoint { + ip: endpoint.addr, + port: endpoint.port as u32, + } + } +} + +impl From for Endpoint { + fn from(endpoint_pb: storage::Endpoint) -> Self { + Endpoint { + addr: endpoint_pb.ip, + port: endpoint_pb.port as u16, + } + } +} diff --git a/server/src/route/hash.rs b/router/src/hash.rs similarity index 100% rename from server/src/route/hash.rs rename to router/src/hash.rs diff --git a/server/src/route/mod.rs b/router/src/lib.rs similarity index 97% rename from server/src/route/mod.rs rename to router/src/lib.rs index 4da8b1bb2d..cf47990e22 100644 --- a/server/src/route/mod.rs +++ b/router/src/lib.rs @@ -1,15 +1,16 @@ // Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. -use std::sync::Arc; - -use async_trait::async_trait; -use ceresdbproto::storage::{Route, RouteRequest}; - pub mod cluster_based; +pub mod endpoint; pub(crate) mod hash; pub mod rule_based; +use std::sync::Arc; + +use async_trait::async_trait; +use ceresdbproto::storage::{Route, RouteRequest}; pub use cluster_based::ClusterBasedRouter; +use common_util::define_result; pub use rule_based::{RuleBasedRouter, RuleList}; use snafu::{Backtrace, Snafu}; diff --git a/server/src/route/rule_based.rs b/router/src/rule_based.rs similarity index 98% rename from server/src/route/rule_based.rs rename to router/src/rule_based.rs index b67f98417b..8adce22cd5 100644 --- a/server/src/route/rule_based.rs +++ b/router/src/rule_based.rs @@ -12,10 +12,7 @@ use meta_client::types::ShardId; use serde_derive::Deserialize; use snafu::{ensure, OptionExt}; -use crate::{ - config::Endpoint, - route::{hash, Result, RouteNotFound, Router, ShardNotFound}, -}; +use crate::{endpoint::Endpoint, hash, Result, RouteNotFound, Router, ShardNotFound}; pub type ShardNodes = HashMap; diff --git a/server/Cargo.toml b/server/Cargo.toml index eb05913bf8..3bfeb19625 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -37,6 +37,7 @@ prometheus = { workspace = true } prometheus-static-metric = { workspace = true } prost = { workspace = true } query_engine = { workspace = true } +router = { workspace = true } serde = { workspace = true } serde_derive = { workspace = true } serde_json = { workspace = true } @@ -47,8 +48,6 @@ table_engine = { workspace = true } tokio = { workspace = true } tokio-stream = { version = "0.1", features = ["net"] } tonic = { workspace = true } -twox-hash = "1.6" warp = "0.3" - [dev-dependencies] sql = { workspace = true, features = ["test"] } diff --git a/server/src/avro_util.rs b/server/src/avro_util.rs deleted file mode 100644 index 726ddd2b90..0000000000 --- a/server/src/avro_util.rs +++ /dev/null @@ -1,168 +0,0 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. - -//! Avro utility - -use std::collections::HashMap; - -use avro_rs::{ - schema::{Name, RecordField, RecordFieldOrder}, - types::{Record, Value}, -}; -use common_types::{ - bytes::ByteVec, - column::ColumnBlock, - datum::{Datum, DatumKind}, - record_batch::RecordBatch, - schema::RecordSchema, -}; -use common_util::define_result; -use snafu::{Backtrace, ResultExt, Snafu}; - -#[derive(Debug, Snafu)] -pub enum Error { - #[snafu(display( - "Failed to write avro record, err:{}.\nBacktrace:\n{}", - source, - backtrace - ))] - WriteAvroRecord { - source: avro_rs::Error, - backtrace: Backtrace, - }, -} - -define_result!(Error); - -/// Create [avro_rs::Schema] with given `name` from [RecordSchema] -pub fn to_avro_schema(name: &str, schema: &RecordSchema) -> avro_rs::Schema { - let columns = schema.columns(); - let mut lookup = HashMap::with_capacity(columns.len()); - let mut avro_fields = Vec::with_capacity(columns.len()); - - for (pos, column) in columns.iter().enumerate() { - // Create avro record field - let default = if column.is_nullable { - Some(serde_json::value::Value::Null) - } else { - None - }; - - let field_schema = if column.is_nullable { - // We want to declare a schema which may be either a null or non-null value, - // for example: ["null", "string"]. - // - // However, `avro_rs` does not provide an accessible API to build a `Union`. - // We can't find a better way to do this than using JSON. - let field_schema_str = format!( - r#"["null", {}]"#, - data_type_to_schema(&column.data_type).canonical_form() - ); - avro_rs::Schema::parse_str(&field_schema_str).unwrap() - } else { - data_type_to_schema(&column.data_type) - }; - - // In dummy select like select "xxx", column name will be "Utf8("xxx")", which - // is not a valid json string. So, escaped name is used here. - let record_field = RecordField { - name: column.escaped_name.clone(), - doc: None, - default, - schema: field_schema, - order: RecordFieldOrder::Ignore, - position: pos, - }; - - avro_fields.push(record_field); - lookup.insert(column.escaped_name.clone(), pos); - } - - avro_rs::Schema::Record { - name: Name::new(name), - doc: None, - fields: avro_fields, - lookup, - } -} - -fn data_type_to_schema(data_type: &DatumKind) -> avro_rs::Schema { - match data_type { - DatumKind::Null => avro_rs::Schema::Null, - DatumKind::Timestamp => avro_rs::Schema::TimestampMillis, - DatumKind::Double => avro_rs::Schema::Double, - DatumKind::Float => avro_rs::Schema::Float, - DatumKind::Varbinary => avro_rs::Schema::Bytes, - DatumKind::String => avro_rs::Schema::String, - DatumKind::UInt32 | DatumKind::Int64 | DatumKind::UInt64 => avro_rs::Schema::Long, - DatumKind::UInt16 - | DatumKind::UInt8 - | DatumKind::Int32 - | DatumKind::Int16 - | DatumKind::Int8 => avro_rs::Schema::Int, - DatumKind::Boolean => avro_rs::Schema::Boolean, - } -} - -/// Convert record batch to avro format -pub fn record_batch_to_avro( - record_batch: &RecordBatch, - schema: &avro_rs::Schema, - rows: &mut Vec, -) -> Result<()> { - let record_batch_schema = record_batch.schema(); - assert_eq!( - record_batch_schema.num_columns(), - record_batch.num_columns() - ); - - rows.reserve(record_batch.num_rows()); - - let column_schemas = record_batch_schema.columns(); - for row_idx in 0..record_batch.num_rows() { - let mut record = Record::new(schema).unwrap(); - for (col_idx, column_schema) in column_schemas.iter().enumerate() { - let column = record_batch.column(col_idx); - let value = column_to_value(column, row_idx, column_schema.is_nullable); - - record.put(&column_schema.escaped_name, value); - } - - let row_bytes = avro_rs::to_avro_datum(schema, record).context(WriteAvroRecord)?; - - rows.push(row_bytes); - } - - Ok(()) -} - -/// Panic if row_idx is out of bound. -fn column_to_value(array: &ColumnBlock, row_idx: usize, is_nullable: bool) -> Value { - let datum = array.datum(row_idx); - match datum { - Datum::Null => may_union(Value::Null, is_nullable), - Datum::Timestamp(v) => may_union(Value::TimestampMillis(v.as_i64()), is_nullable), - Datum::Double(v) => may_union(Value::Double(v), is_nullable), - Datum::Float(v) => may_union(Value::Float(v), is_nullable), - Datum::Varbinary(v) => may_union(Value::Bytes(v.to_vec()), is_nullable), - Datum::String(v) => may_union(Value::String(v.to_string()), is_nullable), - // TODO(yingwen): Should we return error if overflow? Avro does not support uint64. - Datum::UInt64(v) => may_union(Value::Long(v as i64), is_nullable), - Datum::Int64(v) => may_union(Value::Long(v), is_nullable), - Datum::UInt32(v) => may_union(Value::Long(i64::from(v)), is_nullable), - Datum::UInt16(v) => may_union(Value::Int(i32::from(v)), is_nullable), - Datum::UInt8(v) => may_union(Value::Int(i32::from(v)), is_nullable), - Datum::Int32(v) => may_union(Value::Int(v), is_nullable), - Datum::Int16(v) => may_union(Value::Int(i32::from(v)), is_nullable), - Datum::Int8(v) => may_union(Value::Int(i32::from(v)), is_nullable), - Datum::Boolean(v) => may_union(Value::Boolean(v), is_nullable), - } -} - -#[inline] -fn may_union(val: Value, is_nullable: bool) -> Value { - if is_nullable { - Value::Union(Box::new(val)) - } else { - val - } -} diff --git a/server/src/config.rs b/server/src/config.rs index e1ac3a3c26..d3470bb35f 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -2,21 +2,20 @@ //! Server configs -use std::{collections::HashMap, str::FromStr}; +use std::collections::HashMap; use analytic_engine; -use ceresdbproto::storage; use cluster::config::{ClusterConfig, SchemaConfig}; use common_types::schema::TIMESTAMP_COLUMN; use meta_client::types::ShardId; +use router::{ + endpoint::Endpoint, + rule_based::{ClusterView, RuleList}, +}; use serde_derive::Deserialize; use table_engine::ANALYTIC_ENGINE_TYPE; -use crate::{ - http::DEFAULT_MAX_BODY_SIZE, - limiter::LimiterConfig, - route::rule_based::{ClusterView, RuleList}, -}; +use crate::{http::DEFAULT_MAX_BODY_SIZE, limiter::LimiterConfig}; /// The deployment mode decides how to start the CeresDB. /// @@ -51,56 +50,6 @@ pub struct StaticRouteConfig { pub topology: StaticTopologyConfig, } -#[derive(Debug, Clone, Deserialize, PartialEq, Eq)] -pub struct Endpoint { - pub addr: String, - pub port: u16, -} - -impl Endpoint { - pub fn new(addr: String, port: u16) -> Self { - Self { addr, port } - } -} - -impl ToString for Endpoint { - fn to_string(&self) -> String { - format!("{}:{}", self.addr, self.port) - } -} - -impl FromStr for Endpoint { - type Err = Box; - - fn from_str(s: &str) -> std::result::Result { - let (addr, raw_port) = match s.rsplit_once(':') { - Some(v) => v, - None => { - let err_msg = "Can't find ':' in the source string".to_string(); - return Err(Self::Err::from(err_msg)); - } - }; - let port = raw_port.parse().map_err(|e| { - let err_msg = format!("Fail to parse port:{}, err:{}", raw_port, e); - Self::Err::from(err_msg) - })?; - - Ok(Endpoint { - addr: addr.to_string(), - port, - }) - } -} - -impl From for storage::Endpoint { - fn from(endpoint: Endpoint) -> Self { - storage::Endpoint { - ip: endpoint.addr, - port: endpoint.port as u32, - } - } -} - #[derive(Debug, Clone, Deserialize)] pub struct ShardView { pub shard_id: ShardId, diff --git a/server/src/grpc/mod.rs b/server/src/grpc/mod.rs index ed839c4f74..fd4589b128 100644 --- a/server/src/grpc/mod.rs +++ b/server/src/grpc/mod.rs @@ -24,6 +24,7 @@ use common_util::{ use futures::FutureExt; use log::{info, warn}; use query_engine::executor::Executor as QueryExecutor; +use router::RouterRef; use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; use table_engine::engine::EngineRuntimes; use tokio::sync::oneshot::{self, Sender}; @@ -32,7 +33,6 @@ use tonic::transport::Server; use crate::{ grpc::{meta_event_service::MetaServiceImpl, storage_service::StorageServiceImpl}, instance::InstanceRef, - route::RouterRef, schema_config_provider::{self, SchemaConfigProviderRef}, }; diff --git a/server/src/grpc/storage_service/error.rs b/server/src/grpc/storage_service/error.rs index b3f97895fd..a8531275f8 100644 --- a/server/src/grpc/storage_service/error.rs +++ b/server/src/grpc/storage_service/error.rs @@ -7,7 +7,7 @@ use common_util::define_result; use http::StatusCode; use snafu::Snafu; -use crate::{error_util, route}; +use crate::error_util; define_result!(Error); @@ -61,18 +61,18 @@ pub fn build_ok_header() -> ResponseHeader { } } -impl From for Error { - fn from(route_err: route::Error) -> Self { +impl From for Error { + fn from(route_err: router::Error) -> Self { match &route_err { - route::Error::RouteNotFound { .. } | route::Error::ShardNotFound { .. } => { + router::Error::RouteNotFound { .. } | router::Error::ShardNotFound { .. } => { Error::ErrNoCause { code: StatusCode::NOT_FOUND, msg: route_err.to_string(), } } - route::Error::ParseEndpoint { .. } - | route::Error::OtherWithCause { .. } - | route::Error::OtherNoCause { .. } => Error::ErrNoCause { + router::Error::ParseEndpoint { .. } + | router::Error::OtherWithCause { .. } + | router::Error::OtherNoCause { .. } => Error::ErrNoCause { code: StatusCode::INTERNAL_SERVER_ERROR, msg: route_err.to_string(), }, diff --git a/server/src/grpc/storage_service/mod.rs b/server/src/grpc/storage_service/mod.rs index 4813d102b2..b06578dde1 100644 --- a/server/src/grpc/storage_service/mod.rs +++ b/server/src/grpc/storage_service/mod.rs @@ -29,6 +29,7 @@ use http::StatusCode; use log::{error, warn}; use paste::paste; use query_engine::executor::Executor as QueryExecutor; +use router::{Router, RouterRef}; use snafu::{ensure, OptionExt, ResultExt}; use sql::plan::CreateTablePlan; use table_engine::engine::EngineRuntimes; @@ -43,7 +44,6 @@ use crate::{ storage_service::error::{ErrNoCause, ErrWithCause, Result}, }, instance::InstanceRef, - route::{Router, RouterRef}, schema_config_provider::SchemaConfigProviderRef, }; diff --git a/server/src/grpc/storage_service/query.rs b/server/src/grpc/storage_service/query.rs index 687fbe07c8..a51b89f916 100644 --- a/server/src/grpc/storage_service/query.rs +++ b/server/src/grpc/storage_service/query.rs @@ -9,7 +9,7 @@ use ceresdbproto::{ storage::{query_response, QueryRequest, QueryResponse}, }; use common_types::{record_batch::RecordBatch, request_id::RequestId}; -use common_util::time::InstantExt; +use common_util::{avro_util, time::InstantExt}; use http::StatusCode; use interpreters::{context::Context as InterpreterContext, factory::Factory, interpreter::Output}; use log::info; @@ -20,12 +20,9 @@ use sql::{ provider::CatalogMetaProvider, }; -use crate::{ - avro_util, - grpc::storage_service::{ - error::{ErrNoCause, ErrWithCause, Result}, - HandlerContext, - }, +use crate::grpc::storage_service::{ + error::{ErrNoCause, ErrWithCause, Result}, + HandlerContext, }; /// Schema name of the record diff --git a/server/src/http.rs b/server/src/http.rs index 2c283a2ef9..3aec942ac6 100644 --- a/server/src/http.rs +++ b/server/src/http.rs @@ -10,6 +10,7 @@ use log::error; use logger::RuntimeLevel; use profile::Profiler; use query_engine::executor::Executor as QueryExecutor; +use router::endpoint::Endpoint; use serde_derive::Serialize; use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; use table_engine::{engine::EngineRuntimes, table::FlushRequest}; @@ -23,7 +24,6 @@ use warp::{ }; use crate::{ - config::Endpoint, consts, context::RequestContext, error_util, diff --git a/server/src/lib.rs b/server/src/lib.rs index dc5ef663dc..266ad813e9 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -8,7 +8,6 @@ #[macro_use] extern crate common_util; -mod avro_util; pub mod config; mod consts; mod context; @@ -22,7 +21,6 @@ pub mod local_tables; pub mod logger; mod metrics; mod mysql; -pub mod route; pub mod schema_config_provider; pub mod server; pub mod table_engine; diff --git a/server/src/server.rs b/server/src/server.rs index 473ca53c43..39e10f2c9f 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -11,11 +11,12 @@ use interpreters::table_manipulator::TableManipulatorRef; use log::{info, warn}; use logger::RuntimeLevel; use query_engine::executor::Executor as QueryExecutor; +use router::{endpoint::Endpoint, RouterRef}; use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; use table_engine::engine::{EngineRuntimes, TableEngineRef}; use crate::{ - config::{Config, Endpoint}, + config::Config, grpc::{self, RpcServices}, http::{self, HttpConfig, Service}, instance::{Instance, InstanceRef}, @@ -23,7 +24,6 @@ use crate::{ local_tables::{self, LocalTablesRecoverer}, mysql, mysql::error::Error as MysqlError, - route::RouterRef, schema_config_provider::SchemaConfigProviderRef, }; diff --git a/src/setup.rs b/src/setup.rs index 7f4788849d..234e66f319 100644 --- a/src/setup.rs +++ b/src/setup.rs @@ -19,14 +19,14 @@ use log::info; use logger::RuntimeLevel; use meta_client::meta_impl; use query_engine::executor::{Executor, ExecutorImpl}; +use router::{ + cluster_based::ClusterBasedRouter, + rule_based::{ClusterView, RuleBasedRouter}, +}; use server::{ config::{Config, DeployMode, RuntimeConfig, StaticTopologyConfig}, limiter::Limiter, local_tables::LocalTablesRecoverer, - route::{ - cluster_based::ClusterBasedRouter, - rule_based::{ClusterView, RuleBasedRouter}, - }, schema_config_provider::{ cluster_based::ClusterBasedProvider, config_based::ConfigBasedProvider, },