Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix: Global MetaGrpcClient cause dispatch drop error #4361

Merged
merged 10 commits into from
Mar 11, 2022
2 changes: 1 addition & 1 deletion query/benches/suites/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub mod bench_sort_query_sql;

pub async fn select_executor(sql: &str) -> Result<()> {
let sessions = SessionManager::from_conf(Config::default()).await?;
let executor_session = sessions.create_session("Benches")?;
let executor_session = sessions.create_session("Benches").await?;
let ctx = executor_session.create_query_context().await?;

if let PlanNode::Select(plan) = PlanParser::parse(ctx.clone(), sql).await? {
Expand Down
2 changes: 1 addition & 1 deletion query/src/api/http/v1/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub async fn cluster_list_handler(
}

async fn list_nodes(sessions: &Arc<SessionManager>) -> Result<Vec<Arc<NodeInfo>>> {
let watch_cluster_session = sessions.create_session("WatchCluster")?;
let watch_cluster_session = sessions.create_session("WatchCluster").await?;
let watch_cluster_context = watch_cluster_session.create_query_context().await?;
Ok(watch_cluster_context.get_cluster().get_nodes())
}
2 changes: 1 addition & 1 deletion query/src/api/http/v1/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub async fn logs_handler(
}

async fn select_table(sessions: &Arc<SessionManager>) -> Result<Body> {
let session = sessions.create_session("WatchLogs")?;
let session = sessions.create_session("WatchLogs").await?;
let query_context = session.create_query_context().await?;
let mut tracing_table_stream = execute_query(query_context).await?;

Expand Down
12 changes: 9 additions & 3 deletions query/src/api/rpc/flight_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ impl FlightService for DatabendQueryFlightService {
FlightAction::CancelAction(action) => {
// We only destroy when session is exist
let session_id = action.query_id.clone();
if let Some(session) = self.sessions.get_session_by_id(&session_id) {
if let Some(session) = self.sessions.get_session_by_id(&session_id).await {
// TODO: remove streams
session.force_kill_session();
}
Expand All @@ -161,7 +161,10 @@ impl FlightService for DatabendQueryFlightService {
FlightAction::BroadcastAction(action) => {
let session_id = action.query_id.clone();
let is_aborted = self.dispatcher.is_aborted();
let session = self.sessions.create_rpc_session(session_id, is_aborted)?;
let session = self
.sessions
.create_rpc_session(session_id, is_aborted)
.await?;

self.dispatcher
.broadcast_action(session, flight_action)
Expand All @@ -171,7 +174,10 @@ impl FlightService for DatabendQueryFlightService {
FlightAction::PrepareShuffleAction(action) => {
let session_id = action.query_id.clone();
let is_aborted = self.dispatcher.is_aborted();
let session = self.sessions.create_rpc_session(session_id, is_aborted)?;
let session = self
.sessions
.create_rpc_session(session_id, is_aborted)
.await?;

self.dispatcher
.shuffle_action(session, flight_action)
Expand Down
2 changes: 1 addition & 1 deletion query/src/interpreters/interpreter_kill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl Interpreter for KillInterpreter {
.await?;

let id = &self.plan.id;
match self.ctx.get_session_by_id(id) {
match self.ctx.get_session_by_id(id).await {
None => Err(ErrorCode::UnknownSession(format!(
"Not found session id {}",
id
Expand Down
32 changes: 16 additions & 16 deletions query/src/servers/clickhouse/clickhouse_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,27 +77,27 @@ impl ClickHouseHandler {
})
}

fn reject_connection(stream: TcpStream, executor: Arc<Runtime>, error: ErrorCode) {
executor.spawn(async move {
if let Err(error) = RejectCHConnection::reject(stream, error).await {
tracing::error!(
"Unexpected error occurred during reject connection: {:?}",
error
);
}
});
async fn reject_connection(stream: TcpStream, error: ErrorCode) {
if let Err(error) = RejectCHConnection::reject(stream, error).await {
tracing::error!(
"Unexpected error occurred during reject connection: {:?}",
error
);
}
}

fn accept_socket(sessions: Arc<SessionManager>, executor: Arc<Runtime>, socket: TcpStream) {
match sessions.create_session("ClickHouseSession") {
Err(error) => Self::reject_connection(socket, executor, error),
Ok(session) => {
tracing::info!("ClickHouse connection coming: {:?}", socket.peer_addr());
if let Err(error) = ClickHouseConnection::run_on_stream(session, socket) {
tracing::error!("Unexpected error occurred during query: {:?}", error);
executor.spawn(async move {
match sessions.create_session("ClickHouseSession").await {
Err(error) => Self::reject_connection(socket, error).await,
Ok(session) => {
tracing::info!("ClickHouse connection coming: {:?}", socket.peer_addr());
if let Err(error) = ClickHouseConnection::run_on_stream(session, socket) {
tracing::error!("Unexpected error occurred during query: {:?}", error);
}
}
}
}
});
}
}

Expand Down
7 changes: 1 addition & 6 deletions query/src/servers/clickhouse/interactive_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,7 @@ impl ClickHouseSession for InteractiveWorker {
password: Some(password.to_owned()),
hostname: Some(client_ip.to_string()),
};
let user_info_auth = self
.session
.get_session_manager()
.get_auth_manager()
.auth(&credential)
.await;
let user_info_auth = self.session.get_auth_manager().auth(&credential).await;
match user_info_auth {
Ok(user_info) => {
self.session.set_current_user(user_info);
Expand Down
1 change: 1 addition & 0 deletions query/src/servers/http/v1/load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pub async fn streaming_load(
let session_manager = sessions_extension.0;
let session = session_manager
.create_session("Streaming load")
.await
.map_err(InternalServerError)?;

// TODO: list user's grant list and check client address
Expand Down
2 changes: 1 addition & 1 deletion query/src/servers/http/v1/query/execute_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ impl ExecuteState {
) -> Result<(Arc<RwLock<Executor>>, DataSchemaRef)> {
let sql = &request.sql;
let start_time = Instant::now();
let session = session_manager.create_session("http-statement")?;
let session = session_manager.create_session("http-statement").await?;
let ctx = session.create_query_context().await?;
if let Some(db) = &request.session.database {
ctx.set_current_database(db.clone()).await?;
Expand Down
44 changes: 21 additions & 23 deletions query/src/servers/mysql/mysql_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,33 +80,31 @@ impl MySQLHandler {
}

fn accept_socket(sessions: Arc<SessionManager>, executor: Arc<Runtime>, socket: TcpStream) {
match sessions.create_session("MySQL") {
Err(error) => Self::reject_session(socket, executor, error),
Ok(session) => {
tracing::info!("MySQL connection coming: {:?}", socket.peer_addr());
if let Err(error) = MySQLConnection::run_on_stream(session, socket) {
tracing::error!("Unexpected error occurred during query: {:?}", error);
};
executor.spawn(async move {
match sessions.create_session("MySQL").await {
Err(error) => Self::reject_session(socket, error).await,
Ok(session) => {
tracing::info!("MySQL connection coming: {:?}", socket.peer_addr());
if let Err(error) = MySQLConnection::run_on_stream(session, socket) {
tracing::error!("Unexpected error occurred during query: {:?}", error);
};
}
}
}
});
}

fn reject_session(stream: TcpStream, executor: Arc<Runtime>, error: ErrorCode) {
executor.spawn(async move {
let (kind, message) = match error.code() {
41 => (ErrorKind::ER_TOO_MANY_USER_CONNECTIONS, error.message()),
_ => (ErrorKind::ER_INTERNAL_ERROR, error.message()),
};
async fn reject_session(stream: TcpStream, error: ErrorCode) {
let (kind, message) = match error.code() {
41 => (ErrorKind::ER_TOO_MANY_USER_CONNECTIONS, error.message()),
_ => (ErrorKind::ER_INTERNAL_ERROR, error.message()),
};

if let Err(error) =
RejectConnection::reject_mysql_connection(stream, kind, message).await
{
tracing::error!(
"Unexpected error occurred during reject connection: {:?}",
error
);
}
});
if let Err(error) = RejectConnection::reject_mysql_connection(stream, kind, message).await {
tracing::error!(
"Unexpected error occurred during reject connection: {:?}",
error
);
}
}
}

Expand Down
13 changes: 9 additions & 4 deletions query/src/sessions/query_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ impl QueryContext {

// Get user manager api.
pub fn get_user_manager(self: &Arc<Self>) -> Arc<UserApiProvider> {
self.shared.session.get_session_manager().get_user_manager()
self.shared.session.get_user_manager()
}

// Get the current session.
Expand All @@ -295,16 +295,21 @@ impl QueryContext {
}

// Get one session by session id.
pub fn get_session_by_id(self: &Arc<Self>, id: &str) -> Option<SessionRef> {
pub async fn get_session_by_id(self: &Arc<Self>, id: &str) -> Option<SessionRef> {
self.shared
.session
.get_session_manager()
.get_session_by_id(id)
.await
}

// Get all the processes list info.
pub fn get_processes_info(self: &Arc<Self>) -> Vec<ProcessInfo> {
self.shared.session.get_session_manager().processes_info()
pub async fn get_processes_info(self: &Arc<Self>) -> Vec<ProcessInfo> {
self.shared
.session
.get_session_manager()
.processes_info()
.await
}

/// Get the data accessor metrics.
Expand Down
32 changes: 27 additions & 5 deletions query/src/sessions/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ use crate::sessions::QueryContextShared;
use crate::sessions::SessionContext;
use crate::sessions::SessionManager;
use crate::sessions::Settings;
use crate::users::auth::auth_mgr::AuthMgr;
use crate::users::RoleCacheMgr;
use crate::users::UserApiProvider;

#[derive(Clone, MallocSizeOf)]
Expand All @@ -46,19 +48,28 @@ pub struct Session {
pub(in crate::sessions) ref_count: Arc<AtomicUsize>,
pub(in crate::sessions) session_ctx: Arc<SessionContext>,
#[ignore_malloc_size_of = "insignificant"]
pub(in crate::sessions) user_manager: Arc<UserApiProvider>,
#[ignore_malloc_size_of = "insignificant"]
pub(in crate::sessions) auth_manager: Arc<AuthMgr>,
#[ignore_malloc_size_of = "insignificant"]
pub(in crate::sessions) role_cache_manager: Arc<RoleCacheMgr>,
#[ignore_malloc_size_of = "insignificant"]
session_settings: Settings,
}

impl Session {
pub fn try_create(
pub async fn try_create(
conf: Config,
id: String,
typ: String,
session_mgr: Arc<SessionManager>,
) -> Result<Arc<Session>> {
let user_api = session_mgr.get_user_manager();
let user_manager = UserApiProvider::create_global(conf.clone()).await?;
let auth_manager = Arc::new(AuthMgr::create(conf.clone(), user_manager.clone()).await?);
let role_cache_manager = Arc::new(RoleCacheMgr::new(user_manager.clone()));
let session_ctx = Arc::new(SessionContext::try_create(conf.clone())?);
let session_settings = Settings::try_create(&conf, session_ctx.clone(), user_api)?;
let session_settings =
Settings::try_create(&conf, session_ctx.clone(), user_manager.clone())?;
let ref_count = Arc::new(AtomicUsize::new(0));

Ok(Arc::new(Session {
Expand All @@ -68,6 +79,9 @@ impl Session {
session_mgr,
ref_count,
session_ctx,
user_manager,
ariesdevil marked this conversation as resolved.
Show resolved Hide resolved
auth_manager,
role_cache_manager,
session_settings,
}))
}
Expand Down Expand Up @@ -192,7 +206,7 @@ impl Session {
}

let tenant = self.get_current_tenant();
let role_cache = self.session_mgr.get_role_cache_manager();
let role_cache = self.get_role_cache_manager();
let role_verified = role_cache
.verify_privilege(&tenant, &current_user.grants.roles(), object, privilege)
.await?;
Expand All @@ -219,7 +233,15 @@ impl Session {
}

pub fn get_user_manager(self: &Arc<Self>) -> Arc<UserApiProvider> {
self.session_mgr.get_user_manager()
self.user_manager.clone()
}

pub fn get_auth_manager(self: &Arc<Self>) -> Arc<AuthMgr> {
self.auth_manager.clone()
}

pub fn get_role_cache_manager(self: &Arc<Self>) -> Arc<RoleCacheMgr> {
self.role_cache_manager.clone()
}

pub fn get_memory_usage(self: &Arc<Self>) -> usize {
Expand Down
Loading