Skip to content

Commit

Permalink
fix httphandler async query.
Browse files Browse the repository at this point in the history
  • Loading branch information
youngsofun committed Mar 3, 2022
1 parent 6dc7406 commit 41bbd2c
Showing 1 changed file with 33 additions and 27 deletions.
60 changes: 33 additions & 27 deletions query/src/servers/http/v1/query/execute_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,6 @@ impl ExecuteState {
.await
.map_err(|e| tracing::error!("interpreter.start.error: {:?}", e));

let data_stream = interpreter.execute(None).await?;
let mut data_stream = ctx.try_create_abortable(data_stream)?;

let (abort_tx, mut abort_rx) = mpsc::channel(2);
ctx.attach_http_query(HttpQueryHandle {
abort_sender: abort_tx,
Expand All @@ -177,31 +174,40 @@ impl ExecuteState {
}));

let executor_clone = executor.clone();
ctx
.try_spawn(async move {
loop {
if let Some(block_r) = data_stream.next().await {
match block_r {
Ok(block) => tokio::select! {
_ = block_tx.send(block) => { },
_ = abort_rx.recv() => {
Executor::stop(&executor, Err(ErrorCode::AbortedQuery("query aborted")), true).await;
break;
},
},
Err(err) => {
Executor::stop(&executor, Err(err), false).await;
break;
}
};
} else {
Executor::stop(&executor, Ok(()), false).await;
break;
}
let ctx_clone = ctx.clone();
ctx.try_spawn(async move {
match execute(interpreter, ctx_clone, block_tx, &mut abort_rx).await {
Ok(_) => Executor::stop(&executor_clone, Ok(()), false).await,
Err(err) => {
let kill = err.message().starts_with("aborted");
Executor::stop(&executor_clone, Err(err), kill).await
}
tracing::debug!("drop block sender!");
})?;
};
tracing::debug!("drop block sender!");
})?;

Ok((executor, schema))
}
}

Ok((executor_clone, schema))
async fn execute(
interpreter: Arc<dyn Interpreter>,
ctx: Arc<QueryContext>,
block_tx: mpsc::Sender<DataBlock>,
abort_rx: &mut mpsc::Receiver<()>,
) -> Result<()> {
let data_stream = interpreter.execute(None).await?;
let mut data_stream = ctx.try_create_abortable(data_stream)?;
while let Some(block_r) = data_stream.next().await {
match block_r {
Ok(block) => tokio::select! {
_ = block_tx.send(block) => { },
_ = abort_rx.recv() => {
return Err(ErrorCode::AbortedQuery("aborted"))
},
},
Err(err) => return Err(err),
};
}
Ok(())
}

0 comments on commit 41bbd2c

Please sign in to comment.