Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support the simplest influxql raw query #710

Merged
merged 4 commits into from
Mar 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions server/src/handlers/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ pub enum Error {
#[snafu(display("Failed to parse sql, err:{}", source))]
ParseSql { source: sql::frontend::Error },

#[snafu(display("Failed to parse influxql, err:{}", source))]
ParseInfluxql { source: sql::frontend::Error },

#[snafu(display("Failed to create plan, query:{}, err:{}", query, source))]
CreatePlan {
query: String,
Expand Down
2 changes: 1 addition & 1 deletion server/src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
pub mod admin;
pub mod error;
pub mod prom;
pub mod sql;
pub mod query;

mod prelude {
pub use catalog::manager::Manager as CatalogManager;
Expand Down
4 changes: 3 additions & 1 deletion server/src/handlers/prom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use query_engine::executor::{Executor as QueryExecutor, RecordBatchVec};
use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu};
use warp::reject;

use super::query::QueryRequest;
use crate::{
context::RequestContext, handlers, instance::InstanceRef,
schema_config_provider::SchemaConfigProviderRef,
Expand Down Expand Up @@ -291,7 +292,8 @@ impl<Q: QueryExecutor + 'static> RemoteStorage for CeresDBStorage<Q> {
TIMESTAMP_COLUMN
);

let result = handlers::sql::handle_sql(ctx, self.instance.clone(), sql.into())
let request = QueryRequest::Sql(sql.into());
let result = handlers::query::handle_query(ctx, self.instance.clone(), request)
.await
.map_err(Box::new)
.context(SqlHandle)?;
Expand Down
119 changes: 82 additions & 37 deletions server/src/handlers/sql.rs → server/src/handlers/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ use sql::{
};

use crate::handlers::{
error::{CreatePlan, InterpreterExec, ParseSql, QueryBlock, QueryTimeout, TooMuchStmt},
error::{
CreatePlan, InterpreterExec, ParseInfluxql, ParseSql, QueryBlock, QueryTimeout, TooMuchStmt,
},
prelude::*,
};

Expand Down Expand Up @@ -104,18 +106,33 @@ impl From<Bytes> for Request {
}
}

pub async fn handle_sql<Q: QueryExecutor + 'static>(
#[derive(Debug)]
pub enum QueryRequest {
Sql(Request),
// TODO: influxql include more parameters, we should add it in later.
Influxql(Request),
}
impl QueryRequest {
fn query(&self) -> &str {
match self {
QueryRequest::Sql(request) => request.query.as_str(),
QueryRequest::Influxql(request) => request.query.as_str(),
}
}
}

pub async fn handle_query<Q: QueryExecutor + 'static>(
ctx: &RequestContext,
instance: InstanceRef<Q>,
request: Request,
query_request: QueryRequest,
) -> Result<Output> {
let request_id = RequestId::next_id();
let begin_instant = Instant::now();
let deadline = ctx.timeout.map(|t| begin_instant + t);

info!(
"sql handler try to process request, request_id:{}, request:{:?}",
request_id, request
"Query handler try to process request, request_id:{}, request:{:?}",
request_id, query_request
);

// TODO(yingwen): Privilege check, cannot access data of other tenant
Expand All @@ -127,38 +144,66 @@ pub async fn handle_sql<Q: QueryExecutor + 'static>(
function_registry: &*instance.function_registry,
};
let frontend = Frontend::new(provider);

let mut sql_ctx = SqlContext::new(request_id, deadline);
// Parse sql, frontend error of invalid sql already contains sql
// TODO(yingwen): Maybe move sql from frontend error to outer error
let mut stmts = frontend
.parse_sql(&mut sql_ctx, &request.query)
.context(ParseSql)?;

if stmts.is_empty() {
return Ok(Output::AffectedRows(0));
}

// TODO(yingwen): For simplicity, we only support executing one statement now
// TODO(yingwen): INSERT/UPDATE/DELETE can be batched
ensure!(
stmts.len() == 1,
TooMuchStmt {
len: stmts.len(),
query: request.query,
let plan = match &query_request {
QueryRequest::Sql(request) => {
// Parse sql, frontend error of invalid sql already contains sql
// TODO(yingwen): Maybe move sql from frontend error to outer error
let mut stmts = frontend
.parse_sql(&mut sql_ctx, &request.query)
.context(ParseSql)?;

if stmts.is_empty() {
return Ok(Output::AffectedRows(0));
}

// TODO(yingwen): For simplicity, we only support executing one statement now
// TODO(yingwen): INSERT/UPDATE/DELETE can be batched
ensure!(
stmts.len() == 1,
TooMuchStmt {
len: stmts.len(),
query: &request.query,
}
);

// Create logical plan
// Note: Remember to store sql in error when creating logical plan
frontend
.statement_to_plan(&mut sql_ctx, stmts.remove(0))
.context(CreatePlan {
query: &request.query,
})?
}
);

// Create logical plan
// Note: Remember to store sql in error when creating logical plan
let plan = frontend
.statement_to_plan(&mut sql_ctx, stmts.remove(0))
.context(CreatePlan {
query: &request.query,
})?;
QueryRequest::Influxql(request) => {
let mut stmts = frontend
.parse_influxql(&mut sql_ctx, &request.query)
.context(ParseInfluxql)?;

if stmts.is_empty() {
return Ok(Output::AffectedRows(0));
}

ensure!(
stmts.len() == 1,
TooMuchStmt {
len: stmts.len(),
query: &request.query,
}
);

frontend
.influxql_stmt_to_plan(&mut sql_ctx, stmts.remove(0))
.context(CreatePlan {
query: &request.query,
})?
}
};

instance.limiter.try_limit(&plan).context(QueryBlock {
query: &request.query,
query: query_request.query(),
})?;

// Execute in interpreter
Expand All @@ -177,7 +222,7 @@ pub async fn handle_sql<Q: QueryExecutor + 'static>(
interpreter_factory
.create(interpreter_ctx, plan)
.context(InterpreterExec {
query: &request.query,
query: query_request.query(),
})?;

let output = if let Some(deadline) = deadline {
Expand All @@ -187,24 +232,24 @@ pub async fn handle_sql<Q: QueryExecutor + 'static>(
)
.await
.context(QueryTimeout {
query: &request.query,
query: query_request.query(),
})
.and_then(|v| {
v.context(InterpreterExec {
query: &request.query,
query: query_request.query(),
})
})?
} else {
interpreter.execute().await.context(InterpreterExec {
query: &request.query,
query: query_request.query(),
})?
};

info!(
"sql handler finished, request_id:{}, cost:{}ms, request:{:?}",
"Query handler finished, request_id:{}, cost:{}ms, request:{:?}",
request_id,
begin_instant.saturating_elapsed().as_millis(),
request
query_request
);

Ok(output)
Expand Down
46 changes: 43 additions & 3 deletions server/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::{
};

use common_util::error::BoxError;
use handlers::query::QueryRequest;
use log::{error, info};
use logger::RuntimeLevel;
use profile::Profiler;
Expand All @@ -30,7 +31,7 @@ use crate::{
consts,
context::RequestContext,
error_util,
handlers::{self, prom::CeresDBStorage, sql::Request},
handlers::{self, prom::CeresDBStorage, query::Request},
instance::InstanceRef,
metrics,
schema_config_provider::SchemaConfigProviderRef,
Expand Down Expand Up @@ -126,6 +127,7 @@ impl<Q: QueryExecutor + 'static> Service<Q> {
self.home()
.or(self.metrics())
.or(self.sql())
.or(self.influxql())
.or(self.heap_profile())
.or(self.admin_block())
.or(self.flush_memtable())
Expand Down Expand Up @@ -181,9 +183,47 @@ impl<Q: QueryExecutor + 'static> Service<Q> {
.and(self.with_context())
.and(self.with_instance())
.and_then(|req, ctx, instance| async move {
let result = handlers::sql::handle_sql(&ctx, instance, req)
let req = QueryRequest::Sql(req);
let result = handlers::query::handle_query(&ctx, instance, req)
.await
.map(handlers::sql::convert_output)
.map(handlers::query::convert_output)
.map_err(|e| {
// TODO(yingwen): Maybe truncate and print the sql
error!("Http service Failed to handle sql, err:{}", e);
Box::new(e)
})
.context(HandleRequest);
match result {
Ok(res) => Ok(reply::json(&res)),
Err(e) => Err(reject::custom(e)),
}
})
}

// POST /influxql
// this request type is not what influxdb API expected, the one in influxdb:
// https://docs.influxdata.com/influxdb/v1.8/tools/api/#query-http-endpoint
fn influxql(
&self,
) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
// accept json or plain text
let extract_request = warp::body::json()
jiacai2050 marked this conversation as resolved.
Show resolved Hide resolved
.or(warp::body::bytes().map(Request::from))
.unify();

warp::path!("influxql")
.and(warp::post())
.and(warp::body::content_length_limit(self.config.max_body_size))
.and(extract_request)
.and(self.with_context())
.and(self.with_instance())
.and_then(|req, ctx, instance| async move {
let req = QueryRequest::Influxql(req);
let result = handlers::query::handle_query(&ctx, instance, req)
.await
// TODO: the sql's `convert_output` function may be not suitable to influxql.
// We should implement influxql's related function in later.
.map(handlers::query::convert_output)
jiacai2050 marked this conversation as resolved.
Show resolved Hide resolved
.map_err(|e| {
// TODO(yingwen): Maybe truncate and print the sql
error!("Http service Failed to handle sql, err:{}", e);
Expand Down
9 changes: 6 additions & 3 deletions server/src/mysql/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ use table_engine::engine::EngineRuntimes;

use crate::{
context::RequestContext,
handlers::{self, sql::Request},
handlers::{
self,
query::{QueryRequest, Request},
},
instance::Instance,
mysql::{
error::{CreateContext, HandleSql, Result},
Expand Down Expand Up @@ -109,9 +112,9 @@ where
{
async fn do_query<'a>(&'a mut self, sql: &'a str) -> Result<Output> {
let ctx = self.create_ctx()?;

let req = Request::from(sql.to_string());
handlers::sql::handle_sql(&ctx, self.instance.clone(), req)
let req = QueryRequest::Sql(req);
handlers::query::handle_query(&ctx, self.instance.clone(), req)
.await
.map_err(|e| {
error!("Mysql service Failed to handle sql, err: {}", e);
Expand Down
1 change: 1 addition & 0 deletions sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ df_operator = { workspace = true }
hashbrown = { version = "0.12", features = ["raw"] }
influxdb_influxql_parser = { git = "https://github.com/Rachelint/influxdb_iox.git", branch = "influxql-parser" }
itertools = { workspace = true }
lazy_static = { workspace = true }
log = { workspace = true }
paste = { workspace = true }
regex = "1"
Expand Down
11 changes: 9 additions & 2 deletions sql/src/influxql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
//! Influxql processing

pub mod planner;
pub(crate) mod stmt_rewriter;
pub(crate) mod select;
pub(crate) mod util;

pub mod error {
use common_util::error::GenericError;
use snafu::{Backtrace, Snafu};
Expand Down Expand Up @@ -32,7 +33,13 @@ pub mod error {
backtrace
))]
RewriteNoCause { msg: String, backtrace: Backtrace },
}

#[snafu(display(
"Failed to convert to sql statement, msg:{}.\nBacktrace:{}",
msg,
backtrace
))]
Convert { msg: String, backtrace: Backtrace },
}
define_result!(Error);
}
Loading