From 845bd92f604d6b382a70720fc3749226876fa16d Mon Sep 17 00:00:00 2001 From: everpcpc Date: Mon, 28 Oct 2024 12:09:24 +0800 Subject: [PATCH 1/8] fix(query): use custom connector for udf client --- Cargo.lock | 3 +++ src/common/grpc/src/dns_resolver.rs | 6 +++--- src/common/grpc/src/lib.rs | 1 + src/query/expression/Cargo.toml | 3 +++ src/query/expression/src/utils/udf_client.rs | 18 ++++++++++++++---- 5 files changed, 24 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 80aeebcb0389b..45f14762c89b4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3491,6 +3491,7 @@ dependencies = [ "databend-common-base", "databend-common-datavalues", "databend-common-exception", + "databend-common-grpc", "databend-common-hashtable", "databend-common-io", "educe 0.4.23", @@ -3502,6 +3503,8 @@ dependencies = [ "geozero 0.13.0", "goldenfile", "hex", + "hickory-resolver", + "hyper 0.14.30", "itertools 0.10.5", "jsonb", "lexical-core", diff --git a/src/common/grpc/src/dns_resolver.rs b/src/common/grpc/src/dns_resolver.rs index 8d1822dcfaaa7..af95b27c9d0d2 100644 --- a/src/common/grpc/src/dns_resolver.rs +++ b/src/common/grpc/src/dns_resolver.rs @@ -83,7 +83,7 @@ impl DNSResolver { } #[derive(Clone)] -struct DNSService; +pub struct DNSService; impl Service for DNSService { type Response = DNSServiceAddrs; @@ -109,11 +109,11 @@ impl Service for DNSService { } } -struct DNSServiceFuture { +pub struct DNSServiceFuture { inner: JoinHandle>, } -struct DNSServiceAddrs { +pub struct DNSServiceAddrs { inner: std::vec::IntoIter, } diff --git a/src/common/grpc/src/lib.rs b/src/common/grpc/src/lib.rs index 66d01de68c45e..e511837113d00 100644 --- a/src/common/grpc/src/lib.rs +++ b/src/common/grpc/src/lib.rs @@ -18,6 +18,7 @@ pub use client_conf::RpcClientConf; pub use client_conf::RpcClientTlsConfig; pub use dns_resolver::ConnectionFactory; pub use dns_resolver::DNSResolver; +pub use dns_resolver::DNSService; pub use dns_resolver::GrpcConnectionError; pub use grpc_token::GrpcClaim; pub use grpc_token::GrpcToken; diff --git a/src/query/expression/Cargo.toml b/src/query/expression/Cargo.toml index 08d4232286c45..275a5f73a74ac 100644 --- a/src/query/expression/Cargo.toml +++ b/src/query/expression/Cargo.toml @@ -28,6 +28,7 @@ databend-common-ast = { workspace = true } databend-common-base = { workspace = true } databend-common-datavalues = { workspace = true } databend-common-exception = { workspace = true } +databend-common-grpc = { workspace = true } databend-common-hashtable = { workspace = true } databend-common-io = { workspace = true } educe = "0.4" @@ -38,6 +39,8 @@ geo = { workspace = true } geos = { workspace = true } geozero = { workspace = true } hex = "0.4.3" +hickory-resolver = "0.24" +hyper = "0.14" itertools = { workspace = true } jsonb = { workspace = true } lexical-core = "0.8.5" diff --git a/src/query/expression/src/utils/udf_client.rs b/src/query/expression/src/utils/udf_client.rs index 24ce63aa9b17b..6796a3098bec9 100644 --- a/src/query/expression/src/utils/udf_client.rs +++ b/src/query/expression/src/utils/udf_client.rs @@ -27,9 +27,11 @@ use databend_common_base::headers::HEADER_TENANT; use databend_common_base::version::DATABEND_SEMVER; use databend_common_exception::ErrorCode; use databend_common_exception::Result; +use databend_common_grpc::DNSService; use futures::stream; use futures::StreamExt; use futures::TryStreamExt; +use hyper::client::connect::HttpConnector; use tonic::metadata::KeyAndValueRef; use tonic::metadata::MetadataKey; use tonic::metadata::MetadataMap; @@ -78,16 +80,24 @@ impl UDFFlightClient { .keep_alive_timeout(Duration::from_secs(UDF_KEEP_ALIVE_TIMEOUT_SEC)) .keep_alive_while_idle(true); - let inner = FlightServiceClient::connect(endpoint) + let mut connector = HttpConnector::new_with_resolver(DNSService); + connector.enforce_http(false); + connector.set_nodelay(true); + connector.set_keepalive(Some(Duration::from_secs(UDF_TCP_KEEP_ALIVE_SEC))); + connector.set_connect_timeout(Some(Duration::from_secs(conn_timeout))); + connector.set_reuse_address(true); + + let channel = endpoint + .connect_with_connector(connector) .await .map_err(|err| { ErrorCode::UDFServerConnectError(format!( "Cannot connect to UDF Server {}: {:?}", addr, err )) - })? - .max_decoding_message_size(MAX_DECODING_MESSAGE_SIZE); - + })?; + let inner = + FlightServiceClient::new(channel).max_decoding_message_size(MAX_DECODING_MESSAGE_SIZE); Ok(UDFFlightClient { inner, batch_rows, From 0e226f6345ae43eef6457243bc60add58d4ca0d0 Mon Sep 17 00:00:00 2001 From: everpcpc Date: Mon, 28 Oct 2024 12:10:33 +0800 Subject: [PATCH 2/8] z --- Cargo.lock | 1 - src/query/expression/Cargo.toml | 1 - 2 files changed, 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 45f14762c89b4..748e1d1541d00 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3503,7 +3503,6 @@ dependencies = [ "geozero 0.13.0", "goldenfile", "hex", - "hickory-resolver", "hyper 0.14.30", "itertools 0.10.5", "jsonb", diff --git a/src/query/expression/Cargo.toml b/src/query/expression/Cargo.toml index 275a5f73a74ac..a4a1046496fc8 100644 --- a/src/query/expression/Cargo.toml +++ b/src/query/expression/Cargo.toml @@ -39,7 +39,6 @@ geo = { workspace = true } geos = { workspace = true } geozero = { workspace = true } hex = "0.4.3" -hickory-resolver = "0.24" hyper = "0.14" itertools = { workspace = true } jsonb = { workspace = true } From bfae51a33fd4c84d2e6699ece6053addba10c579 Mon Sep 17 00:00:00 2001 From: everpcpc Date: Mon, 28 Oct 2024 13:36:18 +0800 Subject: [PATCH 3/8] z --- Cargo.toml | 1 + src/common/grpc/Cargo.toml | 2 +- src/common/grpc/src/dns_resolver.rs | 8 ++++---- src/query/expression/Cargo.toml | 2 +- src/query/expression/src/utils/udf_client.rs | 2 +- 5 files changed, 8 insertions(+), 7 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 679998b3c7e2f..3285abc627801 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -244,6 +244,7 @@ geos = { version = "8.3", features = ["static", "geo", "geo-types"] } geozero = { version = "0.13.0", features = ["default", "with-wkb", "with-geos", "with-geojson"] } hashbrown = { version = "0.14.3", default-features = false } http = "1" +hyper_v0_14 = { package = "hyper", version = "0.14" } iceberg = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "a0e74b1412ae7ce4b857c251bcbb3179b7713fb6" } iceberg-catalog-hms = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "a0e74b1412ae7ce4b857c251bcbb3179b7713fb6" } iceberg-catalog-rest = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "a0e74b1412ae7ce4b857c251bcbb3179b7713fb6" } diff --git a/src/common/grpc/Cargo.toml b/src/common/grpc/Cargo.toml index 18e163352bcd8..de41223c1bbc9 100644 --- a/src/common/grpc/Cargo.toml +++ b/src/common/grpc/Cargo.toml @@ -15,7 +15,7 @@ anyerror = { workspace = true } databend-common-base = { workspace = true } databend-common-exception = { workspace = true } hickory-resolver = "0.24" -hyper = "0.14.20" +hyper_v0_14 = { workspace = true } jwt-simple = "0.11.0" log = { workspace = true } serde = { workspace = true } diff --git a/src/common/grpc/src/dns_resolver.rs b/src/common/grpc/src/dns_resolver.rs index af95b27c9d0d2..a3075a3159101 100644 --- a/src/common/grpc/src/dns_resolver.rs +++ b/src/common/grpc/src/dns_resolver.rs @@ -28,10 +28,10 @@ use databend_common_base::runtime; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use hickory_resolver::TokioAsyncResolver; -use hyper::client::connect::dns::Name; -use hyper::client::HttpConnector; -use hyper::service::Service; -use hyper::Uri; +use hyper_v0_14::client::connect::dns::Name; +use hyper_v0_14::client::HttpConnector; +use hyper_v0_14::service::Service; +use hyper_v0_14::Uri; use log::info; use serde::Deserialize; use serde::Serialize; diff --git a/src/query/expression/Cargo.toml b/src/query/expression/Cargo.toml index a4a1046496fc8..9489a3e5c1299 100644 --- a/src/query/expression/Cargo.toml +++ b/src/query/expression/Cargo.toml @@ -39,7 +39,7 @@ geo = { workspace = true } geos = { workspace = true } geozero = { workspace = true } hex = "0.4.3" -hyper = "0.14" +hyper_v0_14 = { workspace = true } itertools = { workspace = true } jsonb = { workspace = true } lexical-core = "0.8.5" diff --git a/src/query/expression/src/utils/udf_client.rs b/src/query/expression/src/utils/udf_client.rs index 6796a3098bec9..9ecc987fe4385 100644 --- a/src/query/expression/src/utils/udf_client.rs +++ b/src/query/expression/src/utils/udf_client.rs @@ -31,7 +31,7 @@ use databend_common_grpc::DNSService; use futures::stream; use futures::StreamExt; use futures::TryStreamExt; -use hyper::client::connect::HttpConnector; +use hyper_v0_14::client::connect::HttpConnector; use tonic::metadata::KeyAndValueRef; use tonic::metadata::MetadataKey; use tonic::metadata::MetadataMap; From 72dbbc3dcef609590041111baced795850772c90 Mon Sep 17 00:00:00 2001 From: everpcpc Date: Tue, 29 Oct 2024 22:17:02 +0800 Subject: [PATCH 4/8] z --- Cargo.toml | 8 -------- 1 file changed, 8 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 5ebb007b06474..e7d7e6a958572 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -249,20 +249,12 @@ geozero = { version = "0.14.0", features = ["default", "with-wkb", "with-geos", hashbrown = { version = "0.15.0", default-features = false } hickory-resolver = "0.24" http = "1" -<<<<<<< HEAD -hyper_v0_14 = { package = "hyper", version = "0.14" } -iceberg = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "a0e74b1412ae7ce4b857c251bcbb3179b7713fb6" } -iceberg-catalog-hms = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "a0e74b1412ae7ce4b857c251bcbb3179b7713fb6" } -iceberg-catalog-rest = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "a0e74b1412ae7ce4b857c251bcbb3179b7713fb6" } -itertools = "0.10.5" -======= hyper = "1" hyper-util = { version = "0.1.9", features = ["client", "client-legacy", "tokio", "service"] } iceberg = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "fe5df3f" } iceberg-catalog-hms = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "fe5df3f" } iceberg-catalog-rest = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "fe5df3f" } itertools = "0.13.0" ->>>>>>> upstream/main jsonb = "0.4.3" jwt-simple = { version = "0.12.10", default-features = false, features = ["pure-rust"] } lenient_semver = "0.4.2" From a37ffe2746718f9133033f16313f6e7a84fdf4c2 Mon Sep 17 00:00:00 2001 From: everpcpc Date: Tue, 29 Oct 2024 22:30:27 +0800 Subject: [PATCH 5/8] z --- Cargo.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index e7d7e6a958572..3811afa143c54 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -533,7 +533,6 @@ deltalake = { git = "https://github.com/delta-io/delta-rs", rev = "3038c145" } ethnum = { git = "https://github.com/datafuse-extras/ethnum-rs", rev = "4cb05f1" } jsonb = { git = "https://github.com/databendlabs/jsonb", rev = "ada713c" } openai_api_rust = { git = "https://github.com/datafuse-extras/openai-api", rev = "819a0ed" } -orc-rust = { git = "https://github.com/datafusion-contrib/orc-rust", rev = "dfb1ede" } recursive = { git = "https://github.com/datafuse-extras/recursive.git", rev = "6af35a1" } sled = { git = "https://github.com/datafuse-extras/sled", tag = "v0.34.7-datafuse.1" } tantivy = { git = "https://github.com/datafuse-extras/tantivy", rev = "7502370" } From 4b217c0493411fa23976289d74dca708ace009de Mon Sep 17 00:00:00 2001 From: everpcpc Date: Tue, 29 Oct 2024 22:31:17 +0800 Subject: [PATCH 6/8] z --- Cargo.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/Cargo.toml b/Cargo.toml index 3811afa143c54..e7d7e6a958572 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -533,6 +533,7 @@ deltalake = { git = "https://github.com/delta-io/delta-rs", rev = "3038c145" } ethnum = { git = "https://github.com/datafuse-extras/ethnum-rs", rev = "4cb05f1" } jsonb = { git = "https://github.com/databendlabs/jsonb", rev = "ada713c" } openai_api_rust = { git = "https://github.com/datafuse-extras/openai-api", rev = "819a0ed" } +orc-rust = { git = "https://github.com/datafusion-contrib/orc-rust", rev = "dfb1ede" } recursive = { git = "https://github.com/datafuse-extras/recursive.git", rev = "6af35a1" } sled = { git = "https://github.com/datafuse-extras/sled", tag = "v0.34.7-datafuse.1" } tantivy = { git = "https://github.com/datafuse-extras/tantivy", rev = "7502370" } From 439a09543f9b5e111cba46268d26806bd6634020 Mon Sep 17 00:00:00 2001 From: everpcpc Date: Wed, 30 Oct 2024 00:36:17 +0800 Subject: [PATCH 7/8] z --- Cargo.lock | 1 + src/query/expression/Cargo.toml | 1 + src/query/expression/src/utils/udf_client.rs | 2 +- 3 files changed, 3 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index bb1e572827da0..2d3cb4fe980aa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3394,6 +3394,7 @@ dependencies = [ "geozero 0.14.0", "goldenfile", "hex", + "hyper-util", "itertools 0.13.0", "jsonb", "lexical-core 1.0.2", diff --git a/src/query/expression/Cargo.toml b/src/query/expression/Cargo.toml index 268a6ad12913d..80e347176c2b9 100644 --- a/src/query/expression/Cargo.toml +++ b/src/query/expression/Cargo.toml @@ -39,6 +39,7 @@ geo = { workspace = true } geos = { workspace = true } geozero = { workspace = true } hex = { workspace = true } +hyper-util = { workspace = true } itertools = { workspace = true } jsonb = { workspace = true } lexical-core = { workspace = true } diff --git a/src/query/expression/src/utils/udf_client.rs b/src/query/expression/src/utils/udf_client.rs index 7efbad2426c16..be77dd37ef19f 100644 --- a/src/query/expression/src/utils/udf_client.rs +++ b/src/query/expression/src/utils/udf_client.rs @@ -31,7 +31,7 @@ use databend_common_grpc::DNSService; use futures::stream; use futures::StreamExt; use futures::TryStreamExt; -use hyper_v0_14::client::connect::HttpConnector; +use hyper_util::client::legacy::connect::HttpConnector; use tonic::metadata::KeyAndValueRef; use tonic::metadata::MetadataKey; use tonic::metadata::MetadataMap; From e878c2de7fb8dcd73b86723cd95c66ee067f8977 Mon Sep 17 00:00:00 2001 From: everpcpc Date: Wed, 30 Oct 2024 08:51:59 +0800 Subject: [PATCH 8/8] z --- src/common/grpc/Cargo.toml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/common/grpc/Cargo.toml b/src/common/grpc/Cargo.toml index 38d7ecfa443aa..aaf53cf0d04b0 100644 --- a/src/common/grpc/Cargo.toml +++ b/src/common/grpc/Cargo.toml @@ -11,9 +11,10 @@ doctest = false test = true [dependencies] -anyerror = { workspace = true } databend-common-base = { workspace = true } databend-common-exception = { workspace = true } + +anyerror = { workspace = true } hickory-resolver = { workspace = true } hyper = { workspace = true } hyper-util = { workspace = true }