Skip to content

Commit

Permalink
fix(cluster): update
Browse files Browse the repository at this point in the history
  • Loading branch information
zhang2014 committed Mar 10, 2023
1 parent cbd9830 commit 5922bf9
Showing 1 changed file with 5 additions and 1 deletion.
6 changes: 5 additions & 1 deletion src/query/service/src/api/rpc/flight_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ impl FlightExchange {
common_base::base::tokio::spawn({
let response_tx = response_tx.clone();
async move {
let mut send_closing_input = false;
let mut send_closing_output = false;
let mut futures = Vec::<BoxFuture<'static, _>>::new();

Expand Down Expand Up @@ -211,7 +212,10 @@ impl FlightExchange {
Ok(message) if DataPacket::is_closing_output(&message) => {
if !tx.is_closed() {
tx.close();
}

if !send_closing_input {
send_closing_input = true;
// create new future send packet to remote for avoid blocking recv data
futures.push(Box::pin(common_base::base::tokio::spawn({
let f = f.clone();
Expand Down Expand Up @@ -248,7 +252,7 @@ impl FlightExchange {
}
};

if tx.is_closed() && send_closing_output {
if send_closing_input && send_closing_output {
break 'worker;
}
}
Expand Down

0 comments on commit 5922bf9

Please sign in to comment.