Skip to content

Commit

Permalink
tmp for debug
Browse files Browse the repository at this point in the history
  • Loading branch information
youngsofun committed Mar 3, 2022
1 parent 3a98f1f commit b0aea47
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 2 deletions.
5 changes: 4 additions & 1 deletion query/src/servers/http/v1/query/execute_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,13 +200,16 @@ async fn execute(
while let Some(block_r) = data_stream.next().await {
match block_r {
Ok(block) => tokio::select! {
_ = block_tx.send(block) => { },
_ = block_tx.send(block) => {
println!("send done, {:?}", std::time::SystemTime::now());
},
_ = abort_rx.recv() => {
return Err(ErrorCode::AbortedQuery("aborted"))
},
},
Err(err) => return Err(err),
};
}
println!("stream done, {:?}", std::time::SystemTime::now());
Ok(())
}
2 changes: 2 additions & 0 deletions query/src/servers/http/v1/query/http_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ impl Default for PaginationConf {
impl PaginationConf {
pub(crate) fn get_wait_type(&self) -> Wait {
let t = self.wait_time_secs;
println!("wait {}, {:?}", t, std::time::SystemTime::now());
match t.cmp(&0) {
Ordering::Greater => Wait::Deadline(Instant::now() + Duration::from_secs(t as u64)),
Ordering::Equal => Wait::Async,
Expand Down Expand Up @@ -163,6 +164,7 @@ impl HttpQuery {

async fn get_state(&self) -> ResponseState {
let state = self.state.read().await;
println!("get state, {:?}", std::time::SystemTime::now());
let (exe_state, err) = state.state.extract();
let wall_time_ms = state.elapsed().as_millis();
ResponseState {
Expand Down
10 changes: 9 additions & 1 deletion query/src/servers/http/v1/query/result_data_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ impl ResultDataManager {
Async => block_rx.try_recv(),
Sync => block_rx.recv().await.ok_or(TryRecvError::Disconnected),
Deadline(t) => {
println!("sleep {:?}", *t - std::time::Instant::now());
let sleep = tokio::time::sleep_until(tokio::time::Instant::from_std(*t));
tokio::select! {
biased;
Expand All @@ -133,23 +134,30 @@ impl ResultDataManager {

let mut end = false;
loop {
println!("try get block, {:?}", std::time::SystemTime::now());
match ResultDataManager::receive(block_rx, tp).await {
Ok(block) => {
println!("got block, {:?}", std::time::SystemTime::now());
rows += block.num_rows();
results.push(block_to_json(&block).unwrap());
// TODO(youngsofun): set it in post if needed
if rows >= TARGET_ROWS_PER_PAGE {
break;
}
}
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Empty) => {
println!("got empty, {:?}", std::time::SystemTime::now());
break;
}
Err(TryRecvError::Disconnected) => {
println!("got disconnect, {:?}", std::time::SystemTime::now());
tracing::debug!("no more data");
end = true;
break;
}
}
}
println!("get page done, {:?}", std::time::SystemTime::now());
(results.concat(), end)
}
}
1 change: 1 addition & 0 deletions query/tests/it/servers/http/http_query_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ type EndpointType = HTTPSessionEndpoint<Route>;
async fn test_simple_sql() -> Result<()> {
let sql = "select * from system.tables limit 10";
let (status, result) = post_sql(sql, 1).await?;
print!("{:?}", result);
assert_eq!(status, StatusCode::OK, "{:?}", result);
assert!(result.error.is_none(), "{:?}", result.error);
assert_eq!(result.data.len(), 10);
Expand Down

0 comments on commit b0aea47

Please sign in to comment.