Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement route interface in http protocol #803

Merged
merged 13 commits into from
Apr 6, 2023
14 changes: 14 additions & 0 deletions server/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use std::{sync::Arc, time::Duration};

use common_util::runtime::Runtime;
use router::{Router, RouterRef};
use snafu::{ensure, Backtrace, OptionExt, Snafu};

#[allow(clippy::enum_variant_names)]
Expand All @@ -18,6 +19,9 @@ pub enum Error {

#[snafu(display("Missing runtime.\nBacktrace:\n{}", backtrace))]
MissingRuntime { backtrace: Backtrace },

#[snafu(display("Missing router.\nBacktrace:\n{}", backtrace))]
MissingRouter { backtrace: Backtrace },
}

define_result!(Error);
Expand All @@ -38,6 +42,8 @@ pub struct RequestContext {
pub enable_partition_table_access: bool,
/// Request timeout
pub timeout: Option<Duration>,
/// router
pub router: Arc<dyn Router + Send + Sync>,
}

impl RequestContext {
Expand All @@ -53,6 +59,7 @@ pub struct Builder {
runtime: Option<Arc<Runtime>>,
enable_partition_table_access: bool,
timeout: Option<Duration>,
router: Option<Arc<dyn Router + Send + Sync>>,
}

impl Builder {
Expand Down Expand Up @@ -81,18 +88,25 @@ impl Builder {
self
}

pub fn router(mut self, router: RouterRef) -> Self {
self.router = Some(router);
self
}

pub fn build(self) -> Result<RequestContext> {
ensure!(!self.catalog.is_empty(), MissingCatalog);
ensure!(!self.schema.is_empty(), MissingSchema);

let runtime = self.runtime.context(MissingRuntime)?;
let router = self.router.context(MissingRouter)?;

Ok(RequestContext {
catalog: self.catalog,
schema: self.schema,
runtime,
enable_partition_table_access: self.enable_partition_table_access,
timeout: self.timeout,
router,
})
}
}
6 changes: 6 additions & 0 deletions server/src/handlers/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ pub enum Error {

#[snafu(display("InfluxDb handler failed, msg:{}.\nBacktrace:\n{}", msg, backtrace))]
InfluxDbHandlerNoCause { msg: String, backtrace: Backtrace },

#[snafu(display("Route handler failed, table:{:?}, source:{}", table, source))]
RouteHandler {
table: String,
source: router::Error,
},
}

define_result!(Error);
Expand Down
1 change: 1 addition & 0 deletions server/src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub mod error;
pub mod influxdb;
pub mod prom;
pub mod query;
pub mod route;

mod prelude {
pub use catalog::manager::Manager as CatalogManager;
Expand Down
51 changes: 51 additions & 0 deletions server/src/handlers/route.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0.

//! route request handler
use ceresdbproto::storage::RouteRequest;
use router::endpoint::Endpoint;

use crate::handlers::{error::RouteHandler, prelude::*};

#[derive(Serialize)]
pub struct RouteResponse {
routes: Vec<RouteItem>,
}

#[derive(Serialize)]
pub struct RouteItem {
pub table: String,
pub endpoint: Option<Endpoint>,
}

pub async fn handle_route<Q: QueryExecutor + 'static>(
ctx: &RequestContext,
_: InstanceRef<Q>,
table: &str,
) -> Result<RouteResponse> {
if table.is_empty() {
return Ok(RouteResponse { routes: vec![] });
}

let route_req = RouteRequest {
context: Some(ceresdbproto::storage::RequestContext {
database: ctx.schema.clone(),
}),
tables: vec![table.to_string()],
};

let routes = ctx.router.route(route_req).await.context(RouteHandler {
table: table.to_string(),
})?;

let mut route_items = Vec::with_capacity(1);
for route in routes {
route_items.push(RouteItem {
table: route.table,
endpoint: route.endpoint.map(|endpoint| endpoint.into()),
});
}

Ok(RouteResponse {
routes: route_items,
})
}
44 changes: 43 additions & 1 deletion server/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use logger::RuntimeLevel;
use profile::Profiler;
use prom_remote_api::{types::RemoteStorageRef, web};
use query_engine::executor::Executor as QueryExecutor;
use router::endpoint::Endpoint;
use router::{endpoint::Endpoint, Router, RouterRef};
use serde::Serialize;
use snafu::{Backtrace, OptionExt, ResultExt, Snafu};
use table_engine::{engine::EngineRuntimes, table::FlushRequest};
Expand Down Expand Up @@ -105,6 +105,9 @@ pub enum Error {

#[snafu(display("Server already started.\nBacktrace:\n{}", backtrace))]
AlreadyStarted { backtrace: Backtrace },

#[snafu(display("Missing router.\nBacktrace:\n{}", backtrace))]
MissingRouter { backtrace: Backtrace },
}

define_result!(Error);
Expand All @@ -128,6 +131,7 @@ pub struct Service<Q> {
rx: Option<Receiver<()>>,
config: HttpConfig,
config_content: String,
router: Arc<dyn Router + Send + Sync>,
}

impl<Q: QueryExecutor + 'static> Service<Q> {
Expand Down Expand Up @@ -178,6 +182,7 @@ impl<Q: QueryExecutor + 'static> Service<Q> {
.or(self.sql())
.or(self.influxdb_api())
.or(self.prom_api())
.or(self.route())
// admin APIs
.or(self.admin_block())
// debug APIs
Expand Down Expand Up @@ -253,6 +258,30 @@ impl<Q: QueryExecutor + 'static> Service<Q> {
})
}

// GET /route
fn route(&self) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
warp::path!("route" / String)
MichaelLeeHZ marked this conversation as resolved.
Show resolved Hide resolved
.and(warp::get())
MichaelLeeHZ marked this conversation as resolved.
Show resolved Hide resolved
.and(self.with_context())
.and(self.with_instance())
.and_then(|table: String, ctx, instance| async move {
MichaelLeeHZ marked this conversation as resolved.
Show resolved Hide resolved
let result = handlers::route::handle_route(&ctx, instance, &table)
.await
.map_err(|e| {
error!(
"Http service Failed to find route of table:{}, err:{:?}",
table, e
);
Box::new(e)
})
.context(HandleRequest);
match result {
Ok(res) => Ok(reply::json(&res)),
Err(e) => Err(reject::custom(e)),
}
})
}

/// for write api:
/// POST `/influxdb/v1/write`
///
Expand Down Expand Up @@ -446,6 +475,7 @@ impl<Q: QueryExecutor + 'static> Service<Q> {
//TODO(boyan) use read/write runtime by sql type.
let runtime = self.engine_runtimes.bg_runtime.clone();
let timeout = self.config.timeout;
let router = self.router.clone();

header::optional::<String>(consts::CATALOG_HEADER)
.and(header::optional::<String>(consts::SCHEMA_HEADER))
Expand All @@ -456,13 +486,15 @@ impl<Q: QueryExecutor + 'static> Service<Q> {
let default_catalog = default_catalog.clone();
let runtime = runtime.clone();
let schema = schema.unwrap_or_else(|| default_schema.clone());
let router = router.clone();
async move {
RequestContext::builder()
.catalog(catalog.unwrap_or(default_catalog))
.schema(schema)
.runtime(runtime)
.timeout(timeout)
.enable_partition_table_access(true)
.router(router)
.build()
.context(CreateContext)
.map_err(reject::custom)
Expand Down Expand Up @@ -506,6 +538,7 @@ pub struct Builder<Q> {
instance: Option<InstanceRef<Q>>,
schema_config_provider: Option<SchemaConfigProviderRef>,
config_content: Option<String>,
router: Option<RouterRef>,
}

impl<Q> Builder<Q> {
Expand All @@ -517,6 +550,7 @@ impl<Q> Builder<Q> {
instance: None,
schema_config_provider: None,
config_content: None,
router: None,
}
}

Expand Down Expand Up @@ -544,6 +578,11 @@ impl<Q> Builder<Q> {
self.config_content = Some(content);
self
}

pub fn router(mut self, router: RouterRef) -> Self {
self.router = Some(router);
self
}
}

impl<Q: QueryExecutor + 'static> Builder<Q> {
Expand All @@ -556,6 +595,7 @@ impl<Q: QueryExecutor + 'static> Builder<Q> {
let schema_config_provider = self
.schema_config_provider
.context(MissingSchemaConfigProvider)?;
let router = self.router.context(MissingRouter)?;
let prom_remote_storage = Arc::new(CeresDBStorage::new(
instance.clone(),
schema_config_provider.clone(),
Expand All @@ -574,6 +614,7 @@ impl<Q: QueryExecutor + 'static> Builder<Q> {
rx: Some(rx),
config: self.config.clone(),
config_content,
router,
};

Ok(service)
Expand Down Expand Up @@ -610,6 +651,7 @@ fn error_to_status_code(err: &Error) -> StatusCode {
| Error::JoinAsyncTask { .. }
| Error::AlreadyStarted { .. }
| Error::HandleUpdateLogLevel { .. } => StatusCode::INTERNAL_SERVER_ERROR,
Error::MissingRouter { .. } => StatusCode::INTERNAL_SERVER_ERROR,
}
}

Expand Down
3 changes: 3 additions & 0 deletions server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,8 @@ impl<Q: QueryExecutor + 'static> Builder<Q> {
let engine_runtimes = self.engine_runtimes.context(MissingEngineRuntimes)?;
let log_runtime = self.log_runtime.context(MissingLogRuntime)?;
let config_content = self.config_content.expect("Missing config content");
let router = self.router.clone().context(MissingRouter)?;

let provider = self
.schema_config_provider
.context(MissingSchemaConfigProvider)?;
Expand All @@ -337,6 +339,7 @@ impl<Q: QueryExecutor + 'static> Builder<Q> {
.instance(instance.clone())
.schema_config_provider(provider.clone())
.config_content(config_content)
.router(router.clone())
.build()
.context(HttpService {
msg: "build failed",
Expand Down