Skip to content

Commit

Permalink
Merge pull request #4304 from youngsofun/fix
Browse files Browse the repository at this point in the history
fix httphandler async query.
  • Loading branch information
mergify[bot] authored Mar 3, 2022
2 parents 16e06e4 + 2722bc6 commit c9e9360
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 28 deletions.
63 changes: 36 additions & 27 deletions query/src/servers/http/v1/query/execute_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,6 @@ impl ExecuteState {
.await
.map_err(|e| tracing::error!("interpreter.start.error: {:?}", e));

let data_stream = interpreter.execute(None).await?;
let mut data_stream = ctx.try_create_abortable(data_stream)?;

let (abort_tx, mut abort_rx) = mpsc::channel(2);
ctx.attach_http_query(HttpQueryHandle {
abort_sender: abort_tx,
Expand All @@ -177,31 +174,43 @@ impl ExecuteState {
}));

let executor_clone = executor.clone();
ctx
.try_spawn(async move {
loop {
if let Some(block_r) = data_stream.next().await {
match block_r {
Ok(block) => tokio::select! {
_ = block_tx.send(block) => { },
_ = abort_rx.recv() => {
Executor::stop(&executor, Err(ErrorCode::AbortedQuery("query aborted")), true).await;
break;
},
},
Err(err) => {
Executor::stop(&executor, Err(err), false).await;
break;
}
};
} else {
Executor::stop(&executor, Ok(()), false).await;
break;
}
let ctx_clone = ctx.clone();
ctx.try_spawn(async move {
// drop/close block_tx after calling Executor::stop
// so handler task can get newest state before return
// otherwise the handler task and this task may competing for the executor lock
let block_tx_clone = block_tx.clone();
match execute(interpreter, ctx_clone, block_tx_clone, &mut abort_rx).await {
Ok(_) => Executor::stop(&executor_clone, Ok(()), false).await,
Err(err) => {
let kill = err.message().starts_with("aborted");
Executor::stop(&executor_clone, Err(err), kill).await
}
tracing::debug!("drop block sender!");
})?;
};
})?;

Ok((executor, schema))
}
}

Ok((executor_clone, schema))
async fn execute(
interpreter: Arc<dyn Interpreter>,
ctx: Arc<QueryContext>,
block_tx: mpsc::Sender<DataBlock>,
abort_rx: &mut mpsc::Receiver<()>,
) -> Result<()> {
let data_stream = interpreter.execute(None).await?;
let mut data_stream = ctx.try_create_abortable(data_stream)?;
while let Some(block_r) = data_stream.next().await {
match block_r {
Ok(block) => tokio::select! {
_ = block_tx.send(block) => { },
_ = abort_rx.recv() => {
return Err(ErrorCode::AbortedQuery("aborted"))
},
},
Err(err) => return Err(err),
};
}
Ok(())
}
1 change: 1 addition & 0 deletions query/src/servers/http/v1/query/http_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use crate::servers::http::v1::query::Wait;
use crate::sessions::SessionManager;

#[derive(Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub struct HttpQueryRequest {
#[serde(default)]
pub session: HttpSessionConf,
Expand Down
2 changes: 1 addition & 1 deletion query/tests/it/servers/http/http_query_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ async fn test_simple_sql() -> Result<()> {
assert_eq!(status, StatusCode::OK, "{:?}", result);
assert!(result.error.is_none(), "{:?}", result.error);
assert_eq!(result.data.len(), 10);
assert_eq!(result.state, ExecuteStateName::Succeeded);
assert_eq!(result.state, ExecuteStateName::Succeeded, "{:?}", result);
assert!(result.next_uri.is_none(), "{:?}", result);
assert!(result.stats.progress.is_some());
assert!(result.schema.is_some());
Expand Down

0 comments on commit c9e9360

Please sign in to comment.