Skip to content

Commit

Permalink
add influxql http interface.
Browse files Browse the repository at this point in the history
  • Loading branch information
Rachelint committed Mar 8, 2023
1 parent b36471b commit 5549300
Show file tree
Hide file tree
Showing 10 changed files with 154 additions and 42 deletions.
3 changes: 3 additions & 0 deletions server/src/handlers/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ pub enum Error {
#[snafu(display("Failed to parse sql, err:{}", source))]
ParseSql { source: sql::frontend::Error },

#[snafu(display("Failed to parse influxql, err:{}", source))]
ParseInfluxql { source: sql::frontend::Error },

#[snafu(display("Failed to create plan, query:{}, err:{}", query, source))]
CreatePlan {
query: String,
Expand Down
2 changes: 1 addition & 1 deletion server/src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
pub mod admin;
pub mod error;
pub mod prom;
pub mod sql;
pub mod query;

mod prelude {
pub use catalog::manager::Manager as CatalogManager;
Expand Down
7 changes: 6 additions & 1 deletion server/src/handlers/prom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use query_engine::executor::{Executor as QueryExecutor, RecordBatchVec};
use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu};
use warp::reject;

use super::query::{QueryRequest, QueryType};
use crate::{
context::RequestContext, handlers, instance::InstanceRef,
schema_config_provider::SchemaConfigProviderRef,
Expand Down Expand Up @@ -291,7 +292,11 @@ impl<Q: QueryExecutor + 'static> RemoteStorage for CeresDBStorage<Q> {
TIMESTAMP_COLUMN
);

let result = handlers::sql::handle_sql(ctx, self.instance.clone(), sql.into())
let request = QueryRequest {
query_type: QueryType::Sql,
request: sql.into(),
};
let result = handlers::query::handle_query(ctx, self.instance.clone(), request)
.await
.map_err(Box::new)
.context(SqlHandle)?;
Expand Down
108 changes: 77 additions & 31 deletions server/src/handlers/sql.rs → server/src/handlers/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ use sql::{
};

use crate::handlers::{
error::{CreatePlan, InterpreterExec, ParseSql, QueryBlock, QueryTimeout, TooMuchStmt},
error::{
CreatePlan, InterpreterExec, ParseInfluxql, ParseSql, QueryBlock, QueryTimeout, TooMuchStmt,
},
prelude::*,
};

Expand Down Expand Up @@ -104,18 +106,30 @@ impl From<Bytes> for Request {
}
}

pub async fn handle_sql<Q: QueryExecutor + 'static>(
#[derive(Debug)]
pub enum QueryType {
Sql,
Influxql,
}

#[derive(Debug)]
pub struct QueryRequest {
pub query_type: QueryType,
pub request: Request,
}

pub async fn handle_query<Q: QueryExecutor + 'static>(
ctx: &RequestContext,
instance: InstanceRef<Q>,
request: Request,
query_request: QueryRequest,
) -> Result<Output> {
let request_id = RequestId::next_id();
let begin_instant = Instant::now();
let deadline = ctx.timeout.map(|t| begin_instant + t);

info!(
"sql handler try to process request, request_id:{}, request:{:?}",
request_id, request
"Query handler try to process request, request_id:{}, request:{:?}",
request_id, query_request
);

// TODO(yingwen): Privilege check, cannot access data of other tenant
Expand All @@ -127,35 +141,67 @@ pub async fn handle_sql<Q: QueryExecutor + 'static>(
function_registry: &*instance.function_registry,
};
let frontend = Frontend::new(provider);

let mut sql_ctx = SqlContext::new(request_id, deadline);
// Parse sql, frontend error of invalid sql already contains sql
// TODO(yingwen): Maybe move sql from frontend error to outer error
let mut stmts = frontend
.parse_sql(&mut sql_ctx, &request.query)
.context(ParseSql)?;

if stmts.is_empty() {
return Ok(Output::AffectedRows(0));
}

// TODO(yingwen): For simplicity, we only support executing one statement now
// TODO(yingwen): INSERT/UPDATE/DELETE can be batched
ensure!(
stmts.len() == 1,
TooMuchStmt {
len: stmts.len(),
query: request.query,
let QueryRequest {
request,
query_type,
} = query_request;
let plan = match query_type {
QueryType::Sql => {
// Parse sql, frontend error of invalid sql already contains sql
// TODO(yingwen): Maybe move sql from frontend error to outer error
let mut stmts = frontend
.parse_sql(&mut sql_ctx, &request.query)
.context(ParseSql)?;

if stmts.is_empty() {
return Ok(Output::AffectedRows(0));
}

// TODO(yingwen): For simplicity, we only support executing one statement now
// TODO(yingwen): INSERT/UPDATE/DELETE can be batched
ensure!(
stmts.len() == 1,
TooMuchStmt {
len: stmts.len(),
query: request.query,
}
);

// Create logical plan
// Note: Remember to store sql in error when creating logical plan
frontend
.statement_to_plan(&mut sql_ctx, stmts.remove(0))
.context(CreatePlan {
query: &request.query,
})?
}
);

// Create logical plan
// Note: Remember to store sql in error when creating logical plan
let plan = frontend
.statement_to_plan(&mut sql_ctx, stmts.remove(0))
.context(CreatePlan {
query: &request.query,
})?;
QueryType::Influxql => {
let mut stmts = frontend
.parse_influxql(&mut sql_ctx, &request.query)
.context(ParseInfluxql)?;

if stmts.is_empty() {
return Ok(Output::AffectedRows(0));
}

ensure!(
stmts.len() == 1,
TooMuchStmt {
len: stmts.len(),
query: request.query,
}
);

frontend
.influxql_stmt_to_plan(&mut sql_ctx, stmts.remove(0))
.context(CreatePlan {
query: &request.query,
})?
}
};

instance.limiter.try_limit(&plan).context(QueryBlock {
query: &request.query,
Expand Down Expand Up @@ -201,7 +247,7 @@ pub async fn handle_sql<Q: QueryExecutor + 'static>(
};

info!(
"sql handler finished, request_id:{}, cost:{}ms, request:{:?}",
"Query handler finished, request_id:{}, cost:{}ms, request:{:?}",
request_id,
begin_instant.saturating_elapsed().as_millis(),
request
Expand Down
52 changes: 49 additions & 3 deletions server/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::{
};

use common_util::error::BoxError;
use handlers::query::QueryRequest;
use log::{error, info};
use logger::RuntimeLevel;
use profile::Profiler;
Expand All @@ -30,7 +31,11 @@ use crate::{
consts,
context::RequestContext,
error_util,
handlers::{self, prom::CeresDBStorage, sql::Request},
handlers::{
self,
prom::CeresDBStorage,
query::{QueryType, Request},
},
instance::InstanceRef,
metrics,
schema_config_provider::SchemaConfigProviderRef,
Expand Down Expand Up @@ -126,6 +131,7 @@ impl<Q: QueryExecutor + 'static> Service<Q> {
self.home()
.or(self.metrics())
.or(self.sql())
.or(self.influxql())
.or(self.heap_profile())
.or(self.admin_block())
.or(self.flush_memtable())
Expand Down Expand Up @@ -181,9 +187,49 @@ impl<Q: QueryExecutor + 'static> Service<Q> {
.and(self.with_context())
.and(self.with_instance())
.and_then(|req, ctx, instance| async move {
let result = handlers::sql::handle_sql(&ctx, instance, req)
let req = QueryRequest {
query_type: QueryType::Sql,
request: req,
};
let result = handlers::query::handle_query(&ctx, instance, req)
.await
.map(handlers::query::convert_output)
.map_err(|e| {
// TODO(yingwen): Maybe truncate and print the sql
error!("Http service Failed to handle sql, err:{}", e);
Box::new(e)
})
.context(HandleRequest);
match result {
Ok(res) => Ok(reply::json(&res)),
Err(e) => Err(reject::custom(e)),
}
})
}

// POST /sql
fn influxql(
&self,
) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
// accept json or plain text
let extract_request = warp::body::json()
.or(warp::body::bytes().map(Request::from))
.unify();

warp::path!("influxql")
.and(warp::post())
.and(warp::body::content_length_limit(self.config.max_body_size))
.and(extract_request)
.and(self.with_context())
.and(self.with_instance())
.and_then(|req, ctx, instance| async move {
let req = QueryRequest {
query_type: QueryType::Influxql,
request: req,
};
let result = handlers::query::handle_query(&ctx, instance, req)
.await
.map(handlers::sql::convert_output)
.map(handlers::query::convert_output)
.map_err(|e| {
// TODO(yingwen): Maybe truncate and print the sql
error!("Http service Failed to handle sql, err:{}", e);
Expand Down
12 changes: 9 additions & 3 deletions server/src/mysql/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ use table_engine::engine::EngineRuntimes;

use crate::{
context::RequestContext,
handlers::{self, sql::Request},
handlers::{
self,
query::{QueryRequest, QueryType, Request},
},
instance::Instance,
mysql::{
error::{CreateContext, HandleSql, Result},
Expand Down Expand Up @@ -109,9 +112,12 @@ where
{
async fn do_query<'a>(&'a mut self, sql: &'a str) -> Result<Output> {
let ctx = self.create_ctx()?;

let req = Request::from(sql.to_string());
handlers::sql::handle_sql(&ctx, self.instance.clone(), req)
let req = QueryRequest {
query_type: QueryType::Sql,
request: req,
};
handlers::query::handle_query(&ctx, self.instance.clone(), req)
.await
.map_err(|e| {
error!("Mysql service Failed to handle sql, err: {}", e);
Expand Down
2 changes: 1 addition & 1 deletion sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ df_operator = { workspace = true }
hashbrown = { version = "0.12", features = ["raw"] }
influxdb_influxql_parser = { git = "https://github.com/Rachelint/influxdb_iox.git", branch = "influxql-parser" }
itertools = { workspace = true }
lazy_static = { workspace = true }
log = { workspace = true }
paste = { workspace = true }
regex = "1"
regex-syntax = "0.6.28"
snafu = { workspace = true }
sqlparser = { workspace = true }
table_engine = { workspace = true }
lazy_static = { workspace = true }

[dev-dependencies]
common_types = { workspace = true, features = ["test"] }
Expand Down
4 changes: 2 additions & 2 deletions sql/src/influxql/planner.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0.

//! Influxql planner.
//! Influxql planner
use common_util::error::{BoxError, GenericResult};
use influxdb_influxql_parser::{
Expand All @@ -10,8 +10,8 @@ use snafu::ResultExt;
use sqlparser::ast::Statement as SqlStatement;
use table_engine::table::TableRef;

use super::select::{converter::Converter, rewriter::Rewriter};
use crate::{
influxql::select::{converter::Converter, rewriter::Rewriter},
plan::Plan,
planner::{BuildInfluxqlPlan, Result},
provider::MetaProvider,
Expand Down
4 changes: 4 additions & 0 deletions sql/src/influxql/select/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,6 @@
// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0.

//! Statement level select converting from influxql to sql
pub(crate) mod converter;
pub(crate) mod rewriter;
2 changes: 2 additions & 0 deletions sql/src/influxql/test_util.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0.

//! Test utils
use common_util::error::GenericResult;
use datafusion::sql::TableReference;
use influxdb_influxql_parser::{parse_statements, select::SelectStatement, statement::Statement};
Expand Down

0 comments on commit 5549300

Please sign in to comment.