Skip to content

Commit

Permalink
Merge 472d54a into db8ad4b
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelLeeHZ authored Apr 4, 2023
2 parents db8ad4b + 472d54a commit 33d0a7b
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 1 deletion.
14 changes: 14 additions & 0 deletions server/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use std::{sync::Arc, time::Duration};

use common_util::runtime::Runtime;
use router::{Router, RouterRef};
use snafu::{ensure, Backtrace, OptionExt, Snafu};

#[allow(clippy::enum_variant_names)]
Expand All @@ -18,6 +19,9 @@ pub enum Error {

#[snafu(display("Missing runtime.\nBacktrace:\n{}", backtrace))]
MissingRuntime { backtrace: Backtrace },

#[snafu(display("Missing router.\nBacktrace:\n{}", backtrace))]
MissingRouter { backtrace: Backtrace },
}

define_result!(Error);
Expand All @@ -38,6 +42,8 @@ pub struct RequestContext {
pub enable_partition_table_access: bool,
/// Request timeout
pub timeout: Option<Duration>,
/// router
pub router: Arc<dyn Router + Send + Sync>,
}

impl RequestContext {
Expand All @@ -53,6 +59,7 @@ pub struct Builder {
runtime: Option<Arc<Runtime>>,
enable_partition_table_access: bool,
timeout: Option<Duration>,
router: Option<Arc<dyn Router + Send + Sync>>,
}

impl Builder {
Expand Down Expand Up @@ -81,18 +88,25 @@ impl Builder {
self
}

pub fn router(mut self, router: RouterRef) -> Self {
self.router = Some(router);
self
}

pub fn build(self) -> Result<RequestContext> {
ensure!(!self.catalog.is_empty(), MissingCatalog);
ensure!(!self.schema.is_empty(), MissingSchema);

let runtime = self.runtime.context(MissingRuntime)?;
let router = self.router.context(MissingRouter)?;

Ok(RequestContext {
catalog: self.catalog,
schema: self.schema,
runtime,
enable_partition_table_access: self.enable_partition_table_access,
timeout: self.timeout,
router,
})
}
}
6 changes: 6 additions & 0 deletions server/src/handlers/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ pub enum Error {

#[snafu(display("InfluxDb handler failed, msg:{}.\nBacktrace:\n{}", msg, backtrace))]
InfluxDbHandlerNoCause { msg: String, backtrace: Backtrace },

#[snafu(display("Route handler failed, table:{:?}, source:{}", table, source))]
RouteHandler {
table: String,
source: router::Error,
},
}

define_result!(Error);
Expand Down
1 change: 1 addition & 0 deletions server/src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub mod error;
pub mod influxdb;
pub mod prom;
pub mod query;
pub mod route;

mod prelude {
pub use catalog::manager::Manager as CatalogManager;
Expand Down
51 changes: 51 additions & 0 deletions server/src/handlers/route.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0.

//! route request handler
use ceresdbproto::storage::RouteRequest;
use router::endpoint::Endpoint;

use crate::handlers::{error::RouteHandler, prelude::*};

#[derive(Serialize)]
pub struct RouteResponse {
routes: Vec<RouteItem>,
}

#[derive(Serialize)]
pub struct RouteItem {
pub table: String,
pub endpoint: Option<Endpoint>,
}

pub async fn handle_route<Q: QueryExecutor + 'static>(
ctx: &RequestContext,
_: InstanceRef<Q>,
table: &str,
) -> Result<RouteResponse> {
if table.is_empty() {
return Ok(RouteResponse { routes: vec![] });
}

let route_req = RouteRequest {
context: Some(ceresdbproto::storage::RequestContext {
database: ctx.schema.clone(),
}),
tables: vec![table.to_string()],
};

let routes = ctx.router.route(route_req).await.context(RouteHandler {
table: table.to_string(),
})?;

let mut route_items = Vec::with_capacity(1);
for route in routes {
route_items.push(RouteItem {
table: route.table,
endpoint: route.endpoint.map(|endpoint| endpoint.into()),
});
}

Ok(RouteResponse {
routes: route_items,
})
}
44 changes: 43 additions & 1 deletion server/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use logger::RuntimeLevel;
use profile::Profiler;
use prom_remote_api::{types::RemoteStorageRef, web};
use query_engine::executor::Executor as QueryExecutor;
use router::endpoint::Endpoint;
use router::{endpoint::Endpoint, Router, RouterRef};
use serde::Serialize;
use snafu::{Backtrace, OptionExt, ResultExt, Snafu};
use table_engine::{engine::EngineRuntimes, table::FlushRequest};
Expand Down Expand Up @@ -105,6 +105,9 @@ pub enum Error {

#[snafu(display("Server already started.\nBacktrace:\n{}", backtrace))]
AlreadyStarted { backtrace: Backtrace },

#[snafu(display("Missing router.\nBacktrace:\n{}", backtrace))]
MissingRouter { backtrace: Backtrace },
}

define_result!(Error);
Expand All @@ -128,6 +131,7 @@ pub struct Service<Q> {
rx: Option<Receiver<()>>,
config: HttpConfig,
config_content: String,
router: Arc<dyn Router + Send + Sync>,
}

impl<Q: QueryExecutor + 'static> Service<Q> {
Expand Down Expand Up @@ -178,6 +182,7 @@ impl<Q: QueryExecutor + 'static> Service<Q> {
.or(self.sql())
.or(self.influxdb_api())
.or(self.prom_api())
.or(self.route())
// admin APIs
.or(self.admin_block())
// debug APIs
Expand Down Expand Up @@ -253,6 +258,30 @@ impl<Q: QueryExecutor + 'static> Service<Q> {
})
}

// GET /route
fn route(&self) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
warp::path!("route" / String)
.and(warp::get())
.and(self.with_context())
.and(self.with_instance())
.and_then(|table: String, ctx, instance| async move {
let result = handlers::route::handle_route(&ctx, instance, &table)
.await
.map_err(|e| {
error!(
"Http service Failed to find route of table:{}, err:{:?}",
table, e
);
Box::new(e)
})
.context(HandleRequest);
match result {
Ok(res) => Ok(reply::json(&res)),
Err(e) => Err(reject::custom(e)),
}
})
}

/// for write api:
/// POST `/influxdb/v1/write`
///
Expand Down Expand Up @@ -446,6 +475,7 @@ impl<Q: QueryExecutor + 'static> Service<Q> {
//TODO(boyan) use read/write runtime by sql type.
let runtime = self.engine_runtimes.bg_runtime.clone();
let timeout = self.config.timeout;
let router = self.router.clone();

header::optional::<String>(consts::CATALOG_HEADER)
.and(header::optional::<String>(consts::SCHEMA_HEADER))
Expand All @@ -456,13 +486,15 @@ impl<Q: QueryExecutor + 'static> Service<Q> {
let default_catalog = default_catalog.clone();
let runtime = runtime.clone();
let schema = schema.unwrap_or_else(|| default_schema.clone());
let router = router.clone();
async move {
RequestContext::builder()
.catalog(catalog.unwrap_or(default_catalog))
.schema(schema)
.runtime(runtime)
.timeout(timeout)
.enable_partition_table_access(true)
.router(router)
.build()
.context(CreateContext)
.map_err(reject::custom)
Expand Down Expand Up @@ -506,6 +538,7 @@ pub struct Builder<Q> {
instance: Option<InstanceRef<Q>>,
schema_config_provider: Option<SchemaConfigProviderRef>,
config_content: Option<String>,
router: Option<RouterRef>,
}

impl<Q> Builder<Q> {
Expand All @@ -517,6 +550,7 @@ impl<Q> Builder<Q> {
instance: None,
schema_config_provider: None,
config_content: None,
router: None,
}
}

Expand Down Expand Up @@ -544,6 +578,11 @@ impl<Q> Builder<Q> {
self.config_content = Some(content);
self
}

pub fn router(mut self, router: RouterRef) -> Self {
self.router = Some(router);
self
}
}

impl<Q: QueryExecutor + 'static> Builder<Q> {
Expand All @@ -556,6 +595,7 @@ impl<Q: QueryExecutor + 'static> Builder<Q> {
let schema_config_provider = self
.schema_config_provider
.context(MissingSchemaConfigProvider)?;
let router = self.router.context(MissingRouter)?;
let prom_remote_storage = Arc::new(CeresDBStorage::new(
instance.clone(),
schema_config_provider.clone(),
Expand All @@ -574,6 +614,7 @@ impl<Q: QueryExecutor + 'static> Builder<Q> {
rx: Some(rx),
config: self.config.clone(),
config_content,
router,
};

Ok(service)
Expand Down Expand Up @@ -610,6 +651,7 @@ fn error_to_status_code(err: &Error) -> StatusCode {
| Error::JoinAsyncTask { .. }
| Error::AlreadyStarted { .. }
| Error::HandleUpdateLogLevel { .. } => StatusCode::INTERNAL_SERVER_ERROR,
Error::MissingRouter { .. } => StatusCode::INTERNAL_SERVER_ERROR,
}
}

Expand Down
3 changes: 3 additions & 0 deletions server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,8 @@ impl<Q: QueryExecutor + 'static> Builder<Q> {
let engine_runtimes = self.engine_runtimes.context(MissingEngineRuntimes)?;
let log_runtime = self.log_runtime.context(MissingLogRuntime)?;
let config_content = self.config_content.expect("Missing config content");
let router = self.router.clone().context(MissingRouter)?;

let provider = self
.schema_config_provider
.context(MissingSchemaConfigProvider)?;
Expand All @@ -337,6 +339,7 @@ impl<Q: QueryExecutor + 'static> Builder<Q> {
.instance(instance.clone())
.schema_config_provider(provider.clone())
.config_content(config_content)
.router(router.clone())
.build()
.context(HttpService {
msg: "build failed",
Expand Down

0 comments on commit 33d0a7b

Please sign in to comment.