From 67516b03808ff7a54ab979f4c9e4957fd783da87 Mon Sep 17 00:00:00 2001 From: "chunshao.rcs" Date: Mon, 8 May 2023 17:40:01 +0800 Subject: [PATCH 1/4] feat: impl influxdb api with proxy --- proxy/src/handlers/error.rs | 13 +- proxy/src/handlers/mod.rs | 1 - proxy/src/handlers/query.rs | 34 +--- proxy/src/http/query.rs | 12 +- .../{handlers/influxdb.rs => influxdb/mod.rs} | 191 +++++++++++------- proxy/src/lib.rs | 12 ++ server/src/http.rs | 78 ++++--- server/src/server.rs | 2 - 8 files changed, 167 insertions(+), 176 deletions(-) rename proxy/src/{handlers/influxdb.rs => influxdb/mod.rs} (88%) diff --git a/proxy/src/handlers/error.rs b/proxy/src/handlers/error.rs index 67ce26095c..7ec6ffb51a 100644 --- a/proxy/src/handlers/error.rs +++ b/proxy/src/handlers/error.rs @@ -2,7 +2,7 @@ //! Error of handlers -use common_util::{define_result, error::GenericError}; +use common_util::define_result; use snafu::{Backtrace, Snafu}; use warp::reject::Reject; @@ -18,11 +18,6 @@ pub enum Error { source: query_frontend::frontend::Error, }, - #[snafu(display("Failed to parse influxql, err:{}", source))] - ParseInfluxql { - source: query_frontend::frontend::Error, - }, - #[snafu(display("Failed to create plan, query:{}, err:{}", query, source))] CreatePlan { query: String, @@ -77,12 +72,6 @@ pub enum Error { backtrace: Backtrace, }, - #[snafu(display("InfluxDb handler failed, msg:{}, source:{}", msg, source))] - InfluxDbHandlerWithCause { msg: String, source: GenericError }, - - #[snafu(display("InfluxDb handler failed, msg:{}.\nBacktrace:\n{}", msg, backtrace))] - InfluxDbHandlerNoCause { msg: String, backtrace: Backtrace }, - #[snafu(display("Route handler failed, table:{:?}, source:{}", table, source))] RouteHandler { table: String, diff --git a/proxy/src/handlers/mod.rs b/proxy/src/handlers/mod.rs index 6c59404de0..da1c6e9996 100644 --- a/proxy/src/handlers/mod.rs +++ b/proxy/src/handlers/mod.rs @@ -4,7 +4,6 @@ pub mod admin; pub(crate) mod error; -pub mod influxdb; pub mod query; pub mod route; diff --git a/proxy/src/handlers/query.rs b/proxy/src/handlers/query.rs index 7e3abed572..4832f1111a 100644 --- a/proxy/src/handlers/query.rs +++ b/proxy/src/handlers/query.rs @@ -23,10 +23,8 @@ use serde::{ use snafu::{ensure, ResultExt}; use crate::handlers::{ - error::{ - CreatePlan, InterpreterExec, ParseInfluxql, ParseSql, QueryBlock, QueryTimeout, TooMuchStmt, - }, - influxdb::InfluxqlRequest, + error::{CreatePlan, InterpreterExec, ParseSql, QueryBlock, QueryTimeout, TooMuchStmt}, + // influxdb::InfluxqlRequest, prelude::*, }; @@ -102,13 +100,13 @@ impl From for Request { pub enum QueryRequest { Sql(Request), // TODO: influxql include more parameters, we should add it in later. - Influxql(InfluxqlRequest), + // Influxql(InfluxqlRequest), } impl QueryRequest { pub fn query(&self) -> &str { match self { QueryRequest::Sql(request) => request.query.as_str(), - QueryRequest::Influxql(request) => request.query.as_str(), + // QueryRequest::Influxql(request) => request.query.as_str(), } } } @@ -168,30 +166,6 @@ pub async fn handle_query( query: &request.query, })? } - - QueryRequest::Influxql(request) => { - let mut stmts = frontend - .parse_influxql(&mut sql_ctx, &request.query) - .context(ParseInfluxql)?; - - if stmts.is_empty() { - return Ok(Output::AffectedRows(0)); - } - - ensure!( - stmts.len() == 1, - TooMuchStmt { - len: stmts.len(), - query: &request.query, - } - ); - - frontend - .influxql_stmt_to_plan(&mut sql_ctx, stmts.remove(0)) - .context(CreatePlan { - query: &request.query, - })? - } }; instance.limiter.try_limit(&plan).context(QueryBlock { diff --git a/proxy/src/http/query.rs b/proxy/src/http/query.rs index bc93be6963..f021d57c89 100644 --- a/proxy/src/http/query.rs +++ b/proxy/src/http/query.rs @@ -33,8 +33,7 @@ use crate::{ error::{ErrNoCause, ErrWithCause, Internal, InternalNoCause, Result}, execute_plan, forward::ForwardResult, - handlers::influxdb::InfluxqlRequest, - Proxy, + Proxy, QueryRequest, }; impl Proxy { @@ -259,15 +258,6 @@ impl Serialize for ResponseRows { } } -#[derive(Debug)] -pub enum QueryRequest { - Sql(Request), - // TODO: influxql include more parameters, we should add it in later. - // TODO: remove dead_code after implement influxql with proxy - #[allow(dead_code)] - Influxql(InfluxqlRequest), -} - // Convert output to json pub fn convert_output(output: Output) -> Response { match output { diff --git a/proxy/src/handlers/influxdb.rs b/proxy/src/influxdb/mod.rs similarity index 88% rename from proxy/src/handlers/influxdb.rs rename to proxy/src/influxdb/mod.rs index 7924c3e0f6..ab9cf7ea6c 100644 --- a/proxy/src/handlers/influxdb.rs +++ b/proxy/src/influxdb/mod.rs @@ -6,7 +6,6 @@ use std::{ collections::{BTreeMap, HashMap}, - sync::Arc, time::Instant, }; @@ -18,34 +17,28 @@ use common_types::{ column_schema::ColumnSchema, datum::Datum, record_batch::RecordBatch, request_id::RequestId, schema::RecordSchema, time::Timestamp, }; -use common_util::error::BoxError; -use handlers::{ - error::{InfluxDbHandlerNoCause, InfluxDbHandlerWithCause, Result}, - query::QueryRequest, -}; -use http::Method; +use common_util::{error::BoxError, time::InstantExt}; +use http::{Method, StatusCode}; use influxdb_line_protocol::FieldValue; use interpreters::interpreter::Output; -use log::debug; +use log::{debug, info}; use query_engine::executor::Executor as QueryExecutor; -use query_frontend::influxql::planner::CERESDB_MEASUREMENT_COLUMN_NAME; +use query_frontend::{ + frontend::{Context as SqlContext, Frontend}, + influxql::planner::CERESDB_MEASUREMENT_COLUMN_NAME, + provider::CatalogMetaProvider, +}; use serde::{Deserialize, Serialize}; use snafu::{ensure, OptionExt, ResultExt}; -use warp::{reject, reply, Rejection, Reply}; use crate::{ context::RequestContext, + error::{ErrNoCause, ErrWithCause, Internal, InternalNoCause, Result}, + execute_plan, grpc::write::{execute_insert_plan, write_request_to_insert_plan, WriteContext}, - handlers, - instance::InstanceRef, - schema_config_provider::SchemaConfigProviderRef, + Proxy, }; -pub struct InfluxDb { - instance: InstanceRef, - schema_config_provider: SchemaConfigProviderRef, -} - /// Influxql write request compatible with influxdb 1.8 /// /// It's derived from 1.x write api described in doc of influxdb 1.8: @@ -134,21 +127,21 @@ impl InfluxqlRequest { // - q: required(in body when POST and parameters when GET) // - chunked,db,epoch,pretty: in parameters if body.contains_key("params") { - return InfluxDbHandlerNoCause { + return InternalNoCause { msg: "`params` is not supported now", } .fail(); } let query = match method { - Method::GET => params.q.context(InfluxDbHandlerNoCause { + Method::GET => params.q.context(InternalNoCause { msg: "query not found when query by GET", })?, - Method::POST => body.remove("q").context(InfluxDbHandlerNoCause { + Method::POST => body.remove("q").context(InternalNoCause { msg: "query not found when query by POST", })?, other => { - return InfluxDbHandlerNoCause { + return InternalNoCause { msg: format!("method not allowed in query, method:{other}"), } .fail() @@ -306,7 +299,7 @@ impl InfluxqlResultBuilder { let column_schemas = record_schema.columns().to_owned(); ensure!( !column_schemas.is_empty(), - InfluxDbHandlerNoCause { + InternalNoCause { msg: "empty schema", } ); @@ -321,7 +314,7 @@ impl InfluxqlResultBuilder { // described when introducing `column_schemas`. let mut col_iter = column_schemas.iter().enumerate(); // The first column may be measurement column in normal. - ensure!(col_iter.next().unwrap().1.name == CERESDB_MEASUREMENT_COLUMN_NAME, InfluxDbHandlerNoCause { + ensure!(col_iter.next().unwrap().1.name == CERESDB_MEASUREMENT_COLUMN_NAME, InternalNoCause { msg: format!("invalid schema whose first column is not measurement column, schema:{column_schemas:?}"), }); @@ -353,7 +346,7 @@ impl InfluxqlResultBuilder { // Check schema's compatibility. ensure!( record_batch.schema().columns() == self.column_schemas, - InfluxDbHandlerNoCause { + InternalNoCause { msg: format!( "conflict schema, origin:{:?}, new:{:?}", self.column_schemas, @@ -444,7 +437,7 @@ impl InfluxqlResultBuilder { match measurement { Datum::String(m) => m.to_string(), other => { - return InfluxDbHandlerNoCause { + return InternalNoCause { msg: format!("invalid measurement column, column:{other:?}"), } .fail() @@ -459,7 +452,7 @@ impl InfluxqlResultBuilder { Datum::Null => "".to_string(), Datum::String(tag) => tag.to_string(), other => { - return InfluxDbHandlerNoCause { + return InternalNoCause { msg: format!("invalid tag column, column:{other:?}"), } .fail() @@ -497,26 +490,97 @@ struct GroupKey { group_by_tag_values: Vec, } -impl InfluxDb { - pub fn new(instance: InstanceRef, schema_config_provider: SchemaConfigProviderRef) -> Self { - Self { - instance, - schema_config_provider, +impl Proxy { + pub async fn handle_influxdb_query( + &self, + ctx: RequestContext, + req: InfluxqlRequest, + ) -> Result { + let request_id = RequestId::next_id(); + let begin_instant = Instant::now(); + let deadline = ctx.timeout.map(|t| begin_instant + t); + + info!( + "Influxdb query handler try to process request, request_id:{}, request:{:?}", + request_id, req + ); + + // TODO(yingwen): Privilege check, cannot access data of other tenant + // TODO(yingwen): Maybe move MetaProvider to instance + let provider = CatalogMetaProvider { + manager: self.instance.catalog_manager.clone(), + default_catalog: &ctx.catalog, + default_schema: &ctx.schema, + function_registry: &*self.instance.function_registry, + }; + let frontend = Frontend::new(provider); + let mut sql_ctx = SqlContext::new(request_id, deadline); + + let mut stmts = frontend + .parse_influxql(&mut sql_ctx, &req.query) + .box_err() + .with_context(|| ErrWithCause { + code: StatusCode::BAD_REQUEST, + msg: format!("Failed to parse influxql, query:{}", req.query), + })?; + + if stmts.is_empty() { + return Ok(Output::AffectedRows(0)); } - } - async fn query(&self, ctx: RequestContext, req: QueryRequest) -> Result { - let output = handlers::query::handle_query(&ctx, self.instance.clone(), req) - .await + ensure!( + stmts.len() == 1, + ErrNoCause { + code: StatusCode::BAD_REQUEST, + msg: format!( + "Only support execute one statement now, current num:{}, query:{}.", + stmts.len(), + req.query + ), + } + ); + + let plan = frontend + .influxql_stmt_to_plan(&mut sql_ctx, stmts.remove(0)) + .box_err() + .with_context(|| ErrWithCause { + code: StatusCode::BAD_REQUEST, + msg: format!("Failed to build plan, query:{}", req.query), + })?; + + self.instance + .limiter + .try_limit(&plan) .box_err() - .context(InfluxDbHandlerWithCause { - msg: "failed to query by influxql", + .context(ErrWithCause { + code: StatusCode::INTERNAL_SERVER_ERROR, + msg: "Query is blocked", })?; + let output = execute_plan( + request_id, + &ctx.catalog, + &ctx.schema, + self.instance.clone(), + plan, + deadline, + ) + .await?; + + info!( + "Influxdb query handler finished, request_id:{}, cost:{}ms, request:{:?}", + request_id, + begin_instant.saturating_elapsed().as_millis(), + req + ); - convert_influxql_output(output) + Ok(output) } - async fn write(&self, ctx: RequestContext, req: WriteRequest) -> Result { + pub async fn handle_influxdb_write( + &self, + ctx: RequestContext, + req: WriteRequest, + ) -> Result { let request_id = RequestId::next_id(); let deadline = ctx.timeout.map(|t| Instant::now() + t); let catalog = &ctx.catalog; @@ -526,7 +590,7 @@ impl InfluxDb { .schema_config_provider .schema_config(schema) .box_err() - .with_context(|| InfluxDbHandlerWithCause { + .with_context(|| Internal { msg: format!("get schema config failed, schema:{schema}"), })?; @@ -541,7 +605,7 @@ impl InfluxDb { ) .await .box_err() - .with_context(|| InfluxDbHandlerWithCause { + .with_context(|| Internal { msg: "write request to insert plan", })?; @@ -557,7 +621,7 @@ impl InfluxDb { ) .await .box_err() - .with_context(|| InfluxDbHandlerWithCause { + .with_context(|| Internal { msg: "execute plan", })?; } @@ -573,17 +637,14 @@ impl InfluxDb { fn convert_write_request(req: WriteRequest) -> Result> { let mut req_by_measurement = HashMap::new(); for line in influxdb_line_protocol::parse_lines(&req.lines) { - let mut line = line.box_err().with_context(|| InfluxDbHandlerWithCause { + let mut line = line.box_err().with_context(|| Internal { msg: "invalid line", })?; let timestamp = match line.timestamp { - Some(ts) => req - .precision - .try_normalize(ts) - .context(InfluxDbHandlerNoCause { - msg: "time outside range -9223372036854775806 - 9223372036854775806", - })?, + Some(ts) => req.precision.try_normalize(ts).context(InternalNoCause { + msg: "time outside range -9223372036854775806 - 9223372036854775806", + })?, None => Timestamp::now().as_i64(), }; let mut tag_set = line.series.tag_set.unwrap_or_default(); @@ -661,12 +722,12 @@ fn convert_influx_value(field_value: FieldValue) -> Value { Value { value: Some(v) } } -fn convert_influxql_output(output: Output) -> Result { +pub fn convert_influxql_output(output: Output) -> Result { // TODO: now, we just support one influxql in each query. let records = match output { Output::Records(records) => records, Output::AffectedRows(_) => { - return InfluxDbHandlerNoCause { + return InternalNoCause { msg: "output in influxql should not be affected rows", } .fail() @@ -694,32 +755,10 @@ fn convert_influxql_output(output: Output) -> Result { }) } -// TODO: Request and response type don't match influxdb's API now. -pub async fn query( - ctx: RequestContext, - db: Arc>, - req: QueryRequest, -) -> std::result::Result { - db.query(ctx, req) - .await - .map_err(reject::custom) - .map(|v| reply::json(&v)) -} - -// TODO: Request and response type don't match influxdb's API now. -pub async fn write( - ctx: RequestContext, - db: Arc>, - req: WriteRequest, -) -> std::result::Result { - db.write(ctx, req) - .await - .map_err(reject::custom) - .map(|_| warp::http::StatusCode::NO_CONTENT) -} - #[cfg(test)] mod tests { + use std::sync::Arc; + use arrow::datatypes::{Field as ArrowField, Schema as ArrowSchema}; use common_types::{ column::{ColumnBlock, ColumnBlockBuilder}, diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs index 49de4950c3..7b014d7e2a 100644 --- a/proxy/src/lib.rs +++ b/proxy/src/lib.rs @@ -15,6 +15,7 @@ pub mod handlers; pub mod hotspot; mod hotspot_lru; pub mod http; +pub mod influxdb; pub mod instance; pub mod limiter; pub mod schema_config_provider; @@ -73,6 +74,8 @@ use crate::{ forward::{ForwardRequest, ForwardResult, Forwarder, ForwarderRef}, grpc::write::WriteContext, hotspot::HotspotRecorder, + http::query::Request, + influxdb::InfluxqlRequest, instance::InstanceRef, schema_config_provider::SchemaConfigProviderRef, }; @@ -474,6 +477,15 @@ impl Proxy { } } +#[derive(Debug)] +pub enum QueryRequest { + Sql(Request), + // TODO: influxql include more parameters, we should add it in later. + // TODO: remove dead_code after implement influxql with proxy + #[allow(dead_code)] + Influxql(InfluxqlRequest), +} + #[derive(Clone)] pub struct Context { pub timeout: Option, diff --git a/server/src/http.rs b/server/src/http.rs index 5c12b78f73..4f70c4f532 100644 --- a/server/src/http.rs +++ b/server/src/http.rs @@ -13,21 +13,19 @@ use common_util::{ error::{BoxError, GenericError}, runtime::Runtime, }; -use handlers::query::QueryRequest as HandlerQueryRequest; use log::{error, info}; use logger::RuntimeLevel; use profile::Profiler; use prom_remote_api::web; use proxy::{ context::RequestContext, - handlers::{ - self, - influxdb::{self, InfluxDb, InfluxqlParams, InfluxqlRequest, WriteParams, WriteRequest}, + handlers::{self}, + http::query::{convert_output, Request}, + influxdb::{ + convert_influxql_output, InfluxqlParams, InfluxqlRequest, WriteParams, WriteRequest, }, - http::query::{convert_output, QueryRequest, Request}, instance::InstanceRef, - schema_config_provider::SchemaConfigProviderRef, - Proxy, + Proxy, QueryRequest, }; use query_engine::executor::Executor as QueryExecutor; use router::{endpoint::Endpoint, RouterRef}; @@ -126,7 +124,6 @@ pub struct Service { engine_runtimes: Arc, log_runtime: Arc, profiler: Arc, - influxdb: Arc>, tx: Sender<()>, rx: Option>, config: HttpConfig, @@ -291,12 +288,16 @@ impl Service { .and(warp::post()) .and(body_limit) .and(self.with_context()) - .and(self.with_influxdb()) .and(warp::query::()) .and(warp::body::bytes()) - .and_then(|ctx, db, params, lines| async move { + .and(self.with_proxy()) + .and_then(|ctx, params, lines, proxy: Arc>| async move { let request = WriteRequest::new(lines, params); - influxdb::write(ctx, db, request).await + let result = proxy.handle_influxdb_write(ctx, request).await; + match result { + Ok(res) => Ok(reply::json(&res)), + Err(e) => Err(reject::custom(e)), + } }); // Query support both get and post method, so we can't add `body_limit` here. @@ -305,14 +306,30 @@ impl Service { let query_api = warp::path!("query") .and(warp::method()) .and(self.with_context()) - .and(self.with_influxdb()) .and(warp::query::()) .and(warp::body::form::>()) - .and_then(|method, ctx, db, params, body| async move { - let request = - InfluxqlRequest::try_new(method, body, params).map_err(reject::custom)?; - influxdb::query(ctx, db, HandlerQueryRequest::Influxql(request)).await - }); + .and(self.with_proxy()) + .and_then( + |method, ctx, params, body, proxy: Arc>| async move { + let request = + InfluxqlRequest::try_new(method, body, params).map_err(reject::custom)?; + let result = proxy + .handle_influxdb_query(ctx, request) + .await + .and_then(convert_influxql_output) + .box_err() + .context(HandleRequest); + match result { + Ok(res) => Ok(reply::json(&res)), + Err(e) => Err(reject::custom(e)), + } + }, + ); + // .and_then(|| async move { + // let request = + // InfluxqlRequest::try_new(method, body, + // params).map_err(reject::custom)?; influxdb::query(ctx, db, + // HandlerQueryRequest::Influxql(request)).await }); warp::path!("influxdb" / "v1" / ..).and(write_api.or(query_api)) } @@ -558,13 +575,6 @@ impl Service { warp::any().map(move || runtime.clone()) } - fn with_influxdb( - &self, - ) -> impl Filter>,), Error = Infallible> + Clone { - let influxdb = self.influxdb.clone(); - warp::any().map(move || influxdb.clone()) - } - fn with_instance( &self, ) -> impl Filter,), Error = Infallible> + Clone { @@ -590,8 +600,6 @@ pub struct Builder { config: HttpConfig, engine_runtimes: Option>, log_runtime: Option>, - instance: Option>, - schema_config_provider: Option, config_content: Option, proxy: Option>>, router: Option, @@ -604,8 +612,6 @@ impl Builder { config, engine_runtimes: None, log_runtime: None, - instance: None, - schema_config_provider: None, config_content: None, proxy: None, router: None, @@ -623,16 +629,6 @@ impl Builder { self } - pub fn instance(mut self, instance: InstanceRef) -> Self { - self.instance = Some(instance); - self - } - - pub fn schema_config_provider(mut self, provider: SchemaConfigProviderRef) -> Self { - self.schema_config_provider = Some(provider); - self - } - pub fn config_content(mut self, content: String) -> Self { self.config_content = Some(content); self @@ -659,23 +655,17 @@ impl Builder { pub fn build(self) -> Result> { let engine_runtimes = self.engine_runtimes.context(MissingEngineRuntimes)?; let log_runtime = self.log_runtime.context(MissingLogRuntime)?; - let instance = self.instance.context(MissingInstance)?; let config_content = self.config_content.context(MissingInstance)?; let proxy = self.proxy.context(MissingProxy)?; - let schema_config_provider = self - .schema_config_provider - .context(MissingSchemaConfigProvider)?; let router = self.router.context(MissingRouter)?; let opened_wals = self.opened_wals.context(MissingWal)?; - let influxdb = Arc::new(InfluxDb::new(instance, schema_config_provider)); let (tx, rx) = oneshot::channel(); let service = Service { proxy, engine_runtimes, log_runtime, - influxdb, profiler: Arc::new(Profiler::default()), tx, rx: Some(rx), diff --git a/server/src/server.rs b/server/src/server.rs index ade03c3628..34744a9d47 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -369,8 +369,6 @@ impl Builder { let http_service = http::Builder::new(http_config) .engine_runtimes(engine_runtimes.clone()) .log_runtime(log_runtime) - .instance(instance.clone()) - .schema_config_provider(provider.clone()) .config_content(config_content) .proxy(proxy.clone()) .router(router.clone()) From 2c791809387394526a2a115dc3ae643d7d7b425c Mon Sep 17 00:00:00 2001 From: "chunshao.rcs" Date: Mon, 8 May 2023 19:46:09 +0800 Subject: [PATCH 2/4] fix: segmentation fault --- catalog/src/lib.rs | 6 ++++-- catalog/src/table_operator.rs | 2 +- proxy/src/handlers/query.rs | 4 ---- proxy/src/lib.rs | 1 - 4 files changed, 5 insertions(+), 8 deletions(-) diff --git a/catalog/src/lib.rs b/catalog/src/lib.rs index 000e2a9225..81133e2573 100644 --- a/catalog/src/lib.rs +++ b/catalog/src/lib.rs @@ -53,8 +53,10 @@ pub enum Error { #[snafu(display("Failed to operate table, msg:{}, err:{}", msg, source))] TableOperatorWithCause { msg: String, source: GenericError }, - #[snafu(display("Failed to operate table, msg:{}.\nBacktrace:\n{}", msg, backtrace))] - TableOperatorNoCause { msg: String, backtrace: Backtrace }, + // Fixme: Temporarily remove the stack information, otherwise you will encounter a + // segmentation fault. + #[snafu(display("Failed to operate table, msg:{}.\n", msg))] + TableOperatorNoCause { msg: String }, } define_result!(Error); diff --git a/catalog/src/table_operator.rs b/catalog/src/table_operator.rs index c621c9c8cd..35a6058c43 100644 --- a/catalog/src/table_operator.rs +++ b/catalog/src/table_operator.rs @@ -101,7 +101,7 @@ impl TableOperator { } else { TableOperatorNoCause { msg: format!( - "Failed to open shard, some tables open failed, no table is shard id:{shard_id}, opened count:{no_table_count}, open error count:{open_err_count}"), + "Failed to open shard, some tables open failed, shard id:{shard_id}, no table is opened count:{no_table_count}, open error count:{open_err_count}"), }.fail() } } diff --git a/proxy/src/handlers/query.rs b/proxy/src/handlers/query.rs index 4832f1111a..b3140621f8 100644 --- a/proxy/src/handlers/query.rs +++ b/proxy/src/handlers/query.rs @@ -24,7 +24,6 @@ use snafu::{ensure, ResultExt}; use crate::handlers::{ error::{CreatePlan, InterpreterExec, ParseSql, QueryBlock, QueryTimeout, TooMuchStmt}, - // influxdb::InfluxqlRequest, prelude::*, }; @@ -99,14 +98,11 @@ impl From for Request { #[derive(Debug)] pub enum QueryRequest { Sql(Request), - // TODO: influxql include more parameters, we should add it in later. - // Influxql(InfluxqlRequest), } impl QueryRequest { pub fn query(&self) -> &str { match self { QueryRequest::Sql(request) => request.query.as_str(), - // QueryRequest::Influxql(request) => request.query.as_str(), } } } diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs index 7b014d7e2a..2f35a8945c 100644 --- a/proxy/src/lib.rs +++ b/proxy/src/lib.rs @@ -482,7 +482,6 @@ pub enum QueryRequest { Sql(Request), // TODO: influxql include more parameters, we should add it in later. // TODO: remove dead_code after implement influxql with proxy - #[allow(dead_code)] Influxql(InfluxqlRequest), } From 1b86835f592d4ab3f47e9c3698a59e9045d67f3d Mon Sep 17 00:00:00 2001 From: "chunshao.rcs" Date: Mon, 8 May 2023 20:02:44 +0800 Subject: [PATCH 3/4] refactor code --- proxy/src/http/query.rs | 168 +++---- proxy/src/influxdb/mod.rs | 887 +----------------------------------- proxy/src/influxdb/types.rs | 880 +++++++++++++++++++++++++++++++++++ proxy/src/lib.rs | 10 - server/src/http.rs | 14 +- 5 files changed, 953 insertions(+), 1006 deletions(-) create mode 100644 proxy/src/influxdb/types.rs diff --git a/proxy/src/http/query.rs b/proxy/src/http/query.rs index f021d57c89..44b95b8a50 100644 --- a/proxy/src/http/query.rs +++ b/proxy/src/http/query.rs @@ -33,22 +33,18 @@ use crate::{ error::{ErrNoCause, ErrWithCause, Internal, InternalNoCause, Result}, execute_plan, forward::ForwardResult, - Proxy, QueryRequest, + Proxy, }; impl Proxy { - pub async fn handle_query( - &self, - ctx: &RequestContext, - query_request: QueryRequest, - ) -> Result { + pub async fn handle_query(&self, ctx: &RequestContext, req: SqlRequest) -> Result { let request_id = RequestId::next_id(); let begin_instant = Instant::now(); let deadline = ctx.timeout.map(|t| begin_instant + t); info!( "Query handler try to process request, request_id:{}, request:{:?}", - request_id, query_request + request_id, req ); // TODO(yingwen): Privilege check, cannot access data of other tenant @@ -62,110 +58,66 @@ impl Proxy { let frontend = Frontend::new(provider); let mut sql_ctx = SqlContext::new(request_id, deadline); - let plan = match &query_request { - QueryRequest::Sql(request) => { - // Parse sql, frontend error of invalid sql already contains sql - // TODO(yingwen): Maybe move sql from frontend error to outer error - let mut stmts = frontend - .parse_sql(&mut sql_ctx, &request.query) - .box_err() - .with_context(|| ErrWithCause { - code: StatusCode::BAD_REQUEST, - msg: format!("Failed to parse sql, query:{}", request.query), - })?; - - if stmts.is_empty() { - return Ok(Output::AffectedRows(0)); - } - - // TODO(yingwen): For simplicity, we only support executing one statement now - // TODO(yingwen): INSERT/UPDATE/DELETE can be batched - ensure!( - stmts.len() == 1, - ErrNoCause { - code: StatusCode::BAD_REQUEST, - msg: format!( - "Only support execute one statement now, current num:{}, query:{}.", - stmts.len(), - request.query - ), - } - ); - - let sql_query_request = SqlQueryRequest { - context: Some(GrpcRequestContext { - database: ctx.schema.clone(), - }), - tables: vec![], - sql: request.query.clone(), - }; - - if let Some(resp) = self.maybe_forward_sql_query(&sql_query_request).await? { - match resp { - ForwardResult::Forwarded(resp) => { - return convert_sql_response_to_output(resp?) - } - ForwardResult::Local => (), - } - }; - - // Open partition table if needed. - let table_name = frontend::parse_table_name(&stmts); - if let Some(table_name) = table_name { - self.maybe_open_partition_table_if_not_exist( - &ctx.catalog, - &ctx.schema, - &table_name, - ) - .await?; - } - - // Create logical plan - // Note: Remember to store sql in error when creating logical plan - frontend - .statement_to_plan(&mut sql_ctx, stmts.remove(0)) - .box_err() - .with_context(|| ErrWithCause { - code: StatusCode::BAD_REQUEST, - msg: format!("Failed to build plan, query:{}", request.query), - })? + // Parse sql, frontend error of invalid sql already contains sql + // TODO(yingwen): Maybe move sql from frontend error to outer error + let mut stmts = frontend + .parse_sql(&mut sql_ctx, &req.query) + .box_err() + .with_context(|| ErrWithCause { + code: StatusCode::BAD_REQUEST, + msg: format!("Failed to parse sql, query:{}", req.query), + })?; + + if stmts.is_empty() { + return Ok(Output::AffectedRows(0)); + } + + // TODO(yingwen): For simplicity, we only support executing one statement now + // TODO(yingwen): INSERT/UPDATE/DELETE can be batched + ensure!( + stmts.len() == 1, + ErrNoCause { + code: StatusCode::BAD_REQUEST, + msg: format!( + "Only support execute one statement now, current num:{}, query:{}.", + stmts.len(), + req.query + ), } + ); - QueryRequest::Influxql(request) => { - let mut stmts = frontend - .parse_influxql(&mut sql_ctx, &request.query) - .box_err() - .with_context(|| ErrWithCause { - code: StatusCode::BAD_REQUEST, - msg: format!("Failed to parse influxql, query:{}", request.query), - })?; - - if stmts.is_empty() { - return Ok(Output::AffectedRows(0)); - } - - ensure!( - stmts.len() == 1, - ErrNoCause { - code: StatusCode::BAD_REQUEST, - msg: format!( - "Only support execute one statement now, current num:{}, query:{}.", - stmts.len(), - request.query - ), - } - ); - - frontend - .influxql_stmt_to_plan(&mut sql_ctx, stmts.remove(0)) - .box_err() - .with_context(|| ErrWithCause { - code: StatusCode::BAD_REQUEST, - msg: format!("Failed to build plan, query:{}", request.query), - })? + let sql_query_request = SqlQueryRequest { + context: Some(GrpcRequestContext { + database: ctx.schema.clone(), + }), + tables: vec![], + sql: req.query.clone(), + }; + + if let Some(resp) = self.maybe_forward_sql_query(&sql_query_request).await? { + match resp { + ForwardResult::Forwarded(resp) => return convert_sql_response_to_output(resp?), + ForwardResult::Local => (), } }; + // Open partition table if needed. + let table_name = frontend::parse_table_name(&stmts); + if let Some(table_name) = table_name { + self.maybe_open_partition_table_if_not_exist(&ctx.catalog, &ctx.schema, &table_name) + .await?; + } + + // Create logical plan + // Note: Remember to store sql in error when creating logical plan + let plan = frontend + .statement_to_plan(&mut sql_ctx, stmts.remove(0)) + .box_err() + .with_context(|| ErrWithCause { + code: StatusCode::BAD_REQUEST, + msg: format!("Failed to build plan, query:{}", req.query), + })?; + self.instance .limiter .try_limit(&plan) @@ -188,14 +140,14 @@ impl Proxy { "Query handler finished, request_id:{}, cost:{}ms, request:{:?}", request_id, begin_instant.saturating_elapsed().as_millis(), - query_request + req ); Ok(output) } } #[derive(Debug, Deserialize)] -pub struct Request { +pub struct SqlRequest { pub query: String, } diff --git a/proxy/src/influxdb/mod.rs b/proxy/src/influxdb/mod.rs index ab9cf7ea6c..98e1c21623 100644 --- a/proxy/src/influxdb/mod.rs +++ b/proxy/src/influxdb/mod.rs @@ -1,495 +1,34 @@ -// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. +// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. //! This module implements [write][1] and [query][2] for InfluxDB. //! [1]: https://docs.influxdata.com/influxdb/v1.8/tools/api/#write-http-endpoint //! [2]: https://docs.influxdata.com/influxdb/v1.8/tools/api/#query-http-endpoint -use std::{ - collections::{BTreeMap, HashMap}, - time::Instant, -}; +pub mod types; -use bytes::Bytes; -use ceresdbproto::storage::{ - value, Field, FieldGroup, Tag, Value, WriteSeriesEntry, WriteTableRequest, -}; -use common_types::{ - column_schema::ColumnSchema, datum::Datum, record_batch::RecordBatch, request_id::RequestId, - schema::RecordSchema, time::Timestamp, -}; +use std::time::Instant; + +use common_types::request_id::RequestId; use common_util::{error::BoxError, time::InstantExt}; -use http::{Method, StatusCode}; -use influxdb_line_protocol::FieldValue; +use http::StatusCode; use interpreters::interpreter::Output; use log::{debug, info}; use query_engine::executor::Executor as QueryExecutor; use query_frontend::{ frontend::{Context as SqlContext, Frontend}, - influxql::planner::CERESDB_MEASUREMENT_COLUMN_NAME, provider::CatalogMetaProvider, }; -use serde::{Deserialize, Serialize}; -use snafu::{ensure, OptionExt, ResultExt}; +use snafu::{ensure, ResultExt}; use crate::{ context::RequestContext, - error::{ErrNoCause, ErrWithCause, Internal, InternalNoCause, Result}, + error::{ErrNoCause, ErrWithCause, Internal, Result}, execute_plan, grpc::write::{execute_insert_plan, write_request_to_insert_plan, WriteContext}, + influxdb::types::{convert_write_request, InfluxqlRequest, WriteRequest, WriteResponse}, Proxy, }; -/// Influxql write request compatible with influxdb 1.8 -/// -/// It's derived from 1.x write api described in doc of influxdb 1.8: -/// https://docs.influxdata.com/influxdb/v1.8/tools/api/#write-http-endpoint -#[derive(Debug)] -pub struct WriteRequest { - /// Data formatted in line protocol - pub lines: String, - - /// Details about `db`, `precision` can be saw in [WriteParams] - // TODO: `db` should be made use of in later. - pub db: String, - pub precision: Precision, -} - -impl WriteRequest { - pub fn new(lines: Bytes, params: WriteParams) -> Self { - let lines = String::from_utf8_lossy(&lines).to_string(); - - let precision = params.precision.as_str().into(); - - WriteRequest { - lines, - db: params.db, - precision, - } - } -} - -pub type WriteResponse = (); - -/// Query string parameters for write api -/// -/// It's derived from query string parameters of write described in -/// doc of influxdb 1.8: -/// https://docs.influxdata.com/influxdb/v1.8/tools/api/#query-string-parameters-2 -/// -/// NOTE: -/// - `db` is not required and default to `public` in CeresDB. -/// - `precision`'s default value is `ms` but not `ns` in CeresDB. -#[derive(Debug, Deserialize)] -#[serde(default)] -pub struct WriteParams { - pub db: String, - pub precision: String, -} - -impl Default for WriteParams { - fn default() -> Self { - Self { - db: "public".to_string(), - precision: "ms".to_string(), - } - } -} - -/// Influxql query request compatible with influxdb 1.8 -/// -/// It's derived from 1.x query api described in doc of influxdb 1.8: -/// https://docs.influxdata.com/influxdb/v1.8/tools/api/#query-http-endpoint -/// -/// NOTE: -/// - when query by POST method, query(q) should be placed after -/// `--data-urlencode` like what shown in link above. -/// - when query by GET method, query should be placed in url -/// parameters(where `db`, `epoch`, etc are placed in). -#[derive(Debug)] -pub struct InfluxqlRequest { - /// Query described by influxql - pub query: String, - - /// Details about `db`, `epoch`, `pretty` can be saw in [InfluxqlParams] - // TODO: `db`, `epoch`, `pretty` should be made use of in later. - pub db: String, - pub epoch: Precision, - pub pretty: bool, -} - -impl InfluxqlRequest { - pub fn try_new( - method: Method, - mut body: HashMap, - params: InfluxqlParams, - ) -> Result { - // Extract and check body & parameters. - // - q: required(in body when POST and parameters when GET) - // - chunked,db,epoch,pretty: in parameters - if body.contains_key("params") { - return InternalNoCause { - msg: "`params` is not supported now", - } - .fail(); - } - - let query = match method { - Method::GET => params.q.context(InternalNoCause { - msg: "query not found when query by GET", - })?, - Method::POST => body.remove("q").context(InternalNoCause { - msg: "query not found when query by POST", - })?, - other => { - return InternalNoCause { - msg: format!("method not allowed in query, method:{other}"), - } - .fail() - } - }; - - let epoch = params.epoch.as_str().into(); - - Ok(InfluxqlRequest { - query, - db: params.db, - epoch, - pretty: params.pretty, - }) - } -} - -#[derive(Debug, Default)] -pub enum Precision { - #[default] - Millisecond, - Nanosecond, - Microsecond, - Second, - Minute, - Hour, -} - -impl Precision { - fn try_normalize(&self, ts: i64) -> Option { - match self { - Self::Millisecond => Some(ts), - Self::Nanosecond => ts.checked_div(1000 * 1000), - Self::Microsecond => ts.checked_div(1000), - Self::Second => ts.checked_mul(1000), - Self::Minute => ts.checked_mul(1000 * 60), - Self::Hour => ts.checked_mul(1000 * 60 * 60), - } - } -} - -impl From<&str> for Precision { - fn from(value: &str) -> Self { - match value { - "ns" | "n" => Precision::Nanosecond, - "u" | "µ" => Precision::Microsecond, - "ms" => Precision::Millisecond, - "s" => Precision::Second, - "m" => Precision::Minute, - "h" => Precision::Hour, - // Return the default precision. - _ => Precision::Millisecond, - } - } -} - -/// Query string parameters for query api(by influxql) -/// -/// It's derived from query string parameters of query described in -/// doc of influxdb 1.8: -/// https://docs.influxdata.com/influxdb/v1.8/tools/api/#query-string-parameters-1 -/// -/// NOTE: -/// - `db` is not required and default to `public` in CeresDB. -/// - `chunked` is not supported in CeresDB. -/// - `epoch`'s default value is `ms` but not `ns` in CeresDB. -#[derive(Debug, Deserialize)] -#[serde(default)] -pub struct InfluxqlParams { - pub q: Option, - pub db: String, - pub epoch: String, - pub pretty: bool, -} - -impl Default for InfluxqlParams { - fn default() -> Self { - Self { - q: None, - db: "public".to_string(), - epoch: "ms".to_string(), - pretty: false, - } - } -} - -/// Influxql response organized in the same way with influxdb. -/// -/// The basic example: -/// ```json -/// {"results":[{"statement_id":0,"series":[{"name":"mymeas", -/// "columns":["time","myfield","mytag1","mytag2"], -/// "values":[["2017-03-01T00:16:18Z",33.1,null,null], -/// ["2017-03-01T00:17:18Z",12.4,"12","14"]]}]}]} -/// ``` -/// More details refer to: -/// https://docs.influxdata.com/influxdb/v1.8/tools/api/#query-data-with-a-select-statement -#[derive(Debug, Serialize)] -pub struct InfluxqlResponse { - pub results: Vec, -} - -#[derive(Debug, Serialize)] -pub struct OneInfluxqlResult { - statement_id: u32, - #[serde(skip_serializing_if = "Option::is_none")] - series: Option>, -} - -#[derive(Debug, Serialize)] -struct OneSeries { - name: String, - #[serde(skip_serializing_if = "Option::is_none")] - tags: Option>, - columns: Vec, - values: Vec>, -} - -/// [InfluxqlResult] builder -#[derive(Default)] -pub struct InfluxqlResultBuilder { - /// Query id for [multiple queries](https://docs.influxdata.com/influxdb/v1.8/tools/api/#request-multiple-queries) - statement_id: u32, - - /// Schema of influxql query result - /// - /// Its format is like: - /// measurement | - /// tag_1..tag_n(columns in `group by`) | - /// time | - /// column_1..column_n(column in `projection` but not in `group by`) - column_schemas: Vec, - - /// Tags part in schema - group_by_tag_col_idxs: Vec, - - /// Columns part in schema(include `time`) - value_col_idxs: Vec, - - /// Mapping group key(`measurement` + `tag values`) to column values, - /// - /// NOTE: because tag keys in `group by` clause are same in each sub result, - /// we just use the `measurement` + `tag values` to distinguish them. - group_key_to_idx: HashMap, - - /// Column values grouped by [GroupKey] - value_groups: Vec, -} - -type Row = Vec; -type RowGroup = Vec; - -impl InfluxqlResultBuilder { - pub fn new(record_schema: &RecordSchema, statement_id: u32) -> Result { - let column_schemas = record_schema.columns().to_owned(); - ensure!( - !column_schemas.is_empty(), - InternalNoCause { - msg: "empty schema", - } - ); - - // Query like `show measurements`, there will be not timestamp. - let has_timestamp = column_schemas.iter().any(|c| c.data_type.is_timestamp()); - // Find the tags part and columns part from schema. - let mut group_by_col_idxs = Vec::new(); - let mut value_col_idxs = Vec::new(); - - // The following index searching logic is derived from the fixed format - // described when introducing `column_schemas`. - let mut col_iter = column_schemas.iter().enumerate(); - // The first column may be measurement column in normal. - ensure!(col_iter.next().unwrap().1.name == CERESDB_MEASUREMENT_COLUMN_NAME, InternalNoCause { - msg: format!("invalid schema whose first column is not measurement column, schema:{column_schemas:?}"), - }); - - // The group by tags will be placed after measurement and before time column. - let mut searching_group_by_tags = has_timestamp; - for (idx, col) in col_iter { - if col.data_type.is_timestamp() { - searching_group_by_tags = false; - } - - if searching_group_by_tags { - group_by_col_idxs.push(idx); - } else { - value_col_idxs.push(idx); - } - } - - Ok(Self { - statement_id, - column_schemas, - group_by_tag_col_idxs: group_by_col_idxs, - value_col_idxs, - group_key_to_idx: HashMap::new(), - value_groups: Vec::new(), - }) - } - - pub fn add_record_batch(&mut self, record_batch: RecordBatch) -> Result<()> { - // Check schema's compatibility. - ensure!( - record_batch.schema().columns() == self.column_schemas, - InternalNoCause { - msg: format!( - "conflict schema, origin:{:?}, new:{:?}", - self.column_schemas, - record_batch.schema().columns() - ), - } - ); - - let row_num = record_batch.num_rows(); - for row_idx in 0..row_num { - // Get measurement + group by tags. - let group_key = self.extract_group_key(&record_batch, row_idx)?; - let value_group = self.extract_value_group(&record_batch, row_idx)?; - - let value_groups = if let Some(idx) = self.group_key_to_idx.get(&group_key) { - self.value_groups.get_mut(*idx).unwrap() - } else { - self.value_groups.push(Vec::new()); - self.group_key_to_idx - .insert(group_key, self.value_groups.len() - 1); - self.value_groups.last_mut().unwrap() - }; - - value_groups.push(value_group); - } - - Ok(()) - } - - pub fn build(self) -> OneInfluxqlResult { - let ordered_group_keys = { - let mut ordered_pairs = self.group_key_to_idx.into_iter().collect::>(); - ordered_pairs.sort_by(|a, b| a.1.cmp(&b.1)); - ordered_pairs - .into_iter() - .map(|(key, _)| key) - .collect::>() - }; - - let series = ordered_group_keys - .into_iter() - .zip(self.value_groups.into_iter()) - .map(|(group_key, value_group)| { - let name = group_key.measurement; - let tags = if group_key.group_by_tag_values.is_empty() { - None - } else { - let tags = group_key - .group_by_tag_values - .into_iter() - .enumerate() - .map(|(tagk_idx, tagv)| { - let tagk_col_idx = self.group_by_tag_col_idxs[tagk_idx]; - let tagk = self.column_schemas[tagk_col_idx].name.clone(); - - (tagk, tagv) - }) - .collect::>(); - - Some(tags) - }; - - let columns = self - .value_col_idxs - .iter() - .map(|idx| self.column_schemas[*idx].name.clone()) - .collect::>(); - - OneSeries { - name, - tags, - columns, - values: value_group, - } - }) - .collect(); - - OneInfluxqlResult { - series: Some(series), - statement_id: self.statement_id, - } - } - - fn extract_group_key(&self, record_batch: &RecordBatch, row_idx: usize) -> Result { - let mut group_by_tag_values = Vec::with_capacity(self.group_by_tag_col_idxs.len()); - let measurement = { - let measurement = record_batch.column(0).datum(row_idx); - match measurement { - Datum::String(m) => m.to_string(), - other => { - return InternalNoCause { - msg: format!("invalid measurement column, column:{other:?}"), - } - .fail() - } - } - }; - - for col_idx in &self.group_by_tag_col_idxs { - let tag = { - let tag_datum = record_batch.column(*col_idx).datum(row_idx); - match tag_datum { - Datum::Null => "".to_string(), - Datum::String(tag) => tag.to_string(), - other => { - return InternalNoCause { - msg: format!("invalid tag column, column:{other:?}"), - } - .fail() - } - } - }; - group_by_tag_values.push(tag); - } - - Ok(GroupKey { - measurement, - group_by_tag_values, - }) - } - - fn extract_value_group( - &self, - record_batch: &RecordBatch, - row_idx: usize, - ) -> Result> { - let mut value_group = Vec::with_capacity(self.value_col_idxs.len()); - for col_idx in &self.value_col_idxs { - let value = record_batch.column(*col_idx).datum(row_idx); - - value_group.push(value); - } - - Ok(value_group) - } -} - -#[derive(Hash, PartialEq, Eq, Clone)] -struct GroupKey { - measurement: String, - group_by_tag_values: Vec, -} - impl Proxy { pub async fn handle_influxdb_query( &self, @@ -633,411 +172,3 @@ impl Proxy { Ok(()) } } - -fn convert_write_request(req: WriteRequest) -> Result> { - let mut req_by_measurement = HashMap::new(); - for line in influxdb_line_protocol::parse_lines(&req.lines) { - let mut line = line.box_err().with_context(|| Internal { - msg: "invalid line", - })?; - - let timestamp = match line.timestamp { - Some(ts) => req.precision.try_normalize(ts).context(InternalNoCause { - msg: "time outside range -9223372036854775806 - 9223372036854775806", - })?, - None => Timestamp::now().as_i64(), - }; - let mut tag_set = line.series.tag_set.unwrap_or_default(); - // sort by tag key - tag_set.sort_unstable_by(|a, b| a.0.cmp(&b.0)); - // sort by field key - line.field_set.sort_unstable_by(|a, b| a.0.cmp(&b.0)); - - let req_for_one_measurement = req_by_measurement - .entry(line.series.measurement.to_string()) - .or_insert_with(|| WriteTableRequest { - table: line.series.measurement.to_string(), - tag_names: tag_set.iter().map(|(tagk, _)| tagk.to_string()).collect(), - field_names: line - .field_set - .iter() - .map(|(tagk, _)| tagk.to_string()) - .collect(), - entries: Vec::new(), - }); - - let tags: Vec<_> = tag_set - .iter() - .enumerate() - .map(|(idx, (_, tagv))| Tag { - name_index: idx as u32, - value: Some(Value { - value: Some(value::Value::StringValue(tagv.to_string())), - }), - }) - .collect(); - let field_group = FieldGroup { - timestamp, - fields: line - .field_set - .iter() - .cloned() - .enumerate() - .map(|(idx, (_, fieldv))| Field { - name_index: idx as u32, - value: Some(convert_influx_value(fieldv)), - }) - .collect(), - }; - let mut found = false; - for entry in &mut req_for_one_measurement.entries { - if entry.tags == tags { - // TODO: remove clone? - entry.field_groups.push(field_group.clone()); - found = true; - break; - } - } - if !found { - req_for_one_measurement.entries.push(WriteSeriesEntry { - tags, - field_groups: vec![field_group], - }) - } - } - - Ok(req_by_measurement.into_values().collect()) -} - -/// Convert influxdb's FieldValue to ceresdbproto's Value -fn convert_influx_value(field_value: FieldValue) -> Value { - let v = match field_value { - FieldValue::I64(v) => value::Value::Int64Value(v), - FieldValue::U64(v) => value::Value::Uint64Value(v), - FieldValue::F64(v) => value::Value::Float64Value(v), - FieldValue::String(v) => value::Value::StringValue(v.to_string()), - FieldValue::Boolean(v) => value::Value::BoolValue(v), - }; - - Value { value: Some(v) } -} - -pub fn convert_influxql_output(output: Output) -> Result { - // TODO: now, we just support one influxql in each query. - let records = match output { - Output::Records(records) => records, - Output::AffectedRows(_) => { - return InternalNoCause { - msg: "output in influxql should not be affected rows", - } - .fail() - } - }; - - let influxql_result = if records.is_empty() { - OneInfluxqlResult { - statement_id: 0, - series: None, - } - } else { - // All record schemas in one query result should be same. - let record_schema = records.first().unwrap().schema(); - let mut builder = InfluxqlResultBuilder::new(record_schema, 0)?; - for record in records { - builder.add_record_batch(record)?; - } - - builder.build() - }; - - Ok(InfluxqlResponse { - results: vec![influxql_result], - }) -} - -#[cfg(test)] -mod tests { - use std::sync::Arc; - - use arrow::datatypes::{Field as ArrowField, Schema as ArrowSchema}; - use common_types::{ - column::{ColumnBlock, ColumnBlockBuilder}, - column_schema, - datum::DatumKind, - schema, - string::StringBytes, - }; - use json_pretty::PrettyFormatter; - - use super::*; - - #[test] - fn test_convert_influxdb_write_req() { - let lines = r#" - demo,tag1=t1,tag2=t2 field1=90,field2=100 1678675992000 - demo,tag1=t1,tag2=t2 field1=91,field2=101 1678675993000 - demo,tag1=t11,tag2=t22 field1=900,field2=1000 1678675992000 - demo,tag1=t11,tag2=t22 field1=901,field2=1001 1678675993000 - "# - .to_string(); - let req = WriteRequest { - lines, - db: "public".to_string(), - precision: Precision::Millisecond, - }; - - let pb_req = convert_write_request(req).unwrap(); - assert_eq!(1, pb_req.len()); - assert_eq!( - pb_req[0], - WriteTableRequest { - table: "demo".to_string(), - tag_names: vec!["tag1".to_string(), "tag2".to_string()], - field_names: vec!["field1".to_string(), "field2".to_string()], - entries: vec![ - // First series - WriteSeriesEntry { - tags: vec![ - Tag { - name_index: 0, - value: Some(convert_influx_value(FieldValue::String("t1".into()))), - }, - Tag { - name_index: 1, - value: Some(convert_influx_value(FieldValue::String("t2".into()))), - }, - ], - field_groups: vec![ - FieldGroup { - timestamp: 1678675992000, - fields: vec![ - Field { - name_index: 0, - value: Some(convert_influx_value(FieldValue::F64(90.0))), - }, - Field { - name_index: 1, - value: Some(convert_influx_value(FieldValue::F64(100.0))), - } - ] - }, - FieldGroup { - timestamp: 1678675993000, - fields: vec![ - Field { - name_index: 0, - value: Some(convert_influx_value(FieldValue::F64(91.0))), - }, - Field { - name_index: 1, - value: Some(convert_influx_value(FieldValue::F64(101.0))), - } - ] - }, - ] - }, - // Second series - WriteSeriesEntry { - tags: vec![ - Tag { - name_index: 0, - value: Some(convert_influx_value(FieldValue::String("t11".into()))), - }, - Tag { - name_index: 1, - value: Some(convert_influx_value(FieldValue::String("t22".into()))), - }, - ], - field_groups: vec![ - FieldGroup { - timestamp: 1678675992000, - fields: vec![ - Field { - name_index: 0, - value: Some(convert_influx_value(FieldValue::F64(900.0))), - }, - Field { - name_index: 1, - value: Some(convert_influx_value(FieldValue::F64(1000.0))), - } - ] - }, - FieldGroup { - timestamp: 1678675993000, - fields: vec![ - Field { - name_index: 0, - value: Some(convert_influx_value(FieldValue::F64(901.0))), - }, - Field { - name_index: 1, - value: Some(convert_influx_value(FieldValue::F64(1001.0))), - } - ] - }, - ] - } - ] - } - ); - } - - #[test] - fn test_influxql_result() { - let record_schema = build_test_record_schema(); - let column_blocks = build_test_column_blocks(); - let record_batch = RecordBatch::new(record_schema, column_blocks).unwrap(); - - let mut builder = InfluxqlResultBuilder::new(record_batch.schema(), 0).unwrap(); - builder.add_record_batch(record_batch).unwrap(); - let iql_results = vec![builder.build()]; - let iql_response = InfluxqlResponse { - results: iql_results, - }; - let iql_result_json = - PrettyFormatter::from_str(&serde_json::to_string(&iql_response).unwrap()).pretty(); - let expected = PrettyFormatter::from_str(r#"{"results":[{"statement_id":0,"series":[{"name":"m1","tags":{"tag":"tv1"}, - "columns":["time","field1","field2"],"values":[[10001,"fv1",1]]}, - {"name":"m1","tags":{"tag":"tv2"},"columns":["time","field1","field2"],"values":[[100002,"fv2",2]]}, - {"name":"m1","tags":{"tag":"tv3"},"columns":["time","field1","field2"],"values":[[10003,"fv3",3]]}, - {"name":"m1","tags":{"tag":""},"columns":["time","field1","field2"],"values":[[10007,null,null]]}, - {"name":"m2","tags":{"tag":"tv4"},"columns":["time","field1","field2"],"values":[[10004,"fv4",4]]}, - {"name":"m2","tags":{"tag":"tv5"},"columns":["time","field1","field2"],"values":[[100005,"fv5",5]]}, - {"name":"m2","tags":{"tag":"tv6"},"columns":["time","field1","field2"],"values":[[10006,"fv6",6]]}]}]}"#).pretty(); - assert_eq!(expected, iql_result_json); - } - - fn build_test_record_schema() -> RecordSchema { - let schema = schema::Builder::new() - .auto_increment_column_id(true) - .add_key_column( - column_schema::Builder::new("time".to_string(), DatumKind::Timestamp) - .build() - .expect("should succeed build column schema"), - ) - .unwrap() - .add_normal_column( - column_schema::Builder::new("tag".to_string(), DatumKind::String) - .is_tag(true) - .is_nullable(true) - .build() - .expect("should succeed build column schema"), - ) - .unwrap() - .add_normal_column( - column_schema::Builder::new("field1".to_string(), DatumKind::String) - .is_nullable(true) - .build() - .expect("should succeed build column schema"), - ) - .unwrap() - .add_normal_column( - // The data type of column is `UInt32`, and the type of default value expr is - // `Int64`. So we use this column to cover the test, which has - // different type. - column_schema::Builder::new("field2".to_string(), DatumKind::UInt64) - .is_nullable(true) - .build() - .expect("should succeed build column schema"), - ) - .unwrap() - .build() - .unwrap(); - - // Record schema - let arrow_schema = schema.to_arrow_schema_ref(); - let fields = arrow_schema.fields.to_owned(); - let measurement_field = ArrowField::new( - CERESDB_MEASUREMENT_COLUMN_NAME.to_string(), - schema::DataType::Utf8, - false, - ); - let project_fields = vec![ - measurement_field, - fields[1].clone(), - fields[0].clone(), - fields[2].clone(), - fields[3].clone(), - ]; - let project_arrow_schema = Arc::new(ArrowSchema::new_with_metadata( - project_fields, - arrow_schema.metadata().clone(), - )); - - RecordSchema::try_from(project_arrow_schema).unwrap() - } - - fn build_test_column_blocks() -> Vec { - let mut measurement_builder = ColumnBlockBuilder::with_capacity(&DatumKind::String, 3); - let mut tag_builder = ColumnBlockBuilder::with_capacity(&DatumKind::String, 3); - let mut time_builder = ColumnBlockBuilder::with_capacity(&DatumKind::Timestamp, 3); - let mut field_builder1 = ColumnBlockBuilder::with_capacity(&DatumKind::String, 3); - let mut field_builder2 = ColumnBlockBuilder::with_capacity(&DatumKind::UInt64, 3); - - // Data in measurement1 - let measurement1 = Datum::String(StringBytes::copy_from_str("m1")); - let tags1 = vec!["tv1".to_string(), "tv2".to_string(), "tv3".to_string()] - .into_iter() - .map(|v| Datum::String(StringBytes::copy_from_str(v.as_str()))) - .collect::>(); - let times1 = vec![10001_i64, 100002, 10003] - .into_iter() - .map(|v| Datum::Timestamp(v.into())) - .collect::>(); - let fields1 = vec!["fv1".to_string(), "fv2".to_string(), "fv3".to_string()] - .into_iter() - .map(|v| Datum::String(StringBytes::copy_from_str(v.as_str()))) - .collect::>(); - let fields2 = vec![1_u64, 2, 3] - .into_iter() - .map(Datum::UInt64) - .collect::>(); - - let measurement2 = Datum::String(StringBytes::copy_from_str("m2")); - let tags2 = vec!["tv4".to_string(), "tv5".to_string(), "tv6".to_string()] - .into_iter() - .map(|v| Datum::String(StringBytes::copy_from_str(v.as_str()))) - .collect::>(); - let times2 = vec![10004_i64, 100005, 10006] - .into_iter() - .map(|v| Datum::Timestamp(v.into())) - .collect::>(); - let fields3 = vec!["fv4".to_string(), "fv5".to_string(), "fv6".to_string()] - .into_iter() - .map(|v| Datum::String(StringBytes::copy_from_str(v.as_str()))) - .collect::>(); - let fields4 = vec![4_u64, 5, 6] - .into_iter() - .map(Datum::UInt64) - .collect::>(); - - for idx in 0..3 { - measurement_builder.append(measurement1.clone()).unwrap(); - tag_builder.append(tags1[idx].clone()).unwrap(); - time_builder.append(times1[idx].clone()).unwrap(); - field_builder1.append(fields1[idx].clone()).unwrap(); - field_builder2.append(fields2[idx].clone()).unwrap(); - } - measurement_builder.append(measurement1).unwrap(); - tag_builder.append(Datum::Null).unwrap(); - time_builder.append(Datum::Timestamp(10007.into())).unwrap(); - field_builder1.append(Datum::Null).unwrap(); - field_builder2.append(Datum::Null).unwrap(); - - for idx in 0..3 { - measurement_builder.append(measurement2.clone()).unwrap(); - tag_builder.append(tags2[idx].clone()).unwrap(); - time_builder.append(times2[idx].clone()).unwrap(); - field_builder1.append(fields3[idx].clone()).unwrap(); - field_builder2.append(fields4[idx].clone()).unwrap(); - } - - vec![ - measurement_builder.build(), - tag_builder.build(), - time_builder.build(), - field_builder1.build(), - field_builder2.build(), - ] - } -} diff --git a/proxy/src/influxdb/types.rs b/proxy/src/influxdb/types.rs new file mode 100644 index 0000000000..6a61939b68 --- /dev/null +++ b/proxy/src/influxdb/types.rs @@ -0,0 +1,880 @@ +// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. + +use std::collections::{BTreeMap, HashMap}; + +use bytes::Bytes; +use ceresdbproto::storage::{ + value, Field, FieldGroup, Tag, Value, WriteSeriesEntry, WriteTableRequest, +}; +use common_types::{ + column_schema::ColumnSchema, datum::Datum, record_batch::RecordBatch, schema::RecordSchema, + time::Timestamp, +}; +use common_util::error::BoxError; +use http::Method; +use influxdb_line_protocol::FieldValue; +use interpreters::interpreter::Output; +use query_frontend::influxql::planner::CERESDB_MEASUREMENT_COLUMN_NAME; +use serde::{Deserialize, Serialize}; +use snafu::{ensure, OptionExt, ResultExt}; + +use crate::error::{Internal, InternalNoCause, Result}; + +/// Influxql write request compatible with influxdb 1.8 +/// +/// It's derived from 1.x write api described in doc of influxdb 1.8: +/// https://docs.influxdata.com/influxdb/v1.8/tools/api/#write-http-endpoint +#[derive(Debug)] +pub struct WriteRequest { + /// Data formatted in line protocol + pub lines: String, + + /// Details about `db`, `precision` can be saw in [WriteParams] + // TODO: `db` should be made use of in later. + pub db: String, + pub precision: Precision, +} + +impl WriteRequest { + pub fn new(lines: Bytes, params: WriteParams) -> Self { + let lines = String::from_utf8_lossy(&lines).to_string(); + + let precision = params.precision.as_str().into(); + + WriteRequest { + lines, + db: params.db, + precision, + } + } +} + +pub type WriteResponse = (); + +/// Query string parameters for write api +/// +/// It's derived from query string parameters of write described in +/// doc of influxdb 1.8: +/// https://docs.influxdata.com/influxdb/v1.8/tools/api/#query-string-parameters-2 +/// +/// NOTE: +/// - `db` is not required and default to `public` in CeresDB. +/// - `precision`'s default value is `ms` but not `ns` in CeresDB. +#[derive(Debug, Deserialize)] +#[serde(default)] +pub struct WriteParams { + pub db: String, + pub precision: String, +} + +impl Default for WriteParams { + fn default() -> Self { + Self { + db: "public".to_string(), + precision: "ms".to_string(), + } + } +} + +/// Influxql query request compatible with influxdb 1.8 +/// +/// It's derived from 1.x query api described in doc of influxdb 1.8: +/// https://docs.influxdata.com/influxdb/v1.8/tools/api/#query-http-endpoint +/// +/// NOTE: +/// - when query by POST method, query(q) should be placed after +/// `--data-urlencode` like what shown in link above. +/// - when query by GET method, query should be placed in url +/// parameters(where `db`, `epoch`, etc are placed in). +#[derive(Debug)] +pub struct InfluxqlRequest { + /// Query described by influxql + pub query: String, + + /// Details about `db`, `epoch`, `pretty` can be saw in [InfluxqlParams] + // TODO: `db`, `epoch`, `pretty` should be made use of in later. + pub db: String, + pub epoch: Precision, + pub pretty: bool, +} + +impl InfluxqlRequest { + pub fn try_new( + method: Method, + mut body: HashMap, + params: InfluxqlParams, + ) -> Result { + // Extract and check body & parameters. + // - q: required(in body when POST and parameters when GET) + // - chunked,db,epoch,pretty: in parameters + if body.contains_key("params") { + return InternalNoCause { + msg: "`params` is not supported now", + } + .fail(); + } + + let query = match method { + Method::GET => params.q.context(InternalNoCause { + msg: "query not found when query by GET", + })?, + Method::POST => body.remove("q").context(InternalNoCause { + msg: "query not found when query by POST", + })?, + other => { + return InternalNoCause { + msg: format!("method not allowed in query, method:{other}"), + } + .fail() + } + }; + + let epoch = params.epoch.as_str().into(); + + Ok(InfluxqlRequest { + query, + db: params.db, + epoch, + pretty: params.pretty, + }) + } +} + +#[derive(Debug, Default)] +pub enum Precision { + #[default] + Millisecond, + Nanosecond, + Microsecond, + Second, + Minute, + Hour, +} + +impl Precision { + fn try_normalize(&self, ts: i64) -> Option { + match self { + Self::Millisecond => Some(ts), + Self::Nanosecond => ts.checked_div(1000 * 1000), + Self::Microsecond => ts.checked_div(1000), + Self::Second => ts.checked_mul(1000), + Self::Minute => ts.checked_mul(1000 * 60), + Self::Hour => ts.checked_mul(1000 * 60 * 60), + } + } +} + +impl From<&str> for Precision { + fn from(value: &str) -> Self { + match value { + "ns" | "n" => Precision::Nanosecond, + "u" | "µ" => Precision::Microsecond, + "ms" => Precision::Millisecond, + "s" => Precision::Second, + "m" => Precision::Minute, + "h" => Precision::Hour, + // Return the default precision. + _ => Precision::Millisecond, + } + } +} + +/// Query string parameters for query api(by influxql) +/// +/// It's derived from query string parameters of query described in +/// doc of influxdb 1.8: +/// https://docs.influxdata.com/influxdb/v1.8/tools/api/#query-string-parameters-1 +/// +/// NOTE: +/// - `db` is not required and default to `public` in CeresDB. +/// - `chunked` is not supported in CeresDB. +/// - `epoch`'s default value is `ms` but not `ns` in CeresDB. +#[derive(Debug, Deserialize)] +#[serde(default)] +pub struct InfluxqlParams { + pub q: Option, + pub db: String, + pub epoch: String, + pub pretty: bool, +} + +impl Default for InfluxqlParams { + fn default() -> Self { + Self { + q: None, + db: "public".to_string(), + epoch: "ms".to_string(), + pretty: false, + } + } +} + +/// Influxql response organized in the same way with influxdb. +/// +/// The basic example: +/// ```json +/// {"results":[{"statement_id":0,"series":[{"name":"mymeas", +/// "columns":["time","myfield","mytag1","mytag2"], +/// "values":[["2017-03-01T00:16:18Z",33.1,null,null], +/// ["2017-03-01T00:17:18Z",12.4,"12","14"]]}]}]} +/// ``` +/// More details refer to: +/// https://docs.influxdata.com/influxdb/v1.8/tools/api/#query-data-with-a-select-statement +#[derive(Debug, Serialize)] +pub struct InfluxqlResponse { + pub results: Vec, +} + +#[derive(Debug, Serialize)] +pub struct OneInfluxqlResult { + statement_id: u32, + #[serde(skip_serializing_if = "Option::is_none")] + series: Option>, +} + +#[derive(Debug, Serialize)] +struct OneSeries { + name: String, + #[serde(skip_serializing_if = "Option::is_none")] + tags: Option>, + columns: Vec, + values: Vec>, +} + +/// [InfluxqlResult] builder +#[derive(Default)] +pub struct InfluxqlResultBuilder { + /// Query id for [multiple queries](https://docs.influxdata.com/influxdb/v1.8/tools/api/#request-multiple-queries) + statement_id: u32, + + /// Schema of influxql query result + /// + /// Its format is like: + /// measurement | + /// tag_1..tag_n(columns in `group by`) | + /// time | + /// column_1..column_n(column in `projection` but not in `group by`) + column_schemas: Vec, + + /// Tags part in schema + group_by_tag_col_idxs: Vec, + + /// Columns part in schema(include `time`) + value_col_idxs: Vec, + + /// Mapping group key(`measurement` + `tag values`) to column values, + /// + /// NOTE: because tag keys in `group by` clause are same in each sub result, + /// we just use the `measurement` + `tag values` to distinguish them. + group_key_to_idx: HashMap, + + /// Column values grouped by [GroupKey] + value_groups: Vec, +} + +type Row = Vec; +type RowGroup = Vec; + +impl InfluxqlResultBuilder { + pub fn new(record_schema: &RecordSchema, statement_id: u32) -> Result { + let column_schemas = record_schema.columns().to_owned(); + ensure!( + !column_schemas.is_empty(), + InternalNoCause { + msg: "empty schema", + } + ); + + // Query like `show measurements`, there will be not timestamp. + let has_timestamp = column_schemas.iter().any(|c| c.data_type.is_timestamp()); + // Find the tags part and columns part from schema. + let mut group_by_col_idxs = Vec::new(); + let mut value_col_idxs = Vec::new(); + + // The following index searching logic is derived from the fixed format + // described when introducing `column_schemas`. + let mut col_iter = column_schemas.iter().enumerate(); + // The first column may be measurement column in normal. + ensure!(col_iter.next().unwrap().1.name == CERESDB_MEASUREMENT_COLUMN_NAME, InternalNoCause { + msg: format!("invalid schema whose first column is not measurement column, schema:{column_schemas:?}"), + }); + + // The group by tags will be placed after measurement and before time column. + let mut searching_group_by_tags = has_timestamp; + for (idx, col) in col_iter { + if col.data_type.is_timestamp() { + searching_group_by_tags = false; + } + + if searching_group_by_tags { + group_by_col_idxs.push(idx); + } else { + value_col_idxs.push(idx); + } + } + + Ok(Self { + statement_id, + column_schemas, + group_by_tag_col_idxs: group_by_col_idxs, + value_col_idxs, + group_key_to_idx: HashMap::new(), + value_groups: Vec::new(), + }) + } + + pub fn add_record_batch(&mut self, record_batch: RecordBatch) -> Result<()> { + // Check schema's compatibility. + ensure!( + record_batch.schema().columns() == self.column_schemas, + InternalNoCause { + msg: format!( + "conflict schema, origin:{:?}, new:{:?}", + self.column_schemas, + record_batch.schema().columns() + ), + } + ); + + let row_num = record_batch.num_rows(); + for row_idx in 0..row_num { + // Get measurement + group by tags. + let group_key = self.extract_group_key(&record_batch, row_idx)?; + let value_group = self.extract_value_group(&record_batch, row_idx)?; + + let value_groups = if let Some(idx) = self.group_key_to_idx.get(&group_key) { + self.value_groups.get_mut(*idx).unwrap() + } else { + self.value_groups.push(Vec::new()); + self.group_key_to_idx + .insert(group_key, self.value_groups.len() - 1); + self.value_groups.last_mut().unwrap() + }; + + value_groups.push(value_group); + } + + Ok(()) + } + + pub fn build(self) -> OneInfluxqlResult { + let ordered_group_keys = { + let mut ordered_pairs = self.group_key_to_idx.into_iter().collect::>(); + ordered_pairs.sort_by(|a, b| a.1.cmp(&b.1)); + ordered_pairs + .into_iter() + .map(|(key, _)| key) + .collect::>() + }; + + let series = ordered_group_keys + .into_iter() + .zip(self.value_groups.into_iter()) + .map(|(group_key, value_group)| { + let name = group_key.measurement; + let tags = if group_key.group_by_tag_values.is_empty() { + None + } else { + let tags = group_key + .group_by_tag_values + .into_iter() + .enumerate() + .map(|(tagk_idx, tagv)| { + let tagk_col_idx = self.group_by_tag_col_idxs[tagk_idx]; + let tagk = self.column_schemas[tagk_col_idx].name.clone(); + + (tagk, tagv) + }) + .collect::>(); + + Some(tags) + }; + + let columns = self + .value_col_idxs + .iter() + .map(|idx| self.column_schemas[*idx].name.clone()) + .collect::>(); + + OneSeries { + name, + tags, + columns, + values: value_group, + } + }) + .collect(); + + OneInfluxqlResult { + series: Some(series), + statement_id: self.statement_id, + } + } + + fn extract_group_key(&self, record_batch: &RecordBatch, row_idx: usize) -> Result { + let mut group_by_tag_values = Vec::with_capacity(self.group_by_tag_col_idxs.len()); + let measurement = { + let measurement = record_batch.column(0).datum(row_idx); + match measurement { + Datum::String(m) => m.to_string(), + other => { + return InternalNoCause { + msg: format!("invalid measurement column, column:{other:?}"), + } + .fail() + } + } + }; + + for col_idx in &self.group_by_tag_col_idxs { + let tag = { + let tag_datum = record_batch.column(*col_idx).datum(row_idx); + match tag_datum { + Datum::Null => "".to_string(), + Datum::String(tag) => tag.to_string(), + other => { + return InternalNoCause { + msg: format!("invalid tag column, column:{other:?}"), + } + .fail() + } + } + }; + group_by_tag_values.push(tag); + } + + Ok(GroupKey { + measurement, + group_by_tag_values, + }) + } + + fn extract_value_group( + &self, + record_batch: &RecordBatch, + row_idx: usize, + ) -> Result> { + let mut value_group = Vec::with_capacity(self.value_col_idxs.len()); + for col_idx in &self.value_col_idxs { + let value = record_batch.column(*col_idx).datum(row_idx); + + value_group.push(value); + } + + Ok(value_group) + } +} + +#[derive(Hash, PartialEq, Eq, Clone)] +struct GroupKey { + measurement: String, + group_by_tag_values: Vec, +} + +pub(crate) fn convert_write_request(req: WriteRequest) -> Result> { + let mut req_by_measurement = HashMap::new(); + for line in influxdb_line_protocol::parse_lines(&req.lines) { + let mut line = line.box_err().with_context(|| Internal { + msg: "invalid line", + })?; + + let timestamp = match line.timestamp { + Some(ts) => req.precision.try_normalize(ts).context(InternalNoCause { + msg: "time outside range -9223372036854775806 - 9223372036854775806", + })?, + None => Timestamp::now().as_i64(), + }; + let mut tag_set = line.series.tag_set.unwrap_or_default(); + // sort by tag key + tag_set.sort_unstable_by(|a, b| a.0.cmp(&b.0)); + // sort by field key + line.field_set.sort_unstable_by(|a, b| a.0.cmp(&b.0)); + + let req_for_one_measurement = req_by_measurement + .entry(line.series.measurement.to_string()) + .or_insert_with(|| WriteTableRequest { + table: line.series.measurement.to_string(), + tag_names: tag_set.iter().map(|(tagk, _)| tagk.to_string()).collect(), + field_names: line + .field_set + .iter() + .map(|(tagk, _)| tagk.to_string()) + .collect(), + entries: Vec::new(), + }); + + let tags: Vec<_> = tag_set + .iter() + .enumerate() + .map(|(idx, (_, tagv))| Tag { + name_index: idx as u32, + value: Some(Value { + value: Some(value::Value::StringValue(tagv.to_string())), + }), + }) + .collect(); + let field_group = FieldGroup { + timestamp, + fields: line + .field_set + .iter() + .cloned() + .enumerate() + .map(|(idx, (_, fieldv))| Field { + name_index: idx as u32, + value: Some(convert_influx_value(fieldv)), + }) + .collect(), + }; + let mut found = false; + for entry in &mut req_for_one_measurement.entries { + if entry.tags == tags { + // TODO: remove clone? + entry.field_groups.push(field_group.clone()); + found = true; + break; + } + } + if !found { + req_for_one_measurement.entries.push(WriteSeriesEntry { + tags, + field_groups: vec![field_group], + }) + } + } + + Ok(req_by_measurement.into_values().collect()) +} + +/// Convert influxdb's FieldValue to ceresdbproto's Value +fn convert_influx_value(field_value: FieldValue) -> Value { + let v = match field_value { + FieldValue::I64(v) => value::Value::Int64Value(v), + FieldValue::U64(v) => value::Value::Uint64Value(v), + FieldValue::F64(v) => value::Value::Float64Value(v), + FieldValue::String(v) => value::Value::StringValue(v.to_string()), + FieldValue::Boolean(v) => value::Value::BoolValue(v), + }; + + Value { value: Some(v) } +} + +pub fn convert_influxql_output(output: Output) -> Result { + // TODO: now, we just support one influxql in each query. + let records = match output { + Output::Records(records) => records, + Output::AffectedRows(_) => { + return InternalNoCause { + msg: "output in influxql should not be affected rows", + } + .fail() + } + }; + + let influxql_result = if records.is_empty() { + OneInfluxqlResult { + statement_id: 0, + series: None, + } + } else { + // All record schemas in one query result should be same. + let record_schema = records.first().unwrap().schema(); + let mut builder = InfluxqlResultBuilder::new(record_schema, 0)?; + for record in records { + builder.add_record_batch(record)?; + } + + builder.build() + }; + + Ok(InfluxqlResponse { + results: vec![influxql_result], + }) +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use arrow::datatypes::{Field as ArrowField, Schema as ArrowSchema}; + use common_types::{ + column::{ColumnBlock, ColumnBlockBuilder}, + column_schema, + datum::DatumKind, + schema, + string::StringBytes, + }; + use json_pretty::PrettyFormatter; + + use super::*; + + #[test] + fn test_convert_influxdb_write_req() { + let lines = r#" + demo,tag1=t1,tag2=t2 field1=90,field2=100 1678675992000 + demo,tag1=t1,tag2=t2 field1=91,field2=101 1678675993000 + demo,tag1=t11,tag2=t22 field1=900,field2=1000 1678675992000 + demo,tag1=t11,tag2=t22 field1=901,field2=1001 1678675993000 + "# + .to_string(); + let req = WriteRequest { + lines, + db: "public".to_string(), + precision: Precision::Millisecond, + }; + + let pb_req = convert_write_request(req).unwrap(); + assert_eq!(1, pb_req.len()); + assert_eq!( + pb_req[0], + WriteTableRequest { + table: "demo".to_string(), + tag_names: vec!["tag1".to_string(), "tag2".to_string()], + field_names: vec!["field1".to_string(), "field2".to_string()], + entries: vec![ + // First series + WriteSeriesEntry { + tags: vec![ + Tag { + name_index: 0, + value: Some(convert_influx_value(FieldValue::String("t1".into()))), + }, + Tag { + name_index: 1, + value: Some(convert_influx_value(FieldValue::String("t2".into()))), + }, + ], + field_groups: vec![ + FieldGroup { + timestamp: 1678675992000, + fields: vec![ + Field { + name_index: 0, + value: Some(convert_influx_value(FieldValue::F64(90.0))), + }, + Field { + name_index: 1, + value: Some(convert_influx_value(FieldValue::F64(100.0))), + } + ] + }, + FieldGroup { + timestamp: 1678675993000, + fields: vec![ + Field { + name_index: 0, + value: Some(convert_influx_value(FieldValue::F64(91.0))), + }, + Field { + name_index: 1, + value: Some(convert_influx_value(FieldValue::F64(101.0))), + } + ] + }, + ] + }, + // Second series + WriteSeriesEntry { + tags: vec![ + Tag { + name_index: 0, + value: Some(convert_influx_value(FieldValue::String("t11".into()))), + }, + Tag { + name_index: 1, + value: Some(convert_influx_value(FieldValue::String("t22".into()))), + }, + ], + field_groups: vec![ + FieldGroup { + timestamp: 1678675992000, + fields: vec![ + Field { + name_index: 0, + value: Some(convert_influx_value(FieldValue::F64(900.0))), + }, + Field { + name_index: 1, + value: Some(convert_influx_value(FieldValue::F64(1000.0))), + } + ] + }, + FieldGroup { + timestamp: 1678675993000, + fields: vec![ + Field { + name_index: 0, + value: Some(convert_influx_value(FieldValue::F64(901.0))), + }, + Field { + name_index: 1, + value: Some(convert_influx_value(FieldValue::F64(1001.0))), + } + ] + }, + ] + } + ] + } + ); + } + + #[test] + fn test_influxql_result() { + let record_schema = build_test_record_schema(); + let column_blocks = build_test_column_blocks(); + let record_batch = RecordBatch::new(record_schema, column_blocks).unwrap(); + + let mut builder = InfluxqlResultBuilder::new(record_batch.schema(), 0).unwrap(); + builder.add_record_batch(record_batch).unwrap(); + let iql_results = vec![builder.build()]; + let iql_response = InfluxqlResponse { + results: iql_results, + }; + let iql_result_json = + PrettyFormatter::from_str(&serde_json::to_string(&iql_response).unwrap()).pretty(); + let expected = PrettyFormatter::from_str(r#"{"results":[{"statement_id":0,"series":[{"name":"m1","tags":{"tag":"tv1"}, + "columns":["time","field1","field2"],"values":[[10001,"fv1",1]]}, + {"name":"m1","tags":{"tag":"tv2"},"columns":["time","field1","field2"],"values":[[100002,"fv2",2]]}, + {"name":"m1","tags":{"tag":"tv3"},"columns":["time","field1","field2"],"values":[[10003,"fv3",3]]}, + {"name":"m1","tags":{"tag":""},"columns":["time","field1","field2"],"values":[[10007,null,null]]}, + {"name":"m2","tags":{"tag":"tv4"},"columns":["time","field1","field2"],"values":[[10004,"fv4",4]]}, + {"name":"m2","tags":{"tag":"tv5"},"columns":["time","field1","field2"],"values":[[100005,"fv5",5]]}, + {"name":"m2","tags":{"tag":"tv6"},"columns":["time","field1","field2"],"values":[[10006,"fv6",6]]}]}]}"#).pretty(); + assert_eq!(expected, iql_result_json); + } + + fn build_test_record_schema() -> RecordSchema { + let schema = schema::Builder::new() + .auto_increment_column_id(true) + .add_key_column( + column_schema::Builder::new("time".to_string(), DatumKind::Timestamp) + .build() + .expect("should succeed build column schema"), + ) + .unwrap() + .add_normal_column( + column_schema::Builder::new("tag".to_string(), DatumKind::String) + .is_tag(true) + .is_nullable(true) + .build() + .expect("should succeed build column schema"), + ) + .unwrap() + .add_normal_column( + column_schema::Builder::new("field1".to_string(), DatumKind::String) + .is_nullable(true) + .build() + .expect("should succeed build column schema"), + ) + .unwrap() + .add_normal_column( + // The data type of column is `UInt32`, and the type of default value expr is + // `Int64`. So we use this column to cover the test, which has + // different type. + column_schema::Builder::new("field2".to_string(), DatumKind::UInt64) + .is_nullable(true) + .build() + .expect("should succeed build column schema"), + ) + .unwrap() + .build() + .unwrap(); + + // Record schema + let arrow_schema = schema.to_arrow_schema_ref(); + let fields = arrow_schema.fields.to_owned(); + let measurement_field = ArrowField::new( + CERESDB_MEASUREMENT_COLUMN_NAME.to_string(), + schema::DataType::Utf8, + false, + ); + let project_fields = vec![ + measurement_field, + fields[1].clone(), + fields[0].clone(), + fields[2].clone(), + fields[3].clone(), + ]; + let project_arrow_schema = Arc::new(ArrowSchema::new_with_metadata( + project_fields, + arrow_schema.metadata().clone(), + )); + + RecordSchema::try_from(project_arrow_schema).unwrap() + } + + fn build_test_column_blocks() -> Vec { + let mut measurement_builder = ColumnBlockBuilder::with_capacity(&DatumKind::String, 3); + let mut tag_builder = ColumnBlockBuilder::with_capacity(&DatumKind::String, 3); + let mut time_builder = ColumnBlockBuilder::with_capacity(&DatumKind::Timestamp, 3); + let mut field_builder1 = ColumnBlockBuilder::with_capacity(&DatumKind::String, 3); + let mut field_builder2 = ColumnBlockBuilder::with_capacity(&DatumKind::UInt64, 3); + + // Data in measurement1 + let measurement1 = Datum::String(StringBytes::copy_from_str("m1")); + let tags1 = vec!["tv1".to_string(), "tv2".to_string(), "tv3".to_string()] + .into_iter() + .map(|v| Datum::String(StringBytes::copy_from_str(v.as_str()))) + .collect::>(); + let times1 = vec![10001_i64, 100002, 10003] + .into_iter() + .map(|v| Datum::Timestamp(v.into())) + .collect::>(); + let fields1 = vec!["fv1".to_string(), "fv2".to_string(), "fv3".to_string()] + .into_iter() + .map(|v| Datum::String(StringBytes::copy_from_str(v.as_str()))) + .collect::>(); + let fields2 = vec![1_u64, 2, 3] + .into_iter() + .map(Datum::UInt64) + .collect::>(); + + let measurement2 = Datum::String(StringBytes::copy_from_str("m2")); + let tags2 = vec!["tv4".to_string(), "tv5".to_string(), "tv6".to_string()] + .into_iter() + .map(|v| Datum::String(StringBytes::copy_from_str(v.as_str()))) + .collect::>(); + let times2 = vec![10004_i64, 100005, 10006] + .into_iter() + .map(|v| Datum::Timestamp(v.into())) + .collect::>(); + let fields3 = vec!["fv4".to_string(), "fv5".to_string(), "fv6".to_string()] + .into_iter() + .map(|v| Datum::String(StringBytes::copy_from_str(v.as_str()))) + .collect::>(); + let fields4 = vec![4_u64, 5, 6] + .into_iter() + .map(Datum::UInt64) + .collect::>(); + + for idx in 0..3 { + measurement_builder.append(measurement1.clone()).unwrap(); + tag_builder.append(tags1[idx].clone()).unwrap(); + time_builder.append(times1[idx].clone()).unwrap(); + field_builder1.append(fields1[idx].clone()).unwrap(); + field_builder2.append(fields2[idx].clone()).unwrap(); + } + measurement_builder.append(measurement1).unwrap(); + tag_builder.append(Datum::Null).unwrap(); + time_builder.append(Datum::Timestamp(10007.into())).unwrap(); + field_builder1.append(Datum::Null).unwrap(); + field_builder2.append(Datum::Null).unwrap(); + + for idx in 0..3 { + measurement_builder.append(measurement2.clone()).unwrap(); + tag_builder.append(tags2[idx].clone()).unwrap(); + time_builder.append(times2[idx].clone()).unwrap(); + field_builder1.append(fields3[idx].clone()).unwrap(); + field_builder2.append(fields4[idx].clone()).unwrap(); + } + + vec![ + measurement_builder.build(), + tag_builder.build(), + time_builder.build(), + field_builder1.build(), + field_builder2.build(), + ] + } +} diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs index 2f35a8945c..50ed94470b 100644 --- a/proxy/src/lib.rs +++ b/proxy/src/lib.rs @@ -74,8 +74,6 @@ use crate::{ forward::{ForwardRequest, ForwardResult, Forwarder, ForwarderRef}, grpc::write::WriteContext, hotspot::HotspotRecorder, - http::query::Request, - influxdb::InfluxqlRequest, instance::InstanceRef, schema_config_provider::SchemaConfigProviderRef, }; @@ -477,14 +475,6 @@ impl Proxy { } } -#[derive(Debug)] -pub enum QueryRequest { - Sql(Request), - // TODO: influxql include more parameters, we should add it in later. - // TODO: remove dead_code after implement influxql with proxy - Influxql(InfluxqlRequest), -} - #[derive(Clone)] pub struct Context { pub timeout: Option, diff --git a/server/src/http.rs b/server/src/http.rs index 4f70c4f532..b626ca0b09 100644 --- a/server/src/http.rs +++ b/server/src/http.rs @@ -20,12 +20,12 @@ use prom_remote_api::web; use proxy::{ context::RequestContext, handlers::{self}, - http::query::{convert_output, Request}, - influxdb::{ + http::query::{convert_output, SqlRequest}, + influxdb::types::{ convert_influxql_output, InfluxqlParams, InfluxqlRequest, WriteParams, WriteRequest, }, instance::InstanceRef, - Proxy, QueryRequest, + Proxy, }; use query_engine::executor::Executor as QueryExecutor; use router::{endpoint::Endpoint, RouterRef}; @@ -227,7 +227,7 @@ impl Service { fn sql(&self) -> impl Filter + Clone { // accept json or plain text let extract_request = warp::body::json() - .or(warp::body::bytes().map(|v: Bytes| Request { + .or(warp::body::bytes().map(|v: Bytes| SqlRequest { query: String::from_utf8_lossy(&v).to_string(), })) .unify(); @@ -239,7 +239,6 @@ impl Service { .and(self.with_context()) .and(self.with_proxy()) .and_then(|req, ctx, proxy: Arc>| async move { - let req = QueryRequest::Sql(req); let result = proxy .handle_query(&ctx, req) .await @@ -325,11 +324,6 @@ impl Service { } }, ); - // .and_then(|| async move { - // let request = - // InfluxqlRequest::try_new(method, body, - // params).map_err(reject::custom)?; influxdb::query(ctx, db, - // HandlerQueryRequest::Influxql(request)).await }); warp::path!("influxdb" / "v1" / ..).and(write_api.or(query_api)) } From f86f7cdbd0a9966dff4ad16d33817972e3d3acd2 Mon Sep 17 00:00:00 2001 From: "chunshao.rcs" Date: Tue, 9 May 2023 13:49:58 +0800 Subject: [PATCH 4/4] refactor by CR --- proxy/src/http/mod.rs | 2 +- proxy/src/http/{query.rs => sql.rs} | 8 +- proxy/src/influxdb/mod.rs | 128 +++++++++++++++------------- proxy/src/influxdb/types.rs | 4 +- server/src/http.rs | 11 +-- 5 files changed, 84 insertions(+), 69 deletions(-) rename proxy/src/http/{query.rs => sql.rs} (98%) diff --git a/proxy/src/http/mod.rs b/proxy/src/http/mod.rs index ec02939c33..631330d49a 100644 --- a/proxy/src/http/mod.rs +++ b/proxy/src/http/mod.rs @@ -1,4 +1,4 @@ // Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. pub mod prom; -pub mod query; +pub mod sql; diff --git a/proxy/src/http/query.rs b/proxy/src/http/sql.rs similarity index 98% rename from proxy/src/http/query.rs rename to proxy/src/http/sql.rs index 44b95b8a50..3e562c0c7b 100644 --- a/proxy/src/http/query.rs +++ b/proxy/src/http/sql.rs @@ -37,7 +37,11 @@ use crate::{ }; impl Proxy { - pub async fn handle_query(&self, ctx: &RequestContext, req: SqlRequest) -> Result { + pub async fn handle_http_sql_query( + &self, + ctx: &RequestContext, + req: Request, + ) -> Result { let request_id = RequestId::next_id(); let begin_instant = Instant::now(); let deadline = ctx.timeout.map(|t| begin_instant + t); @@ -147,7 +151,7 @@ impl Proxy { } } #[derive(Debug, Deserialize)] -pub struct SqlRequest { +pub struct Request { pub query: String, } diff --git a/proxy/src/influxdb/mod.rs b/proxy/src/influxdb/mod.rs index 98e1c21623..974a975c28 100644 --- a/proxy/src/influxdb/mod.rs +++ b/proxy/src/influxdb/mod.rs @@ -25,7 +25,10 @@ use crate::{ error::{ErrNoCause, ErrWithCause, Internal, Result}, execute_plan, grpc::write::{execute_insert_plan, write_request_to_insert_plan, WriteContext}, - influxdb::types::{convert_write_request, InfluxqlRequest, WriteRequest, WriteResponse}, + influxdb::types::{ + convert_influxql_output, convert_write_request, InfluxqlRequest, InfluxqlResponse, + WriteRequest, WriteResponse, + }, Proxy, }; @@ -34,6 +37,72 @@ impl Proxy { &self, ctx: RequestContext, req: InfluxqlRequest, + ) -> Result { + let output = self.fetch_influxdb_query_output(ctx, req).await?; + convert_influxql_output(output) + } + + pub async fn handle_influxdb_write( + &self, + ctx: RequestContext, + req: WriteRequest, + ) -> Result { + let request_id = RequestId::next_id(); + let deadline = ctx.timeout.map(|t| Instant::now() + t); + let catalog = &ctx.catalog; + self.instance.catalog_manager.default_catalog_name(); + let schema = &ctx.schema; + let schema_config = self + .schema_config_provider + .schema_config(schema) + .box_err() + .with_context(|| Internal { + msg: format!("get schema config failed, schema:{schema}"), + })?; + + let write_context = + WriteContext::new(request_id, deadline, catalog.clone(), schema.clone()); + + let plans = write_request_to_insert_plan( + self.instance.clone(), + convert_write_request(req)?, + schema_config, + write_context, + ) + .await + .box_err() + .with_context(|| Internal { + msg: "write request to insert plan", + })?; + + let mut success = 0; + for insert_plan in plans { + success += execute_insert_plan( + request_id, + catalog, + schema, + self.instance.clone(), + insert_plan, + deadline, + ) + .await + .box_err() + .with_context(|| Internal { + msg: "execute plan", + })?; + } + debug!( + "Influxdb write finished, catalog:{}, schema:{}, success:{}", + catalog, schema, success + ); + + Ok(()) + } + + async fn fetch_influxdb_query_output( + &self, + ctx: RequestContext, + req: InfluxqlRequest, ) -> Result { let request_id = RequestId::next_id(); let begin_instant = Instant::now(); @@ -114,61 +183,4 @@ impl Proxy { Ok(output) } - - pub async fn handle_influxdb_write( - &self, - ctx: RequestContext, - req: WriteRequest, - ) -> Result { - let request_id = RequestId::next_id(); - let deadline = ctx.timeout.map(|t| Instant::now() + t); - let catalog = &ctx.catalog; - self.instance.catalog_manager.default_catalog_name(); - let schema = &ctx.schema; - let schema_config = self - .schema_config_provider - .schema_config(schema) - .box_err() - .with_context(|| Internal { - msg: format!("get schema config failed, schema:{schema}"), - })?; - - let write_context = - WriteContext::new(request_id, deadline, catalog.clone(), schema.clone()); - - let plans = write_request_to_insert_plan( - self.instance.clone(), - convert_write_request(req)?, - schema_config, - write_context, - ) - .await - .box_err() - .with_context(|| Internal { - msg: "write request to insert plan", - })?; - - let mut success = 0; - for insert_plan in plans { - success += execute_insert_plan( - request_id, - catalog, - schema, - self.instance.clone(), - insert_plan, - deadline, - ) - .await - .box_err() - .with_context(|| Internal { - msg: "execute plan", - })?; - } - debug!( - "Influxdb write finished, catalog:{}, schema:{}, success:{}", - catalog, schema, success - ); - - Ok(()) - } } diff --git a/proxy/src/influxdb/types.rs b/proxy/src/influxdb/types.rs index 6a61939b68..c55213e8a7 100644 --- a/proxy/src/influxdb/types.rs +++ b/proxy/src/influxdb/types.rs @@ -1,5 +1,7 @@ // Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. +//! This module contains the types for InfluxDB. + use std::collections::{BTreeMap, HashMap}; use bytes::Bytes; @@ -559,7 +561,7 @@ fn convert_influx_value(field_value: FieldValue) -> Value { Value { value: Some(v) } } -pub fn convert_influxql_output(output: Output) -> Result { +pub(crate) fn convert_influxql_output(output: Output) -> Result { // TODO: now, we just support one influxql in each query. let records = match output { Output::Records(records) => records, diff --git a/server/src/http.rs b/server/src/http.rs index b626ca0b09..9d8178f112 100644 --- a/server/src/http.rs +++ b/server/src/http.rs @@ -20,10 +20,8 @@ use prom_remote_api::web; use proxy::{ context::RequestContext, handlers::{self}, - http::query::{convert_output, SqlRequest}, - influxdb::types::{ - convert_influxql_output, InfluxqlParams, InfluxqlRequest, WriteParams, WriteRequest, - }, + http::sql::{convert_output, Request}, + influxdb::types::{InfluxqlParams, InfluxqlRequest, WriteParams, WriteRequest}, instance::InstanceRef, Proxy, }; @@ -227,7 +225,7 @@ impl Service { fn sql(&self) -> impl Filter + Clone { // accept json or plain text let extract_request = warp::body::json() - .or(warp::body::bytes().map(|v: Bytes| SqlRequest { + .or(warp::body::bytes().map(|v: Bytes| Request { query: String::from_utf8_lossy(&v).to_string(), })) .unify(); @@ -240,7 +238,7 @@ impl Service { .and(self.with_proxy()) .and_then(|req, ctx, proxy: Arc>| async move { let result = proxy - .handle_query(&ctx, req) + .handle_http_sql_query(&ctx, req) .await .map(convert_output) .box_err() @@ -315,7 +313,6 @@ impl Service { let result = proxy .handle_influxdb_query(ctx, request) .await - .and_then(convert_influxql_output) .box_err() .context(HandleRequest); match result {