Skip to content

chore(query): support local semaphore #17827

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/query/config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1458,6 +1458,9 @@ pub struct QueryConfig {
#[clap(long, value_name = "VALUE", default_value = "8")]
pub max_running_queries: u64,

#[clap(long, value_name = "VALUE", default_value = "true")]
pub global_statement_queue: bool,

/// The max total memory in bytes that can be used by this process.
#[clap(long, value_name = "VALUE", default_value = "0")]
pub max_server_memory_usage: u64,
Expand Down Expand Up @@ -1788,6 +1791,7 @@ impl TryInto<InnerQueryConfig> for QueryConfig {
mysql_tls_server_key: self.mysql_tls_server_key,
max_active_sessions: self.max_active_sessions,
max_running_queries: self.max_running_queries,
global_statement_queue: self.global_statement_queue,
max_server_memory_usage: self.max_server_memory_usage,
max_memory_limit_enabled: self.max_memory_limit_enabled,
clickhouse_http_handler_host: self.clickhouse_http_handler_host,
Expand Down Expand Up @@ -1880,6 +1884,7 @@ impl From<InnerQueryConfig> for QueryConfig {
mysql_tls_server_key: inner.mysql_tls_server_key,
max_active_sessions: inner.max_active_sessions,
max_running_queries: inner.max_running_queries,
global_statement_queue: inner.global_statement_queue,
max_server_memory_usage: inner.max_server_memory_usage,
max_memory_limit_enabled: inner.max_memory_limit_enabled,

Expand Down
2 changes: 2 additions & 0 deletions src/query/config/src/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ pub struct QueryConfig {
pub mysql_tls_server_key: String,
pub max_active_sessions: u64,
pub max_running_queries: u64,
pub global_statement_queue: bool,
pub max_server_memory_usage: u64,
pub max_memory_limit_enabled: bool,
pub clickhouse_http_handler_host: String,
Expand Down Expand Up @@ -271,6 +272,7 @@ impl Default for QueryConfig {
mysql_tls_server_key: "".to_string(),
max_active_sessions: 256,
max_running_queries: 8,
global_statement_queue: true,
max_server_memory_usage: 0,
max_memory_limit_enabled: false,
clickhouse_http_handler_host: "127.0.0.1".to_string(),
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/src/interpreters/interpreter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ async fn plan_sql(
if !acquire_queue {
// If queue guard is not required, plan the statement directly.
let plan = planner.plan_stmt(&extras.statement).await?;
return Ok((plan, extras, AcquireQueueGuard::create(None)));
return Ok((plan, extras, AcquireQueueGuard::create_global(None)));
}

let need_acquire_lock = need_acquire_lock(ctx.clone(), &extras.statement);
Expand Down
167 changes: 105 additions & 62 deletions src/query/service/src/sessions/queue_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ use databend_common_sql::PlanExtras;
use log::info;
use parking_lot::Mutex;
use pin_project_lite::pin_project;
use tokio::sync::AcquireError as TokioAcquireError;
use tokio::sync::OwnedSemaphorePermit;
use tokio::sync::Semaphore;
use tokio::time::error::Elapsed;

use crate::sessions::QueryContext;
Expand Down Expand Up @@ -87,6 +90,8 @@ pub(crate) struct Inner<Data: QueueData> {
pub struct QueueManager<Data: QueueData> {
permits: usize,
meta_store: MetaStore,
semaphore: Arc<Semaphore>,
global_statement_queue: bool,
queue: Mutex<HashMap<Data::Key, Inner<Data>>>,
}

Expand All @@ -101,23 +106,33 @@ impl<Data: QueueData> QueueManager<Data> {
};

info!("queue manager permits: {:?}", permits);
GlobalInstance::set(Self::create(permits, metastore));
GlobalInstance::set(Self::create(
permits,
metastore,
conf.query.global_statement_queue,
));
Ok(())
}

pub fn instance() -> Arc<Self> {
GlobalInstance::get::<Arc<Self>>()
}

pub fn create(mut permits: usize, meta_store: MetaStore) -> Arc<QueueManager<Data>> {
pub fn create(
mut permits: usize,
meta_store: MetaStore,
global_statement_queue: bool,
) -> Arc<QueueManager<Data>> {
if permits == 0 {
permits = usize::MAX >> 4;
}

Arc::new(QueueManager {
permits,
meta_store,
global_statement_queue,
queue: Mutex::new(HashMap::new()),
semaphore: Arc::new(Semaphore::new(permits)),
})
}

Expand Down Expand Up @@ -156,21 +171,35 @@ impl<Data: QueueData> QueueManager<Data> {
);

let timeout = data.timeout();
let semaphore_acquire = self.meta_store.new_acquired(
data.get_lock_key(),
self.permits as u64,
data.get_key(), // ID of this acquirer
data.lock_ttl(),
);

let future = AcquireQueueFuture::create(
Arc::new(data),
tokio::time::timeout(timeout, semaphore_acquire),
self.clone(),
);
let start_time = SystemTime::now();
let acquire_res = match self.global_statement_queue {
true => {
let semaphore_acquire = self.meta_store.new_acquired(
data.get_lock_key(),
self.permits as u64,
data.get_key(), // ID of this acquirer
data.lock_ttl(),
);

return match future.await {
AcquireQueueFuture::create(
Arc::new(data),
tokio::time::timeout(timeout, semaphore_acquire),
self.clone(),
)
.await
}
false => {
AcquireQueueFuture::create(
Arc::new(data),
tokio::time::timeout(timeout, self.semaphore.clone().acquire_owned()),
self.clone(),
)
.await
}
};

return match acquire_res {
Ok(v) => {
info!("finished acquiring from queue, length: {}", self.length());

Expand All @@ -197,7 +226,7 @@ impl<Data: QueueData> QueueManager<Data> {
};
}

Ok(AcquireQueueGuard::create(None))
Ok(AcquireQueueGuard::create_global(None))
}

pub(crate) fn add_entity(&self, inner: Inner<Data>) -> Data::Key {
Expand Down Expand Up @@ -231,28 +260,35 @@ impl<Data: QueueData> QueueManager<Data> {
}
}

pub struct AcquireQueueGuard {
#[allow(dead_code)]
permit: Option<Permit>,
pub enum AcquireQueueGuard {
Global(Option<Permit>),
Local(Option<OwnedSemaphorePermit>),
}

impl Drop for AcquireQueueGuard {
fn drop(&mut self) {
if self.permit.is_some() {
dec_session_running_acquired_queries();
match self {
AcquireQueueGuard::Local(Some(_)) | AcquireQueueGuard::Global(Some(_)) => {
dec_session_running_acquired_queries();
}
_ => {}
}
}
}

impl AcquireQueueGuard {
pub fn create(permit: Option<Permit>) -> Self {
AcquireQueueGuard { permit }
pub fn create_global(permit: Option<Permit>) -> Self {
AcquireQueueGuard::Global(permit)
}

pub fn create_local(permit: Option<OwnedSemaphorePermit>) -> Self {
AcquireQueueGuard::Local(permit)
}
}

pin_project! {
pub struct AcquireQueueFuture<Data: QueueData, T>
where T: Future<Output = std::result::Result<std::result::Result<Permit, AcquireError>, Elapsed>>
pub struct AcquireQueueFuture<Data: QueueData, T, Permit, E>
where T: Future<Output = std::result::Result<std::result::Result<Permit, E>, Elapsed>>
{
#[pin]
inner: T,
Expand All @@ -266,8 +302,8 @@ where T: Future<Output = std::result::Result<std::result::Result<Permit, Acquir
}
}

impl<Data: QueueData, T> AcquireQueueFuture<Data, T>
where T: Future<Output = std::result::Result<std::result::Result<Permit, AcquireError>, Elapsed>>
impl<Data: QueueData, T, Permit, E> AcquireQueueFuture<Data, T, Permit, E>
where T: Future<Output = std::result::Result<std::result::Result<Permit, E>, Elapsed>>
{
pub fn create(data: Arc<Data>, inner: T, mgr: Arc<QueueManager<Data>>) -> Self {
AcquireQueueFuture {
Expand All @@ -281,53 +317,60 @@ where T: Future<Output = std::result::Result<std::result::Result<Permit, Acquire
}
}

impl<Data: QueueData, T> Future for AcquireQueueFuture<Data, T>
where T: Future<Output = std::result::Result<std::result::Result<Permit, AcquireError>, Elapsed>>
{
type Output = Result<AcquireQueueGuard>;
macro_rules! impl_acquire_queue_future {
($Permit:ty, $fn_name:ident, $Error:ty) => {
impl<Data: QueueData, T> Future for AcquireQueueFuture<Data, T, $Permit, $Error>
where T: Future<Output = std::result::Result<std::result::Result<$Permit, $Error>, Elapsed>>
{
type Output = Result<AcquireQueueGuard>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();

if this.is_abort.load(Ordering::SeqCst) {
return Poll::Ready(Err(Data::remove_error_message(this.key.take())));
}
if this.is_abort.load(Ordering::SeqCst) {
return Poll::Ready(Err(Data::remove_error_message(this.key.take())));
}

match this.inner.poll(cx) {
Poll::Ready(res) => {
if let Some(key) = this.key.take() {
if this.manager.remove_entity(&key).is_none() {
return Poll::Ready(Err(Data::remove_error_message(Some(key))));
}
}

match this.inner.poll(cx) {
Poll::Ready(res) => {
if let Some(key) = this.key.take() {
if this.manager.remove_entity(&key).is_none() {
return Poll::Ready(Err(Data::remove_error_message(Some(key))));
Poll::Ready(match res {
Ok(Ok(v)) => Ok(AcquireQueueGuard::$fn_name(Some(v))),
Ok(Err(_)) => Err(ErrorCode::TokioError("acquire queue failure.")),
Err(_elapsed) => Err(ErrorCode::Timeout("query queuing timeout")),
})
}
}
Poll::Pending => {
if !*this.has_pending {
*this.has_pending = true;
}

Poll::Ready(match res {
Ok(Ok(v)) => Ok(AcquireQueueGuard::create(Some(v))),
Ok(Err(_)) => Err(ErrorCode::TokioError("acquire queue failure.")),
Err(_elapsed) => Err(ErrorCode::Timeout("query queuing timeout")),
})
}
Poll::Pending => {
if !*this.has_pending {
*this.has_pending = true;
}
if let Some(data) = this.data.take() {
let waker = cx.waker().clone();
*this.key = Some(this.manager.add_entity(Inner {
data,
waker,
instant: Instant::now(),
is_abort: this.is_abort.clone(),
}));
}

if let Some(data) = this.data.take() {
let waker = cx.waker().clone();
*this.key = Some(this.manager.add_entity(Inner {
data,
waker,
instant: Instant::now(),
is_abort: this.is_abort.clone(),
}));
Poll::Pending
}
}

Poll::Pending
}
}
}
};
}

impl_acquire_queue_future!(Permit, create_global, AcquireError);
impl_acquire_queue_future!(OwnedSemaphorePermit, create_local, TokioAcquireError);

pub struct QueryEntry {
ctx: Arc<QueryContext>,
pub query_id: String,
Expand Down
Loading
Loading