Skip to content

Commit

Permalink
fix(cluster): make lint
Browse files Browse the repository at this point in the history
  • Loading branch information
zhang2014 committed Mar 10, 2023
1 parent 449b753 commit cbd9830
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 58 deletions.
7 changes: 1 addition & 6 deletions src/query/service/src/api/rpc/exchange/exchange_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,7 @@ impl DataExchangeManager {
targets_exchanges.insert(
(connection_info.target.id.clone(), *fragment),
flight_client
.do_exchange(
&packet.query_id,
source,
&connection_info.target.id,
*fragment,
)
.do_exchange(&packet.query_id, source, *fragment)
.await?,
);
}
Expand Down
41 changes: 3 additions & 38 deletions src/query/service/src/api/rpc/flight_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
use std::convert::TryInto;
use std::error::Error;
use std::io::ErrorKind;
use std::pin::Pin;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
Expand All @@ -32,15 +31,13 @@ use common_exception::ErrorCode;
use common_exception::Result;
use futures_util::future::BoxFuture;
use futures_util::future::Either;
use futures_util::future::JoinAll;
use futures_util::StreamExt;
use parking_lot::Mutex;
use tonic::transport::channel::Channel;
use tonic::Request;
use tonic::Status;
use tonic::Streaming;
use tracing::error;
use tracing::info;

use crate::api::rpc::flight_actions::FlightAction;
use crate::api::rpc::packets::DataPacket;
Expand All @@ -67,9 +64,6 @@ impl FlightClient {
pub async fn request_server_exchange(&mut self, query_id: &str) -> Result<FlightExchange> {
let (tx, rx) = async_channel::bounded(8);
Ok(FlightExchange::from_client(
query_id.to_string(),
String::from("request_server_exchange"),
String::from("request_server_exchange"),
tx,
self.exchange_streaming(
RequestBuilder::create(Box::pin(rx))
Expand All @@ -85,14 +79,10 @@ impl FlightClient {
&mut self,
query_id: &str,
source: &str,
target: &str,
fragment_id: usize,
) -> Result<FlightExchange> {
let (tx, rx) = async_channel::bounded(8);
Ok(FlightExchange::from_client(
query_id.to_string(),
fragment_id.to_string(),
target.to_string(),
tx,
self.exchange_streaming(
RequestBuilder::create(Box::pin(rx))
Expand Down Expand Up @@ -147,24 +137,13 @@ pub enum FlightExchange {

impl FlightExchange {
pub fn from_server(
query_id: String,
fragment_id: String,
endpoint: String,
streaming: Request<Streaming<FlightData>>,
response_tx: Sender<Result<FlightData, Status>>,
) -> FlightExchange {
let streaming = streaming.into_inner();
let state = Arc::new(ChannelState::create());
let f = |x| Ok(FlightData::from(x));
let (tx, rx) = Self::listen_request::<true, _>(
query_id,
fragment_id,
endpoint,
state.clone(),
response_tx.clone(),
streaming,
f,
);
let (tx, rx) = Self::listen_request(state.clone(), response_tx.clone(), streaming, f);

FlightExchange::Server(ServerFlightExchange {
state,
Expand All @@ -175,23 +154,12 @@ impl FlightExchange {
}

pub fn from_client(
query_id: String,
fragment: String,
endpoint: String,
response_tx: Sender<FlightData>,
streaming: Streaming<FlightData>,
) -> FlightExchange {
let state = Arc::new(ChannelState::create());
let f = FlightData::from;
let (tx, rx) = Self::listen_request::<false, _>(
query_id,
fragment,
endpoint,
state.clone(),
response_tx.clone(),
streaming,
f,
);
let (tx, rx) = Self::listen_request(state.clone(), response_tx.clone(), streaming, f);

FlightExchange::Client(ClientFlightExchange {
state,
Expand All @@ -201,10 +169,7 @@ impl FlightExchange {
})
}

fn listen_request<const CLOSE_CONN: bool, ResponseT: Send + 'static>(
query_id: String,
fragment: String,
endpoint: String,
fn listen_request<ResponseT: Send + 'static>(
state: Arc<ChannelState>,
network_tx: Sender<ResponseT>,
mut streaming: Streaming<FlightData>,
Expand Down
16 changes: 2 additions & 14 deletions src/query/service/src/api/rpc/flight_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,7 @@ impl FlightService for DatabendQueryFlightService {
"request_server_exchange" => {
let query_id = req.get_metadata("x-query-id")?;
let (tx, rx) = async_channel::bounded(8);
let exchange = FlightExchange::from_server(
query_id.clone(),
String::from("request_server_exchange"),
String::from("request_server_exchange"),
req,
tx,
);
let exchange = FlightExchange::from_server(req, tx);

DataExchangeManager::instance().handle_statistics_exchange(query_id, exchange)?;
Ok(RawResponse::new(Box::pin(rx)))
Expand All @@ -125,13 +119,7 @@ impl FlightService for DatabendQueryFlightService {
let fragment = req.get_metadata("x-fragment-id")?.parse::<usize>().unwrap();

let (tx, rx) = async_channel::bounded(8);
let exchange = FlightExchange::from_server(
query_id.clone(),
fragment.to_string(),
source.clone(),
req,
tx,
);
let exchange = FlightExchange::from_server(req, tx);

DataExchangeManager::instance()
.handle_exchange_fragment(query_id, source, fragment, exchange)?;
Expand Down

0 comments on commit cbd9830

Please sign in to comment.