Skip to content

Commit

Permalink
Merge pull request #4361 from ariesdevil/dev
Browse files Browse the repository at this point in the history
bugfix: Global MetaGrpcClient cause dispatch drop error
  • Loading branch information
BohuTANG authored Mar 11, 2022
2 parents eb55cba + 157ff97 commit ccb7f4c
Show file tree
Hide file tree
Showing 105 changed files with 287 additions and 238 deletions.
2 changes: 1 addition & 1 deletion query/benches/suites/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub mod bench_sort_query_sql;

pub async fn select_executor(sql: &str) -> Result<()> {
let sessions = SessionManager::from_conf(Config::default()).await?;
let executor_session = sessions.create_session("Benches")?;
let executor_session = sessions.create_session("Benches").await?;
let ctx = executor_session.create_query_context().await?;

if let PlanNode::Select(plan) = PlanParser::parse(ctx.clone(), sql).await? {
Expand Down
2 changes: 1 addition & 1 deletion query/src/api/http/v1/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub async fn cluster_list_handler(
}

async fn list_nodes(sessions: &Arc<SessionManager>) -> Result<Vec<Arc<NodeInfo>>> {
let watch_cluster_session = sessions.create_session("WatchCluster")?;
let watch_cluster_session = sessions.create_session("WatchCluster").await?;
let watch_cluster_context = watch_cluster_session.create_query_context().await?;
Ok(watch_cluster_context.get_cluster().get_nodes())
}
2 changes: 1 addition & 1 deletion query/src/api/http/v1/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub async fn logs_handler(
}

async fn select_table(sessions: &Arc<SessionManager>) -> Result<Body> {
let session = sessions.create_session("WatchLogs")?;
let session = sessions.create_session("WatchLogs").await?;
let query_context = session.create_query_context().await?;
let mut tracing_table_stream = execute_query(query_context).await?;

Expand Down
12 changes: 9 additions & 3 deletions query/src/api/rpc/flight_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ impl FlightService for DatabendQueryFlightService {
FlightAction::CancelAction(action) => {
// We only destroy when session is exist
let session_id = action.query_id.clone();
if let Some(session) = self.sessions.get_session_by_id(&session_id) {
if let Some(session) = self.sessions.get_session_by_id(&session_id).await {
// TODO: remove streams
session.force_kill_session();
}
Expand All @@ -161,7 +161,10 @@ impl FlightService for DatabendQueryFlightService {
FlightAction::BroadcastAction(action) => {
let session_id = action.query_id.clone();
let is_aborted = self.dispatcher.is_aborted();
let session = self.sessions.create_rpc_session(session_id, is_aborted)?;
let session = self
.sessions
.create_rpc_session(session_id, is_aborted)
.await?;

self.dispatcher
.broadcast_action(session, flight_action)
Expand All @@ -171,7 +174,10 @@ impl FlightService for DatabendQueryFlightService {
FlightAction::PrepareShuffleAction(action) => {
let session_id = action.query_id.clone();
let is_aborted = self.dispatcher.is_aborted();
let session = self.sessions.create_rpc_session(session_id, is_aborted)?;
let session = self
.sessions
.create_rpc_session(session_id, is_aborted)
.await?;

self.dispatcher
.shuffle_action(session, flight_action)
Expand Down
2 changes: 1 addition & 1 deletion query/src/interpreters/interpreter_kill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl Interpreter for KillInterpreter {
.await?;

let id = &self.plan.id;
match self.ctx.get_session_by_id(id) {
match self.ctx.get_session_by_id(id).await {
None => Err(ErrorCode::UnknownSession(format!(
"Not found session id {}",
id
Expand Down
32 changes: 16 additions & 16 deletions query/src/servers/clickhouse/clickhouse_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,27 +77,27 @@ impl ClickHouseHandler {
})
}

fn reject_connection(stream: TcpStream, executor: Arc<Runtime>, error: ErrorCode) {
executor.spawn(async move {
if let Err(error) = RejectCHConnection::reject(stream, error).await {
tracing::error!(
"Unexpected error occurred during reject connection: {:?}",
error
);
}
});
async fn reject_connection(stream: TcpStream, error: ErrorCode) {
if let Err(error) = RejectCHConnection::reject(stream, error).await {
tracing::error!(
"Unexpected error occurred during reject connection: {:?}",
error
);
}
}

fn accept_socket(sessions: Arc<SessionManager>, executor: Arc<Runtime>, socket: TcpStream) {
match sessions.create_session("ClickHouseSession") {
Err(error) => Self::reject_connection(socket, executor, error),
Ok(session) => {
tracing::info!("ClickHouse connection coming: {:?}", socket.peer_addr());
if let Err(error) = ClickHouseConnection::run_on_stream(session, socket) {
tracing::error!("Unexpected error occurred during query: {:?}", error);
executor.spawn(async move {
match sessions.create_session("ClickHouseSession").await {
Err(error) => Self::reject_connection(socket, error).await,
Ok(session) => {
tracing::info!("ClickHouse connection coming: {:?}", socket.peer_addr());
if let Err(error) = ClickHouseConnection::run_on_stream(session, socket) {
tracing::error!("Unexpected error occurred during query: {:?}", error);
}
}
}
}
});
}
}

Expand Down
7 changes: 1 addition & 6 deletions query/src/servers/clickhouse/interactive_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,7 @@ impl ClickHouseSession for InteractiveWorker {
password: Some(password.to_owned()),
hostname: Some(client_ip.to_string()),
};
let user_info_auth = self
.session
.get_session_manager()
.get_auth_manager()
.auth(&credential)
.await;
let user_info_auth = self.session.get_auth_manager().auth(&credential).await;
match user_info_auth {
Ok(user_info) => {
self.session.set_current_user(user_info);
Expand Down
1 change: 1 addition & 0 deletions query/src/servers/http/v1/load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pub async fn streaming_load(
let session_manager = sessions_extension.0;
let session = session_manager
.create_session("Streaming load")
.await
.map_err(InternalServerError)?;

// TODO: list user's grant list and check client address
Expand Down
2 changes: 1 addition & 1 deletion query/src/servers/http/v1/query/execute_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ impl ExecuteState {
) -> Result<(Arc<RwLock<Executor>>, DataSchemaRef)> {
let sql = &request.sql;
let start_time = Instant::now();
let session = session_manager.create_session("http-statement")?;
let session = session_manager.create_session("http-statement").await?;
let ctx = session.create_query_context().await?;
if let Some(db) = &request.session.database {
ctx.set_current_database(db.clone()).await?;
Expand Down
44 changes: 21 additions & 23 deletions query/src/servers/mysql/mysql_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,33 +80,31 @@ impl MySQLHandler {
}

fn accept_socket(sessions: Arc<SessionManager>, executor: Arc<Runtime>, socket: TcpStream) {
match sessions.create_session("MySQL") {
Err(error) => Self::reject_session(socket, executor, error),
Ok(session) => {
tracing::info!("MySQL connection coming: {:?}", socket.peer_addr());
if let Err(error) = MySQLConnection::run_on_stream(session, socket) {
tracing::error!("Unexpected error occurred during query: {:?}", error);
};
executor.spawn(async move {
match sessions.create_session("MySQL").await {
Err(error) => Self::reject_session(socket, error).await,
Ok(session) => {
tracing::info!("MySQL connection coming: {:?}", socket.peer_addr());
if let Err(error) = MySQLConnection::run_on_stream(session, socket) {
tracing::error!("Unexpected error occurred during query: {:?}", error);
};
}
}
}
});
}

fn reject_session(stream: TcpStream, executor: Arc<Runtime>, error: ErrorCode) {
executor.spawn(async move {
let (kind, message) = match error.code() {
41 => (ErrorKind::ER_TOO_MANY_USER_CONNECTIONS, error.message()),
_ => (ErrorKind::ER_INTERNAL_ERROR, error.message()),
};
async fn reject_session(stream: TcpStream, error: ErrorCode) {
let (kind, message) = match error.code() {
41 => (ErrorKind::ER_TOO_MANY_USER_CONNECTIONS, error.message()),
_ => (ErrorKind::ER_INTERNAL_ERROR, error.message()),
};

if let Err(error) =
RejectConnection::reject_mysql_connection(stream, kind, message).await
{
tracing::error!(
"Unexpected error occurred during reject connection: {:?}",
error
);
}
});
if let Err(error) = RejectConnection::reject_mysql_connection(stream, kind, message).await {
tracing::error!(
"Unexpected error occurred during reject connection: {:?}",
error
);
}
}
}

Expand Down
13 changes: 9 additions & 4 deletions query/src/sessions/query_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ impl QueryContext {

// Get user manager api.
pub fn get_user_manager(self: &Arc<Self>) -> Arc<UserApiProvider> {
self.shared.session.get_session_manager().get_user_manager()
self.shared.session.get_user_manager()
}

// Get the current session.
Expand All @@ -295,16 +295,21 @@ impl QueryContext {
}

// Get one session by session id.
pub fn get_session_by_id(self: &Arc<Self>, id: &str) -> Option<SessionRef> {
pub async fn get_session_by_id(self: &Arc<Self>, id: &str) -> Option<SessionRef> {
self.shared
.session
.get_session_manager()
.get_session_by_id(id)
.await
}

// Get all the processes list info.
pub fn get_processes_info(self: &Arc<Self>) -> Vec<ProcessInfo> {
self.shared.session.get_session_manager().processes_info()
pub async fn get_processes_info(self: &Arc<Self>) -> Vec<ProcessInfo> {
self.shared
.session
.get_session_manager()
.processes_info()
.await
}

/// Get the data accessor metrics.
Expand Down
32 changes: 27 additions & 5 deletions query/src/sessions/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ use crate::sessions::QueryContextShared;
use crate::sessions::SessionContext;
use crate::sessions::SessionManager;
use crate::sessions::Settings;
use crate::users::auth::auth_mgr::AuthMgr;
use crate::users::RoleCacheMgr;
use crate::users::UserApiProvider;

#[derive(Clone, MallocSizeOf)]
Expand All @@ -46,19 +48,28 @@ pub struct Session {
pub(in crate::sessions) ref_count: Arc<AtomicUsize>,
pub(in crate::sessions) session_ctx: Arc<SessionContext>,
#[ignore_malloc_size_of = "insignificant"]
pub(in crate::sessions) user_manager: Arc<UserApiProvider>,
#[ignore_malloc_size_of = "insignificant"]
pub(in crate::sessions) auth_manager: Arc<AuthMgr>,
#[ignore_malloc_size_of = "insignificant"]
pub(in crate::sessions) role_cache_manager: Arc<RoleCacheMgr>,
#[ignore_malloc_size_of = "insignificant"]
session_settings: Settings,
}

impl Session {
pub fn try_create(
pub async fn try_create(
conf: Config,
id: String,
typ: String,
session_mgr: Arc<SessionManager>,
) -> Result<Arc<Session>> {
let user_api = session_mgr.get_user_manager();
let user_manager = UserApiProvider::create_global(conf.clone()).await?;
let auth_manager = Arc::new(AuthMgr::create(conf.clone(), user_manager.clone()).await?);
let role_cache_manager = Arc::new(RoleCacheMgr::new(user_manager.clone()));
let session_ctx = Arc::new(SessionContext::try_create(conf.clone())?);
let session_settings = Settings::try_create(&conf, session_ctx.clone(), user_api)?;
let session_settings =
Settings::try_create(&conf, session_ctx.clone(), user_manager.clone())?;
let ref_count = Arc::new(AtomicUsize::new(0));

Ok(Arc::new(Session {
Expand All @@ -68,6 +79,9 @@ impl Session {
session_mgr,
ref_count,
session_ctx,
user_manager,
auth_manager,
role_cache_manager,
session_settings,
}))
}
Expand Down Expand Up @@ -192,7 +206,7 @@ impl Session {
}

let tenant = self.get_current_tenant();
let role_cache = self.session_mgr.get_role_cache_manager();
let role_cache = self.get_role_cache_manager();
let role_verified = role_cache
.verify_privilege(&tenant, &current_user.grants.roles(), object, privilege)
.await?;
Expand All @@ -219,7 +233,15 @@ impl Session {
}

pub fn get_user_manager(self: &Arc<Self>) -> Arc<UserApiProvider> {
self.session_mgr.get_user_manager()
self.user_manager.clone()
}

pub fn get_auth_manager(self: &Arc<Self>) -> Arc<AuthMgr> {
self.auth_manager.clone()
}

pub fn get_role_cache_manager(self: &Arc<Self>) -> Arc<RoleCacheMgr> {
self.role_cache_manager.clone()
}

pub fn get_memory_usage(self: &Arc<Self>) -> usize {
Expand Down
Loading

0 comments on commit ccb7f4c

Please sign in to comment.