Skip to content

chore: find tables from DFParser, schema merge when required #1380

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jul 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 45 additions & 7 deletions src/alerts/alerts_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,15 @@ use datafusion::{
logical_expr::{BinaryExpr, Literal, Operator},
prelude::{col, lit, DataFrame, Expr},
};
use tracing::trace;
use tokio::task::JoinSet;
use tracing::{trace, warn};

use crate::{
alerts::LogicalOperator, parseable::PARSEABLE, query::QUERY_SESSION, utils::time::TimeRange,
alerts::LogicalOperator,
handlers::http::query::update_schema_when_distributed,
parseable::PARSEABLE,
query::{resolve_stream_names, QUERY_SESSION},
utils::time::TimeRange,
};

use super::{
Expand Down Expand Up @@ -71,11 +76,37 @@ async fn prepare_query(alert: &AlertConfig) -> Result<crate::query::Query, Alert

let session_state = QUERY_SESSION.state();
let select_query = alert.get_base_query();
let raw_logical_plan = session_state.create_logical_plan(&select_query).await?;

let time_range = TimeRange::parse_human_time(start_time, end_time)
.map_err(|err| AlertError::CustomError(err.to_string()))?;

let streams = resolve_stream_names(&select_query)?;
let raw_logical_plan = match session_state.create_logical_plan(&select_query).await {
Ok(plan) => plan,
Err(_) => {
let mut join_set = JoinSet::new();
for stream_name in streams {
let stream_name = stream_name.clone();
join_set.spawn(async move {
let result = PARSEABLE
.create_stream_and_schema_from_storage(&stream_name)
.await;

if let Err(e) = &result {
warn!("Failed to create stream '{}': {}", stream_name, e);
}

(stream_name, result)
});
}

while let Some(result) = join_set.join_next().await {
if let Err(join_error) = result {
warn!("Task join error: {}", join_error);
}
}
session_state.create_logical_plan(&select_query).await?
}
};
Ok(crate::query::Query {
raw_logical_plan,
time_range,
Expand All @@ -87,11 +118,18 @@ async fn execute_base_query(
query: &crate::query::Query,
original_query: &str,
) -> Result<DataFrame, AlertError> {
let stream_name = query.first_table_name().ok_or_else(|| {
let streams = resolve_stream_names(original_query)?;
let stream_name = streams.first().ok_or_else(|| {
AlertError::CustomError(format!("Table name not found in query- {original_query}"))
})?;

let time_partition = PARSEABLE.get_stream(&stream_name)?.get_time_partition();
update_schema_when_distributed(&streams)
.await
.map_err(|err| {
AlertError::CustomError(format!(
"Failed to update schema for distributed streams: {err}"
))
})?;
let time_partition = PARSEABLE.get_stream(stream_name)?.get_time_partition();
query
.get_dataframe(time_partition.as_ref())
.await
Expand Down
4 changes: 4 additions & 0 deletions src/alerts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use actix_web::http::header::ContentType;
use async_trait::async_trait;
use chrono::Utc;
use datafusion::sql::sqlparser::parser::ParserError;
use derive_more::derive::FromStr;
use derive_more::FromStrError;
use http::StatusCode;
Expand Down Expand Up @@ -860,6 +861,8 @@ pub enum AlertError {
InvalidTargetModification(String),
#[error("Can't delete a Target which is being used")]
TargetInUse,
#[error("{0}")]
ParserError(#[from] ParserError),
}

impl actix_web::ResponseError for AlertError {
Expand All @@ -880,6 +883,7 @@ impl actix_web::ResponseError for AlertError {
Self::InvalidTargetID(_) => StatusCode::BAD_REQUEST,
Self::InvalidTargetModification(_) => StatusCode::BAD_REQUEST,
Self::TargetInUse => StatusCode::CONFLICT,
Self::ParserError(_) => StatusCode::BAD_REQUEST,
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/correlation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl Correlations {
.iter()
.map(|t| t.table_name.clone())
.collect_vec();
if user_auth_for_datasets(&permissions, tables).is_ok() {
if user_auth_for_datasets(&permissions, tables).await.is_ok() {
user_correlations.push(correlation.clone());
}
}
Expand Down Expand Up @@ -281,7 +281,7 @@ impl CorrelationConfig {
.map(|t| t.table_name.clone())
.collect_vec();

user_auth_for_datasets(&permissions, tables)?;
user_auth_for_datasets(&permissions, tables).await?;

// to validate table config, we need to check whether the mentioned fields
// are present in the table or not
Expand Down
2 changes: 1 addition & 1 deletion src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ pub fn commit_schema(stream_name: &str, schema: Arc<Schema>) -> Result<(), Stagi

let map = &mut stream_metadata
.get_mut(stream_name)
.expect("map has entry for this stream name")
.ok_or_else(|| StagingError::NotFound(stream_name.to_string()))?
.metadata
.write()
.expect(LOCK_EXPECT)
Expand Down
38 changes: 12 additions & 26 deletions src/handlers/airplane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ use arrow_array::RecordBatch;
use arrow_flight::flight_service_server::FlightServiceServer;
use arrow_flight::PollInfo;
use arrow_schema::ArrowError;

use datafusion::common::tree_node::TreeNode;
use serde_json::json;
use std::net::SocketAddr;
use std::time::Instant;
Expand All @@ -35,11 +33,11 @@ use tonic_web::GrpcWebLayer;

use crate::handlers::http::cluster::get_node_info;
use crate::handlers::http::modal::{NodeMetadata, NodeType};
use crate::handlers::http::query::{into_query, update_schema_when_distributed};
use crate::handlers::http::query::into_query;
use crate::handlers::livetail::cross_origin_config;
use crate::metrics::QUERY_EXECUTE_TIME;
use crate::parseable::PARSEABLE;
use crate::query::{execute, TableScanVisitor, QUERY_SESSION};
use crate::query::{execute, resolve_stream_names, QUERY_SESSION};
use crate::utils::arrow::flight::{
append_temporary_events, get_query_from_ticket, into_flight_data, run_do_get_rpc,
send_to_ingester,
Expand Down Expand Up @@ -131,40 +129,26 @@ impl FlightService for AirServiceImpl {

let ticket =
get_query_from_ticket(&req).map_err(|e| Status::invalid_argument(e.to_string()))?;

let streams = resolve_stream_names(&ticket.query).map_err(|e| {
error!("Failed to extract table names from SQL: {}", e);
Status::invalid_argument("Invalid SQL query syntax")
})?;
info!("query requested to airplane: {:?}", ticket);

// get the query session_state
let session_state = QUERY_SESSION.state();

// get the logical plan and extract the table name
let raw_logical_plan = session_state
.create_logical_plan(&ticket.query)
.await
.map_err(|err| {
error!("Datafusion Error: Failed to create logical plan: {}", err);
Status::internal("Failed to create logical plan")
})?;

let time_range = TimeRange::parse_human_time(&ticket.start_time, &ticket.end_time)
.map_err(|e| Status::internal(e.to_string()))?;
// create a visitor to extract the table name
let mut visitor = TableScanVisitor::default();
let _ = raw_logical_plan.visit(&mut visitor);

let streams = visitor.into_inner();

let stream_name = streams
.first()
.ok_or_else(|| Status::aborted("Malformed SQL Provided, Table Name Not Found"))?
.to_owned();

update_schema_when_distributed(&streams)
.await
.map_err(|err| Status::internal(err.to_string()))?;

// map payload to query
let query = into_query(&ticket, &session_state, time_range)
let query = into_query(&ticket, &session_state, time_range, &streams)
.await
.map_err(|_| Status::internal("Failed to parse query"))?;

Expand Down Expand Up @@ -214,9 +198,11 @@ impl FlightService for AirServiceImpl {

let permissions = Users.get_permissions(&key);

user_auth_for_datasets(&permissions, &streams).map_err(|_| {
Status::permission_denied("User Does not have permission to access this")
})?;
user_auth_for_datasets(&permissions, &streams)
.await
.map_err(|_| {
Status::permission_denied("User Does not have permission to access this")
})?;
let time = Instant::now();

let (records, _) = execute(query, &stream_name, false)
Expand Down
2 changes: 1 addition & 1 deletion src/handlers/http/correlation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub async fn get(
.map(|t| t.table_name.clone())
.collect_vec();

user_auth_for_datasets(&permissions, tables)?;
user_auth_for_datasets(&permissions, tables).await?;

Ok(web::Json(correlation))
}
Expand Down
Loading
Loading