diff --git a/query/src/sessions/session_mgr.rs b/query/src/sessions/session_mgr.rs index 521222fec1fa8..fe06c1f496e65 100644 --- a/query/src/sessions/session_mgr.rs +++ b/query/src/sessions/session_mgr.rs @@ -125,31 +125,37 @@ impl SessionManager { } pub async fn create_session(self: &Arc, typ: impl Into) -> Result { - let mut sessions = self.active_sessions.write().await; + { + let sessions = self.active_sessions.read().await; + if sessions.len() == self.max_sessions { + return Err(ErrorCode::TooManyUserConnections( + "The current accept connection has exceeded mysql_handler_thread_num config", + )); + } + } + let session = Session::try_create( + self.conf.clone(), + uuid::Uuid::new_v4().to_string(), + typ.into(), + self.clone(), + ) + .await?; - match sessions.len() == self.max_sessions { - true => Err(ErrorCode::TooManyUserConnections( + let mut sessions = self.active_sessions.write().await; + if sessions.len() < self.max_sessions { + label_counter( + super::metrics::METRIC_SESSION_CONNECT_NUMBERS, + &self.conf.query.tenant_id, + &self.conf.query.cluster_id, + ); + + sessions.insert(session.get_id(), session.clone()); + + Ok(SessionRef::create(session)) + } else { + Err(ErrorCode::TooManyUserConnections( "The current accept connection has exceeded mysql_handler_thread_num config", - )), - false => { - let session = Session::try_create( - self.conf.clone(), - uuid::Uuid::new_v4().to_string(), - typ.into(), - self.clone(), - ) - .await?; - - label_counter( - super::metrics::METRIC_SESSION_CONNECT_NUMBERS, - &self.conf.query.tenant_id, - &self.conf.query.cluster_id, - ); - - sessions.insert(session.get_id(), session.clone()); - - Ok(SessionRef::create(session)) - } + )) } }