Skip to content

Commit 712c7e2

Browse files
chore: find tables from DFParser, schema merge when required
find table list from DFParser create logical plan for the query if fails, create from storage, merge schema then create logical plan again
1 parent b99913f commit 712c7e2

File tree

9 files changed

+129
-148
lines changed

9 files changed

+129
-148
lines changed

src/alerts/alerts_utils.rs

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,14 @@ use datafusion::{
2929
logical_expr::{BinaryExpr, Literal, Operator},
3030
prelude::{col, lit, DataFrame, Expr},
3131
};
32-
use tracing::trace;
32+
use tokio::task::JoinSet;
33+
use tracing::{trace, warn};
3334

3435
use crate::{
35-
alerts::LogicalOperator, parseable::PARSEABLE, query::QUERY_SESSION, utils::time::TimeRange,
36+
alerts::LogicalOperator,
37+
parseable::PARSEABLE,
38+
query::{resolve_stream_names, QUERY_SESSION},
39+
utils::time::TimeRange,
3640
};
3741

3842
use super::{
@@ -71,11 +75,37 @@ async fn prepare_query(alert: &AlertConfig) -> Result<crate::query::Query, Alert
7175

7276
let session_state = QUERY_SESSION.state();
7377
let select_query = alert.get_base_query();
74-
let raw_logical_plan = session_state.create_logical_plan(&select_query).await?;
75-
7678
let time_range = TimeRange::parse_human_time(start_time, end_time)
7779
.map_err(|err| AlertError::CustomError(err.to_string()))?;
7880

81+
let streams = resolve_stream_names(&select_query)?;
82+
let raw_logical_plan = match session_state.create_logical_plan(&select_query).await {
83+
Ok(plan) => plan,
84+
Err(_) => {
85+
let mut join_set = JoinSet::new();
86+
for stream_name in streams {
87+
let stream_name = stream_name.clone();
88+
join_set.spawn(async move {
89+
let result = PARSEABLE
90+
.create_stream_and_schema_from_storage(&stream_name)
91+
.await;
92+
93+
if let Err(e) = &result {
94+
warn!("Failed to create stream '{}': {}", stream_name, e);
95+
}
96+
97+
(stream_name, result)
98+
});
99+
}
100+
101+
while let Some(result) = join_set.join_next().await {
102+
if let Err(join_error) = result {
103+
warn!("Task join error: {}", join_error);
104+
}
105+
}
106+
session_state.create_logical_plan(&select_query).await?
107+
}
108+
};
79109
Ok(crate::query::Query {
80110
raw_logical_plan,
81111
time_range,
@@ -87,7 +117,8 @@ async fn execute_base_query(
87117
query: &crate::query::Query,
88118
original_query: &str,
89119
) -> Result<DataFrame, AlertError> {
90-
let stream_name = query.first_table_name().ok_or_else(|| {
120+
let streams = resolve_stream_names(&original_query)?;
121+
let stream_name = streams.first().ok_or_else(|| {
91122
AlertError::CustomError(format!("Table name not found in query- {original_query}"))
92123
})?;
93124

src/alerts/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
use actix_web::http::header::ContentType;
2020
use async_trait::async_trait;
2121
use chrono::Utc;
22+
use datafusion::sql::sqlparser::parser::ParserError;
2223
use derive_more::derive::FromStr;
2324
use derive_more::FromStrError;
2425
use http::StatusCode;
@@ -860,6 +861,8 @@ pub enum AlertError {
860861
InvalidTargetModification(String),
861862
#[error("Can't delete a Target which is being used")]
862863
TargetInUse,
864+
#[error("{0}")]
865+
ParserError(#[from] ParserError),
863866
}
864867

865868
impl actix_web::ResponseError for AlertError {
@@ -880,6 +883,7 @@ impl actix_web::ResponseError for AlertError {
880883
Self::InvalidTargetID(_) => StatusCode::BAD_REQUEST,
881884
Self::InvalidTargetModification(_) => StatusCode::BAD_REQUEST,
882885
Self::TargetInUse => StatusCode::CONFLICT,
886+
Self::ParserError(_) => StatusCode::BAD_REQUEST,
883887
}
884888
}
885889

src/correlation.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ impl Correlations {
8787
.iter()
8888
.map(|t| t.table_name.clone())
8989
.collect_vec();
90-
if user_auth_for_datasets(&permissions, tables).is_ok() {
90+
if user_auth_for_datasets(&permissions, tables).await.is_ok() {
9191
user_correlations.push(correlation.clone());
9292
}
9393
}
@@ -281,7 +281,7 @@ impl CorrelationConfig {
281281
.map(|t| t.table_name.clone())
282282
.collect_vec();
283283

284-
user_auth_for_datasets(&permissions, tables)?;
284+
user_auth_for_datasets(&permissions, tables).await?;
285285

286286
// to validate table config, we need to check whether the mentioned fields
287287
// are present in the table or not

src/handlers/airplane.rs

Lines changed: 10 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@ use arrow_array::RecordBatch;
2020
use arrow_flight::flight_service_server::FlightServiceServer;
2121
use arrow_flight::PollInfo;
2222
use arrow_schema::ArrowError;
23-
24-
use datafusion::common::tree_node::TreeNode;
2523
use serde_json::json;
2624
use std::net::SocketAddr;
2725
use std::time::Instant;
@@ -35,11 +33,11 @@ use tonic_web::GrpcWebLayer;
3533

3634
use crate::handlers::http::cluster::get_node_info;
3735
use crate::handlers::http::modal::{NodeMetadata, NodeType};
38-
use crate::handlers::http::query::{into_query, update_schema_when_distributed};
36+
use crate::handlers::http::query::into_query;
3937
use crate::handlers::livetail::cross_origin_config;
4038
use crate::metrics::QUERY_EXECUTE_TIME;
4139
use crate::parseable::PARSEABLE;
42-
use crate::query::{execute, TableScanVisitor, QUERY_SESSION};
40+
use crate::query::{execute, resolve_stream_names, QUERY_SESSION};
4341
use crate::utils::arrow::flight::{
4442
append_temporary_events, get_query_from_ticket, into_flight_data, run_do_get_rpc,
4543
send_to_ingester,
@@ -131,40 +129,27 @@ impl FlightService for AirServiceImpl {
131129

132130
let ticket =
133131
get_query_from_ticket(&req).map_err(|e| Status::invalid_argument(e.to_string()))?;
134-
132+
let streams = resolve_stream_names(&ticket.query).map_err(|e| {
133+
error!("Failed to extract table names from SQL: {}", e);
134+
Status::invalid_argument("Invalid SQL query syntax")
135+
})?;
135136
info!("query requested to airplane: {:?}", ticket);
136137

137138
// get the query session_state
138139
let session_state = QUERY_SESSION.state();
139140

140-
// get the logical plan and extract the table name
141-
let raw_logical_plan = session_state
142-
.create_logical_plan(&ticket.query)
143-
.await
144-
.map_err(|err| {
145-
error!("Datafusion Error: Failed to create logical plan: {}", err);
146-
Status::internal("Failed to create logical plan")
147-
})?;
148-
149141
let time_range = TimeRange::parse_human_time(&ticket.start_time, &ticket.end_time)
150142
.map_err(|e| Status::internal(e.to_string()))?;
151143
// create a visitor to extract the table name
152-
let mut visitor = TableScanVisitor::default();
153-
let _ = raw_logical_plan.visit(&mut visitor);
154-
155-
let streams = visitor.into_inner();
156144

157145
let stream_name = streams
158-
.first()
146+
.iter()
147+
.next()
159148
.ok_or_else(|| Status::aborted("Malformed SQL Provided, Table Name Not Found"))?
160149
.to_owned();
161150

162-
update_schema_when_distributed(&streams)
163-
.await
164-
.map_err(|err| Status::internal(err.to_string()))?;
165-
166151
// map payload to query
167-
let query = into_query(&ticket, &session_state, time_range)
152+
let query = into_query(&ticket, &session_state, time_range, &streams)
168153
.await
169154
.map_err(|_| Status::internal("Failed to parse query"))?;
170155

@@ -214,7 +199,7 @@ impl FlightService for AirServiceImpl {
214199

215200
let permissions = Users.get_permissions(&key);
216201

217-
user_auth_for_datasets(&permissions, &streams).map_err(|_| {
202+
user_auth_for_datasets(&permissions, &streams).await.map_err(|_| {
218203
Status::permission_denied("User Does not have permission to access this")
219204
})?;
220205
let time = Instant::now();

src/handlers/http/correlation.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ pub async fn get(
5454
.map(|t| t.table_name.clone())
5555
.collect_vec();
5656

57-
user_auth_for_datasets(&permissions, tables)?;
57+
user_auth_for_datasets(&permissions, tables).await?;
5858

5959
Ok(web::Json(correlation))
6060
}

src/handlers/http/query.rs

Lines changed: 49 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@ use actix_web::{Either, FromRequest, HttpRequest, HttpResponse, Responder};
2525
use arrow_array::RecordBatch;
2626
use bytes::Bytes;
2727
use chrono::{DateTime, Utc};
28-
use datafusion::common::tree_node::TreeNode;
2928
use datafusion::error::DataFusionError;
3029
use datafusion::execution::context::SessionState;
30+
use datafusion::sql::sqlparser::parser::ParserError;
3131
use futures::stream::once;
3232
use futures::{future, Stream, StreamExt};
3333
use futures_util::Future;
@@ -44,11 +44,10 @@ use tracing::{error, warn};
4444

4545
use crate::event::commit_schema;
4646
use crate::metrics::QUERY_EXECUTE_TIME;
47-
use crate::option::Mode;
4847
use crate::parseable::{StreamNotFound, PARSEABLE};
4948
use crate::query::error::ExecuteError;
49+
use crate::query::{resolve_stream_names, QUERY_SESSION};
5050
use crate::query::{execute, CountsRequest, Query as LogicalQuery};
51-
use crate::query::{TableScanVisitor, QUERY_SESSION};
5251
use crate::rbac::Users;
5352
use crate::response::QueryResponse;
5453
use crate::storage::ObjectStorageError;
@@ -81,31 +80,21 @@ pub async fn get_records_and_fields(
8180
query_request: &Query,
8281
req: &HttpRequest,
8382
) -> Result<(Option<Vec<RecordBatch>>, Option<Vec<String>>), QueryError> {
83+
let tables = resolve_stream_names(&query_request.query)?;
8484
let session_state = QUERY_SESSION.state();
8585

86-
// get the logical plan and extract the table name
87-
let raw_logical_plan = session_state
88-
.create_logical_plan(&query_request.query)
89-
.await?;
90-
9186
let time_range =
9287
TimeRange::parse_human_time(&query_request.start_time, &query_request.end_time)?;
93-
// create a visitor to extract the table name
94-
let mut visitor = TableScanVisitor::default();
95-
let _ = raw_logical_plan.visit(&mut visitor);
96-
97-
let tables = visitor.into_inner();
98-
update_schema_when_distributed(&tables).await?;
99-
let query: LogicalQuery = into_query(query_request, &session_state, time_range).await?;
10088

89+
let query: LogicalQuery =
90+
into_query(query_request, &session_state, time_range, &tables).await?;
10191
let creds = extract_session_key_from_req(req)?;
10292
let permissions = Users.get_permissions(&creds);
10393

104-
let table_name = query
105-
.first_table_name()
94+
let table_name = tables
95+
.first()
10696
.ok_or_else(|| QueryError::MalformedQuery("No table name found in query"))?;
107-
108-
user_auth_for_datasets(&permissions, &tables)?;
97+
user_auth_for_datasets(&permissions, &tables).await?;
10998

11099
let (records, fields) = execute(query, &table_name, false).await?;
111100

@@ -121,35 +110,18 @@ pub async fn get_records_and_fields(
121110

122111
pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpResponse, QueryError> {
123112
let session_state = QUERY_SESSION.state();
124-
let raw_logical_plan = match session_state
125-
.create_logical_plan(&query_request.query)
126-
.await
127-
{
128-
Ok(raw_logical_plan) => raw_logical_plan,
129-
Err(_) => {
130-
create_streams_for_querier().await?;
131-
session_state
132-
.create_logical_plan(&query_request.query)
133-
.await?
134-
}
135-
};
136113
let time_range =
137114
TimeRange::parse_human_time(&query_request.start_time, &query_request.end_time)?;
138-
139-
let mut visitor = TableScanVisitor::default();
140-
let _ = raw_logical_plan.visit(&mut visitor);
141-
let tables = visitor.into_inner();
142-
update_schema_when_distributed(&tables).await?;
143-
let query: LogicalQuery = into_query(&query_request, &session_state, time_range).await?;
144-
115+
let tables = resolve_stream_names(&query_request.query)?;
116+
let query: LogicalQuery =
117+
into_query(&query_request, &session_state, time_range, &tables).await?;
145118
let creds = extract_session_key_from_req(&req)?;
146119
let permissions = Users.get_permissions(&creds);
147120

148-
let table_name = query
149-
.first_table_name()
121+
let table_name = tables
122+
.first()
150123
.ok_or_else(|| QueryError::MalformedQuery("No table name found in query"))?;
151-
152-
user_auth_for_datasets(&permissions, &tables)?;
124+
user_auth_for_datasets(&permissions, &tables).await?;
153125

154126
let time = Instant::now();
155127

@@ -372,7 +344,7 @@ pub async fn get_counts(
372344
let body = counts_request.into_inner();
373345

374346
// does user have access to table?
375-
user_auth_for_datasets(&permissions, &[body.stream.clone()])?;
347+
user_auth_for_datasets(&permissions, &[body.stream.clone()]).await?;
376348

377349
// if the user has given a sql query (counts call with filters applied), then use this flow
378350
// this could include filters or group by
@@ -420,11 +392,9 @@ pub async fn update_schema_when_distributed(tables: &Vec<String>) -> Result<(),
420392
// if the mode is query or prism, we need to update the schema in memory
421393
// no need to commit schema to storage
422394
// as the schema is read from memory everytime
423-
if PARSEABLE.options.mode == Mode::Query || PARSEABLE.options.mode == Mode::Prism {
424-
for table in tables {
425-
if let Ok(new_schema) = fetch_schema(table).await {
426-
commit_schema(table, Arc::new(new_schema))?;
427-
}
395+
for table in tables {
396+
if let Ok(new_schema) = fetch_schema(table).await {
397+
commit_schema(table, Arc::new(new_schema))?;
428398
}
429399
}
430400

@@ -520,6 +490,7 @@ pub async fn into_query(
520490
query: &Query,
521491
session_state: &SessionState,
522492
time_range: TimeRange,
493+
tables: &Vec<String>,
523494
) -> Result<LogicalQuery, QueryError> {
524495
if query.query.is_empty() {
525496
return Err(QueryError::EmptyQuery);
@@ -532,9 +503,36 @@ pub async fn into_query(
532503
if query.end_time.is_empty() {
533504
return Err(QueryError::EmptyEndTime);
534505
}
506+
let raw_logical_plan = match session_state.create_logical_plan(&query.query).await {
507+
Ok(plan) => plan,
508+
Err(_) => {
509+
let mut join_set = JoinSet::new();
510+
for stream_name in tables {
511+
let stream_name = stream_name.clone();
512+
join_set.spawn(async move {
513+
let result = PARSEABLE
514+
.create_stream_and_schema_from_storage(&stream_name)
515+
.await;
516+
517+
if let Err(e) = &result {
518+
warn!("Failed to create stream '{}': {}", stream_name, e);
519+
}
520+
521+
(stream_name, result)
522+
});
523+
}
524+
525+
while let Some(result) = join_set.join_next().await {
526+
if let Err(join_error) = result {
527+
warn!("Task join error: {}", join_error);
528+
}
529+
}
530+
session_state.create_logical_plan(&query.query).await?
531+
}
532+
};
535533

536534
Ok(crate::query::Query {
537-
raw_logical_plan: session_state.create_logical_plan(&query.query).await?,
535+
raw_logical_plan,
538536
time_range,
539537
filter_tag: query.filter_tags.clone(),
540538
})
@@ -618,6 +616,8 @@ Description: {0}"#
618616
CustomError(String),
619617
#[error("No available queriers found")]
620618
NoAvailableQuerier,
619+
#[error("{0}")]
620+
ParserError(#[from] ParserError),
621621
}
622622

623623
impl actix_web::ResponseError for QueryError {

0 commit comments

Comments
 (0)