Skip to content

Commit

Permalink
feat: support rewrite basic raw query in influxql (apache#683)
Browse files Browse the repository at this point in the history
* draft.

* implement influxql stmt rewriter.

* add tests.

* address CR.
  • Loading branch information
Rachelint authored Mar 8, 2023
1 parent 895516d commit e57d339
Show file tree
Hide file tree
Showing 10 changed files with 860 additions and 9 deletions.
84 changes: 84 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 6 additions & 3 deletions server/src/grpc/storage_service/prom_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,12 @@ use crate::grpc::storage_service::{
};

fn is_table_not_found_error(e: &FrontendError) -> bool {
matches!(&e, FrontendError::CreatePlan { source }
if matches!(source, sql::planner::Error::BuildPromPlanError { source }
if matches!(source, sql::promql::Error::TableNotFound { .. })))
matches!(&e,
FrontendError::CreatePlan {
source,
.. }
if matches!(source, sql::planner::Error::BuildPromPlanError { source }
if matches!(source, sql::promql::Error::TableNotFound { .. })))
}

pub async fn handle_query<Q>(
Expand Down
33 changes: 33 additions & 0 deletions sql/src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{sync::Arc, time::Instant};
use ceresdbproto::{prometheus::Expr as PromExpr, storage::WriteTableRequest};
use cluster::config::SchemaConfig;
use common_types::request_id::RequestId;
use influxdb_influxql_parser::statement::Statement as InfluxqlStatement;
use snafu::{Backtrace, OptionExt, ResultExt, Snafu};
use table_engine::table;

Expand Down Expand Up @@ -37,6 +38,13 @@ pub enum Error {

#[snafu(display("Expr is not found in prom request.\nBacktrace:\n{}", backtrace))]
ExprNotFoundInPromRequest { backtrace: Backtrace },

// invalid sql is quite common, so we don't provide a backtrace now.
#[snafu(display("invalid influxql, influxql:{}, err:{}", influxql, parse_err))]
InvalidInfluxql {
influxql: String,
parse_err: influxdb_influxql_parser::common::ParseError,
},
}

define_result!(Error);
Expand Down Expand Up @@ -90,6 +98,21 @@ impl<P> Frontend<P> {
let expr = expr.context(ExprNotFoundInPromRequest)?;
Expr::try_from(expr).context(InvalidPromRequest)
}

/// Parse the sql and returns the statements
pub fn parse_influxql(
&self,
_ctx: &mut Context,
influxql: &str,
) -> Result<Vec<InfluxqlStatement>> {
match influxdb_influxql_parser::parse_statements(influxql) {
Ok(stmts) => Ok(stmts),
Err(e) => Err(Error::InvalidInfluxql {
influxql: influxql.to_string(),
parse_err: e,
}),
}
}
}

impl<P: MetaProvider> Frontend<P> {
Expand All @@ -110,6 +133,16 @@ impl<P: MetaProvider> Frontend<P> {
planner.promql_expr_to_plan(expr).context(CreatePlan)
}

pub fn influxql_stmt_to_plan(
&self,
ctx: &mut Context,
stmt: InfluxqlStatement,
) -> Result<Plan> {
let planner = Planner::new(&self.provider, ctx.request_id, ctx.read_parallelism);

planner.influxql_stmt_to_plan(stmt).context(CreatePlan)
}

pub fn write_req_to_plan(
&self,
ctx: &mut Context,
Expand Down
38 changes: 38 additions & 0 deletions sql/src/influxql/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0.

//! Influxql processing

pub mod planner;
pub(crate) mod stmt_rewriter;
pub(crate) mod util;
pub mod error {
use common_util::error::GenericError;
use snafu::{Backtrace, Snafu};

#[derive(Debug, Snafu)]
#[snafu(visibility = "pub")]
pub enum Error {
#[snafu(display(
"Unimplemented influxql statement, msg: {}.\nBacktrace:{}",
msg,
backtrace
))]
Unimplemented { msg: String, backtrace: Backtrace },

#[snafu(display(
"Failed to rewrite influxql from statement with cause, msg:{}, source:{}",
msg,
source
))]
RewriteWithCause { msg: String, source: GenericError },

#[snafu(display(
"Failed to rewrite influxql from statement no cause, msg:{}.\nBacktrace:{}",
msg,
backtrace
))]
RewriteNoCause { msg: String, backtrace: Backtrace },
}

define_result!(Error);
}
56 changes: 56 additions & 0 deletions sql/src/influxql/planner.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0.

//! Influxql planner.

use common_util::error::BoxError;
use influxdb_influxql_parser::statement::Statement as InfluxqlStatement;
use snafu::ResultExt;
use table_engine::table::TableRef;

use crate::{influxql::error::*, plan::Plan, provider::MetaProvider};

#[allow(dead_code)]
pub(crate) struct Planner<'a, P: MetaProvider> {
sql_planner: crate::planner::PlannerDelegate<'a, P>,
}

impl<'a, P: MetaProvider> Planner<'a, P> {
pub fn new(sql_planner: crate::planner::PlannerDelegate<'a, P>) -> Self {
Self { sql_planner }
}

pub fn statement_to_plan(&self, stmt: InfluxqlStatement) -> Result<Plan> {
match stmt {
InfluxqlStatement::Select(_) => todo!(),
InfluxqlStatement::CreateDatabase(_) => todo!(),
InfluxqlStatement::ShowDatabases(_) => todo!(),
InfluxqlStatement::ShowRetentionPolicies(_) => todo!(),
InfluxqlStatement::ShowTagKeys(_) => todo!(),
InfluxqlStatement::ShowTagValues(_) => todo!(),
InfluxqlStatement::ShowFieldKeys(_) => todo!(),
InfluxqlStatement::ShowMeasurements(_) => todo!(),
InfluxqlStatement::Delete(_) => todo!(),
InfluxqlStatement::DropMeasurement(_) => todo!(),
InfluxqlStatement::Explain(_) => todo!(),
}
}
}

pub trait MeasurementProvider {
fn measurement(&self, measurement_name: &str) -> Result<Option<TableRef>>;
}

pub(crate) struct MeasurementProviderImpl<'a, P: MetaProvider>(
crate::planner::PlannerDelegate<'a, P>,
);

impl<'a, P: MetaProvider> MeasurementProvider for MeasurementProviderImpl<'a, P> {
fn measurement(&self, measurement_name: &str) -> Result<Option<TableRef>> {
self.0
.find_table(measurement_name)
.box_err()
.context(RewriteWithCause {
msg: format!("failed to find measurement, measurement:{measurement_name}"),
})
}
}
Loading

0 comments on commit e57d339

Please sign in to comment.