diff --git a/query/src/servers/http/v1/query/execute_state.rs b/query/src/servers/http/v1/query/execute_state.rs index 7f600b19db67..9f8bf3a87a57 100644 --- a/query/src/servers/http/v1/query/execute_state.rs +++ b/query/src/servers/http/v1/query/execute_state.rs @@ -158,9 +158,6 @@ impl ExecuteState { .await .map_err(|e| tracing::error!("interpreter.start.error: {:?}", e)); - let data_stream = interpreter.execute(None).await?; - let mut data_stream = ctx.try_create_abortable(data_stream)?; - let (abort_tx, mut abort_rx) = mpsc::channel(2); ctx.attach_http_query(HttpQueryHandle { abort_sender: abort_tx, @@ -177,31 +174,43 @@ impl ExecuteState { })); let executor_clone = executor.clone(); - ctx - .try_spawn(async move { - loop { - if let Some(block_r) = data_stream.next().await { - match block_r { - Ok(block) => tokio::select! { - _ = block_tx.send(block) => { }, - _ = abort_rx.recv() => { - Executor::stop(&executor, Err(ErrorCode::AbortedQuery("query aborted")), true).await; - break; - }, - }, - Err(err) => { - Executor::stop(&executor, Err(err), false).await; - break; - } - }; - } else { - Executor::stop(&executor, Ok(()), false).await; - break; - } + let ctx_clone = ctx.clone(); + ctx.try_spawn(async move { + // drop/close block_tx after calling Executor::stop + // so handler task can get newest state before return + // otherwise the handler task and this task may competing for the executor lock + let block_tx_clone = block_tx.clone(); + match execute(interpreter, ctx_clone, block_tx_clone, &mut abort_rx).await { + Ok(_) => Executor::stop(&executor_clone, Ok(()), false).await, + Err(err) => { + let kill = err.message().starts_with("aborted"); + Executor::stop(&executor_clone, Err(err), kill).await } - tracing::debug!("drop block sender!"); - })?; + }; + })?; + + Ok((executor, schema)) + } +} - Ok((executor_clone, schema)) +async fn execute( + interpreter: Arc, + ctx: Arc, + block_tx: mpsc::Sender, + abort_rx: &mut mpsc::Receiver<()>, +) -> Result<()> { + let data_stream = interpreter.execute(None).await?; + let mut data_stream = ctx.try_create_abortable(data_stream)?; + while let Some(block_r) = data_stream.next().await { + match block_r { + Ok(block) => tokio::select! { + _ = block_tx.send(block) => { }, + _ = abort_rx.recv() => { + return Err(ErrorCode::AbortedQuery("aborted")) + }, + }, + Err(err) => return Err(err), + }; } + Ok(()) } diff --git a/query/src/servers/http/v1/query/http_query.rs b/query/src/servers/http/v1/query/http_query.rs index 3f6b5dfd4961..a7728a135f02 100644 --- a/query/src/servers/http/v1/query/http_query.rs +++ b/query/src/servers/http/v1/query/http_query.rs @@ -37,6 +37,7 @@ use crate::servers::http::v1::query::Wait; use crate::sessions::SessionManager; #[derive(Deserialize, Debug)] +#[serde(deny_unknown_fields)] pub struct HttpQueryRequest { #[serde(default)] pub session: HttpSessionConf, diff --git a/query/tests/it/servers/http/http_query_handlers.rs b/query/tests/it/servers/http/http_query_handlers.rs index fb1c1cc68254..1221f7161ef3 100644 --- a/query/tests/it/servers/http/http_query_handlers.rs +++ b/query/tests/it/servers/http/http_query_handlers.rs @@ -77,7 +77,7 @@ async fn test_simple_sql() -> Result<()> { assert_eq!(status, StatusCode::OK, "{:?}", result); assert!(result.error.is_none(), "{:?}", result.error); assert_eq!(result.data.len(), 10); - assert_eq!(result.state, ExecuteStateName::Succeeded); + assert_eq!(result.state, ExecuteStateName::Succeeded, "{:?}", result); assert!(result.next_uri.is_none(), "{:?}", result); assert!(result.stats.progress.is_some()); assert!(result.schema.is_some());