Skip to content

Commit

Permalink
refactor: use notifier::RequestNotifiers instead of `dedup_requests…
Browse files Browse the repository at this point in the history
…::RequestNotifiers` (#1249)

## Rationale
Now, there are same code of RequestNotifiers in notifier and
dedup_requests, so we can use `notifier::RequestNotifiers` instead of
`dedup_requests::RequestNotifiers`

## Detailed Changes
- use `notifier::RequestNotifiers` instead of
`dedup_requests::RequestNotifiers`
- remove `dedup_requests` file

## Test Plan
Existing tests
  • Loading branch information
baojinri authored Oct 20, 2023
1 parent 1df984c commit 2241cb9
Show file tree
Hide file tree
Showing 6 changed files with 17 additions and 138 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ lazy_static = { workspace = true }
logger = { workspace = true }
macros = { workspace = true }
meta_client = { workspace = true }
notifier = { workspace = true }
paste = { workspace = true }
prom-remote-api = { workspace = true, features = ["warp"] }
prometheus = { workspace = true }
Expand Down
123 changes: 0 additions & 123 deletions proxy/src/dedup_requests.rs

This file was deleted.

9 changes: 3 additions & 6 deletions proxy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
#![feature(trait_alias)]

pub mod context;
pub mod dedup_requests;
pub mod error;
mod error_util;
pub mod forward;
Expand Down Expand Up @@ -82,16 +81,14 @@ use table_engine::{
PARTITION_TABLE_ENGINE_TYPE,
};
use time_ext::{current_time_millis, parse_duration};
use tokio::sync::mpsc::Sender;
use tonic::{transport::Channel, IntoRequest};

use crate::{
dedup_requests::RequestNotifiers,
error::{ErrNoCause, ErrWithCause, Error, Internal, Result},
forward::{ForwardRequest, ForwardResult, Forwarder, ForwarderRef},
hotspot::HotspotRecorder,
instance::InstanceRef,
read::SqlResponse,
read::ReadRequestNotifiers,
schema_config_provider::SchemaConfigProviderRef,
};

Expand Down Expand Up @@ -125,7 +122,7 @@ pub struct Proxy {
engine_runtimes: Arc<EngineRuntimes>,
cluster_with_meta: bool,
sub_table_access_perm: SubTableAccessPerm,
request_notifiers: Option<Arc<RequestNotifiers<String, Sender<Result<SqlResponse>>>>>,
request_notifiers: Option<ReadRequestNotifiers>,
}

impl Proxy {
Expand All @@ -142,7 +139,7 @@ impl Proxy {
engine_runtimes: Arc<EngineRuntimes>,
cluster_with_meta: bool,
sub_table_access_perm: SubTableAccessPerm,
request_notifiers: Option<Arc<RequestNotifiers<String, Sender<Result<SqlResponse>>>>>,
request_notifiers: Option<ReadRequestNotifiers>,
) -> Self {
let forwarder = Arc::new(Forwarder::new(
forward_config,
Expand Down
9 changes: 6 additions & 3 deletions proxy/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use generic_error::BoxError;
use http::StatusCode;
use interpreters::interpreter::Output;
use logger::{error, info, warn, SlowTimer};
use notifier::notifier::{ExecutionGuard, RequestNotifiers, RequestResult};
use query_frontend::{
frontend,
frontend::{Context as SqlContext, Frontend},
Expand All @@ -35,13 +36,15 @@ use tokio::sync::mpsc::{self, Sender};
use tonic::{transport::Channel, IntoRequest};

use crate::{
dedup_requests::{ExecutionGuard, RequestNotifiers, RequestResult},
error::{ErrNoCause, ErrWithCause, Error, Internal, InternalNoCause, Result},
forward::{ForwardRequest, ForwardResult},
metrics::GRPC_HANDLER_COUNTER_VEC,
Context, Proxy,
};

const DEDUP_READ_CHANNEL_LEN: usize = 1;
pub type ReadRequestNotifiers = Arc<RequestNotifiers<String, Sender<Result<SqlResponse>>>>;

pub enum SqlResponse {
Forwarded(SqlQueryResponse),
Local(Output),
Expand Down Expand Up @@ -77,10 +80,10 @@ impl Proxy {
ctx: &Context,
schema: &str,
sql: &str,
request_notifiers: Arc<RequestNotifiers<String, Sender<Result<SqlResponse>>>>,
request_notifiers: ReadRequestNotifiers,
enable_partition_table_access: bool,
) -> Result<SqlResponse> {
let (tx, mut rx) = mpsc::channel(1);
let (tx, mut rx) = mpsc::channel(DEDUP_READ_CHANNEL_LEN);
let mut guard = match request_notifiers.insert_notifier(sql.to_string(), tx) {
RequestResult::First => ExecutionGuard::new(|| {
request_notifiers.take_notifiers(&sql.to_string());
Expand Down
12 changes: 6 additions & 6 deletions server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ use df_operator::registry::FunctionRegistryRef;
use interpreters::table_manipulator::TableManipulatorRef;
use logger::{info, warn, RuntimeLevel};
use macros::define_result;
use notifier::notifier::RequestNotifiers;
use partition_table_engine::PartitionTableEngine;
use proxy::{
dedup_requests::RequestNotifiers,
hotspot::HotspotRecorder,
instance::{DynamicConfig, Instance, InstanceRef},
limiter::Limiter,
Expand Down Expand Up @@ -440,11 +440,11 @@ impl Builder {
timeout: self.server_config.timeout.map(|v| v.0),
};

let request_notifiers = if self.server_config.query_dedup.enable {
Some(Arc::new(RequestNotifiers::default()))
} else {
None
};
let request_notifiers = self
.server_config
.query_dedup
.enable
.then(|| Arc::new(RequestNotifiers::default()));

let proxy = Arc::new(Proxy::new(
router.clone(),
Expand Down

0 comments on commit 2241cb9

Please sign in to comment.