Skip to content

Commit

Permalink
feat(tql): add initial implementation for explain & analyze (Greptime…
Browse files Browse the repository at this point in the history
…Team#1427)

* feat(tql): resolve conflicts after merge,formatting and clippy issues, add sqlness tests, adjust explain with start, end, step

* feat(tql): adjust sqlness assertions
  • Loading branch information
etolbakov authored May 15, 2023
1 parent 2fd1075 commit 122bd5f
Show file tree
Hide file tree
Showing 12 changed files with 347 additions and 32 deletions.
4 changes: 1 addition & 3 deletions src/datanode/src/instance/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,7 @@ impl Instance {
) -> Result<Output> {
let query = PromQuery {
query: promql.to_string(),
start: "0".to_string(),
end: "0".to_string(),
step: "5m".to_string(),
..PromQuery::default()
};
let mut stmt = QueryLanguageParser::parse_promql(&query).context(ExecuteSqlSnafu)?;
match &mut stmt {
Expand Down
51 changes: 35 additions & 16 deletions src/frontend/src/statement/tql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,41 +12,60 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;

use common_query::Output;
use query::parser::{PromQuery, QueryLanguageParser};
use query::parser::{PromQuery, QueryLanguageParser, ANALYZE_NODE_NAME, EXPLAIN_NODE_NAME};
use session::context::QueryContextRef;
use snafu::ResultExt;
use sql::statements::tql::Tql;

use crate::error::{
ExecLogicalPlanSnafu, NotSupportedSnafu, ParseQuerySnafu, PlanStatementSnafu, Result,
};
use crate::error::{ExecLogicalPlanSnafu, ParseQuerySnafu, PlanStatementSnafu, Result};
use crate::statement::StatementExecutor;

impl StatementExecutor {
pub(super) async fn execute_tql(&self, tql: Tql, query_ctx: QueryContextRef) -> Result<Output> {
let plan = match tql {
let stmt = match tql {
Tql::Eval(eval) => {
let promql = PromQuery {
start: eval.start,
end: eval.end,
step: eval.step,
query: eval.query,
};
let stmt = QueryLanguageParser::parse_promql(&promql).context(ParseQuerySnafu)?;
self.query_engine
.planner()
.plan(stmt, query_ctx.clone())
.await
.context(PlanStatementSnafu)?
QueryLanguageParser::parse_promql(&promql).context(ParseQuerySnafu)?
}
Tql::Explain(_) => {
return NotSupportedSnafu {
feat: "TQL EXPLAIN",
}
.fail()
Tql::Explain(explain) => {
let promql = PromQuery {
query: explain.query,
..PromQuery::default()
};
let params = HashMap::from([("name".to_string(), EXPLAIN_NODE_NAME.to_string())]);
QueryLanguageParser::parse_promql(&promql)
.context(ParseQuerySnafu)?
.post_process(params)
.unwrap()
}
Tql::Analyze(tql_analyze) => {
let promql = PromQuery {
start: tql_analyze.start,
end: tql_analyze.end,
step: tql_analyze.step,
query: tql_analyze.query,
};
let params = HashMap::from([("name".to_string(), ANALYZE_NODE_NAME.to_string())]);
QueryLanguageParser::parse_promql(&promql)
.context(ParseQuerySnafu)?
.post_process(params)
.unwrap()
}
};
let plan = self
.query_engine
.planner()
.plan(stmt, query_ctx.clone())
.await
.context(PlanStatementSnafu)?;
self.query_engine
.execute(plan, query_ctx)
.await
Expand Down
28 changes: 24 additions & 4 deletions src/promql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,10 +393,30 @@ impl PromPlanner {
.build()
.context(DataFusionPlanningSnafu)?
}
PromExpr::Extension(_) => UnsupportedExprSnafu {
name: "Prom Extension",
PromExpr::Extension(promql_parser::parser::ast::Extension { expr }) => {
let children = expr.children();
let plan = self.prom_expr_to_plan(children[0].clone()).await?;
// Wrapper for the explanation/analyze of the existing plan
// https://docs.rs/datafusion-expr/latest/datafusion_expr/logical_plan/builder/struct.LogicalPlanBuilder.html#method.explain
// if `analyze` is true, runs the actual plan and produces
// information about metrics during run.
// if `verbose` is true, prints out additional details when VERBOSE keyword is specified
match expr.name() {
"ANALYZE" => LogicalPlanBuilder::from(plan)
.explain(false, true)
.unwrap()
.build()
.context(DataFusionPlanningSnafu)?,
"EXPLAIN" => LogicalPlanBuilder::from(plan)
.explain(false, false)
.unwrap()
.build()
.context(DataFusionPlanningSnafu)?,
_ => LogicalPlanBuilder::empty(true)
.build()
.context(DataFusionPlanningSnafu)?,
}
}
.fail()?,
};
Ok(res)
}
Expand Down Expand Up @@ -559,7 +579,7 @@ impl PromPlanner {
Ok(logical_plan)
}

/// Convert [AggModifier] to [Column] exprs for aggregation.
/// Convert [LabelModifier] to [Column] exprs for aggregation.
/// Timestamp column and tag columns will be included.
///
/// # Side effect
Expand Down
7 changes: 7 additions & 0 deletions src/query/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ pub enum Error {
#[snafu(display("Unsupported expr type: {}", name))]
UnsupportedExpr { name: String, location: Location },

#[snafu(display("Operation {} not implemented yet", operation))]
Unimplemented {
operation: String,
location: Location,
},

#[snafu(display("General catalog error: {}", source))]
Catalog {
#[snafu(backtrace)]
Expand Down Expand Up @@ -183,6 +189,7 @@ impl ErrorExt for Error {
match self {
QueryParse { .. } | MultipleStatements { .. } => StatusCode::InvalidSyntax,
UnsupportedExpr { .. }
| Unimplemented { .. }
| CatalogNotFound { .. }
| SchemaNotFound { .. }
| TableNotFound { .. }
Expand Down
104 changes: 102 additions & 2 deletions src/query/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,32 +12,77 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, SystemTime};

use chrono::DateTime;
use common_error::ext::PlainError;
use common_error::prelude::BoxedError;
use common_error::status_code::StatusCode;
use common_telemetry::timer;
use promql_parser::parser::EvalStmt;
use promql_parser::parser::ast::{Extension as NodeExtension, ExtensionExpr};
use promql_parser::parser::Expr::Extension;
use promql_parser::parser::{EvalStmt, Expr, ValueType};
use snafu::ResultExt;
use sql::dialect::GenericDialect;
use sql::parser::ParserContext;
use sql::statements::statement::Statement;

use crate::error::{
MultipleStatementsSnafu, ParseFloatSnafu, ParseTimestampSnafu, QueryParseSnafu, Result,
UnimplementedSnafu,
};
use crate::metrics::{METRIC_PARSE_PROMQL_ELAPSED, METRIC_PARSE_SQL_ELAPSED};

const DEFAULT_LOOKBACK: u64 = 5 * 60; // 5m
pub const EXPLAIN_NODE_NAME: &str = "EXPLAIN";
pub const ANALYZE_NODE_NAME: &str = "ANALYZE";

#[derive(Debug, Clone)]
pub enum QueryStatement {
Sql(Statement),
Promql(EvalStmt),
}

impl QueryStatement {
pub fn post_process(&self, params: HashMap<String, String>) -> Result<QueryStatement> {
match self {
QueryStatement::Sql(_) => UnimplementedSnafu {
operation: "sql post process",
}
.fail(),
QueryStatement::Promql(eval_stmt) => {
let node_name = match params.get("name") {
Some(name) => name.as_str(),
None => "",
};
let extension_node = Self::create_extension_node(node_name, &eval_stmt.expr);
Ok(QueryStatement::Promql(EvalStmt {
expr: Extension(extension_node.unwrap()),
start: eval_stmt.start,
end: eval_stmt.end,
interval: eval_stmt.interval,
lookback_delta: eval_stmt.lookback_delta,
}))
}
}
}

fn create_extension_node(node_name: &str, expr: &Expr) -> Option<NodeExtension> {
match node_name {
ANALYZE_NODE_NAME => Some(NodeExtension {
expr: Arc::new(AnalyzeExpr { expr: expr.clone() }),
}),
EXPLAIN_NODE_NAME => Some(NodeExtension {
expr: Arc::new(ExplainExpr { expr: expr.clone() }),
}),
_ => None,
}
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PromQuery {
pub query: String,
Expand All @@ -46,6 +91,17 @@ pub struct PromQuery {
pub step: String,
}

impl Default for PromQuery {
fn default() -> Self {
PromQuery {
query: String::new(),
start: String::from("0"),
end: String::from("0"),
step: String::from("5m"),
}
}
}

pub struct QueryLanguageParser {}

impl QueryLanguageParser {
Expand All @@ -66,7 +122,6 @@ impl QueryLanguageParser {
}
}

// TODO(ruihang): implement this method when parser is ready.
pub fn parse_promql(query: &PromQuery) -> Result<QueryStatement> {
let _timer = timer!(METRIC_PARSE_PROMQL_ELAPSED);

Expand Down Expand Up @@ -142,6 +197,51 @@ fn max_system_timestamp() -> SystemTime {
.unwrap()
}

macro_rules! define_node_ast_extension {
($name:ident, $name_expr:ident, $expr_type:ty, $extension_name:expr) => {
/// The implementation of the `$name_expr` extension AST node
#[derive(Debug, Clone)]
pub struct $name_expr {
pub expr: $expr_type,
}

impl ExtensionExpr for $name_expr {
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
$extension_name
}

fn value_type(&self) -> ValueType {
self.expr.value_type()
}

fn children(&self) -> &[Expr] {
std::slice::from_ref(&self.expr)
}
}

#[allow(rustdoc::broken_intra_doc_links)]
#[derive(Debug, Clone)]
pub struct $name {
pub expr: Arc<$name_expr>,
}

impl $name {
pub fn new(expr: $expr_type) -> Self {
Self {
expr: Arc::new($name_expr { expr }),
}
}
}
};
}

define_node_ast_extension!(Analyze, AnalyzeExpr, Expr, ANALYZE_NODE_NAME);
define_node_ast_extension!(Explain, ExplainExpr, Expr, EXPLAIN_NODE_NAME);

#[cfg(test)]
mod test {
use super::*;
Expand Down
Loading

0 comments on commit 122bd5f

Please sign in to comment.