diff --git a/Cargo.lock b/Cargo.lock index 2d2ef239846cd..7b8693bce34c4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5249,6 +5249,7 @@ dependencies = [ "databend-storages-common-table-meta", "derive-visitor", "dyn-clone", + "either", "enum-as-inner", "ethnum", "fastrace", @@ -5287,6 +5288,7 @@ dependencies = [ "poem", "pretty_assertions", "prometheus-client 0.22.3", + "proptest", "prost", "rand 0.8.5", "recursive", diff --git a/src/query/service/Cargo.toml b/src/query/service/Cargo.toml index 0016398385d49..66a8a9f07d023 100644 --- a/src/query/service/Cargo.toml +++ b/src/query/service/Cargo.toml @@ -117,6 +117,7 @@ databend-storages-common-stage = { workspace = true } databend-storages-common-table-meta = { workspace = true } derive-visitor = { workspace = true } dyn-clone = { workspace = true } +either = { workspace = true } enum-as-inner = { workspace = true } ethnum = { workspace = true } fastrace = { workspace = true } @@ -188,6 +189,7 @@ maplit = { workspace = true } mysql_async = { workspace = true } p256 = { workspace = true } pretty_assertions = { workspace = true } +proptest = { workspace = true } reqwest = { workspace = true } serde_json.workspace = true serde_yaml = { workspace = true } diff --git a/src/query/service/src/servers/http/v1/http_query_handlers.rs b/src/query/service/src/servers/http/v1/http_query_handlers.rs index dc601bc8827e5..ec1c3d7c2d11e 100644 --- a/src/query/service/src/servers/http/v1/http_query_handlers.rs +++ b/src/query/service/src/servers/http/v1/http_query_handlers.rs @@ -31,7 +31,6 @@ use databend_common_exception::ErrorCode; use databend_common_expression::DataSchemaRef; use databend_common_management::WorkloadGroupResourceManager; use databend_common_metrics::http::metrics_incr_http_response_errors_count; -use fastrace::func_path; use fastrace::prelude::*; use http::HeaderMap; use http::HeaderValue; @@ -296,7 +295,7 @@ async fn query_final_handler( Path(query_id): Path, ) -> PoemResult { ctx.check_node_id(&query_id)?; - let root = get_http_tracing_span(func_path!(), ctx, &query_id); + let root = get_http_tracing_span("http::query_final_handler", ctx, &query_id); let _t = SlowRequestLogTracker::new(ctx); async { info!( @@ -337,7 +336,7 @@ async fn query_cancel_handler( Path(query_id): Path, ) -> PoemResult { ctx.check_node_id(&query_id)?; - let root = get_http_tracing_span(func_path!(), ctx, &query_id); + let root = get_http_tracing_span("http::query_cancel_handler", ctx, &query_id); let _t = SlowRequestLogTracker::new(ctx); async { info!( @@ -369,7 +368,7 @@ async fn query_state_handler( Path(query_id): Path, ) -> PoemResult { ctx.check_node_id(&query_id)?; - let root = get_http_tracing_span(func_path!(), ctx, &query_id); + let root = get_http_tracing_span("http::query_state_handler", ctx, &query_id); async { let http_query_manager = HttpQueryManager::instance(); @@ -457,7 +456,7 @@ async fn query_page_handler( }; let query_page_handle = { - let root = get_http_tracing_span(func_path!(), ctx, &query_id); + let root = get_http_tracing_span("http::query_page_handler", ctx, &query_id); let _t = SlowRequestLogTracker::new(ctx); query_page_handle.in_span(root) }; @@ -556,7 +555,7 @@ pub(crate) async fn query_handler( }; let query_handle = { - let root = get_http_tracing_span(func_path!(), ctx, &ctx.query_id); + let root = get_http_tracing_span("http::query_handler", ctx, &ctx.query_id); let _t = SlowRequestLogTracker::new(ctx); query_handle.in_span(root) }; diff --git a/src/query/service/src/servers/http/v1/mod.rs b/src/query/service/src/servers/http/v1/mod.rs index f099b9d993b2a..5ad6d628319b9 100644 --- a/src/query/service/src/servers/http/v1/mod.rs +++ b/src/query/service/src/servers/http/v1/mod.rs @@ -31,6 +31,7 @@ pub use http_query_handlers::query_route; pub use http_query_handlers::QueryResponse; pub use http_query_handlers::QueryResponseField; pub use http_query_handlers::QueryStats; +pub use query::blocks_serializer::BlocksCollector; pub use query::blocks_serializer::BlocksSerializer; pub use query::ExecuteStateKind; pub use query::ExpiringMap; diff --git a/src/query/service/src/servers/http/v1/query/blocks_serializer.rs b/src/query/service/src/servers/http/v1/query/blocks_serializer.rs index 458945f833cc8..b9b51a336ce15 100644 --- a/src/query/service/src/servers/http/v1/query/blocks_serializer.rs +++ b/src/query/service/src/servers/http/v1/query/blocks_serializer.rs @@ -18,7 +18,9 @@ use std::ops::DerefMut; use databend_common_expression::types::date::date_to_string; use databend_common_expression::types::interval::interval_to_string; use databend_common_expression::types::timestamp::timestamp_to_string; +use databend_common_expression::BlockEntry; use databend_common_expression::Column; +use databend_common_expression::DataBlock; use databend_common_formats::field_encoder::FieldEncoderValues; use databend_common_io::ewkb_to_geo; use databend_common_io::geo_to_ewkb; @@ -40,38 +42,55 @@ fn data_is_null(column: &Column, row_index: usize) -> bool { } } -#[derive(Debug, Clone)] -pub struct BlocksSerializer { +#[derive(Debug, Clone, Default)] +pub struct BlocksCollector { // Vec for a Block columns: Vec<(Vec, usize)>, - pub(crate) format: Option, } -impl BlocksSerializer { - pub fn empty() -> Self { - Self { - columns: vec![], - format: None, - } +impl BlocksCollector { + pub fn new() -> Self { + Self { columns: vec![] } } - pub fn new(format: Option) -> Self { - Self { - columns: vec![], - format, + pub fn append_columns(&mut self, columns: Vec, num_rows: usize) { + self.columns.push((columns, num_rows)); + } + + pub fn append_block(&mut self, block: DataBlock) { + if block.is_empty() { + return; } + let columns = block.columns().iter().map(BlockEntry::to_column).collect(); + let num_rows = block.num_rows(); + self.append_columns(columns, num_rows); } - pub fn has_format(&self) -> bool { - self.format.is_some() + pub fn num_rows(&self) -> usize { + self.columns.iter().map(|(_, num_rows)| *num_rows).sum() } - pub fn set_format(&mut self, format: FormatSettings) { - self.format = Some(format); + pub fn into_serializer(self, format: FormatSettings) -> BlocksSerializer { + BlocksSerializer { + columns: self.columns, + format: Some(format), + } } +} - pub fn append(&mut self, columns: Vec, num_rows: usize) { - self.columns.push((columns, num_rows)); +#[derive(Debug, Clone)] +pub struct BlocksSerializer { + // Vec for a Block + columns: Vec<(Vec, usize)>, + format: Option, +} + +impl BlocksSerializer { + pub fn empty() -> Self { + Self { + columns: vec![], + format: None, + } } pub fn is_empty(&self) -> bool { diff --git a/src/query/service/src/servers/http/v1/query/execute_state.rs b/src/query/service/src/servers/http/v1/query/execute_state.rs index c3202657427ee..3673a6028c559 100644 --- a/src/query/service/src/servers/http/v1/query/execute_state.rs +++ b/src/query/service/src/servers/http/v1/query/execute_state.rs @@ -25,8 +25,9 @@ use databend_common_exception::ResultExt; use databend_common_expression::DataBlock; use databend_common_expression::DataSchemaRef; use databend_common_expression::Scalar; -use databend_common_io::prelude::FormatSettings; use databend_common_settings::Settings; +use databend_common_storage::DataOperator; +use databend_storages_common_cache::TempDirManager; use databend_storages_common_session::TxnManagerRef; use futures::StreamExt; use log::debug; @@ -49,6 +50,12 @@ use crate::sessions::QueryAffect; use crate::sessions::QueryContext; use crate::sessions::Session; use crate::sessions::TableContext; +use crate::spillers::LiteSpiller; +use crate::spillers::SpillerConfig; +use crate::spillers::SpillerDiskConfig; +use crate::spillers::SpillerType; + +type Sender = SizedChannelSender; pub struct ExecutionError; @@ -114,7 +121,7 @@ impl ExecuteState { pub struct ExecuteStarting { pub(crate) ctx: Arc, - pub(crate) sender: SizedChannelSender, + pub(crate) sender: Option, } pub struct ExecuteRunning { @@ -355,8 +362,7 @@ impl ExecuteState { sql: String, session: Arc, ctx: Arc, - block_sender: SizedChannelSender, - format_settings: Arc>>, + mut block_sender: Sender, ) -> Result<(), ExecutionError> { let make_error = || format!("failed to start query: {sql}"); @@ -367,11 +373,8 @@ impl ExecuteState { .await .map_err(|err| err.display_with_sql(&sql)) .with_context(make_error)?; - { - // set_var may change settings - let mut guard = format_settings.write(); - *guard = Some(ctx.get_format_settings().with_context(make_error)?); - } + + Self::apply_settings(&ctx, &mut block_sender).with_context(make_error)?; let interpreter = InterpreterFactory::get(ctx.clone(), &plan) .await @@ -397,7 +400,7 @@ impl ExecuteState { let ctx_clone = ctx.clone(); let block_sender_closer = block_sender.closer(); - let res = execute( + let res = Self::execute( interpreter, plan.schema(), ctx_clone, @@ -407,60 +410,111 @@ impl ExecuteState { match CatchUnwindFuture::create(res).await { Ok(Err(err)) => { Executor::stop(&executor_clone, Err(err.clone())); - block_sender_closer.close(); + block_sender_closer.abort(); } Err(e) => { Executor::stop(&executor_clone, Err(e)); - block_sender_closer.close(); + block_sender_closer.abort(); } _ => {} } Ok(()) } -} -async fn execute( - interpreter: Arc, - schema: DataSchemaRef, - ctx: Arc, - block_sender: SizedChannelSender, - executor: Arc>, -) -> Result<(), ExecutionError> { - let make_error = || format!("failed to execute {}", interpreter.name()); - - let mut data_stream = interpreter - .execute(ctx.clone()) - .await - .with_context(make_error)?; - match data_stream.next().await { - None => { - let block = DataBlock::empty_with_schema(schema); - block_sender.send(block, 0).await; - Executor::stop::<()>(&executor, Ok(())); - block_sender.close(); - } - Some(Err(err)) => { - Executor::stop(&executor, Err(err)); - block_sender.close(); + #[fastrace::trace(name = "ExecuteState::execute")] + async fn execute( + interpreter: Arc, + schema: DataSchemaRef, + ctx: Arc, + mut sender: Sender, + executor: Arc>, + ) -> Result<(), ExecutionError> { + let make_error = || format!("failed to execute {}", interpreter.name()); + + let mut data_stream = interpreter + .execute(ctx.clone()) + .await + .with_context(make_error)?; + match data_stream.next().await { + None => { + Self::send_data_block(&mut sender, &executor, DataBlock::empty_with_schema(schema)) + .await + .with_context(make_error)?; + Executor::stop::<()>(&executor, Ok(())); + sender.finish(); + } + Some(Err(err)) => { + Executor::stop(&executor, Err(err)); + sender.abort(); + } + Some(Ok(block)) => { + Self::send_data_block(&mut sender, &executor, block) + .await + .with_context(make_error)?; + while let Some(block) = data_stream.next().await { + match block { + Ok(block) => { + Self::send_data_block(&mut sender, &executor, block) + .await + .with_context(make_error)?; + } + Err(err) => { + sender.abort(); + return Err(err.with_context(make_error())); + } + }; + } + Executor::stop::<()>(&executor, Ok(())); + sender.finish(); + } } - Some(Ok(block)) => { - let size = block.num_rows(); - block_sender.send(block, size).await; - while let Some(block_r) = data_stream.next().await { - match block_r { - Ok(block) => { - block_sender.send(block.clone(), block.num_rows()).await; - } - Err(err) => { - block_sender.close(); - return Err(err.with_context(make_error())); - } - }; + Ok(()) + } + + async fn send_data_block( + sender: &mut Sender, + executor: &Arc>, + block: DataBlock, + ) -> Result { + match sender.send(block).await { + Ok(ok) => Ok(ok), + Err(err) => { + Executor::stop(executor, Err(err.clone())); + sender.abort(); + Err(err) } - Executor::stop::<()>(&executor, Ok(())); - block_sender.close(); } } - Ok(()) + + fn apply_settings(ctx: &Arc, block_sender: &mut Sender) -> Result<()> { + let settings = ctx.get_settings(); + + let spiller = if settings.get_enable_result_set_spilling()? { + let temp_dir_manager = TempDirManager::instance(); + let disk_bytes_limit = settings.get_result_set_spilling_to_disk_bytes_limit()?; + let enable_dio = settings.get_enable_dio()?; + let disk_spill = temp_dir_manager + .get_disk_spill_dir(disk_bytes_limit, &ctx.get_id()) + .map(|temp_dir| SpillerDiskConfig::new(temp_dir, enable_dio)) + .transpose()?; + + let location_prefix = ctx.query_id_spill_prefix(); + let config = SpillerConfig { + spiller_type: SpillerType::ResultSet, + location_prefix, + disk_spill, + use_parquet: settings.get_spilling_file_format()?.is_parquet(), + }; + let op = DataOperator::instance().spill_operator(); + Some(LiteSpiller::new(op, config)?) + } else { + None + }; + + // set_var may change settings + let format_settings = ctx.get_format_settings()?; + block_sender.plan_ready(format_settings, spiller); + Ok(()) + } } diff --git a/src/query/service/src/servers/http/v1/query/http_query.rs b/src/query/service/src/servers/http/v1/query/http_query.rs index 1a11c9610205e..d5298abe6dd5d 100644 --- a/src/query/service/src/servers/http/v1/query/http_query.rs +++ b/src/query/service/src/servers/http/v1/query/http_query.rs @@ -33,7 +33,6 @@ use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_exception::ResultExt; use databend_common_expression::Scalar; -use databend_common_io::prelude::FormatSettings; use databend_common_metrics::http::metrics_incr_http_response_errors_count; use databend_common_settings::ScopeLevel; use databend_storages_common_session::TempTblMgrRef; @@ -61,7 +60,6 @@ use crate::servers::http::v1::query::execute_state::ExecuteStarting; use crate::servers::http::v1::query::execute_state::ExecuteStopped; use crate::servers::http::v1::query::execute_state::ExecutorSessionState; use crate::servers::http::v1::query::execute_state::Progresses; -use crate::servers::http::v1::query::sized_spsc::sized_spsc; use crate::servers::http::v1::query::ExecuteState; use crate::servers::http::v1::query::ExecuteStateKind; use crate::servers::http::v1::query::Executor; @@ -613,25 +611,17 @@ impl HttpQuery { }) }; - let (sender, block_receiver) = sized_spsc(req.pagination.max_rows_in_buffer); - + let (page_manager, sender) = PageManager::create(&req.pagination); let executor = Arc::new(Mutex::new(Executor { query_id: query_id.clone(), state: ExecuteState::Starting(ExecuteStarting { ctx: ctx.clone(), - sender, + sender: Some(sender), }), })); let settings = session.get_settings(); let result_timeout_secs = settings.get_http_handler_result_timeout_secs()?; - let format_settings: Arc>> = Default::default(); - - let data = Arc::new(TokioMutex::new(PageManager::new( - req.pagination.max_rows_per_page, - block_receiver, - format_settings, - ))); Ok(HttpQuery { id: query_id, @@ -640,7 +630,7 @@ impl HttpQuery { node_id, request: req, executor, - page_manager: data, + page_manager: Arc::new(TokioMutex::new(page_manager)), result_timeout_secs, state: Arc::new(Mutex::new(HttpQueryState::Working)), @@ -652,7 +642,7 @@ impl HttpQuery { } #[async_backtrace::framed] - #[fastrace::trace] + #[fastrace::trace(name = "HttpQuery::get_response_page",properties = {"page_no": "{page_no}"})] pub async fn get_response_page(&self, page_no: usize) -> Result { let data = Some(self.get_page(page_no).await?); let state = self.get_state(); @@ -787,36 +777,30 @@ impl HttpQuery { } pub async fn start_query(&mut self, sql: String) -> Result<()> { - let span = fastrace::Span::enter_with_local_parent("HttpQuery::start_query"); let (block_sender, query_context) = { - let state = self.executor.lock(); - let ExecuteState::Starting(state) = &state.state else { + let state = &mut self.executor.lock().state; + let ExecuteState::Starting(state) = state else { return Err(ErrorCode::Internal( "[HTTP-QUERY] Invalid query state: expected Starting state", )); }; - (state.sender.clone(), state.ctx.clone()) + (state.sender.take().unwrap(), state.ctx.clone()) }; let query_session = query_context.get_current_session(); let query_state = self.executor.clone(); - let query_format_settings = { - let page_manager = self.page_manager.lock().await; - page_manager.format_settings.clone() - }; - GlobalQueryRuntime::instance().runtime().try_spawn( async move { + let block_sender_closer = block_sender.closer(); if let Err(e) = CatchUnwindFuture::create(ExecuteState::try_start_query( query_state.clone(), sql, query_session, query_context.clone(), - block_sender.clone(), - query_format_settings, + block_sender, )) .await .with_context(|| "failed to start query") @@ -843,10 +827,12 @@ impl HttpQuery { e ); Executor::start_to_stop(&query_state, ExecuteState::Stopped(Box::new(state))); - block_sender.close(); + block_sender_closer.abort(); } } - .in_span(span), + .in_span(fastrace::Span::enter_with_local_parent( + "HttpQuery::start_query", + )), None, )?; @@ -856,17 +842,11 @@ impl HttpQuery { #[async_backtrace::framed] pub async fn kill(&self, reason: ErrorCode) { // the query will be removed from the query manager before the session is dropped. - self.detach().await; + self.page_manager.lock().await.detach().await; Executor::stop(&self.executor, Err(reason)); } - #[async_backtrace::framed] - async fn detach(&self) { - let mut data = self.page_manager.lock().await; - data.detach().await - } - #[async_backtrace::framed] pub async fn update_expire_time(&self, before_wait: bool) { let duration = Duration::from_secs(self.result_timeout_secs) @@ -923,6 +903,7 @@ impl HttpQuery { } #[async_backtrace::framed] + #[fastrace::trace(name = "HttpQuery::on_heartbeat")] pub fn on_heartbeat(&self) -> bool { let mut expire_state = self.state.lock(); match *expire_state { diff --git a/src/query/service/src/servers/http/v1/query/mod.rs b/src/query/service/src/servers/http/v1/query/mod.rs index 215def1e4bc95..1d7756c297361 100644 --- a/src/query/service/src/servers/http/v1/query/mod.rs +++ b/src/query/service/src/servers/http/v1/query/mod.rs @@ -38,4 +38,4 @@ pub use http_query_manager::HttpQueryManager; pub(crate) use http_query_manager::RemoveReason; pub use page_manager::PageManager; pub use page_manager::ResponseData; -pub use page_manager::Wait; +pub use sized_spsc::Wait; diff --git a/src/query/service/src/servers/http/v1/query/page_manager.rs b/src/query/service/src/servers/http/v1/query/page_manager.rs index 1a4fa4d9f45ba..bba28b16444f4 100644 --- a/src/query/service/src/servers/http/v1/query/page_manager.rs +++ b/src/query/service/src/servers/http/v1/query/page_manager.rs @@ -12,31 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::cmp::min; -use std::ops::Range; use std::sync::Arc; -use std::time::Instant; -use databend_common_base::base::tokio; use databend_common_exception::ErrorCode; use databend_common_exception::Result; -use databend_common_expression::BlockEntry; -use databend_common_expression::Column; -use databend_common_expression::DataBlock; -use databend_common_io::prelude::FormatSettings; -use itertools::Itertools; -use log::debug; -use log::info; -use parking_lot::RwLock; use super::blocks_serializer::BlocksSerializer; -use crate::servers::http::v1::query::sized_spsc::SizedChannelReceiver; - -#[derive(Debug, PartialEq, Eq)] -pub enum Wait { - Async, - Deadline(Instant), -} +use super::http_query::PaginationConf; +use super::sized_spsc::sized_spsc; +use super::sized_spsc::SizedChannelReceiver; +use super::sized_spsc::SizedChannelSender; +use super::Wait; +use crate::spillers::LiteSpiller; #[derive(Clone)] pub struct Page { @@ -49,34 +36,28 @@ pub struct ResponseData { } pub struct PageManager { - max_rows_per_page: usize, total_rows: usize, total_pages: usize, end: bool, - block_end: bool, last_page: Option, - row_buffer: Option>, - block_receiver: SizedChannelReceiver, - pub(crate) format_settings: Arc>>, + receiver: SizedChannelReceiver, } impl PageManager { - pub fn new( - max_rows_per_page: usize, - block_receiver: SizedChannelReceiver, - format_settings: Arc>>, - ) -> PageManager { - PageManager { - total_rows: 0, - last_page: None, - total_pages: 0, - end: false, - block_end: false, - row_buffer: Default::default(), - block_receiver, - max_rows_per_page, - format_settings, - } + pub fn create(conf: &PaginationConf) -> (PageManager, SizedChannelSender) { + let (sender, receiver) = + sized_spsc::(conf.max_rows_in_buffer, conf.max_rows_per_page); + + ( + PageManager { + total_rows: 0, + last_page: None, + total_pages: 0, + end: false, + receiver, + }, + sender, + ) } pub fn next_page_no(&mut self) -> Option { @@ -88,13 +69,31 @@ impl PageManager { } #[async_backtrace::framed] - pub async fn get_a_page(&mut self, page_no: usize, tp: &Wait) -> Result { + #[fastrace::trace(name = "PageManager::get_a_page")] + pub async fn get_a_page(&mut self, page_no: usize, wait: &Wait) -> Result { let next_no = self.total_pages; if page_no == next_no { - let mut serializer = BlocksSerializer::new(self.format_settings.read().clone()); if !self.end { - let end = self.collect_new_page(&mut serializer, tp).await?; + let start_time = std::time::Instant::now(); + let (serializer, end) = self.receiver.next_page(wait).await?; let num_row = serializer.num_rows(); + let duration_ms = start_time.elapsed().as_millis(); + + log::debug!(num_row, wait_type:? = wait; "collect_new_page"); + log::info!( + target: "result-set-spill", + "[RESULT-SET-SPILL] Page received page_no={}, rows={}, total_rows={}, end={}, duration_ms={}", + self.total_pages, num_row, self.total_rows + num_row, end, duration_ms + ); + + if num_row == 0 { + log::info!( + target: "result-set-spill", + "[RESULT-SET-SPILL] Empty page page_no={}, end={}", + self.total_pages, end + ); + } + self.total_rows += num_row; let page = Page { data: Arc::new(serializer), @@ -110,7 +109,7 @@ impl PageManager { // but the response may be lost and client will retry, // we simply return an empty page. let page = Page { - data: Arc::new(serializer), + data: Arc::new(BlocksSerializer::empty()), }; Ok(page) } @@ -133,136 +132,33 @@ impl PageManager { } } - fn append_block( - &mut self, - serializer: &mut BlocksSerializer, - block: DataBlock, - remain_rows: &mut usize, - remain_size: &mut usize, - ) -> Result<()> { - assert!(self.row_buffer.is_none()); - if block.is_empty() { - return Ok(()); - } - if !serializer.has_format() { - let guard = self.format_settings.read(); - serializer.set_format(guard.as_ref().unwrap().clone()); - } - - let columns = block - .columns() - .iter() - .map(BlockEntry::to_column) - .collect_vec(); - - let block_memory_size = block.memory_size(); - let mut take_rows = min( - *remain_rows, - if block_memory_size > *remain_size { - (*remain_size * block.num_rows()) / block_memory_size - } else { - block.num_rows() - }, - ); - // this means that the data in remaining_size cannot satisfy even one row. - if take_rows == 0 { - take_rows = 1; - } - - if take_rows == block.num_rows() { - // theoretically, it should always be smaller than the memory_size of the block. - *remain_size -= min(*remain_size, block_memory_size); - *remain_rows -= take_rows; - serializer.append(columns, block.num_rows()); - } else { - // Since not all rows of the block are used, either the size limit or the row limit must have been exceeded. - // simply set any of remain_xxx to end the page. - *remain_rows = 0; - let fn_slice = |columns: &[Column], range: Range| { - columns - .iter() - .map(|column| column.slice(range.clone())) - .collect_vec() - }; - - serializer.append(fn_slice(&columns, 0..take_rows), take_rows); - self.row_buffer = Some(fn_slice(&columns, take_rows..block.num_rows())); - } - Ok(()) - } - #[async_backtrace::framed] - async fn collect_new_page( - &mut self, - serializer: &mut BlocksSerializer, - tp: &Wait, - ) -> Result { - let mut remain_size = 10 * 1024 * 1024; - let mut remain_rows = self.max_rows_per_page; - while remain_rows > 0 && remain_size > 0 { - let Some(block) = self.row_buffer.take() else { - break; - }; - self.append_block( - serializer, - DataBlock::new_from_columns(block), - &mut remain_rows, - &mut remain_size, - )?; - } + pub async fn detach(&mut self) { + log::info!( + target: "result-set-spill", + "[RESULT-SET-SPILL] Query completed total_pages={}, total_rows={}", + self.total_pages, self.total_rows + ); - while remain_rows > 0 && remain_size > 0 { - match tp { - Wait::Async => match self.block_receiver.try_recv() { - Some(block) => { - self.append_block(serializer, block, &mut remain_rows, &mut remain_size)? - } - None => break, - }, - Wait::Deadline(t) => { - let now = Instant::now(); - let d = *t - now; - if d.is_zero() { - // timeout() will return Ok if the future completes immediately - break; - } - match tokio::time::timeout(d, self.block_receiver.recv()).await { - Ok(Some(block)) => { - debug!( - "[HTTP-QUERY] Received new data block with {} rows", - block.num_rows() - ); - self.append_block( - serializer, - block, - &mut remain_rows, - &mut remain_size, - )? - } - Ok(None) => { - info!("[HTTP-QUERY] Reached end of data blocks"); - break; - } - Err(_) => { - debug!("[HTTP-QUERY] Long polling timeout reached"); - break; - } - } + self.last_page = None; + if let Some(spiller) = self.receiver.close() { + let start_time = std::time::Instant::now(); + match spiller.cleanup().await { + Ok(_) => { + let duration_ms = start_time.elapsed().as_millis(); + log::info!( + target: "result-set-spill", + "[RESULT-SET-SPILL] Cleanup completed duration_ms={}", + duration_ms + ); + } + Err(error) => { + log::error!( + target: "result-set-spill", + error:?; "[RESULT-SET-SPILL] Failed to cleanup spilled result set files" + ); } } - } - - // try to report 'no more data' earlier to client to avoid unnecessary http call - if !self.block_end { - self.block_end = self.block_receiver.is_empty(); - } - Ok(self.block_end && self.row_buffer.is_none()) - } - - #[async_backtrace::framed] - pub async fn detach(&mut self) { - self.block_receiver.close(); - self.last_page = None; - self.row_buffer = None; + }; } } diff --git a/src/query/service/src/servers/http/v1/query/sized_spsc.rs b/src/query/service/src/servers/http/v1/query/sized_spsc.rs index 25bf6d6e55c37..1f695116b5b64 100644 --- a/src/query/service/src/servers/http/v1/query/sized_spsc.rs +++ b/src/query/service/src/servers/http/v1/query/sized_spsc.rs @@ -12,215 +12,797 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! A channel bounded with the sum of sizes associate with items instead of count of items. -//! -//! other features: -//! 1. it is SPSC, enough for now. -//! 2. receive can check status of channel with fn is_empty(). - +use std::cmp::min; use std::collections::VecDeque; +use std::fmt::Debug; +use std::result; use std::sync::Arc; use std::sync::Mutex; +use std::time::Instant; +use databend_common_base::base::tokio; use databend_common_base::base::tokio::sync::Notify; - -struct SizedChannelInner { +use databend_common_base::base::WatchNotify; +use databend_common_exception::Result; +use databend_common_expression::BlockEntry; +use databend_common_expression::DataBlock; +use databend_common_io::prelude::FormatSettings; +use databend_common_pipeline_transforms::traits::DataBlockSpill; +use databend_common_pipeline_transforms::traits::Location; +use log::debug; +use log::info; + +use super::blocks_serializer::BlocksCollector; +use super::blocks_serializer::BlocksSerializer; + +pub fn sized_spsc( max_size: usize, - values: VecDeque<(T, usize)>, - is_recv_stopped: bool, - is_send_stopped: bool, + page_size: usize, +) -> (SizedChannelSender, SizedChannelReceiver) +where + S: DataBlockSpill, +{ + let chan = Arc::new(SizedChannel::new(max_size, page_size)); + let sender = SizedChannelSender { chan: chan.clone() }; + let receiver = SizedChannelReceiver { chan }; + (sender, receiver) +} + +enum Page { + Memory(Vec), + Spilled(Location), } -struct Stopped {} +struct SizedChannelBuffer { + max_rows: usize, + page_rows: usize, + pages: VecDeque, -pub fn sized_spsc(max_size: usize) -> (SizedChannelSender, SizedChannelReceiver) { - let chan = Arc::new(SizedChannel::create(max_size)); - let cloned = chan.clone(); - (SizedChannelSender { chan }, SizedChannelReceiver { - chan: cloned, - }) + /// The current_page gets moved outside the lock to spilling and then moved back in, or cleared on close. + /// There's a lot of unwrap here to make sure there's no unintended behavior that's not by design. + current_page: Option, + is_recv_stopped: bool, + is_send_stopped: bool, } -impl SizedChannelInner { - pub fn create(max_size: usize) -> Self { - SizedChannelInner { - max_size, - values: Default::default(), +impl SizedChannelBuffer { + fn new(max_rows: usize, page_rows: usize) -> Self { + SizedChannelBuffer { + max_rows: max_rows.max(page_rows), + page_rows, + pages: Default::default(), + current_page: Some(PageBuilder::new(page_rows)), is_recv_stopped: false, is_send_stopped: false, } } - pub fn size(&self) -> usize { - self.values.iter().map(|x| x.1).sum::() + fn pages_rows(&self) -> usize { + self.pages + .iter() + .map(|page| match page { + Page::Memory(blocks) => blocks.iter().map(DataBlock::num_rows).sum::(), + Page::Spilled(_) => 0, + }) + .sum() + } + + fn is_pages_full(&self, reserve: usize) -> bool { + self.pages_rows() + reserve > self.max_rows } - pub fn try_send(&mut self, value: T, size: usize) -> Result, Stopped> { - let current_size = self.size(); + fn try_add_block(&mut self, mut block: DataBlock) -> result::Result<(), SendFail> { if self.is_recv_stopped || self.is_send_stopped { - Err(Stopped {}) - } else if current_size + size <= self.max_size || current_size == 0 { - self.values.push_back((value, size)); - Ok(None) - } else { - Ok(Some(value)) + return Err(SendFail::Closed); } - } - pub fn try_recv(&mut self) -> Result, Stopped> { - let v = self.values.pop_front().map(|x| x.0); - if v.is_none() && self.is_send_stopped { - Err(Stopped {}) - } else { - Ok(v) + loop { + let page_builder = self.current_page.as_mut().expect("current_page has taken"); + + let remain = page_builder.try_append_block(block); + if !page_builder.has_capacity() { + let rows = page_builder.num_rows(); + if self.is_pages_full(rows) { + return Err(SendFail::Full { + page: self + .current_page + .take() + .expect("current_page has taken") + .into_page(), + remain, + }); + } + let page = self + .current_page + .replace(PageBuilder::new(self.page_rows)) + .expect("current_page has taken") + .into_page(); + self.pages.push_back(Page::Memory(page)); + } + match remain { + Some(remain) => block = remain, + None => return Ok(()), + } } } - pub fn is_empty(&self) -> bool { - self.values.is_empty() && self.is_send_stopped + fn try_add_page(&mut self, page: Page) -> result::Result<(), SendFail> { + if self.is_recv_stopped || self.is_send_stopped { + return Err(SendFail::Closed); + } + + assert!(self.current_page.is_none()); + + match page { + page @ Page::Spilled(_) => self.pages.push_back(page), + Page::Memory(page) => { + let rows = page.iter().map(DataBlock::num_rows).sum(); + if self.is_pages_full(rows) { + return Err(SendFail::Full { page, remain: None }); + }; + self.pages.push_back(Page::Memory(page)) + } + }; + + self.current_page = Some(PageBuilder::new(self.page_rows)); + Ok(()) } - pub fn stop_send(&mut self) { + fn stop_send(&mut self) { self.is_send_stopped = true } - pub fn stop_recv(&mut self) { + fn stop_recv(&mut self) { self.is_recv_stopped = true } + + fn take_page(&mut self) -> Option { + self.pages.pop_front() + } +} + +enum SendFail { + Closed, + Full { + page: Vec, + remain: Option, + }, +} + +struct PageBuilder { + blocks: Vec, + remain_rows: usize, + remain_size: usize, +} + +impl PageBuilder { + fn new(max_rows: usize) -> Self { + Self { + blocks: Vec::new(), + remain_size: 10 * 1024 * 1024, + remain_rows: max_rows, + } + } + + fn has_capacity(&self) -> bool { + self.remain_rows > 0 && self.remain_size > 0 + } + + fn num_rows(&self) -> usize { + self.blocks.iter().map(DataBlock::num_rows).sum() + } + + fn calculate_take_rows(&self, block: &DataBlock) -> (usize, usize) { + let block_rows = block.num_rows(); + let memory_size = block + .columns() + .iter() + .map(|entry| match entry { + BlockEntry::Const(scalar, _, n) => *n * scalar.as_ref().memory_size(), + BlockEntry::Column(column) => column.memory_size(), + }) + .sum::(); + + ( + min( + self.remain_rows, + if memory_size > self.remain_size { + (self.remain_size * block_rows) / memory_size + } else { + block_rows + }, + ) + .max(1), + memory_size, + ) + } + + fn try_append_block(&mut self, block: DataBlock) -> Option { + assert!(self.has_capacity()); + let (take_rows, memory_size) = self.calculate_take_rows(&block); + let total_rows = block.num_rows(); + if take_rows < total_rows { + self.remain_size = 0; + self.remain_rows -= take_rows; + self.blocks.push(block.slice(0..take_rows)); + Some(block.slice(take_rows..total_rows)) + } else { + self.remain_size -= min(self.remain_size, memory_size); + self.remain_rows -= total_rows; + self.blocks.push(block); + None + } + } + + fn into_page(self) -> Vec { + self.blocks + } } -struct SizedChannel { - inner: Mutex>, +struct SizedChannel { + buffer: Mutex, notify_on_sent: Notify, notify_on_recv: Notify, + + is_plan_ready: WatchNotify, + format_settings: Mutex>, + spiller: Mutex>, } -impl SizedChannel { - fn create(max_size: usize) -> Self { +impl SizedChannel +where S: DataBlockSpill +{ + fn new(max_rows: usize, page_rows: usize) -> Self { SizedChannel { - inner: Mutex::new(SizedChannelInner::create(max_size)), + buffer: Mutex::new(SizedChannelBuffer::new(max_rows, page_rows)), notify_on_sent: Default::default(), notify_on_recv: Default::default(), + is_plan_ready: WatchNotify::new(), + format_settings: Default::default(), + spiller: Default::default(), } } - fn try_send(&self, value: T, size: usize) -> Result, Stopped> { - let mut guard = self.inner.lock().unwrap(); - guard.try_send(value, size) - } + async fn try_spill_page(&self, page: Vec) -> Result { + let rows_count = page.iter().map(|b| b.num_rows()).sum::(); + let memory_bytes = page.iter().map(|b| b.memory_size()).sum::(); + + if !self.should_spill() { + log::info!( + target: "result-set-spill", + "[RESULT-SET-SPILL] Spill disabled, page kept in memory blocks={}, rows={}, memory_bytes={}", + page.len(), rows_count, memory_bytes + ); + return Ok(Page::Memory(page)); + } + let spiller = self.spiller.lock().unwrap().clone(); + let Some(spiller) = spiller.as_ref() else { + log::warn!( + target: "result-set-spill", + "[RESULT-SET-SPILL] No spiller configured, page kept in memory blocks={}, rows={}, memory_bytes={}", + page.len(), rows_count, memory_bytes + ); + return Ok(Page::Memory(page)); + }; + + let start_time = std::time::Instant::now(); + + log::info!( + target: "result-set-spill", + "[RESULT-SET-SPILL] Starting spill to disk blocks={}, rows={}, memory_bytes={}", + page.len(), rows_count, memory_bytes + ); + + let location = match spiller.merge_and_spill(page).await { + Ok(location) => location, + Err(e) => { + log::error!( + target: "result-set-spill", + "[RESULT-SET-SPILL] Spill failed error={:?}", + e + ); + return Err(e); + } + }; + let duration_ms = start_time.elapsed().as_millis(); + + log::info!( + target: "result-set-spill", + "[RESULT-SET-SPILL] Spill completed rows={}, duration_ms={}, location={:?}", + rows_count, duration_ms, location + ); - pub fn try_recv(&self) -> Result, Stopped> { - let mut guard = self.inner.lock().unwrap(); - guard.try_recv() + Ok(Page::Spilled(location)) } #[async_backtrace::framed] - pub async fn send(&self, value: T, size: usize) -> bool { - let mut to_send = value; + async fn recv(&self) -> bool { loop { - match self.try_send(to_send, size) { - Ok(Some(v)) => { - to_send = v; - self.notify_on_recv.notified().await; - } - Ok(None) => { - self.notify_on_sent.notify_one(); + { + let buffer = self.buffer.lock().unwrap(); + if !buffer.pages.is_empty() { return true; } - Err(_) => return false, + if buffer.is_send_stopped { + return false; + } } + self.notify_on_sent.notified().await; } } + fn is_close(&self) -> bool { + let buffer = self.buffer.lock().unwrap(); + buffer.is_send_stopped && buffer.pages.is_empty() + } + + fn should_spill(&self) -> bool { + // todo: expected to be controlled externally + true + } +} + +#[derive(Debug, PartialEq, Eq)] +pub enum Wait { + Async, + Deadline(Instant), +} + +pub struct SizedChannelReceiver { + chan: Arc>, +} + +impl SizedChannelReceiver +where S: DataBlockSpill +{ + pub fn close(&mut self) -> Option { + { + let mut buffer = self.chan.buffer.lock().unwrap(); + buffer.stop_recv(); + buffer.current_page = None; + buffer.pages.clear(); + } + let spiller = self.chan.spiller.lock().unwrap().take(); + self.chan.notify_on_recv.notify_one(); + spiller + } + + #[async_backtrace::framed] + pub async fn next_page(&mut self, tp: &Wait) -> Result<(BlocksSerializer, bool)> { + let page = match tp { + Wait::Async => self.try_take_page().await?, + Wait::Deadline(t) => { + match tokio::time::timeout_at((*t).into(), self.chan.recv()).await { + Ok(true) => self.try_take_page().await?, + Ok(false) => { + info!("[HTTP-QUERY] Reached end of data blocks"); + return Ok((BlocksSerializer::empty(), true)); + } + Err(_) => { + debug!("[HTTP-QUERY] Long polling timeout reached"); + return Ok((BlocksSerializer::empty(), self.chan.is_close())); + } + } + } + }; + + // try to report 'no more data' earlier to client to avoid unnecessary http call + Ok(( + page.unwrap_or_else(BlocksSerializer::empty), + self.chan.is_close(), + )) + } + + #[fastrace::trace(name = "SizedChannelReceiver::try_take_page")] + async fn try_take_page(&mut self) -> Result> { + let page = self.chan.buffer.lock().unwrap().take_page(); + let collector = match page { + None => return Ok(None), + Some(Page::Memory(page)) => { + let mut collector = BlocksCollector::new(); + for block in page { + collector.append_block(block); + } + collector + } + Some(Page::Spilled(location)) => { + let start_time = std::time::Instant::now(); + + log::info!( + target: "result-set-spill", + "[RESULT-SET-SPILL] Restoring from disk location={:?}", + location + ); + + let spiller = self.chan.spiller.lock().unwrap().clone().unwrap(); + let block = spiller.restore(&location).await?; + let rows_count = block.num_rows(); + let memory_bytes = block.memory_size(); + let duration_ms = start_time.elapsed().as_millis(); + + log::info!( + target: "result-set-spill", + "[RESULT-SET-SPILL] Restore completed rows={}, memory_bytes={}, duration_ms={}", + rows_count, memory_bytes, duration_ms + ); + + let mut collector = BlocksCollector::new(); + collector.append_block(block); + collector + } + }; + self.chan.notify_on_recv.notify_one(); + + let format_settings = self + .chan + .format_settings + .lock() + .unwrap() + .clone() + .unwrap_or_default(); + Ok(Some(collector.into_serializer(format_settings))) + } +} + +pub struct SizedChannelSender { + chan: Arc>, +} + +impl SizedChannelSender +where S: DataBlockSpill +{ #[async_backtrace::framed] - pub async fn recv(&self) -> Option { + pub async fn send(&mut self, mut block: DataBlock) -> Result { loop { - match self.try_recv() { - Ok(Some(v)) => { - self.notify_on_recv.notify_one(); - return Some(v); + let result = self.chan.buffer.lock().unwrap().try_add_block(block); + match result { + Err(SendFail::Closed) => return Ok(false), + Ok(_) => { + self.chan.notify_on_sent.notify_one(); + return Ok(true); } - Ok(None) => { - self.notify_on_sent.notified().await; + Err(SendFail::Full { page, remain }) => { + let page_rows = page.iter().map(|b| b.num_rows()).sum::(); + + log::info!( + target: "result-set-spill", + "[RESULT-SET-SPILL] Buffer full page_blocks={}, page_rows={}, has_remain={}", + page.len(), page_rows, remain.is_some() + ); + + self.chan.notify_on_sent.notify_one(); + let mut to_add = self.chan.try_spill_page(page).await?; + loop { + let result = self.chan.buffer.lock().unwrap().try_add_page(to_add); + match result { + Err(SendFail::Closed) => return Ok(false), + Ok(_) => { + self.chan.notify_on_sent.notify_one(); + break; + } + Err(SendFail::Full { page, .. }) => { + to_add = Page::Memory(page); + self.chan.notify_on_recv.notified().await; + } + } + } + + let Some(remain) = remain else { + return Ok(true); + }; + block = remain; } - Err(_) => return None, } } } - pub fn is_empty(&self) -> bool { - let guard = self.inner.lock().unwrap(); - guard.is_empty() + pub fn abort(&mut self) { + self.chan.buffer.lock().unwrap().stop_send(); + self.chan.notify_on_sent.notify_one(); } - pub fn stop_send(&self) { + pub fn finish(self) { { - let mut guard = self.inner.lock().unwrap(); - guard.stop_send() + let mut buffer = self.chan.buffer.lock().unwrap(); + if !buffer.is_recv_stopped && !buffer.is_send_stopped { + let page = buffer + .current_page + .take() + .expect("current_page has taken") + .into_page(); + buffer.pages.push_back(Page::Memory(page)); + } + buffer.stop_send(); } - self.notify_on_sent.notify_one(); + self.chan.notify_on_sent.notify_one(); } - pub fn stop_recv(&self) { - { - let mut guard = self.inner.lock().unwrap(); - guard.stop_recv() + pub fn closer(&self) -> SizedChannelSenderCloser { + SizedChannelSenderCloser { + chan: self.chan.clone(), } - self.notify_on_recv.notify_one(); + } + + pub fn plan_ready(&mut self, format_settings: FormatSettings, spiller: Option) { + assert!(!self.chan.is_plan_ready.has_notified()); + *self.chan.format_settings.lock().unwrap() = Some(format_settings); + *self.chan.spiller.lock().unwrap() = spiller; + self.chan.is_plan_ready.notify_waiters(); } } -pub struct SizedChannelReceiver { - chan: Arc>, +pub struct SizedChannelSenderCloser { + chan: Arc>, } -impl SizedChannelReceiver { - #[async_backtrace::framed] - pub async fn recv(&self) -> Option { - self.chan.recv().await +impl SizedChannelSenderCloser +where S: DataBlockSpill +{ + pub fn abort(self) { + self.chan.buffer.lock().unwrap().stop_send(); + self.chan.notify_on_sent.notify_one(); } +} - pub fn try_recv(&self) -> Option { - self.chan.try_recv().unwrap_or_default() +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::sync::Arc; + use std::sync::Mutex; + use std::time::Duration; + + use databend_common_exception::ErrorCode; + use databend_common_expression::types::Int32Type; + use databend_common_expression::types::Number; + use databend_common_expression::types::NumberType; + use databend_common_expression::FromData; + use databend_common_io::prelude::FormatSettings; + use databend_common_pipeline_transforms::traits::DataBlockSpill; + use databend_common_pipeline_transforms::traits::Location; + use proptest::prelude::*; + use proptest::strategy::ValueTree; + use proptest::test_runner::TestRunner; + + use super::*; + + #[derive(Clone, Default)] + struct MockSpiller { + storage: Arc>>, } - pub fn close(&self) { - self.chan.stop_recv() + #[async_trait::async_trait] + impl DataBlockSpill for MockSpiller { + async fn spill(&self, data_block: DataBlock) -> Result { + let key = format!("block_{}", rand::random::()); + self.storage.lock().unwrap().insert(key.clone(), data_block); + Ok(Location::Remote(key)) + } + + async fn merge_and_spill(&self, data_block: Vec) -> Result { + self.spill(DataBlock::concat(&data_block)?).await + } + + async fn restore(&self, location: &Location) -> Result { + match location { + Location::Remote(key) => { + let storage = self.storage.lock().unwrap(); + storage + .get(key) + .cloned() + .ok_or_else(|| ErrorCode::Internal("Block not found in mock spiller")) + } + _ => Err(ErrorCode::Internal("Unsupported location type")), + } + } } - pub fn is_empty(&self) -> bool { - self.chan.is_empty() + #[tokio::test(flavor = "multi_thread")] + async fn test_spsc_channel() { + let (mut sender, mut receiver) = sized_spsc::(5, 4); + + let test_data = vec![ + DataBlock::new_from_columns(vec![Int32Type::from_data(vec![1, 2, 3])]), + DataBlock::new_from_columns(vec![Int32Type::from_data(vec![4, 5, 6])]), + DataBlock::new_from_columns(vec![Int32Type::from_data(vec![7, 8, 9])]), + ]; + + let sender_data = test_data.clone(); + let send_task = databend_common_base::runtime::spawn(async move { + let format_settings = FormatSettings::default(); + let spiller = MockSpiller::default(); + sender.plan_ready(format_settings, Some(spiller)); + + for block in sender_data { + sender.send(block).await.unwrap(); + } + sender.finish(); + }); + + let mut received_blocks_size = Vec::new(); + loop { + let (serializer, is_end) = receiver.next_page(&Wait::Async).await.unwrap(); + + if serializer.num_rows() > 0 { + received_blocks_size.push(serializer.num_rows()); + } + + if is_end { + break; + } + } + + let _ = receiver.close().unwrap().storage; + + send_task.await.unwrap(); + assert_eq!(received_blocks_size, vec![4, 4, 1]); } -} -#[derive(Clone)] -pub struct SizedChannelSender { - chan: Arc>, -} + #[tokio::test(flavor = "multi_thread")] + async fn test_spsc_slow() { + let (mut sender, mut receiver) = sized_spsc::(5, 4); -impl SizedChannelSender { - #[async_backtrace::framed] - pub async fn send(&self, value: T, size: usize) -> bool { - self.chan.send(value, size).await + let test_data = vec![DataBlock::new_from_columns(vec![Int32Type::from_data( + vec![1, 2, 3], + )])]; + + let wait = Arc::new(Notify::new()); + + let sender_wait = wait.clone(); + let sender_data = test_data.clone(); + let send_task = databend_common_base::runtime::spawn(async move { + let format_settings = FormatSettings::default(); + sender.plan_ready(format_settings, None); + + sender_wait.notified().await; + + for block in sender_data { + sender.send(block).await.unwrap(); + } + sender.finish(); + }); + + for _ in 0..10 { + let deadline = Instant::now() + Duration::from_millis(1); + let (serializer, _) = receiver.next_page(&Wait::Deadline(deadline)).await.unwrap(); + assert_eq!(serializer.num_rows(), 0); + } + + wait.notify_one(); + + let (serializer, _) = receiver + .next_page(&Wait::Deadline(Instant::now() + Duration::from_secs(1))) + .await + .unwrap(); + let _ = receiver.close(); + send_task.await.unwrap(); + + assert_eq!(serializer.num_rows(), 3); } - pub fn close(&self) { - self.chan.stop_send() + #[tokio::test(flavor = "multi_thread")] + async fn test_spsc_abort() { + let (mut sender, mut receiver) = sized_spsc::(5, 4); + + let test_data = vec![ + DataBlock::new_from_columns(vec![Int32Type::from_data(vec![1, 2, 3])]), + DataBlock::new_from_columns(vec![Int32Type::from_data(vec![1, 2])]), + DataBlock::new_from_columns(vec![Int32Type::from_data(vec![])]), + DataBlock::new_from_columns(vec![Int32Type::from_data(vec![4, 5, 6, 7, 8])]), + DataBlock::new_from_columns(vec![Int32Type::from_data(vec![7, 8, 9])]), + ]; + + let sender_data = test_data.clone(); + let send_task = databend_common_base::runtime::spawn(async move { + let format_settings = FormatSettings::default(); + sender.plan_ready(format_settings, None); + + for (i, block) in sender_data.into_iter().enumerate() { + sender.send(block).await.unwrap(); + if i == 3 { + sender.abort(); + return; + } + } + }); + + let mut received_blocks_size = Vec::new(); + loop { + let (serializer, is_end) = receiver + .next_page(&Wait::Deadline(Instant::now() + Duration::from_secs(1))) + .await + .unwrap(); + + if serializer.num_rows() > 0 { + received_blocks_size.push(serializer.num_rows()); + } + + if is_end { + break; + } + } + + send_task.await.unwrap(); + + assert_eq!(received_blocks_size, vec![4, 4]) } - pub fn closer(&self) -> SizedChannelSenderCloser { - SizedChannelSenderCloser { - chan: self.chan.clone(), + fn data_block_strategy(len: usize) -> impl Strategy + where + N: Number + Arbitrary + 'static, + NumberType: FromData, + { + prop::collection::vec(any::(), len) + .prop_map(|data| DataBlock::new_from_columns(vec![NumberType::::from_data(data)])) + } + + fn data_block_vec_strategy() -> impl Strategy> { + let num_rows_strategy = 0..30_usize; + let num_blocks_strategy = 0..100_usize; + (num_blocks_strategy, num_rows_strategy).prop_flat_map(|(num_blocks, num_rows)| { + prop::collection::vec(data_block_strategy::(num_rows), num_blocks) + }) + } + + async fn delay() { + if rand::thread_rng().gen_bool(0.7) { + let delay = rand::thread_rng().gen_range(0..1000); + tokio::time::sleep(Duration::from_micros(delay)).await; } } -} -pub struct SizedChannelSenderCloser { - chan: Arc>, -} + #[tokio::test(flavor = "multi_thread")] + async fn test_spsc_channel_fuzz() { + let mut runner = TestRunner::default(); + for _ in 0..100 { + let (has_spiller, max_size, page_size, test_data) = ( + any::(), + 10_usize..20, + 5_usize..8, + data_block_vec_strategy(), + ) + .new_tree(&mut runner) + .unwrap() + .current(); + + let (mut sender, mut receiver) = sized_spsc::(max_size, page_size); + + let sender_data = test_data.clone(); + let send_task = databend_common_base::runtime::spawn(async move { + let format_settings = FormatSettings::default(); + sender.plan_ready(format_settings, has_spiller.then(MockSpiller::default)); + + for block in sender_data { + delay().await; + sender.send(block).await.unwrap(); + delay().await; + } + sender.finish(); + }); + + let mut received_blocks_size = Vec::new(); + loop { + delay().await; + let wait = Wait::Deadline(Instant::now() + Duration::from_millis(50)); + let (serializer, is_end) = receiver.next_page(&wait).await.unwrap(); + if !serializer.is_empty() { + received_blocks_size.push(serializer.num_rows()); + } + if is_end { + break; + } + } + + send_task.await.unwrap(); -impl SizedChannelSenderCloser { - pub fn close(&self) { - self.chan.stop_send() + if has_spiller { + let _ = receiver.close().unwrap().storage; + } else { + assert!(receiver.close().is_none()); + } + + let total = test_data.iter().map(|data| data.num_rows()).sum::(); + + assert_eq!(received_blocks_size.iter().sum::(), total); + } } } diff --git a/src/query/service/src/servers/http/v1/streaming_load.rs b/src/query/service/src/servers/http/v1/streaming_load.rs index 076404639956e..ca774e703d7ad 100644 --- a/src/query/service/src/servers/http/v1/streaming_load.rs +++ b/src/query/service/src/servers/http/v1/streaming_load.rs @@ -33,7 +33,6 @@ use databend_common_sql::plans::Plan; use databend_common_sql::Planner; use databend_common_storages_stage::BytesBatch; use databend_storages_common_session::TxnState; -use fastrace::func_path; use fastrace::future::FutureExt; use futures::StreamExt; use http::StatusCode; @@ -61,6 +60,7 @@ use crate::servers::http::error::JsonErrorOnly; use crate::servers::http::error::QueryError; use crate::servers::http::middleware::json_header::encode_json_header; use crate::servers::http::middleware::sanitize_request_headers; +use crate::servers::http::v1::http_query_handlers::get_http_tracing_span; use crate::sessions::QueriesQueueManager; use crate::sessions::QueryContext; use crate::sessions::QueryEntry; @@ -93,11 +93,7 @@ fn execute_query( tracking_payload.query_id = Some(id.clone()); tracking_payload.mem_stat = Some(mem_stat); let _tracking_guard = ThreadTracker::tracking(tracking_payload); - let root = crate::servers::http::v1::http_query_handlers::get_http_tracing_span( - func_path!(), - &http_query_context, - &id, - ); + let root = get_http_tracing_span("http::execute_query", &http_query_context, &id); ThreadTracker::tracking_future(fut.in_span(root)) } @@ -113,11 +109,7 @@ pub async fn streaming_load_handler( tracking_payload.query_id = Some(ctx.query_id.clone()); tracking_payload.mem_stat = Some(query_mem_stat.clone()); let _tracking_guard = ThreadTracker::tracking(tracking_payload); - let root = crate::servers::http::v1::http_query_handlers::get_http_tracing_span( - func_path!(), - ctx, - &ctx.query_id, - ); + let root = get_http_tracing_span("http::streaming_load_handler", ctx, &ctx.query_id); let mut session_conf: Option = match req.headers().get(HEADER_QUERY_CONTEXT) { Some(v) => { let s = v.to_str().unwrap().to_string(); diff --git a/src/query/service/src/spillers/adapter.rs b/src/query/service/src/spillers/adapter.rs index 0a31e2f355fb0..13ac4d7755519 100644 --- a/src/query/service/src/spillers/adapter.rs +++ b/src/query/service/src/spillers/adapter.rs @@ -14,6 +14,7 @@ use std::collections::HashMap; use std::collections::HashSet; +use std::ops::DerefMut; use std::ops::Range; use std::sync::Arc; use std::sync::RwLock; @@ -385,12 +386,41 @@ impl SpillAdapter for LiteAdapter { } } -pub type LiteSpiller = Arc>; +#[derive(Clone)] +pub struct LiteSpiller(Arc>); + +impl LiteSpiller { + pub fn new(operator: Operator, config: SpillerConfig) -> Result { + Ok(LiteSpiller(Arc::new(SpillerInner::new( + Default::default(), + operator, + config, + )?))) + } -pub fn new_lite_spiller(operator: Operator, config: SpillerConfig) -> Result { - Ok(Arc::new(SpillerInner::new( - Default::default(), - operator, - config, - )?)) + pub async fn cleanup(self) -> Result<()> { + let files = std::mem::take(self.0.adapter.files.write().unwrap().deref_mut()); + let files: Vec<_> = files + .into_keys() + .filter_map(|location| match location { + Location::Remote(path) => Some(path), + Location::Local(_) => None, + }) + .collect(); + let op = self.0.local_operator.as_ref().unwrap_or(&self.0.operator); + + op.delete_iter(files).await?; + Ok(()) + } +} + +#[async_trait::async_trait] +impl DataBlockSpill for LiteSpiller { + async fn merge_and_spill(&self, data_block: Vec) -> Result { + self.0.spill(data_block).await + } + + async fn restore(&self, location: &Location) -> Result { + self.0.read_spilled_file(location).await + } } diff --git a/src/query/service/src/spillers/test_memory.rs b/src/query/service/src/spillers/test_memory.rs index 3fbf35af9735d..bacf0bdf9f5a8 100644 --- a/src/query/service/src/spillers/test_memory.rs +++ b/src/query/service/src/spillers/test_memory.rs @@ -28,12 +28,12 @@ use databend_common_expression::types::StringType; use databend_common_expression::Column; use databend_common_expression::DataBlock; use databend_common_expression::FromData; +use databend_common_pipeline_transforms::traits::DataBlockSpill; use databend_common_pipeline_transforms::traits::Location; use databend_common_storage::DataOperator; use databend_storages_common_cache::TempDirManager; use rand::Rng; -use crate::spillers::new_lite_spiller; use crate::spillers::LiteSpiller; use crate::spillers::SpillerConfig; use crate::spillers::SpillerDiskConfig; @@ -209,7 +209,7 @@ async fn init() -> Result { use_parquet: true, }; let operator = DataOperator::instance().spill_operator(); - new_lite_spiller(operator, spiller_config) + LiteSpiller::new(operator, spiller_config) } #[derive(Debug)] @@ -225,7 +225,7 @@ async fn run_spill_test(spiller: &LiteSpiller, rows: usize) -> Result Result<()> { #[tokio::test(flavor = "current_thread")] async fn test_active_sessions() -> Result<()> { - let max_active_sessions = 2; let conf = ConfigBuilder::create() - .max_active_sessions(max_active_sessions) + .max_active_sessions(2) + .off_log() .build(); let _fixture = TestFixture::setup_with_config(&conf).await?; let ep = create_endpoint()?; @@ -542,6 +542,7 @@ async fn test_active_sessions() -> Result<()> { .into_iter() .map(|(_status, resp)| (resp.error.map(|e| e.message).unwrap_or_default())) .collect::>(); + _fixture.keep_alive(); results.sort(); let msg = "[HTTP-QUERY] Failed to upgrade session: Current active sessions (2) has exceeded the max_active_sessions limit (2)"; let expect = vec!["", "", msg]; @@ -1757,19 +1758,21 @@ async fn test_has_result_set() -> Result<()> { #[tokio::test(flavor = "current_thread")] async fn test_max_size_per_page() -> Result<()> { - let _fixture = TestFixture::setup().await?; + let _fixture = + TestFixture::setup_with_config(&ConfigBuilder::create().off_log().config()).await?; let sql = "select repeat('1', 1000) as a, repeat('2', 1000) from numbers(10000)"; let wait_time_secs = 5; let json = serde_json::json!({"sql": sql.to_string(), "pagination": {"wait_time_secs": wait_time_secs}}); let (_, reply, body) = TestHttpQueryRequest::new(json).fetch_begin().await?; assert!(reply.error.is_none(), "{:?}", reply.error); - let len = body.len() as i32; - let target = 20_080_000; - assert!(len > target); - assert!(len < target + 2000); - assert_eq!(reply.data.len(), 10_000); - assert_eq!(reply.data[0].len(), 2); + let target = (10_usize * 1024 * 1024) as f64; + assert!( + (0.9..1.1).contains(&(body.len() as f64 / target)), + "body len {} rows {}", + body.len(), + reply.data.len() + ); Ok(()) } diff --git a/src/query/service/tests/it/servers/http/json_block.rs b/src/query/service/tests/it/servers/http/json_block.rs index af6ca5821f2bd..ae6ddc4f61b95 100644 --- a/src/query/service/tests/it/servers/http/json_block.rs +++ b/src/query/service/tests/it/servers/http/json_block.rs @@ -22,7 +22,7 @@ use databend_common_expression::types::DateType; use databend_common_expression::types::StringType; use databend_common_expression::FromData; use databend_common_io::prelude::FormatSettings; -use databend_query::servers::http::v1::BlocksSerializer; +use databend_query::servers::http::v1::BlocksCollector; use pretty_assertions::assert_eq; fn test_data_block(is_nullable: bool) -> Result<()> { @@ -42,8 +42,9 @@ fn test_data_block(is_nullable: bool) -> Result<()> { } let format = FormatSettings::default(); - let mut serializer = BlocksSerializer::new(Some(format)); - serializer.append(columns, 3); + let mut collector = BlocksCollector::new(); + collector.append_columns(columns, 3); + let serializer = collector.into_serializer(format); let expect = [ vec!["1", "a", "1", "1.1", "1970-01-02"], vec!["2", "b", "1", "2.2", "1970-01-03"], @@ -73,7 +74,8 @@ fn test_data_block_not_nullable() -> Result<()> { #[test] fn test_empty_block() -> Result<()> { let format = FormatSettings::default(); - let serializer = BlocksSerializer::new(Some(format)); + let collector = BlocksCollector::new(); + let serializer = collector.into_serializer(format); assert!(serializer.is_empty()); Ok(()) } diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index c1853fc5db347..5205204fbe2d9 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -689,6 +689,20 @@ impl DefaultSettings { scope: SettingScope::Both, range: Some(SettingRange::Numeric(0..=u64::MAX)), }), + ("result_set_spilling_to_disk_bytes_limit", DefaultSettingValue { + value: UserSettingValue::UInt64(0), + desc: "Sets the maximum amount of local disk in bytes that result set can use before spilling data to storage during one query execution.", + mode: SettingMode::Both, + scope: SettingScope::Both, + range: Some(SettingRange::Numeric(0..=u64::MAX)), + }), + ("enable_result_set_spilling", DefaultSettingValue { + value: UserSettingValue::UInt64(0), + desc: "Enable spilling result set data to storage when memory usage exceeds the threshold.", + mode: SettingMode::Both, + scope: SettingScope::Both, + range: Some(SettingRange::Numeric(0..=1)), + }), ("sort_spilling_batch_bytes", DefaultSettingValue { value: UserSettingValue::UInt64(20 * 1024 * 1024), desc: "Sets the uncompressed size that merge sorter will spill to storage", diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index a77d989cc695f..b1512c1c6b953 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -520,6 +520,14 @@ impl Settings { Ok(self.try_get_u64("sort_spilling_to_disk_bytes_limit")? as usize) } + pub fn get_result_set_spilling_to_disk_bytes_limit(&self) -> Result { + Ok(self.try_get_u64("result_set_spilling_to_disk_bytes_limit")? as usize) + } + + pub fn get_enable_result_set_spilling(&self) -> Result { + Ok(self.try_get_u64("enable_result_set_spilling")? == 1) + } + pub fn get_enable_shuffle_sort(&self) -> Result { Ok(self.try_get_u64("enable_shuffle_sort")? == 1) } diff --git a/tests/nox/suites/1_stateful/09_http_handler/test_09_0013_result_spill.py b/tests/nox/suites/1_stateful/09_http_handler/test_09_0013_result_spill.py new file mode 100644 index 0000000000000..248cd186bcd7c --- /dev/null +++ b/tests/nox/suites/1_stateful/09_http_handler/test_09_0013_result_spill.py @@ -0,0 +1,101 @@ +import requests +import time + +auth = ("root", "") + + +def do_query(query, session, pagination): + url = f"http://localhost:8000/v1/query" + payload = { + "sql": query, + } + if session: + payload["session"] = session + if pagination: + payload["pagination"] = pagination + headers = { + "Content-Type": "application/json", + } + + response = requests.post(url, headers=headers, json=payload, auth=auth) + return response.json() + + +def test_disable_result_spill(): + session = { + "settings": { + "enable_result_set_spilling": "0", + "max_block_size": "10", + "max_threads": "2", + } + } + pagination = { + "wait_time_secs": 2, + "max_rows_in_buffer": 4, + "max_rows_per_page": 4, + } + resp = do_query("select * from numbers(97)", session=session, pagination=pagination) + assert resp["error"] == None + + # print(resp["state"]) + # print(resp["stats"]) + + rows = len(resp["data"]) + for _ in range(30): + if resp.get("next_uri") == None: + break + + uri = f"http://localhost:8000/{resp['next_uri']}" + resp = requests.get(uri, auth=auth).json() + cur_rows = len(resp["data"]) + assert resp["error"] == None, resp + if rows < 96: + assert cur_rows == 4 + rows += cur_rows + + # print(resp["state"]) + # print(resp["stats"]) + + # get same page again + assert requests.get(uri, auth=auth).json()["error"] == None, resp + + assert rows == 97 + + +def test_enable_result_spill(): + session = { + "settings": { + "enable_result_set_spilling": "1", + "max_block_size": "10", + "max_threads": "2", + } + } + pagination = { + "wait_time_secs": 2, + "max_rows_in_buffer": 4, + "max_rows_per_page": 4, + } + resp = do_query("select * from numbers(97)", session=session, pagination=pagination) + assert resp["error"] == None + + time.sleep(1) + + rows = len(resp["data"]) + for _ in range(30): + if resp.get("next_uri") == None: + break + + uri = f"http://localhost:8000/{resp['next_uri']}" + resp = requests.get(uri, auth=auth).json() + cur_rows = len(resp["data"]) + assert resp["error"] == None, resp + if rows < 96: + assert cur_rows == 4 + rows += cur_rows + + assert resp["state"] == "Succeeded" + + # get same page again + assert requests.get(uri, auth=auth).json()["error"] == None, resp + + assert rows == 97 diff --git a/tests/suites/0_stateless/01_transaction/01_03_issue_18705.py b/tests/suites/0_stateless/01_transaction/01_03_issue_18705.py index c7f8bcf08606f..1646e234cab25 100755 --- a/tests/suites/0_stateless/01_transaction/01_03_issue_18705.py +++ b/tests/suites/0_stateless/01_transaction/01_03_issue_18705.py @@ -6,7 +6,9 @@ if __name__ == "__main__": # Session 1: # Insert into empty table - mdb = mysql.connector.connect(host="127.0.0.1", user="root", passwd="root", port="3307") + mdb = mysql.connector.connect( + host="127.0.0.1", user="root", passwd="root", port="3307" + ) mycursor = mdb.cursor() mycursor.execute("create or replace table t_18705(c int)") mycursor.fetchall() @@ -17,9 +19,13 @@ # Session 2: # Alter table in another session, so that the new table after alter operation will still be empty - mydb_alter_tbl = mysql.connector.connect(host="127.0.0.1", user="root", passwd="root", port="3307") + mydb_alter_tbl = mysql.connector.connect( + host="127.0.0.1", user="root", passwd="root", port="3307" + ) mycursor_alter_tbl = mydb_alter_tbl.cursor() - mycursor_alter_tbl.execute("alter table t_18705 SET OPTIONS (block_per_segment = 500)") + mycursor_alter_tbl.execute( + "alter table t_18705 SET OPTIONS (block_per_segment = 500)" + ) mycursor_alter_tbl.fetchall() # Session 1: