Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
b00e46c
refine
forsaken628 Sep 2, 2025
65c792e
cleanup
forsaken628 Sep 2, 2025
956d943
x
forsaken628 Sep 2, 2025
4c39dce
x
forsaken628 Sep 3, 2025
3d25d50
SpillableBlock 1
forsaken628 Sep 3, 2025
0f3fceb
x
forsaken628 Sep 3, 2025
6d9f26c
x
forsaken628 Sep 3, 2025
3e8c274
SpillerRef
forsaken628 Sep 3, 2025
b381fd9
fix
forsaken628 Sep 3, 2025
f0666e1
move
forsaken628 Sep 4, 2025
0aa3c6b
apply_settings
forsaken628 Sep 4, 2025
e57c659
settings
forsaken628 Sep 4, 2025
db1216f
fix
forsaken628 Sep 4, 2025
4014c07
fix
forsaken628 Sep 4, 2025
12ea792
fix
forsaken628 Sep 4, 2025
44b8ac6
fix
forsaken628 Sep 5, 2025
0242a0a
PortableSpiller
forsaken628 Sep 5, 2025
19b90b7
x
forsaken628 Sep 8, 2025
a9ab260
x
forsaken628 Sep 8, 2025
35ee153
x
forsaken628 Sep 9, 2025
c0abd04
x
forsaken628 Sep 9, 2025
a25beca
fix
forsaken628 Sep 9, 2025
c38d785
test
forsaken628 Sep 9, 2025
7219712
Merge remote-tracking branch 'up/main' into result-spill
forsaken628 Sep 9, 2025
5e9e739
disable
forsaken628 Sep 9, 2025
0da4d90
fix
forsaken628 Sep 9, 2025
8e38064
fix
forsaken628 Sep 9, 2025
7367cf0
Merge branch 'main' into result-spill
forsaken628 Sep 10, 2025
5e80acd
Merge remote-tracking branch 'up/main' into aaaa
forsaken628 Sep 12, 2025
1255615
test
forsaken628 Sep 11, 2025
e786b5e
test
forsaken628 Sep 12, 2025
7597171
update
forsaken628 Sep 12, 2025
8827b71
format
forsaken628 Sep 12, 2025
088a91a
doc
forsaken628 Sep 12, 2025
d8217a5
test
forsaken628 Sep 12, 2025
4f398ee
feat(query): add comprehensive logging for result set spilling operat…
BohuTANG Sep 13, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src/query/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ databend-storages-common-stage = { workspace = true }
databend-storages-common-table-meta = { workspace = true }
derive-visitor = { workspace = true }
dyn-clone = { workspace = true }
either = { workspace = true }
enum-as-inner = { workspace = true }
ethnum = { workspace = true }
fastrace = { workspace = true }
Expand Down Expand Up @@ -188,6 +189,7 @@ maplit = { workspace = true }
mysql_async = { workspace = true }
p256 = { workspace = true }
pretty_assertions = { workspace = true }
proptest = { workspace = true }
reqwest = { workspace = true }
serde_json.workspace = true
serde_yaml = { workspace = true }
Expand Down
11 changes: 5 additions & 6 deletions src/query/service/src/servers/http/v1/http_query_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ use databend_common_exception::ErrorCode;
use databend_common_expression::DataSchemaRef;
use databend_common_management::WorkloadGroupResourceManager;
use databend_common_metrics::http::metrics_incr_http_response_errors_count;
use fastrace::func_path;
use fastrace::prelude::*;
use http::HeaderMap;
use http::HeaderValue;
Expand Down Expand Up @@ -296,7 +295,7 @@ async fn query_final_handler(
Path(query_id): Path<String>,
) -> PoemResult<impl IntoResponse> {
ctx.check_node_id(&query_id)?;
let root = get_http_tracing_span(func_path!(), ctx, &query_id);
let root = get_http_tracing_span("http::query_final_handler", ctx, &query_id);
let _t = SlowRequestLogTracker::new(ctx);
async {
info!(
Expand Down Expand Up @@ -337,7 +336,7 @@ async fn query_cancel_handler(
Path(query_id): Path<String>,
) -> PoemResult<impl IntoResponse> {
ctx.check_node_id(&query_id)?;
let root = get_http_tracing_span(func_path!(), ctx, &query_id);
let root = get_http_tracing_span("http::query_cancel_handler", ctx, &query_id);
let _t = SlowRequestLogTracker::new(ctx);
async {
info!(
Expand Down Expand Up @@ -369,7 +368,7 @@ async fn query_state_handler(
Path(query_id): Path<String>,
) -> PoemResult<Response> {
ctx.check_node_id(&query_id)?;
let root = get_http_tracing_span(func_path!(), ctx, &query_id);
let root = get_http_tracing_span("http::query_state_handler", ctx, &query_id);

async {
let http_query_manager = HttpQueryManager::instance();
Expand Down Expand Up @@ -457,7 +456,7 @@ async fn query_page_handler(
};

let query_page_handle = {
let root = get_http_tracing_span(func_path!(), ctx, &query_id);
let root = get_http_tracing_span("http::query_page_handler", ctx, &query_id);
let _t = SlowRequestLogTracker::new(ctx);
query_page_handle.in_span(root)
};
Expand Down Expand Up @@ -556,7 +555,7 @@ pub(crate) async fn query_handler(
};

let query_handle = {
let root = get_http_tracing_span(func_path!(), ctx, &ctx.query_id);
let root = get_http_tracing_span("http::query_handler", ctx, &ctx.query_id);
let _t = SlowRequestLogTracker::new(ctx);
query_handle.in_span(root)
};
Expand Down
1 change: 1 addition & 0 deletions src/query/service/src/servers/http/v1/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub use http_query_handlers::query_route;
pub use http_query_handlers::QueryResponse;
pub use http_query_handlers::QueryResponseField;
pub use http_query_handlers::QueryStats;
pub use query::blocks_serializer::BlocksCollector;
pub use query::blocks_serializer::BlocksSerializer;
pub use query::ExecuteStateKind;
pub use query::ExpiringMap;
Expand Down
57 changes: 38 additions & 19 deletions src/query/service/src/servers/http/v1/query/blocks_serializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ use std::ops::DerefMut;
use databend_common_expression::types::date::date_to_string;
use databend_common_expression::types::interval::interval_to_string;
use databend_common_expression::types::timestamp::timestamp_to_string;
use databend_common_expression::BlockEntry;
use databend_common_expression::Column;
use databend_common_expression::DataBlock;
use databend_common_formats::field_encoder::FieldEncoderValues;
use databend_common_io::ewkb_to_geo;
use databend_common_io::geo_to_ewkb;
Expand All @@ -40,38 +42,55 @@ fn data_is_null(column: &Column, row_index: usize) -> bool {
}
}

#[derive(Debug, Clone)]
pub struct BlocksSerializer {
#[derive(Debug, Clone, Default)]
pub struct BlocksCollector {
// Vec<Column> for a Block
columns: Vec<(Vec<Column>, usize)>,
pub(crate) format: Option<FormatSettings>,
}

impl BlocksSerializer {
pub fn empty() -> Self {
Self {
columns: vec![],
format: None,
}
impl BlocksCollector {
pub fn new() -> Self {
Self { columns: vec![] }
}

pub fn new(format: Option<FormatSettings>) -> Self {
Self {
columns: vec![],
format,
pub fn append_columns(&mut self, columns: Vec<Column>, num_rows: usize) {
self.columns.push((columns, num_rows));
}

pub fn append_block(&mut self, block: DataBlock) {
if block.is_empty() {
return;
}
let columns = block.columns().iter().map(BlockEntry::to_column).collect();
let num_rows = block.num_rows();
self.append_columns(columns, num_rows);
}

pub fn has_format(&self) -> bool {
self.format.is_some()
pub fn num_rows(&self) -> usize {
self.columns.iter().map(|(_, num_rows)| *num_rows).sum()
}

pub fn set_format(&mut self, format: FormatSettings) {
self.format = Some(format);
pub fn into_serializer(self, format: FormatSettings) -> BlocksSerializer {
BlocksSerializer {
columns: self.columns,
format: Some(format),
}
}
}

pub fn append(&mut self, columns: Vec<Column>, num_rows: usize) {
self.columns.push((columns, num_rows));
#[derive(Debug, Clone)]
pub struct BlocksSerializer {
// Vec<Column> for a Block
columns: Vec<(Vec<Column>, usize)>,
format: Option<FormatSettings>,
}

impl BlocksSerializer {
pub fn empty() -> Self {
Self {
columns: vec![],
format: None,
}
}

pub fn is_empty(&self) -> bool {
Expand Down
158 changes: 106 additions & 52 deletions src/query/service/src/servers/http/v1/query/execute_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ use databend_common_exception::ResultExt;
use databend_common_expression::DataBlock;
use databend_common_expression::DataSchemaRef;
use databend_common_expression::Scalar;
use databend_common_io::prelude::FormatSettings;
use databend_common_settings::Settings;
use databend_common_storage::DataOperator;
use databend_storages_common_cache::TempDirManager;
use databend_storages_common_session::TxnManagerRef;
use futures::StreamExt;
use log::debug;
Expand All @@ -49,6 +50,12 @@ use crate::sessions::QueryAffect;
use crate::sessions::QueryContext;
use crate::sessions::Session;
use crate::sessions::TableContext;
use crate::spillers::LiteSpiller;
use crate::spillers::SpillerConfig;
use crate::spillers::SpillerDiskConfig;
use crate::spillers::SpillerType;

type Sender = SizedChannelSender<LiteSpiller>;

pub struct ExecutionError;

Expand Down Expand Up @@ -114,7 +121,7 @@ impl ExecuteState {

pub struct ExecuteStarting {
pub(crate) ctx: Arc<QueryContext>,
pub(crate) sender: SizedChannelSender<DataBlock>,
pub(crate) sender: Option<Sender>,
}

pub struct ExecuteRunning {
Expand Down Expand Up @@ -355,8 +362,7 @@ impl ExecuteState {
sql: String,
session: Arc<Session>,
ctx: Arc<QueryContext>,
block_sender: SizedChannelSender<DataBlock>,
format_settings: Arc<parking_lot::RwLock<Option<FormatSettings>>>,
mut block_sender: Sender,
) -> Result<(), ExecutionError> {
let make_error = || format!("failed to start query: {sql}");

Expand All @@ -367,11 +373,8 @@ impl ExecuteState {
.await
.map_err(|err| err.display_with_sql(&sql))
.with_context(make_error)?;
{
// set_var may change settings
let mut guard = format_settings.write();
*guard = Some(ctx.get_format_settings().with_context(make_error)?);
}

Self::apply_settings(&ctx, &mut block_sender).with_context(make_error)?;

let interpreter = InterpreterFactory::get(ctx.clone(), &plan)
.await
Expand All @@ -397,7 +400,7 @@ impl ExecuteState {
let ctx_clone = ctx.clone();
let block_sender_closer = block_sender.closer();

let res = execute(
let res = Self::execute(
interpreter,
plan.schema(),
ctx_clone,
Expand All @@ -407,60 +410,111 @@ impl ExecuteState {
match CatchUnwindFuture::create(res).await {
Ok(Err(err)) => {
Executor::stop(&executor_clone, Err(err.clone()));
block_sender_closer.close();
block_sender_closer.abort();
}
Err(e) => {
Executor::stop(&executor_clone, Err(e));
block_sender_closer.close();
block_sender_closer.abort();
}
_ => {}
}

Ok(())
}
}

async fn execute(
interpreter: Arc<dyn Interpreter>,
schema: DataSchemaRef,
ctx: Arc<QueryContext>,
block_sender: SizedChannelSender<DataBlock>,
executor: Arc<Mutex<Executor>>,
) -> Result<(), ExecutionError> {
let make_error = || format!("failed to execute {}", interpreter.name());

let mut data_stream = interpreter
.execute(ctx.clone())
.await
.with_context(make_error)?;
match data_stream.next().await {
None => {
let block = DataBlock::empty_with_schema(schema);
block_sender.send(block, 0).await;
Executor::stop::<()>(&executor, Ok(()));
block_sender.close();
}
Some(Err(err)) => {
Executor::stop(&executor, Err(err));
block_sender.close();
#[fastrace::trace(name = "ExecuteState::execute")]
async fn execute(
interpreter: Arc<dyn Interpreter>,
schema: DataSchemaRef,
ctx: Arc<QueryContext>,
mut sender: Sender,
executor: Arc<Mutex<Executor>>,
) -> Result<(), ExecutionError> {
let make_error = || format!("failed to execute {}", interpreter.name());

let mut data_stream = interpreter
.execute(ctx.clone())
.await
.with_context(make_error)?;
match data_stream.next().await {
None => {
Self::send_data_block(&mut sender, &executor, DataBlock::empty_with_schema(schema))
.await
.with_context(make_error)?;
Executor::stop::<()>(&executor, Ok(()));
sender.finish();
}
Some(Err(err)) => {
Executor::stop(&executor, Err(err));
sender.abort();
}
Some(Ok(block)) => {
Self::send_data_block(&mut sender, &executor, block)
.await
.with_context(make_error)?;
while let Some(block) = data_stream.next().await {
match block {
Ok(block) => {
Self::send_data_block(&mut sender, &executor, block)
.await
.with_context(make_error)?;
}
Err(err) => {
sender.abort();
return Err(err.with_context(make_error()));
}
};
}
Executor::stop::<()>(&executor, Ok(()));
sender.finish();
}
}
Some(Ok(block)) => {
let size = block.num_rows();
block_sender.send(block, size).await;
while let Some(block_r) = data_stream.next().await {
match block_r {
Ok(block) => {
block_sender.send(block.clone(), block.num_rows()).await;
}
Err(err) => {
block_sender.close();
return Err(err.with_context(make_error()));
}
};
Ok(())
}

async fn send_data_block(
sender: &mut Sender,
executor: &Arc<Mutex<Executor>>,
block: DataBlock,
) -> Result<bool> {
match sender.send(block).await {
Ok(ok) => Ok(ok),
Err(err) => {
Executor::stop(executor, Err(err.clone()));
sender.abort();
Err(err)
}
Executor::stop::<()>(&executor, Ok(()));
block_sender.close();
}
}
Ok(())

fn apply_settings(ctx: &Arc<QueryContext>, block_sender: &mut Sender) -> Result<()> {
let settings = ctx.get_settings();

let spiller = if settings.get_enable_result_set_spilling()? {
let temp_dir_manager = TempDirManager::instance();
let disk_bytes_limit = settings.get_result_set_spilling_to_disk_bytes_limit()?;
let enable_dio = settings.get_enable_dio()?;
let disk_spill = temp_dir_manager
.get_disk_spill_dir(disk_bytes_limit, &ctx.get_id())
.map(|temp_dir| SpillerDiskConfig::new(temp_dir, enable_dio))
.transpose()?;

let location_prefix = ctx.query_id_spill_prefix();
let config = SpillerConfig {
spiller_type: SpillerType::ResultSet,
location_prefix,
disk_spill,
use_parquet: settings.get_spilling_file_format()?.is_parquet(),
};
let op = DataOperator::instance().spill_operator();
Some(LiteSpiller::new(op, config)?)
} else {
None
};

// set_var may change settings
let format_settings = ctx.get_format_settings()?;
block_sender.plan_ready(format_settings, spiller);
Ok(())
}
}
Loading