diff --git a/src/alerts/alerts_utils.rs b/src/alerts/alerts_utils.rs index c29d61573..878f78644 100644 --- a/src/alerts/alerts_utils.rs +++ b/src/alerts/alerts_utils.rs @@ -31,6 +31,7 @@ use tracing::trace; use crate::{ alerts::AggregateCondition, + parseable::PARSEABLE, query::{TableScanVisitor, QUERY_SESSION}, rbac::{ map::SessionKey, @@ -137,8 +138,9 @@ async fn execute_base_query( AlertError::CustomError(format!("Table name not found in query- {}", original_query)) })?; + let time_partition = PARSEABLE.get_stream(&stream_name)?.get_time_partition(); query - .get_dataframe(stream_name) + .get_dataframe(time_partition.as_ref()) .await .map_err(|err| AlertError::CustomError(err.to_string())) } diff --git a/src/alerts/mod.rs b/src/alerts/mod.rs index 66857d686..db22754f5 100644 --- a/src/alerts/mod.rs +++ b/src/alerts/mod.rs @@ -37,7 +37,7 @@ use ulid::Ulid; pub mod alerts_utils; pub mod target; -use crate::parseable::PARSEABLE; +use crate::parseable::{StreamNotFound, PARSEABLE}; use crate::query::{TableScanVisitor, QUERY_SESSION}; use crate::rbac::map::SessionKey; use crate::storage; @@ -514,17 +514,16 @@ impl AlertConfig { // for now proceed in a similar fashion as we do in query // TODO: in case of multiple table query does the selection of time partition make a difference? (especially when the tables don't have overlapping data) - let stream_name = if let Some(stream_name) = query.first_table_name() { - stream_name - } else { + let Some(stream_name) = query.first_table_name() else { return Err(AlertError::CustomError(format!( "Table name not found in query- {}", self.query ))); }; + let time_partition = PARSEABLE.get_stream(&stream_name)?.get_time_partition(); let base_df = query - .get_dataframe(stream_name) + .get_dataframe(time_partition.as_ref()) .await .map_err(|err| AlertError::CustomError(err.to_string()))?; @@ -704,6 +703,8 @@ pub enum AlertError { CustomError(String), #[error("Invalid State Change: {0}")] InvalidStateChange(String), + #[error("{0}")] + StreamNotFound(#[from] StreamNotFound), } impl actix_web::ResponseError for AlertError { @@ -717,6 +718,7 @@ impl actix_web::ResponseError for AlertError { Self::DatafusionError(_) => StatusCode::INTERNAL_SERVER_ERROR, Self::CustomError(_) => StatusCode::BAD_REQUEST, Self::InvalidStateChange(_) => StatusCode::BAD_REQUEST, + Self::StreamNotFound(_) => StatusCode::NOT_FOUND, } } diff --git a/src/handlers/airplane.rs b/src/handlers/airplane.rs index f3e0bfcb8..1a72c7470 100644 --- a/src/handlers/airplane.rs +++ b/src/handlers/airplane.rs @@ -38,7 +38,7 @@ use crate::handlers::http::query::{into_query, update_schema_when_distributed}; use crate::handlers::livetail::cross_origin_config; use crate::metrics::QUERY_EXECUTE_TIME; use crate::parseable::PARSEABLE; -use crate::query::{TableScanVisitor, QUERY_SESSION}; +use crate::query::{execute, TableScanVisitor, QUERY_SESSION}; use crate::utils::arrow::flight::{ append_temporary_events, get_query_from_ticket, into_flight_data, run_do_get_rpc, send_to_ingester, @@ -216,13 +216,9 @@ impl FlightService for AirServiceImpl { })?; let time = Instant::now(); - let stream_name_clone = stream_name.clone(); - let (records, _) = - match tokio::task::spawn_blocking(move || query.execute(stream_name_clone)).await { - Ok(Ok((records, fields))) => (records, fields), - Ok(Err(e)) => return Err(Status::internal(e.to_string())), - Err(err) => return Err(Status::internal(err.to_string())), - }; + let (records, _) = execute(query, &stream_name) + .await + .map_err(|err| Status::internal(err.to_string()))?; /* * INFO: No returning the schema with the data. diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index 0d7d0b340..7d9c33a45 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -19,7 +19,6 @@ use actix_web::http::header::ContentType; use actix_web::web::{self, Json}; use actix_web::{FromRequest, HttpRequest, HttpResponse, Responder}; -use arrow_array::RecordBatch; use chrono::{DateTime, Utc}; use datafusion::common::tree_node::TreeNode; use datafusion::error::DataFusionError; @@ -40,9 +39,9 @@ use crate::handlers::http::fetch_schema; use crate::event::commit_schema; use crate::metrics::QUERY_EXECUTE_TIME; use crate::option::Mode; -use crate::parseable::PARSEABLE; +use crate::parseable::{StreamNotFound, PARSEABLE}; use crate::query::error::ExecuteError; -use crate::query::{CountsRequest, CountsResponse, Query as LogicalQuery}; +use crate::query::{execute, CountsRequest, CountsResponse, Query as LogicalQuery}; use crate::query::{TableScanVisitor, QUERY_SESSION}; use crate::rbac::Users; use crate::response::QueryResponse; @@ -131,7 +130,8 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result Result Result<(Vec, Vec), QueryError> { - match tokio::task::spawn_blocking(move || query.execute(stream_name)).await { - Ok(Ok(result)) => Ok(result), - Ok(Err(e)) => Err(QueryError::Execute(e)), - Err(e) => Err(QueryError::Anyhow(anyhow::Error::msg(e.to_string()))), - } -} - pub async fn get_counts( req: HttpRequest, counts_request: Json, @@ -330,6 +319,8 @@ Description: {0}"# ActixError(#[from] actix_web::Error), #[error("Error: {0}")] Anyhow(#[from] anyhow::Error), + #[error("Error: {0}")] + StreamNotFound(#[from] StreamNotFound), } impl actix_web::ResponseError for QueryError { diff --git a/src/parseable/mod.rs b/src/parseable/mod.rs index 6e4f55e94..bee083bce 100644 --- a/src/parseable/mod.rs +++ b/src/parseable/mod.rs @@ -28,7 +28,7 @@ use http::{header::CONTENT_TYPE, HeaderName, HeaderValue, StatusCode}; use once_cell::sync::Lazy; pub use staging::StagingError; use streams::StreamRef; -pub use streams::{StreamNotFound, Streams}; +pub use streams::{Stream, StreamNotFound, Streams}; use tracing::error; #[cfg(feature = "kafka")] diff --git a/src/query/mod.rs b/src/query/mod.rs index fa422e9a1..9fb50b94e 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -42,6 +42,7 @@ use std::ops::Bound; use std::sync::Arc; use stream_schema_provider::collect_manifest_files; use sysinfo::System; +use tokio::runtime::Runtime; use self::error::ExecuteError; use self::stream_schema_provider::GlobalSchemaProvider; @@ -60,6 +61,24 @@ use crate::utils::time::TimeRange; pub static QUERY_SESSION: Lazy = Lazy::new(|| Query::create_session_context(PARSEABLE.storage())); +/// Dedicated multi-threaded runtime to run all queries on +pub static QUERY_RUNTIME: Lazy = + Lazy::new(|| Runtime::new().expect("Runtime should be constructible")); + + +/// This function executes a query on the dedicated runtime, ensuring that the query is not isolated to a single thread/CPU +/// at a time and has access to the entire thread pool, enabling better concurrent processing, and thus quicker results. +pub async fn execute( + query: Query, + stream_name: &str, +) -> Result<(Vec, Vec), ExecuteError> { + let time_partition = PARSEABLE.get_stream(stream_name)?.get_time_partition(); + QUERY_RUNTIME + .spawn(async move { query.execute(time_partition.as_ref()).await }) + .await + .expect("The Join should have been successful") +} + // A query request by client #[derive(Debug)] pub struct Query { @@ -129,15 +148,12 @@ impl Query { SessionContext::new_with_state(state) } - #[tokio::main(flavor = "multi_thread")] pub async fn execute( &self, - stream_name: String, + time_partition: Option<&String>, ) -> Result<(Vec, Vec), ExecuteError> { - let time_partition = PARSEABLE.get_stream(&stream_name)?.get_time_partition(); - let df = QUERY_SESSION - .execute_logical_plan(self.final_logical_plan(&time_partition)) + .execute_logical_plan(self.final_logical_plan(time_partition)) .await?; let fields = df @@ -153,21 +169,23 @@ impl Query { } let results = df.collect().await?; + Ok((results, fields)) } - pub async fn get_dataframe(&self, stream_name: String) -> Result { - let time_partition = PARSEABLE.get_stream(&stream_name)?.get_time_partition(); - + pub async fn get_dataframe( + &self, + time_partition: Option<&String>, + ) -> Result { let df = QUERY_SESSION - .execute_logical_plan(self.final_logical_plan(&time_partition)) + .execute_logical_plan(self.final_logical_plan(time_partition)) .await?; Ok(df) } /// return logical plan with all time filters applied through - fn final_logical_plan(&self, time_partition: &Option) -> LogicalPlan { + fn final_logical_plan(&self, time_partition: Option<&String>) -> LogicalPlan { // see https://github.com/apache/arrow-datafusion/pull/8400 // this can be eliminated in later version of datafusion but with slight caveat // transform cannot modify stringified plans by itself @@ -487,7 +505,7 @@ fn transform( plan: LogicalPlan, start_time: NaiveDateTime, end_time: NaiveDateTime, - time_partition: &Option, + time_partition: Option<&String>, ) -> Transformed { plan.transform(&|plan| match plan { LogicalPlan::TableScan(table) => { @@ -545,7 +563,7 @@ fn transform( fn table_contains_any_time_filters( table: &datafusion::logical_expr::TableScan, - time_partition: &Option, + time_partition: Option<&String>, ) -> bool { table .filters @@ -559,8 +577,8 @@ fn table_contains_any_time_filters( }) .any(|expr| { matches!(&*expr.left, Expr::Column(Column { name, .. }) - if ((time_partition.is_some() && name == time_partition.as_ref().unwrap()) || - (!time_partition.is_some() && name == event::DEFAULT_TIMESTAMP_KEY))) + if (time_partition.is_some_and(|field| field == name) || + (time_partition.is_none() && name == event::DEFAULT_TIMESTAMP_KEY))) }) }