diff --git a/Cargo.lock b/Cargo.lock index 43a0ee0447..8d89eeaf88 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5317,6 +5317,7 @@ dependencies = [ "logger", "macros", "meta_client", + "notifier", "paste 1.0.12", "prom-remote-api", "prometheus 0.12.0", diff --git a/proxy/Cargo.toml b/proxy/Cargo.toml index b0f3775557..85643e50aa 100644 --- a/proxy/Cargo.toml +++ b/proxy/Cargo.toml @@ -47,6 +47,7 @@ lazy_static = { workspace = true } logger = { workspace = true } macros = { workspace = true } meta_client = { workspace = true } +notifier = { workspace = true } paste = { workspace = true } prom-remote-api = { workspace = true, features = ["warp"] } prometheus = { workspace = true } diff --git a/proxy/src/dedup_requests.rs b/proxy/src/dedup_requests.rs deleted file mode 100644 index dea3381d95..0000000000 --- a/proxy/src/dedup_requests.rs +++ /dev/null @@ -1,123 +0,0 @@ -// Copyright 2023 The CeresDB Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::{collections::HashMap, hash::Hash, sync::RwLock}; - -#[derive(Debug)] -struct Notifiers { - notifiers: RwLock>, -} - -impl Notifiers { - pub fn new(notifier: T) -> Self { - let notifiers = vec![notifier]; - Self { - notifiers: RwLock::new(notifiers), - } - } - - pub fn add_notifier(&self, notifier: T) { - self.notifiers.write().unwrap().push(notifier); - } -} - -#[derive(Debug)] -pub struct RequestNotifiers -where - K: PartialEq + Eq + Hash, -{ - notifiers_by_key: RwLock>>, -} - -impl Default for RequestNotifiers -where - K: PartialEq + Eq + Hash, -{ - fn default() -> Self { - Self { - notifiers_by_key: RwLock::new(HashMap::new()), - } - } -} - -impl RequestNotifiers -where - K: PartialEq + Eq + Hash, -{ - /// Insert a notifier for the given key. - pub fn insert_notifier(&self, key: K, notifier: T) -> RequestResult { - // First try to read the notifiers, if the key exists, add the notifier to the - // notifiers. - let notifiers_by_key = self.notifiers_by_key.read().unwrap(); - if let Some(notifiers) = notifiers_by_key.get(&key) { - notifiers.add_notifier(notifier); - return RequestResult::Wait; - } - drop(notifiers_by_key); - - // If the key does not exist, try to write the notifiers. - let mut notifiers_by_key = self.notifiers_by_key.write().unwrap(); - // double check, if the key exists, add the notifier to the notifiers. - if let Some(notifiers) = notifiers_by_key.get(&key) { - notifiers.add_notifier(notifier); - return RequestResult::Wait; - } - - //the key is not existed, insert the key and the notifier. - notifiers_by_key.insert(key, Notifiers::new(notifier)); - RequestResult::First - } - - /// Take the notifiers for the given key, and remove the key from the map. - pub fn take_notifiers(&self, key: &K) -> Option> { - self.notifiers_by_key - .write() - .unwrap() - .remove(key) - .map(|notifiers| notifiers.notifiers.into_inner().unwrap()) - } -} - -pub enum RequestResult { - // The first request for this key, need to handle this request. - First, - // There are other requests for this key, just wait for the result. - Wait, -} - -pub struct ExecutionGuard { - f: F, - cancelled: bool, -} - -impl ExecutionGuard { - pub fn new(f: F) -> Self { - Self { - f, - cancelled: false, - } - } - - pub fn cancel(&mut self) { - self.cancelled = true; - } -} - -impl Drop for ExecutionGuard { - fn drop(&mut self) { - if !self.cancelled { - (self.f)() - } - } -} diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs index 4d4fe87379..97cd847999 100644 --- a/proxy/src/lib.rs +++ b/proxy/src/lib.rs @@ -18,7 +18,6 @@ #![feature(trait_alias)] pub mod context; -pub mod dedup_requests; pub mod error; mod error_util; pub mod forward; @@ -82,16 +81,14 @@ use table_engine::{ PARTITION_TABLE_ENGINE_TYPE, }; use time_ext::{current_time_millis, parse_duration}; -use tokio::sync::mpsc::Sender; use tonic::{transport::Channel, IntoRequest}; use crate::{ - dedup_requests::RequestNotifiers, error::{ErrNoCause, ErrWithCause, Error, Internal, Result}, forward::{ForwardRequest, ForwardResult, Forwarder, ForwarderRef}, hotspot::HotspotRecorder, instance::InstanceRef, - read::SqlResponse, + read::ReadRequestNotifiers, schema_config_provider::SchemaConfigProviderRef, }; @@ -125,7 +122,7 @@ pub struct Proxy { engine_runtimes: Arc, cluster_with_meta: bool, sub_table_access_perm: SubTableAccessPerm, - request_notifiers: Option>>>>, + request_notifiers: Option, } impl Proxy { @@ -142,7 +139,7 @@ impl Proxy { engine_runtimes: Arc, cluster_with_meta: bool, sub_table_access_perm: SubTableAccessPerm, - request_notifiers: Option>>>>, + request_notifiers: Option, ) -> Self { let forwarder = Arc::new(Forwarder::new( forward_config, diff --git a/proxy/src/read.rs b/proxy/src/read.rs index 711f3975d8..f92433d0c0 100644 --- a/proxy/src/read.rs +++ b/proxy/src/read.rs @@ -24,6 +24,7 @@ use generic_error::BoxError; use http::StatusCode; use interpreters::interpreter::Output; use logger::{error, info, warn, SlowTimer}; +use notifier::notifier::{ExecutionGuard, RequestNotifiers, RequestResult}; use query_frontend::{ frontend, frontend::{Context as SqlContext, Frontend}, @@ -35,13 +36,15 @@ use tokio::sync::mpsc::{self, Sender}; use tonic::{transport::Channel, IntoRequest}; use crate::{ - dedup_requests::{ExecutionGuard, RequestNotifiers, RequestResult}, error::{ErrNoCause, ErrWithCause, Error, Internal, InternalNoCause, Result}, forward::{ForwardRequest, ForwardResult}, metrics::GRPC_HANDLER_COUNTER_VEC, Context, Proxy, }; +const DEDUP_READ_CHANNEL_LEN: usize = 1; +pub type ReadRequestNotifiers = Arc>>>; + pub enum SqlResponse { Forwarded(SqlQueryResponse), Local(Output), @@ -77,10 +80,10 @@ impl Proxy { ctx: &Context, schema: &str, sql: &str, - request_notifiers: Arc>>>, + request_notifiers: ReadRequestNotifiers, enable_partition_table_access: bool, ) -> Result { - let (tx, mut rx) = mpsc::channel(1); + let (tx, mut rx) = mpsc::channel(DEDUP_READ_CHANNEL_LEN); let mut guard = match request_notifiers.insert_notifier(sql.to_string(), tx) { RequestResult::First => ExecutionGuard::new(|| { request_notifiers.take_notifiers(&sql.to_string()); diff --git a/server/src/server.rs b/server/src/server.rs index 5547b808ca..4c3fabad7b 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -23,9 +23,9 @@ use df_operator::registry::FunctionRegistryRef; use interpreters::table_manipulator::TableManipulatorRef; use logger::{info, warn, RuntimeLevel}; use macros::define_result; +use notifier::notifier::RequestNotifiers; use partition_table_engine::PartitionTableEngine; use proxy::{ - dedup_requests::RequestNotifiers, hotspot::HotspotRecorder, instance::{DynamicConfig, Instance, InstanceRef}, limiter::Limiter, @@ -440,11 +440,11 @@ impl Builder { timeout: self.server_config.timeout.map(|v| v.0), }; - let request_notifiers = if self.server_config.query_dedup.enable { - Some(Arc::new(RequestNotifiers::default())) - } else { - None - }; + let request_notifiers = self + .server_config + .query_dedup + .enable + .then(|| Arc::new(RequestNotifiers::default())); let proxy = Arc::new(Proxy::new( router.clone(),