Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: impl influxdb api with proxy #875

Merged
merged 4 commits into from
May 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,10 @@ pub enum Error {
#[snafu(display("Failed to operate table, msg:{}, err:{}", msg, source))]
TableOperatorWithCause { msg: String, source: GenericError },

#[snafu(display("Failed to operate table, msg:{}.\nBacktrace:\n{}", msg, backtrace))]
TableOperatorNoCause { msg: String, backtrace: Backtrace },
// Fixme: Temporarily remove the stack information, otherwise you will encounter a
// segmentation fault.
#[snafu(display("Failed to operate table, msg:{}.\n", msg))]
TableOperatorNoCause { msg: String },
}

define_result!(Error);
Expand Down
2 changes: 1 addition & 1 deletion catalog/src/table_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ impl TableOperator {
} else {
TableOperatorNoCause {
msg: format!(
"Failed to open shard, some tables open failed, no table is shard id:{shard_id}, opened count:{no_table_count}, open error count:{open_err_count}"),
"Failed to open shard, some tables open failed, shard id:{shard_id}, no table is opened count:{no_table_count}, open error count:{open_err_count}"),
}.fail()
}
}
Expand Down
13 changes: 1 addition & 12 deletions proxy/src/handlers/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

//! Error of handlers

use common_util::{define_result, error::GenericError};
use common_util::define_result;
use snafu::{Backtrace, Snafu};
use warp::reject::Reject;

Expand All @@ -18,11 +18,6 @@ pub enum Error {
source: query_frontend::frontend::Error,
},

#[snafu(display("Failed to parse influxql, err:{}", source))]
ParseInfluxql {
source: query_frontend::frontend::Error,
},

#[snafu(display("Failed to create plan, query:{}, err:{}", query, source))]
CreatePlan {
query: String,
Expand Down Expand Up @@ -77,12 +72,6 @@ pub enum Error {
backtrace: Backtrace,
},

#[snafu(display("InfluxDb handler failed, msg:{}, source:{}", msg, source))]
InfluxDbHandlerWithCause { msg: String, source: GenericError },

#[snafu(display("InfluxDb handler failed, msg:{}.\nBacktrace:\n{}", msg, backtrace))]
InfluxDbHandlerNoCause { msg: String, backtrace: Backtrace },

#[snafu(display("Route handler failed, table:{:?}, source:{}", table, source))]
RouteHandler {
table: String,
Expand Down
1 change: 0 additions & 1 deletion proxy/src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

pub mod admin;
pub(crate) mod error;
pub mod influxdb;
pub mod query;
pub mod route;

Expand Down
32 changes: 1 addition & 31 deletions proxy/src/handlers/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,7 @@ use serde::{
use snafu::{ensure, ResultExt};

use crate::handlers::{
error::{
CreatePlan, InterpreterExec, ParseInfluxql, ParseSql, QueryBlock, QueryTimeout, TooMuchStmt,
},
influxdb::InfluxqlRequest,
error::{CreatePlan, InterpreterExec, ParseSql, QueryBlock, QueryTimeout, TooMuchStmt},
prelude::*,
};

Expand Down Expand Up @@ -101,14 +98,11 @@ impl From<Bytes> for Request {
#[derive(Debug)]
pub enum QueryRequest {
Sql(Request),
// TODO: influxql include more parameters, we should add it in later.
Influxql(InfluxqlRequest),
}
impl QueryRequest {
pub fn query(&self) -> &str {
match self {
QueryRequest::Sql(request) => request.query.as_str(),
QueryRequest::Influxql(request) => request.query.as_str(),
}
}
}
Expand Down Expand Up @@ -168,30 +162,6 @@ pub async fn handle_query<Q: QueryExecutor + 'static>(
query: &request.query,
})?
}

QueryRequest::Influxql(request) => {
let mut stmts = frontend
.parse_influxql(&mut sql_ctx, &request.query)
.context(ParseInfluxql)?;

if stmts.is_empty() {
return Ok(Output::AffectedRows(0));
}

ensure!(
stmts.len() == 1,
TooMuchStmt {
len: stmts.len(),
query: &request.query,
}
);

frontend
.influxql_stmt_to_plan(&mut sql_ctx, stmts.remove(0))
.context(CreatePlan {
query: &request.query,
})?
}
};

instance.limiter.try_limit(&plan).context(QueryBlock {
Expand Down
2 changes: 1 addition & 1 deletion proxy/src/http/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0.

pub mod prom;
pub mod query;
pub mod sql;
172 changes: 59 additions & 113 deletions proxy/src/http/query.rs → proxy/src/http/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,23 +33,22 @@ use crate::{
error::{ErrNoCause, ErrWithCause, Internal, InternalNoCause, Result},
execute_plan,
forward::ForwardResult,
handlers::influxdb::InfluxqlRequest,
Proxy,
};

impl<Q: QueryExecutor + 'static> Proxy<Q> {
pub async fn handle_query(
pub async fn handle_http_sql_query(
&self,
ctx: &RequestContext,
query_request: QueryRequest,
req: Request,
) -> Result<Output> {
let request_id = RequestId::next_id();
let begin_instant = Instant::now();
let deadline = ctx.timeout.map(|t| begin_instant + t);

info!(
"Query handler try to process request, request_id:{}, request:{:?}",
request_id, query_request
request_id, req
);

// TODO(yingwen): Privilege check, cannot access data of other tenant
Expand All @@ -63,110 +62,66 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
let frontend = Frontend::new(provider);
let mut sql_ctx = SqlContext::new(request_id, deadline);

let plan = match &query_request {
QueryRequest::Sql(request) => {
// Parse sql, frontend error of invalid sql already contains sql
// TODO(yingwen): Maybe move sql from frontend error to outer error
let mut stmts = frontend
.parse_sql(&mut sql_ctx, &request.query)
.box_err()
.with_context(|| ErrWithCause {
code: StatusCode::BAD_REQUEST,
msg: format!("Failed to parse sql, query:{}", request.query),
})?;

if stmts.is_empty() {
return Ok(Output::AffectedRows(0));
}

// TODO(yingwen): For simplicity, we only support executing one statement now
// TODO(yingwen): INSERT/UPDATE/DELETE can be batched
ensure!(
stmts.len() == 1,
ErrNoCause {
code: StatusCode::BAD_REQUEST,
msg: format!(
"Only support execute one statement now, current num:{}, query:{}.",
stmts.len(),
request.query
),
}
);

let sql_query_request = SqlQueryRequest {
context: Some(GrpcRequestContext {
database: ctx.schema.clone(),
}),
tables: vec![],
sql: request.query.clone(),
};

if let Some(resp) = self.maybe_forward_sql_query(&sql_query_request).await? {
match resp {
ForwardResult::Forwarded(resp) => {
return convert_sql_response_to_output(resp?)
}
ForwardResult::Local => (),
}
};

// Open partition table if needed.
let table_name = frontend::parse_table_name(&stmts);
if let Some(table_name) = table_name {
self.maybe_open_partition_table_if_not_exist(
&ctx.catalog,
&ctx.schema,
&table_name,
)
.await?;
}

// Create logical plan
// Note: Remember to store sql in error when creating logical plan
frontend
.statement_to_plan(&mut sql_ctx, stmts.remove(0))
.box_err()
.with_context(|| ErrWithCause {
code: StatusCode::BAD_REQUEST,
msg: format!("Failed to build plan, query:{}", request.query),
})?
// Parse sql, frontend error of invalid sql already contains sql
// TODO(yingwen): Maybe move sql from frontend error to outer error
let mut stmts = frontend
.parse_sql(&mut sql_ctx, &req.query)
.box_err()
.with_context(|| ErrWithCause {
code: StatusCode::BAD_REQUEST,
msg: format!("Failed to parse sql, query:{}", req.query),
})?;

if stmts.is_empty() {
return Ok(Output::AffectedRows(0));
}

// TODO(yingwen): For simplicity, we only support executing one statement now
// TODO(yingwen): INSERT/UPDATE/DELETE can be batched
ensure!(
stmts.len() == 1,
ErrNoCause {
code: StatusCode::BAD_REQUEST,
msg: format!(
"Only support execute one statement now, current num:{}, query:{}.",
stmts.len(),
req.query
),
}
);

let sql_query_request = SqlQueryRequest {
context: Some(GrpcRequestContext {
database: ctx.schema.clone(),
}),
tables: vec![],
sql: req.query.clone(),
};

QueryRequest::Influxql(request) => {
let mut stmts = frontend
.parse_influxql(&mut sql_ctx, &request.query)
.box_err()
.with_context(|| ErrWithCause {
code: StatusCode::BAD_REQUEST,
msg: format!("Failed to parse influxql, query:{}", request.query),
})?;

if stmts.is_empty() {
return Ok(Output::AffectedRows(0));
}

ensure!(
stmts.len() == 1,
ErrNoCause {
code: StatusCode::BAD_REQUEST,
msg: format!(
"Only support execute one statement now, current num:{}, query:{}.",
stmts.len(),
request.query
),
}
);

frontend
.influxql_stmt_to_plan(&mut sql_ctx, stmts.remove(0))
.box_err()
.with_context(|| ErrWithCause {
code: StatusCode::BAD_REQUEST,
msg: format!("Failed to build plan, query:{}", request.query),
})?
if let Some(resp) = self.maybe_forward_sql_query(&sql_query_request).await? {
match resp {
ForwardResult::Forwarded(resp) => return convert_sql_response_to_output(resp?),
ForwardResult::Local => (),
}
};

// Open partition table if needed.
let table_name = frontend::parse_table_name(&stmts);
if let Some(table_name) = table_name {
self.maybe_open_partition_table_if_not_exist(&ctx.catalog, &ctx.schema, &table_name)
.await?;
}

// Create logical plan
// Note: Remember to store sql in error when creating logical plan
let plan = frontend
.statement_to_plan(&mut sql_ctx, stmts.remove(0))
.box_err()
.with_context(|| ErrWithCause {
code: StatusCode::BAD_REQUEST,
msg: format!("Failed to build plan, query:{}", req.query),
})?;

self.instance
.limiter
.try_limit(&plan)
Expand All @@ -189,7 +144,7 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
"Query handler finished, request_id:{}, cost:{}ms, request:{:?}",
request_id,
begin_instant.saturating_elapsed().as_millis(),
query_request
req
);

Ok(output)
Expand Down Expand Up @@ -259,15 +214,6 @@ impl Serialize for ResponseRows {
}
}

#[derive(Debug)]
pub enum QueryRequest {
Sql(Request),
// TODO: influxql include more parameters, we should add it in later.
// TODO: remove dead_code after implement influxql with proxy
#[allow(dead_code)]
Influxql(InfluxqlRequest),
}

// Convert output to json
pub fn convert_output(output: Output) -> Response {
match output {
Expand Down
Loading