From 6cf4a0b296fb556ccfd35b13f4a3c408a97a88e1 Mon Sep 17 00:00:00 2001 From: MachaelLee <690585471@qq.com> Date: Fri, 17 Mar 2023 10:33:37 +0800 Subject: [PATCH 01/11] http route --- server/src/context.rs | 10 ++++++ server/src/handlers/error.rs | 3 ++ server/src/handlers/mod.rs | 1 + server/src/handlers/route.rs | 68 ++++++++++++++++++++++++++++++++++++ server/src/http.rs | 43 +++++++++++++++++++++-- server/src/server.rs | 4 ++- 6 files changed, 125 insertions(+), 4 deletions(-) create mode 100644 server/src/handlers/route.rs diff --git a/server/src/context.rs b/server/src/context.rs index 8a3fa25f7e..83a7fa0f78 100644 --- a/server/src/context.rs +++ b/server/src/context.rs @@ -5,6 +5,7 @@ use std::{sync::Arc, time::Duration}; use common_util::runtime::Runtime; +use router::Router; use snafu::{ensure, Backtrace, OptionExt, Snafu}; #[allow(clippy::enum_variant_names)] @@ -38,6 +39,8 @@ pub struct RequestContext { pub enable_partition_table_access: bool, /// Request timeout pub timeout: Option, + /// router + pub router: Arc, } impl RequestContext { @@ -53,6 +56,7 @@ pub struct Builder { runtime: Option>, enable_partition_table_access: bool, timeout: Option, + router: Arc, } impl Builder { @@ -81,6 +85,11 @@ impl Builder { self } + pub fn router(mut self, router: Arc) -> Self { + self.router = router; + self + } + pub fn build(self) -> Result { ensure!(!self.catalog.is_empty(), MissingCatalog); ensure!(!self.schema.is_empty(), MissingSchema); @@ -93,6 +102,7 @@ impl Builder { runtime, enable_partition_table_access: self.enable_partition_table_access, timeout: self.timeout, + router: self.router, }) } } diff --git a/server/src/handlers/error.rs b/server/src/handlers/error.rs index 16c2355a53..0d326ddfbf 100644 --- a/server/src/handlers/error.rs +++ b/server/src/handlers/error.rs @@ -75,6 +75,9 @@ pub enum Error { #[snafu(display("InfluxDb handler failed, msg:{}, source:{}", msg, source))] InfluxDbHandler { msg: String, source: GenericError }, + + #[snafu(display("Route handler failed, table:{}, source:{}", table, source))] + RouteHandler { table: String, source: GenericError }, } define_result!(Error); diff --git a/server/src/handlers/mod.rs b/server/src/handlers/mod.rs index cf0f264c67..6f7fe190a9 100644 --- a/server/src/handlers/mod.rs +++ b/server/src/handlers/mod.rs @@ -7,6 +7,7 @@ pub mod error; pub mod influxdb; pub mod prom; pub mod query; +pub mod route; mod prelude { pub use catalog::manager::Manager as CatalogManager; diff --git a/server/src/handlers/route.rs b/server/src/handlers/route.rs new file mode 100644 index 0000000000..c0035d0376 --- /dev/null +++ b/server/src/handlers/route.rs @@ -0,0 +1,68 @@ +// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. + +//! route request handler + +use std::{ time::Instant}; + +use ceresdbproto::storage::{Route, RouteRequest}; +use common_util::time::InstantExt; +use log::info; +use serde::{Serialize, Serializer}; +use serde::ser::{SerializeMap, SerializeSeq}; +use snafu::{ensure}; + +use crate::handlers::{ + error::{ + RouteHandler + }, + prelude::*, +}; +use crate::handlers::error::Error::RouteHandler; + +#[serde(rename_all = "snake_case")] +pub struct Response { + pub routes: Vec, +} + +impl Serialize for Response { + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { let mut seq = serializer.serialize_seq(Some(self.routes.len()))?; + for route in self.routes{ + let endpoint = route.endpoint.unwrap(); + let tup = (route.table.clone(), format!("{}:{}",endpoint.ip,endpoint.port)); + seq.serialize_element(&tup)?; + } + seq.end() + } +} + +pub async fn handle_route( + ctx: &RequestContext, + instance: InstanceRef, + table: String, +) -> Result { + let begin_instant = Instant::now(); + let deadline = ctx.timeout.map(|t| begin_instant + t); + + info!("Route handler try to find route of table:{table}"); + + let req = RouteRequest { + context: Some(ceresdbproto::storage::RequestContext{ + database: ctx.schema.clone() + }), + tables: vec![table], + }; + let routes = ctx.router.route(req).await.context(RouteHandler{ + table: table.clone() + })?; + + info!( + "Route handler finished, cost:{}ms, table:{}", + begin_instant.saturating_elapsed().as_millis(), + table + ); + + Ok(Response{routes} ) +} diff --git a/server/src/http.rs b/server/src/http.rs index f54db35dc6..e6874e9a74 100644 --- a/server/src/http.rs +++ b/server/src/http.rs @@ -3,18 +3,19 @@ //! Http service use std::{ - collections::HashMap, convert::Infallible, error::Error as StdError, net::IpAddr, sync::Arc, - time::Duration, + collections::HashMap, convert::Infallible, error::Error as StdError, fmt::Display, net::IpAddr, + sync::Arc, time::Duration, }; use common_util::error::BoxError; +use datafusion::parquet::arrow::async_reader::AsyncFileReader; use handlers::query::QueryRequest; use log::{error, info}; use logger::RuntimeLevel; use profile::Profiler; use prom_remote_api::{types::RemoteStorageRef, web}; use query_engine::executor::Executor as QueryExecutor; -use router::endpoint::Endpoint; +use router::{endpoint::Endpoint, Router, RouterRef}; use serde::Serialize; use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; use table_engine::{engine::EngineRuntimes, table::FlushRequest}; @@ -96,6 +97,9 @@ pub enum Error { Internal { source: Box, }, + + #[snafu(display("Missing router.\nBacktrace:\n{}", backtrace))] + MissingRouter { backtrace: Backtrace }, } define_result!(Error); @@ -117,6 +121,7 @@ pub struct Service { influxdb: Arc>, tx: Sender<()>, config: HttpConfig, + router: Arc, } impl Service { @@ -136,6 +141,7 @@ impl Service { .or(self.sql()) .or(self.influxdb_api()) .or(self.prom_api()) + .or(self.route()) // admin APIs .or(self.admin_block()) // debug APIs @@ -210,6 +216,27 @@ impl Service { }) } + // GET /route/{table} + fn route(&self) -> impl Filter + Clone { + warp::path!("route" / String) + .and(warp::get()) + .and(self.with_context()) + .and(self.with_instance()) + .and_then(|table, ctx, instance| async move { + let result = handlers::route::handle_route(&ctx, instance, table) + .await + .map_err(|e| { + error!("Http service Failed to find route of table:{table}, err:{e:?}"); + Box::new(e) + }) + .context(HandleRequest); + match result { + Ok(res) => Ok(reply::json(&res)), + Err(e) => Err(reject::custom(e)), + } + }) + } + /// POST `/influxdb/v1/query` and `/influxdb/v1/write` fn influxdb_api( &self, @@ -388,6 +415,7 @@ impl Service { .runtime(runtime) .timeout(timeout) .enable_partition_table_access(true) + .router(self.router.clone()) .build() .context(CreateContext) .map_err(reject::custom) @@ -430,6 +458,7 @@ pub struct Builder { log_runtime: Option>, instance: Option>, schema_config_provider: Option, + router: Option, } impl Builder { @@ -440,6 +469,7 @@ impl Builder { log_runtime: None, instance: None, schema_config_provider: None, + router: None, } } @@ -462,6 +492,11 @@ impl Builder { self.schema_config_provider = Some(provider); self } + + pub fn router(mut self, router: RouterRef) -> Self { + self.router = Some(router); + self + } } impl Builder { @@ -473,6 +508,7 @@ impl Builder { let schema_config_provider = self .schema_config_provider .context(MissingSchemaConfigProvider)?; + let router = self.router.context(MissingRouter)?; let prom_remote_storage = Arc::new(CeresDBStorage::new( instance.clone(), schema_config_provider.clone(), @@ -489,6 +525,7 @@ impl Builder { profiler: Arc::new(Profiler::default()), tx, config: self.config.clone(), + router, }; info!( diff --git a/server/src/server.rs b/server/src/server.rs index f38441371d..f636d450ab 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -315,6 +315,8 @@ impl Builder { // Start http service let engine_runtimes = self.engine_runtimes.context(MissingEngineRuntimes)?; let log_runtime = self.log_runtime.context(MissingLogRuntime)?; + let router = self.router.context(MissingRouter)?; + let provider = self .schema_config_provider .context(MissingSchemaConfigProvider)?; @@ -323,6 +325,7 @@ impl Builder { .log_runtime(log_runtime) .instance(instance.clone()) .schema_config_provider(provider.clone()) + .router(router) .build() .context(StartHttpService)?; @@ -338,7 +341,6 @@ impl Builder { .build() .context(BuildMysqlService)?; - let router = self.router.context(MissingRouter)?; let rpc_services = grpc::Builder::new() .endpoint(Endpoint::new(self.config.bind_addr, self.config.grpc_port).to_string()) .local_endpoint(Endpoint::new(self.node_addr, self.config.grpc_port).to_string()) From 1c15b5b0b0661fa8d62ee5bc4f5902cc3c266762 Mon Sep 17 00:00:00 2001 From: MachaelLee <690585471@qq.com> Date: Fri, 17 Mar 2023 14:18:13 +0800 Subject: [PATCH 02/11] debug --- server/src/handlers/route.rs | 2 +- server/src/http.rs | 9 +++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/server/src/handlers/route.rs b/server/src/handlers/route.rs index c0035d0376..80085c2ffe 100644 --- a/server/src/handlers/route.rs +++ b/server/src/handlers/route.rs @@ -17,7 +17,6 @@ use crate::handlers::{ }, prelude::*, }; -use crate::handlers::error::Error::RouteHandler; #[serde(rename_all = "snake_case")] pub struct Response { @@ -43,6 +42,7 @@ pub async fn handle_route( instance: InstanceRef, table: String, ) -> Result { + ensure!(!table.is_empty()); let begin_instant = Instant::now(); let deadline = ctx.timeout.map(|t| begin_instant + t); diff --git a/server/src/http.rs b/server/src/http.rs index e6874e9a74..a4355aaf1f 100644 --- a/server/src/http.rs +++ b/server/src/http.rs @@ -222,11 +222,11 @@ impl Service { .and(warp::get()) .and(self.with_context()) .and(self.with_instance()) - .and_then(|table, ctx, instance| async move { - let result = handlers::route::handle_route(&ctx, instance, table) + .and_then(|table:String, ctx, instance| async move { + let result = handlers::route::handle_route(&ctx, instance, table.clone()) .await .map_err(|e| { - error!("Http service Failed to find route of table:{table}, err:{e:?}"); + error!("Http service Failed to find route of table:{}, err:{:?}",table,e); Box::new(e) }) .context(HandleRequest); @@ -398,6 +398,7 @@ impl Service { //TODO(boyan) use read/write runtime by sql type. let runtime = self.engine_runtimes.bg_runtime.clone(); let timeout = self.config.timeout; + let router = self.router.clone(); header::optional::(consts::CATALOG_HEADER) .and(header::optional::(consts::SCHEMA_HEADER)) @@ -415,7 +416,7 @@ impl Service { .runtime(runtime) .timeout(timeout) .enable_partition_table_access(true) - .router(self.router.clone()) + .router(router) .build() .context(CreateContext) .map_err(reject::custom) From 4f62fb1814a7f6438c0a64e69a5a372b18ee0050 Mon Sep 17 00:00:00 2001 From: MachaelLee <690585471@qq.com> Date: Fri, 17 Mar 2023 19:05:20 +0800 Subject: [PATCH 03/11] add http route interface --- server/src/context.rs | 10 ++++-- server/src/handlers/error.rs | 5 ++- server/src/handlers/route.rs | 67 ++++++++++++++---------------------- server/src/http.rs | 14 +++++--- server/src/server.rs | 4 +-- 5 files changed, 47 insertions(+), 53 deletions(-) diff --git a/server/src/context.rs b/server/src/context.rs index 83a7fa0f78..810d64f945 100644 --- a/server/src/context.rs +++ b/server/src/context.rs @@ -19,6 +19,9 @@ pub enum Error { #[snafu(display("Missing runtime.\nBacktrace:\n{}", backtrace))] MissingRuntime { backtrace: Backtrace }, + + #[snafu(display("Missing router.\nBacktrace:\n{}", backtrace))] + MissingRouter { backtrace: Backtrace }, } define_result!(Error); @@ -56,7 +59,7 @@ pub struct Builder { runtime: Option>, enable_partition_table_access: bool, timeout: Option, - router: Arc, + router: Option>, } impl Builder { @@ -86,7 +89,7 @@ impl Builder { } pub fn router(mut self, router: Arc) -> Self { - self.router = router; + self.router = Some(router); self } @@ -95,6 +98,7 @@ impl Builder { ensure!(!self.schema.is_empty(), MissingSchema); let runtime = self.runtime.context(MissingRuntime)?; + let router = self.router.context(MissingRouter)?; Ok(RequestContext { catalog: self.catalog, @@ -102,7 +106,7 @@ impl Builder { runtime, enable_partition_table_access: self.enable_partition_table_access, timeout: self.timeout, - router: self.router, + router, }) } } diff --git a/server/src/handlers/error.rs b/server/src/handlers/error.rs index 0d326ddfbf..6936c58132 100644 --- a/server/src/handlers/error.rs +++ b/server/src/handlers/error.rs @@ -77,7 +77,10 @@ pub enum Error { InfluxDbHandler { msg: String, source: GenericError }, #[snafu(display("Route handler failed, table:{}, source:{}", table, source))] - RouteHandler { table: String, source: GenericError }, + RouteHandler { + table: String, + source: router::Error, + }, } define_result!(Error); diff --git a/server/src/handlers/route.rs b/server/src/handlers/route.rs index 80085c2ffe..df8be82938 100644 --- a/server/src/handlers/route.rs +++ b/server/src/handlers/route.rs @@ -2,67 +2,50 @@ //! route request handler -use std::{ time::Instant}; +use std::{collections::HashMap, time::Instant}; -use ceresdbproto::storage::{Route, RouteRequest}; +use ceresdbproto::storage::{RouteRequest}; use common_util::time::InstantExt; use log::info; -use serde::{Serialize, Serializer}; -use serde::ser::{SerializeMap, SerializeSeq}; -use snafu::{ensure}; -use crate::handlers::{ - error::{ - RouteHandler - }, - prelude::*, -}; +use crate::handlers::{error::RouteHandler, prelude::*}; -#[serde(rename_all = "snake_case")] -pub struct Response { - pub routes: Vec, -} - -impl Serialize for Response { - fn serialize(&self, serializer: S) -> std::result::Result - where - S: serde::Serializer, - { let mut seq = serializer.serialize_seq(Some(self.routes.len()))?; - for route in self.routes{ - let endpoint = route.endpoint.unwrap(); - let tup = (route.table.clone(), format!("{}:{}",endpoint.ip,endpoint.port)); - seq.serialize_element(&tup)?; - } - seq.end() - } +#[derive(Serialize)] +pub struct RouteResponse { + routes: HashMap, } pub async fn handle_route( ctx: &RequestContext, - instance: InstanceRef, + _: InstanceRef, table: String, -) -> Result { - ensure!(!table.is_empty()); - let begin_instant = Instant::now(); - let deadline = ctx.timeout.map(|t| begin_instant + t); - - info!("Route handler try to find route of table:{table}"); +) -> Result { + let mut route_map = HashMap::new(); + if table.is_empty() { + return Ok(RouteResponse { routes: route_map }); + } + let begin_instant = Instant::now(); + info!("Route handler try to find route for table:{table}"); let req = RouteRequest { - context: Some(ceresdbproto::storage::RequestContext{ - database: ctx.schema.clone() + context: Some(ceresdbproto::storage::RequestContext { + database: ctx.schema.clone(), }), - tables: vec![table], + tables: vec![table.clone()], }; - let routes = ctx.router.route(req).await.context(RouteHandler{ - table: table.clone() + let routes = ctx.router.route(req).await.context(RouteHandler { + table: table.clone(), })?; - + for route in routes { + if let Some(endpoint) = route.endpoint { + route_map.insert(route.table, format!("{}:{}", endpoint.ip, endpoint.port)); + } + } info!( "Route handler finished, cost:{}ms, table:{}", begin_instant.saturating_elapsed().as_millis(), table ); - Ok(Response{routes} ) + Ok(RouteResponse { routes: route_map }) } diff --git a/server/src/http.rs b/server/src/http.rs index a4355aaf1f..8da31d55c7 100644 --- a/server/src/http.rs +++ b/server/src/http.rs @@ -3,12 +3,11 @@ //! Http service use std::{ - collections::HashMap, convert::Infallible, error::Error as StdError, fmt::Display, net::IpAddr, - sync::Arc, time::Duration, + collections::HashMap, convert::Infallible, error::Error as StdError, net::IpAddr, sync::Arc, + time::Duration, }; use common_util::error::BoxError; -use datafusion::parquet::arrow::async_reader::AsyncFileReader; use handlers::query::QueryRequest; use log::{error, info}; use logger::RuntimeLevel; @@ -222,11 +221,14 @@ impl Service { .and(warp::get()) .and(self.with_context()) .and(self.with_instance()) - .and_then(|table:String, ctx, instance| async move { + .and_then(|table: String, ctx, instance| async move { let result = handlers::route::handle_route(&ctx, instance, table.clone()) .await .map_err(|e| { - error!("Http service Failed to find route of table:{}, err:{:?}",table,e); + error!( + "Http service Failed to find route of table:{}, err:{:?}", + table, e + ); Box::new(e) }) .context(HandleRequest); @@ -409,6 +411,7 @@ impl Service { let default_catalog = default_catalog.clone(); let runtime = runtime.clone(); let schema = schema.unwrap_or_else(|| default_schema.clone()); + let router = router.clone(); async move { RequestContext::builder() .catalog(catalog.unwrap_or(default_catalog)) @@ -581,6 +584,7 @@ fn error_to_status_code(err: &Error) -> StatusCode { | Error::Internal { .. } | Error::JoinAsyncTask { .. } | Error::HandleUpdateLogLevel { .. } => StatusCode::INTERNAL_SERVER_ERROR, + Error::MissingRouter { .. } => StatusCode::INTERNAL_SERVER_ERROR, } } diff --git a/server/src/server.rs b/server/src/server.rs index f636d450ab..2fa434d713 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -325,7 +325,7 @@ impl Builder { .log_runtime(log_runtime) .instance(instance.clone()) .schema_config_provider(provider.clone()) - .router(router) + .router(router.clone()) .build() .context(StartHttpService)?; @@ -347,7 +347,7 @@ impl Builder { .resp_compress_min_length(self.config.resp_compress_min_length.as_bytes() as usize) .runtimes(engine_runtimes) .instance(instance.clone()) - .router(router) + .router(router.clone()) .cluster(self.cluster.clone()) .opened_wals(opened_wals) .schema_config_provider(provider) From 2fb365bdc29a63d4c35021968581fb674ae1c4e9 Mon Sep 17 00:00:00 2001 From: MachaelLee <690585471@qq.com> Date: Fri, 17 Mar 2023 19:06:58 +0800 Subject: [PATCH 04/11] fmt --- server/src/handlers/route.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/handlers/route.rs b/server/src/handlers/route.rs index df8be82938..dd49befbea 100644 --- a/server/src/handlers/route.rs +++ b/server/src/handlers/route.rs @@ -4,7 +4,7 @@ use std::{collections::HashMap, time::Instant}; -use ceresdbproto::storage::{RouteRequest}; +use ceresdbproto::storage::RouteRequest; use common_util::time::InstantExt; use log::info; From fbaa7a3599813d95b783f764c8cf8e6dd8ae78de Mon Sep 17 00:00:00 2001 From: MachaelLee <690585471@qq.com> Date: Mon, 3 Apr 2023 16:55:14 +0800 Subject: [PATCH 05/11] http router --- server/src/server.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/server.rs b/server/src/server.rs index 9ff2d79f86..53571433d4 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -328,7 +328,7 @@ impl Builder { let engine_runtimes = self.engine_runtimes.context(MissingEngineRuntimes)?; let log_runtime = self.log_runtime.context(MissingLogRuntime)?; let config_content = self.config_content.expect("Missing config content"); - let router = self.router.context(MissingRouter)?; + let router = self.router.clone().context(MissingRouter)?; let provider = self .schema_config_provider @@ -357,7 +357,7 @@ impl Builder { .build() .context(BuildMysqlService)?; - let router = self.router.context(MissingRouter)?; + let router = self.router.clone().context(MissingRouter)?; let rpc_services = grpc::Builder::new() .endpoint( From e7b36beb1fbdc35371d4c15aced293b0b653ad79 Mon Sep 17 00:00:00 2001 From: MachaelLee <690585471@qq.com> Date: Mon, 3 Apr 2023 17:22:17 +0800 Subject: [PATCH 06/11] fmt --- server/src/http.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/http.rs b/server/src/http.rs index 41ab8b7cca..a19358a90f 100644 --- a/server/src/http.rs +++ b/server/src/http.rs @@ -132,7 +132,6 @@ pub struct Service { config: HttpConfig, config_content: String, router: Arc, - } impl Service { From 19614eef8605b9b37522eee3cef2e2d680ad74fb Mon Sep 17 00:00:00 2001 From: lee <690585471@qq.com> Date: Mon, 3 Apr 2023 18:09:40 +0800 Subject: [PATCH 07/11] Update server/src/context.rs Co-authored-by: WEI Xikai --- server/src/context.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/context.rs b/server/src/context.rs index 810d64f945..74b44268a8 100644 --- a/server/src/context.rs +++ b/server/src/context.rs @@ -88,7 +88,7 @@ impl Builder { self } - pub fn router(mut self, router: Arc) -> Self { + pub fn router(mut self, router: RouterRef) -> Self { self.router = Some(router); self } From b2433fb6a291168b609cc97f8dfec8ea31dd2b5f Mon Sep 17 00:00:00 2001 From: MachaelLee <690585471@qq.com> Date: Mon, 3 Apr 2023 19:41:54 +0800 Subject: [PATCH 08/11] optimize input/output --- server/src/context.rs | 4 +-- server/src/handlers/error.rs | 4 +-- server/src/handlers/route.rs | 55 +++++++++++++++++++++++------------- server/src/http.rs | 14 +++++---- server/src/server.rs | 4 +-- 5 files changed, 50 insertions(+), 31 deletions(-) diff --git a/server/src/context.rs b/server/src/context.rs index 810d64f945..0d22ce4bc8 100644 --- a/server/src/context.rs +++ b/server/src/context.rs @@ -5,7 +5,7 @@ use std::{sync::Arc, time::Duration}; use common_util::runtime::Runtime; -use router::Router; +use router::{Router, RouterRef}; use snafu::{ensure, Backtrace, OptionExt, Snafu}; #[allow(clippy::enum_variant_names)] @@ -88,7 +88,7 @@ impl Builder { self } - pub fn router(mut self, router: Arc) -> Self { + pub fn router(mut self, router: RouterRef) -> Self { self.router = Some(router); self } diff --git a/server/src/handlers/error.rs b/server/src/handlers/error.rs index 2c95102e45..5faec10d97 100644 --- a/server/src/handlers/error.rs +++ b/server/src/handlers/error.rs @@ -79,9 +79,9 @@ pub enum Error { #[snafu(display("InfluxDb handler failed, msg:{}.\nBacktrace:\n{}", msg, backtrace))] InfluxDbHandlerNoCause { msg: String, backtrace: Backtrace }, - #[snafu(display("Route handler failed, table:{}, source:{}", table, source))] + #[snafu(display("Route handler failed, table:{:?}, source:{}", tables, source))] RouteHandler { - table: String, + tables: Vec, source: router::Error, }, } diff --git a/server/src/handlers/route.rs b/server/src/handlers/route.rs index dd49befbea..4057cde3a3 100644 --- a/server/src/handlers/route.rs +++ b/server/src/handlers/route.rs @@ -2,50 +2,67 @@ //! route request handler -use std::{collections::HashMap, time::Instant}; +use std::time::Instant; use ceresdbproto::storage::RouteRequest; use common_util::time::InstantExt; -use log::info; +use log::debug; +use router::endpoint::Endpoint; use crate::handlers::{error::RouteHandler, prelude::*}; +#[derive(Debug, Deserialize)] +pub struct RouteHttpRequest { + pub tables: Vec, +} + #[derive(Serialize)] pub struct RouteResponse { - routes: HashMap, + routes: Vec, +} + +#[derive(Serialize)] +pub struct RouteItem { + pub table: String, + pub endpoint: Option, } pub async fn handle_route( ctx: &RequestContext, _: InstanceRef, - table: String, + req: &RouteHttpRequest, ) -> Result { - let mut route_map = HashMap::new(); - if table.is_empty() { - return Ok(RouteResponse { routes: route_map }); + if req.tables.is_empty() { + return Ok(RouteResponse { routes: vec![] }); } let begin_instant = Instant::now(); - info!("Route handler try to find route for table:{table}"); - let req = RouteRequest { + let route_req = RouteRequest { context: Some(ceresdbproto::storage::RequestContext { database: ctx.schema.clone(), }), - tables: vec![table.clone()], + tables: req.tables.clone(), }; - let routes = ctx.router.route(req).await.context(RouteHandler { - table: table.clone(), + + let routes = ctx.router.route(route_req).await.context(RouteHandler { + tables: req.tables.clone(), })?; + + let mut route_items = Vec::with_capacity(req.tables.len()); for route in routes { - if let Some(endpoint) = route.endpoint { - route_map.insert(route.table, format!("{}:{}", endpoint.ip, endpoint.port)); - } + route_items.push(RouteItem { + table: route.table, + endpoint: route.endpoint.map(|endpoint| endpoint.into()), + }); } - info!( - "Route handler finished, cost:{}ms, table:{}", + + debug!( + "Route handler finished, tables:{:?}, cost:{}ms", + req.tables, begin_instant.saturating_elapsed().as_millis(), - table ); - Ok(RouteResponse { routes: route_map }) + Ok(RouteResponse { + routes: route_items, + }) } diff --git a/server/src/http.rs b/server/src/http.rs index a19358a90f..b536b64f25 100644 --- a/server/src/http.rs +++ b/server/src/http.rs @@ -258,19 +258,21 @@ impl Service { }) } - // GET /route/{table} + // GET /route fn route(&self) -> impl Filter + Clone { - warp::path!("route" / String) + warp::path!("route") .and(warp::get()) + .and(warp::body::content_length_limit(self.config.max_body_size)) + .and(warp::body::json()) .and(self.with_context()) .and(self.with_instance()) - .and_then(|table: String, ctx, instance| async move { - let result = handlers::route::handle_route(&ctx, instance, table.clone()) + .and_then(|req, ctx, instance| async move { + let result = handlers::route::handle_route(&ctx, instance, &req) .await .map_err(|e| { error!( - "Http service Failed to find route of table:{}, err:{:?}", - table, e + "Http service Failed to find route of tables:{:?}, err:{:?}", + &req.tables, e ); Box::new(e) }) diff --git a/server/src/server.rs b/server/src/server.rs index 53571433d4..1830b054df 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -357,7 +357,7 @@ impl Builder { .build() .context(BuildMysqlService)?; - let router = self.router.clone().context(MissingRouter)?; + let router = self.router.context(MissingRouter)?; let rpc_services = grpc::Builder::new() .endpoint( @@ -372,7 +372,7 @@ impl Builder { ) .runtimes(engine_runtimes) .instance(instance.clone()) - .router(router.clone()) + .router(router) .cluster(self.cluster.clone()) .opened_wals(opened_wals) .schema_config_provider(provider) From e803992f514c9313f66fa266e4f22b1f8e9899ee Mon Sep 17 00:00:00 2001 From: MachaelLee <690585471@qq.com> Date: Mon, 3 Apr 2023 21:16:23 +0800 Subject: [PATCH 09/11] chore --- server/src/handlers/error.rs | 4 ++-- server/src/handlers/route.rs | 19 +++++++------------ server/src/http.rs | 12 +++++------- 3 files changed, 14 insertions(+), 21 deletions(-) diff --git a/server/src/handlers/error.rs b/server/src/handlers/error.rs index 5faec10d97..de6d68b339 100644 --- a/server/src/handlers/error.rs +++ b/server/src/handlers/error.rs @@ -79,9 +79,9 @@ pub enum Error { #[snafu(display("InfluxDb handler failed, msg:{}.\nBacktrace:\n{}", msg, backtrace))] InfluxDbHandlerNoCause { msg: String, backtrace: Backtrace }, - #[snafu(display("Route handler failed, table:{:?}, source:{}", tables, source))] + #[snafu(display("Route handler failed, table:{:?}, source:{}", table, source))] RouteHandler { - tables: Vec, + table: String, source: router::Error, }, } diff --git a/server/src/handlers/route.rs b/server/src/handlers/route.rs index 4057cde3a3..179be54e04 100644 --- a/server/src/handlers/route.rs +++ b/server/src/handlers/route.rs @@ -11,11 +11,6 @@ use router::endpoint::Endpoint; use crate::handlers::{error::RouteHandler, prelude::*}; -#[derive(Debug, Deserialize)] -pub struct RouteHttpRequest { - pub tables: Vec, -} - #[derive(Serialize)] pub struct RouteResponse { routes: Vec, @@ -30,9 +25,9 @@ pub struct RouteItem { pub async fn handle_route( ctx: &RequestContext, _: InstanceRef, - req: &RouteHttpRequest, + table: &str, ) -> Result { - if req.tables.is_empty() { + if table.is_empty() { return Ok(RouteResponse { routes: vec![] }); } @@ -41,14 +36,14 @@ pub async fn handle_route( context: Some(ceresdbproto::storage::RequestContext { database: ctx.schema.clone(), }), - tables: req.tables.clone(), + tables: vec![table.to_string()], }; let routes = ctx.router.route(route_req).await.context(RouteHandler { - tables: req.tables.clone(), + table: table.to_string(), })?; - let mut route_items = Vec::with_capacity(req.tables.len()); + let mut route_items = Vec::with_capacity(1); for route in routes { route_items.push(RouteItem { table: route.table, @@ -57,8 +52,8 @@ pub async fn handle_route( } debug!( - "Route handler finished, tables:{:?}, cost:{}ms", - req.tables, + "Route handler finished, table:{}, cost:{}ms", + table, begin_instant.saturating_elapsed().as_millis(), ); diff --git a/server/src/http.rs b/server/src/http.rs index b536b64f25..8528ed761f 100644 --- a/server/src/http.rs +++ b/server/src/http.rs @@ -260,19 +260,17 @@ impl Service { // GET /route fn route(&self) -> impl Filter + Clone { - warp::path!("route") + warp::path!("route" / String) .and(warp::get()) - .and(warp::body::content_length_limit(self.config.max_body_size)) - .and(warp::body::json()) .and(self.with_context()) .and(self.with_instance()) - .and_then(|req, ctx, instance| async move { - let result = handlers::route::handle_route(&ctx, instance, &req) + .and_then(|table: String, ctx, instance| async move { + let result = handlers::route::handle_route(&ctx, instance, &table) .await .map_err(|e| { error!( - "Http service Failed to find route of tables:{:?}, err:{:?}", - &req.tables, e + "Http service Failed to find route of table:{}, err:{:?}", + table, e ); Box::new(e) }) From 933ccbb14650395298311a2560c8ad242ca22dd7 Mon Sep 17 00:00:00 2001 From: MachaelLee <690585471@qq.com> Date: Tue, 4 Apr 2023 12:10:45 +0800 Subject: [PATCH 10/11] remove log --- server/src/handlers/route.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/server/src/handlers/route.rs b/server/src/handlers/route.rs index 179be54e04..ef12180059 100644 --- a/server/src/handlers/route.rs +++ b/server/src/handlers/route.rs @@ -51,12 +51,6 @@ pub async fn handle_route( }); } - debug!( - "Route handler finished, table:{}, cost:{}ms", - table, - begin_instant.saturating_elapsed().as_millis(), - ); - Ok(RouteResponse { routes: route_items, }) From 472d54a2b7ef2d12bea3de14e8d036dee76e609b Mon Sep 17 00:00:00 2001 From: MachaelLee <690585471@qq.com> Date: Tue, 4 Apr 2023 12:16:07 +0800 Subject: [PATCH 11/11] clippy --- server/src/handlers/route.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/server/src/handlers/route.rs b/server/src/handlers/route.rs index ef12180059..a3a179b53e 100644 --- a/server/src/handlers/route.rs +++ b/server/src/handlers/route.rs @@ -1,12 +1,7 @@ // Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. //! route request handler - -use std::time::Instant; - use ceresdbproto::storage::RouteRequest; -use common_util::time::InstantExt; -use log::debug; use router::endpoint::Endpoint; use crate::handlers::{error::RouteHandler, prelude::*}; @@ -31,7 +26,6 @@ pub async fn handle_route( return Ok(RouteResponse { routes: vec![] }); } - let begin_instant = Instant::now(); let route_req = RouteRequest { context: Some(ceresdbproto::storage::RequestContext { database: ctx.schema.clone(),