Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(query): use custom connector for udf client #16697

Merged
merged 12 commits into from
Oct 30, 2024
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ geos = { version = "8.3", features = ["static", "geo", "geo-types"] }
geozero = { version = "0.13.0", features = ["default", "with-wkb", "with-geos", "with-geojson"] }
hashbrown = { version = "0.14.3", default-features = false }
http = "1"
hyper_v0_14 = { package = "hyper", version = "0.14" }
iceberg = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "a0e74b1412ae7ce4b857c251bcbb3179b7713fb6" }
iceberg-catalog-hms = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "a0e74b1412ae7ce4b857c251bcbb3179b7713fb6" }
iceberg-catalog-rest = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "a0e74b1412ae7ce4b857c251bcbb3179b7713fb6" }
Expand Down
2 changes: 1 addition & 1 deletion src/common/grpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ anyerror = { workspace = true }
databend-common-base = { workspace = true }
databend-common-exception = { workspace = true }
hickory-resolver = "0.24"
hyper = "0.14.20"
hyper_v0_14 = { workspace = true }
jwt-simple = "0.11.0"
log = { workspace = true }
serde = { workspace = true }
Expand Down
14 changes: 7 additions & 7 deletions src/common/grpc/src/dns_resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ use databend_common_base::runtime;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use hickory_resolver::TokioAsyncResolver;
use hyper::client::connect::dns::Name;
use hyper::client::HttpConnector;
use hyper::service::Service;
use hyper::Uri;
use hyper_v0_14::client::connect::dns::Name;
use hyper_v0_14::client::HttpConnector;
use hyper_v0_14::service::Service;
use hyper_v0_14::Uri;
use log::info;
use serde::Deserialize;
use serde::Serialize;
Expand Down Expand Up @@ -83,7 +83,7 @@ impl DNSResolver {
}

#[derive(Clone)]
struct DNSService;
pub struct DNSService;

impl Service<Name> for DNSService {
type Response = DNSServiceAddrs;
Expand All @@ -109,11 +109,11 @@ impl Service<Name> for DNSService {
}
}

struct DNSServiceFuture {
pub struct DNSServiceFuture {
inner: JoinHandle<Result<DNSServiceAddrs>>,
}

struct DNSServiceAddrs {
pub struct DNSServiceAddrs {
inner: std::vec::IntoIter<IpAddr>,
}

Expand Down
1 change: 1 addition & 0 deletions src/common/grpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub use client_conf::RpcClientConf;
pub use client_conf::RpcClientTlsConfig;
pub use dns_resolver::ConnectionFactory;
pub use dns_resolver::DNSResolver;
pub use dns_resolver::DNSService;
pub use dns_resolver::GrpcConnectionError;
pub use grpc_token::GrpcClaim;
pub use grpc_token::GrpcToken;
Expand Down
2 changes: 2 additions & 0 deletions src/query/expression/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ databend-common-ast = { workspace = true }
databend-common-base = { workspace = true }
databend-common-datavalues = { workspace = true }
databend-common-exception = { workspace = true }
databend-common-grpc = { workspace = true }
databend-common-hashtable = { workspace = true }
databend-common-io = { workspace = true }
educe = "0.4"
Expand All @@ -38,6 +39,7 @@ geo = { workspace = true }
geos = { workspace = true }
geozero = { workspace = true }
hex = "0.4.3"
hyper_v0_14 = { workspace = true }
itertools = { workspace = true }
jsonb = { workspace = true }
lexical-core = "0.8.5"
Expand Down
18 changes: 14 additions & 4 deletions src/query/expression/src/utils/udf_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ use databend_common_base::headers::HEADER_TENANT;
use databend_common_base::version::DATABEND_SEMVER;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_grpc::DNSService;
use futures::stream;
use futures::StreamExt;
use futures::TryStreamExt;
use hyper_v0_14::client::connect::HttpConnector;
use tonic::metadata::KeyAndValueRef;
use tonic::metadata::MetadataKey;
use tonic::metadata::MetadataMap;
Expand Down Expand Up @@ -78,16 +80,24 @@ impl UDFFlightClient {
.keep_alive_timeout(Duration::from_secs(UDF_KEEP_ALIVE_TIMEOUT_SEC))
.keep_alive_while_idle(true);

let inner = FlightServiceClient::connect(endpoint)
let mut connector = HttpConnector::new_with_resolver(DNSService);
connector.enforce_http(false);
connector.set_nodelay(true);
connector.set_keepalive(Some(Duration::from_secs(UDF_TCP_KEEP_ALIVE_SEC)));
connector.set_connect_timeout(Some(Duration::from_secs(conn_timeout)));
connector.set_reuse_address(true);

let channel = endpoint
.connect_with_connector(connector)
.await
.map_err(|err| {
ErrorCode::UDFServerConnectError(format!(
"Cannot connect to UDF Server {}: {:?}",
addr, err
))
})?
.max_decoding_message_size(MAX_DECODING_MESSAGE_SIZE);

})?;
let inner =
FlightServiceClient::new(channel).max_decoding_message_size(MAX_DECODING_MESSAGE_SIZE);
Ok(UDFFlightClient {
inner,
batch_rows,
Expand Down
Loading