Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(tql): introduce HybridTql #4284

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ pub fn check_permission(

match stmt {
// These are executed by query engine, and will be checked there.
Statement::Query(_) | Statement::Explain(_) | Statement::Tql(_) | Statement::Delete(_) => {}
Statement::Query(_) | Statement::Explain(_) | Statement::Tql(_) | Statement::Tqls(_) | Statement::HybridTql(_,_) | Statement::Delete(_) => {}
// database ops won't be checked
Statement::CreateDatabase(_)
| Statement::ShowDatabases(_)
Expand Down
22 changes: 21 additions & 1 deletion src/sql/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,18 @@ impl<'a> ParserContext<'a> {

Keyword::INSERT => self.parse_insert(),

Keyword::SELECT | Keyword::WITH | Keyword::VALUES => self.parse_query(),
Keyword::SELECT | Keyword::VALUES => self.parse_query(),

Keyword::WITH => {
if self.sql.contains(tql_parser::TQL) { // can replace with something like 'find_next_delimiter_token'
let tql_result = self.parse_tql_cte();
let sql_result = self.parse_statement();
let result = self.create_hybrid_statement(tql_result, sql_result);
result
} else {
self.parse_query()
}
}

Keyword::ALTER => self.parse_alter(),

Expand Down Expand Up @@ -287,6 +298,15 @@ impl<'a> ParserContext<'a> {
pub(crate) fn parse_identifier(&mut self) -> std::result::Result<Ident, ParserError> {
self.parser.parse_identifier(false)
}

fn create_hybrid_statement(&self, tql: Result<Statement>, sql: Result<Statement>) -> Result<Statement> {
let tql = tql?.clone();
let sql = sql?.clone();
return match (tql, sql) {
(Statement::Tqls(tvec), Statement::Query(q)) => Ok(Statement::HybridTql(tvec, q)),
_ => self.unsupported("cte".to_string()),
};
}
}

#[cfg(test)]
Expand Down
85 changes: 85 additions & 0 deletions src/sql/src/parsers/tql_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use datafusion_common::ScalarValue;
use itertools::Itertools;
use snafu::{OptionExt, ResultExt};
use sqlparser::keywords::Keyword;
use sqlparser::parser::ParserError;
Expand All @@ -39,6 +40,84 @@ use crate::parsers::error::{EvaluationSnafu, ParserSnafu, TQLError};
/// - `TQL EXPLAIN [VERBOSE] <query>`
/// - `TQL ANALYZE [VERBOSE] <query>`
impl<'a> ParserContext<'a> {

pub(crate) fn parse_tql_cte(&mut self) -> Result<Statement> {
let parser = &mut self.parser;
let _skip_with = parser.next_token();
let mut tqls = Vec::new();
loop {
let cte_name = parser.parse_identifier(false).unwrap().to_string();
parser.expect_keyword(Keyword::AS).unwrap();
parser.expect_token(&Token::LParen).unwrap();

let _skip_tql_keyword = parser.next_token();
let _skip_eval_keyword = parser.next_token();

let (start, end, step, lookback) = match parser.peek_token().token {
Token::LParen => {
let _consume_lparen_token = parser.next_token();
let start = Self::parse_string_or_number_or_word(parser, &[Token::Comma]).unwrap().0;
let end = Self::parse_string_or_number_or_word(parser, &[Token::Comma]).unwrap().0;
let (step, delimiter) =
Self::parse_string_or_number_or_word(parser, &[Token::Comma, Token::RParen]).unwrap();
let lookback = if delimiter == Token::Comma {
Self::parse_string_or_number_or_word(parser, &[Token::RParen])
.ok()
.map(|t| t.0)
} else {
None
};
(start, end, step, lookback)
}
_ => ("0".to_string(), "0".to_string(), "5m".to_string(), None),
};
let cte_query = Self::parse_tql_cte_query(parser, self.sql).unwrap();

let parameters = TqlParameters::new(start, end, step, lookback, cte_query.clone()).with_name(Some(cte_name));
let tql = Tql::Eval(TqlEval::from(parameters));
tqls.push(tql);

if parser.peek_token() == Token::Comma {
let _skip_comma = parser.next_token();
} else {
break;
}
}
Ok(Statement::Tqls(tqls))
}

fn parse_tql_cte_query(parser: &mut Parser, sql: &str) -> std::result::Result<String, ParserError> {
while matches!(parser.peek_token().token, Token::Comma) {
let _skip_token = parser.next_token();
dbg!(_skip_token);
}

let location = parser.next_token();
let query_start_index = location.location.column as usize;
if query_start_index == 0 {
return Err(ParserError::ParserError("empty TQL query".to_string()));
}
let mut end_pos = query_start_index;
let mut paren_count = 1; // we already start with (TQL

while parser.peek_token() != Token::EOF {
let token = parser.next_token();
end_pos = token.location.column as usize;

match token.token {
Token::LParen => paren_count += 1,
Token::RParen => {
paren_count -= 1;
if paren_count == 0 {
break;
}
}
_ => {}
}
}
Ok(sql[(query_start_index - 1)..(end_pos - 1)].trim().to_string())
}

pub(crate) fn parse_tql(&mut self) -> Result<Statement> {
let _ = self.parser.next_token();

Expand Down Expand Up @@ -244,6 +323,12 @@ mod tests {
result.remove(0)
}

#[test]
fn test_cte_first() {
let sql = "WITH cte_test_a AS (TQL EVAL (0, 10, '1s', '2s') host_cpu{host=\"host1\"}), cte_test_b AS (TQL EVAL (5, 15, '5s', '10s') host_memory{host=\"host1\"} + host_swap{host=\"host1\"}), cte_test_c AS (TQL EVAL (0, 20, '2s') host_disk{host=\"host1\"}) SELECT * FROM cte_test_a JOIN cte_test_b ON cte_test_a.timestamp = cte_test_b.timestamp JOIN cte_test_c ON cte_test_a.timestamp = cte_test_c.timestamp WHERE cte_test_a.value > 80";
let statement = parse_into_statement(sql);
}

#[test]
fn test_parse_tql_eval_with_functions() {
let sql = "TQL EVAL (now() - now(), now() - (now() - '10 seconds'::interval), '1s') http_requests_total{environment=~'staging|testing|development',method!='GET'} @ 1609746000 offset 5m";
Expand Down
9 changes: 9 additions & 0 deletions src/sql/src/statements/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,12 @@ pub enum Statement {
Explain(Explain),
// COPY
Copy(crate::statements::copy::Copy),
// TQL
Tql(Tql),
// Several TQL expression (possibly a redundant step)
Tqls(Vec<Tql>),
// CTE TQL
HybridTql(Vec<Tql>, Box<Query>),
// TRUNCATE TABLE
TruncateTable(TruncateTable),
// SET VARIABLES
Expand Down Expand Up @@ -128,6 +133,10 @@ impl Display for Statement {
Statement::Explain(s) => s.fmt(f),
Statement::Copy(s) => s.fmt(f),
Statement::Tql(s) => s.fmt(f),
Statement::Tqls(_) => {
write!(f, "TODO later if we need it")
},
Statement::HybridTql(_,s) => s.fmt(f),
Statement::TruncateTable(s) => s.fmt(f),
Statement::SetVariables(s) => s.fmt(f),
Statement::ShowVariables(s) => s.fmt(f),
Expand Down
14 changes: 14 additions & 0 deletions src/sql/src/statements/tql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ pub struct TqlParameters {
lookback: Option<String>,
query: String,
pub is_verbose: bool,
name: Option<String>,
}

impl TqlParameters {
Expand All @@ -158,6 +159,19 @@ impl TqlParameters {
lookback,
query,
is_verbose: false,
name: None,
}
}

pub fn with_name(self, name: Option<String>) -> Self {
TqlParameters {
start: self.start,
end: self.end,
step: self.step,
lookback: self.lookback,
query: self.query,
is_verbose: false,
name,
}
}
}
Expand Down
Loading