diff --git a/query/benches/suites/mod.rs b/query/benches/suites/mod.rs index 1529331a306f6..146310482aa29 100644 --- a/query/benches/suites/mod.rs +++ b/query/benches/suites/mod.rs @@ -29,7 +29,7 @@ pub mod bench_sort_query_sql; pub async fn select_executor(sql: &str) -> Result<()> { let sessions = SessionManager::from_conf(Config::default()).await?; - let executor_session = sessions.create_session("Benches")?; + let executor_session = sessions.create_session("Benches").await?; let ctx = executor_session.create_query_context().await?; if let PlanNode::Select(plan) = PlanParser::parse(ctx.clone(), sql).await? { diff --git a/query/src/api/http/v1/cluster.rs b/query/src/api/http/v1/cluster.rs index bfa5d105e9eff..f6089b581f7fb 100644 --- a/query/src/api/http/v1/cluster.rs +++ b/query/src/api/http/v1/cluster.rs @@ -42,7 +42,7 @@ pub async fn cluster_list_handler( } async fn list_nodes(sessions: &Arc) -> Result>> { - let watch_cluster_session = sessions.create_session("WatchCluster")?; + let watch_cluster_session = sessions.create_session("WatchCluster").await?; let watch_cluster_context = watch_cluster_session.create_query_context().await?; Ok(watch_cluster_context.get_cluster().get_nodes()) } diff --git a/query/src/api/http/v1/logs.rs b/query/src/api/http/v1/logs.rs index dd76a9f456079..2fa1cd74c52b3 100644 --- a/query/src/api/http/v1/logs.rs +++ b/query/src/api/http/v1/logs.rs @@ -42,7 +42,7 @@ pub async fn logs_handler( } async fn select_table(sessions: &Arc) -> Result { - let session = sessions.create_session("WatchLogs")?; + let session = sessions.create_session("WatchLogs").await?; let query_context = session.create_query_context().await?; let mut tracing_table_stream = execute_query(query_context).await?; diff --git a/query/src/api/rpc/flight_service.rs b/query/src/api/rpc/flight_service.rs index ee45e72f5d1fc..88c2b04ca4647 100644 --- a/query/src/api/rpc/flight_service.rs +++ b/query/src/api/rpc/flight_service.rs @@ -161,7 +161,10 @@ impl FlightService for DatabendQueryFlightService { FlightAction::BroadcastAction(action) => { let session_id = action.query_id.clone(); let is_aborted = self.dispatcher.is_aborted(); - let session = self.sessions.create_rpc_session(session_id, is_aborted)?; + let session = self + .sessions + .create_rpc_session(session_id, is_aborted) + .await?; self.dispatcher .broadcast_action(session, flight_action) @@ -171,7 +174,10 @@ impl FlightService for DatabendQueryFlightService { FlightAction::PrepareShuffleAction(action) => { let session_id = action.query_id.clone(); let is_aborted = self.dispatcher.is_aborted(); - let session = self.sessions.create_rpc_session(session_id, is_aborted)?; + let session = self + .sessions + .create_rpc_session(session_id, is_aborted) + .await?; self.dispatcher .shuffle_action(session, flight_action) diff --git a/query/src/servers/clickhouse/clickhouse_handler.rs b/query/src/servers/clickhouse/clickhouse_handler.rs index 5dabd42f944e3..b7babbe82fe84 100644 --- a/query/src/servers/clickhouse/clickhouse_handler.rs +++ b/query/src/servers/clickhouse/clickhouse_handler.rs @@ -77,27 +77,27 @@ impl ClickHouseHandler { }) } - fn reject_connection(stream: TcpStream, executor: Arc, error: ErrorCode) { - executor.spawn(async move { - if let Err(error) = RejectCHConnection::reject(stream, error).await { - tracing::error!( - "Unexpected error occurred during reject connection: {:?}", - error - ); - } - }); + async fn reject_connection(stream: TcpStream, error: ErrorCode) { + if let Err(error) = RejectCHConnection::reject(stream, error).await { + tracing::error!( + "Unexpected error occurred during reject connection: {:?}", + error + ); + } } fn accept_socket(sessions: Arc, executor: Arc, socket: TcpStream) { - match sessions.create_session("ClickHouseSession") { - Err(error) => Self::reject_connection(socket, executor, error), - Ok(session) => { - tracing::info!("ClickHouse connection coming: {:?}", socket.peer_addr()); - if let Err(error) = ClickHouseConnection::run_on_stream(session, socket) { - tracing::error!("Unexpected error occurred during query: {:?}", error); + executor.spawn(async move { + match sessions.create_session("ClickHouseSession").await { + Err(error) => Self::reject_connection(socket, error).await, + Ok(session) => { + tracing::info!("ClickHouse connection coming: {:?}", socket.peer_addr()); + if let Err(error) = ClickHouseConnection::run_on_stream(session, socket) { + tracing::error!("Unexpected error occurred during query: {:?}", error); + } } } - } + }); } } diff --git a/query/src/servers/http/v1/load.rs b/query/src/servers/http/v1/load.rs index acf79809d8db2..7f63e684bda85 100644 --- a/query/src/servers/http/v1/load.rs +++ b/query/src/servers/http/v1/load.rs @@ -56,6 +56,7 @@ pub async fn streaming_load( let session_manager = sessions_extension.0; let session = session_manager .create_session("Streaming load") + .await .map_err(InternalServerError)?; // TODO: list user's grant list and check client address diff --git a/query/src/servers/http/v1/query/execute_state.rs b/query/src/servers/http/v1/query/execute_state.rs index ca3a1a4d8214d..ea4dfcbda9ccf 100644 --- a/query/src/servers/http/v1/query/execute_state.rs +++ b/query/src/servers/http/v1/query/execute_state.rs @@ -141,7 +141,7 @@ impl ExecuteState { ) -> Result<(Arc>, DataSchemaRef)> { let sql = &request.sql; let start_time = Instant::now(); - let session = session_manager.create_session("http-statement")?; + let session = session_manager.create_session("http-statement").await?; let ctx = session.create_query_context().await?; if let Some(db) = &request.session.database { ctx.set_current_database(db.clone()).await?; diff --git a/query/src/servers/mysql/mysql_handler.rs b/query/src/servers/mysql/mysql_handler.rs index f62d077521b69..941c4887d996b 100644 --- a/query/src/servers/mysql/mysql_handler.rs +++ b/query/src/servers/mysql/mysql_handler.rs @@ -80,33 +80,31 @@ impl MySQLHandler { } fn accept_socket(sessions: Arc, executor: Arc, socket: TcpStream) { - match sessions.create_session("MySQL") { - Err(error) => Self::reject_session(socket, executor, error), - Ok(session) => { - tracing::info!("MySQL connection coming: {:?}", socket.peer_addr()); - if let Err(error) = MySQLConnection::run_on_stream(session, socket) { - tracing::error!("Unexpected error occurred during query: {:?}", error); - }; + executor.spawn(async move { + match sessions.create_session("MySQL").await { + Err(error) => Self::reject_session(socket, error).await, + Ok(session) => { + tracing::info!("MySQL connection coming: {:?}", socket.peer_addr()); + if let Err(error) = MySQLConnection::run_on_stream(session, socket) { + tracing::error!("Unexpected error occurred during query: {:?}", error); + }; + } } - } + }); } - fn reject_session(stream: TcpStream, executor: Arc, error: ErrorCode) { - executor.spawn(async move { - let (kind, message) = match error.code() { - 41 => (ErrorKind::ER_TOO_MANY_USER_CONNECTIONS, error.message()), - _ => (ErrorKind::ER_INTERNAL_ERROR, error.message()), - }; + async fn reject_session(stream: TcpStream, error: ErrorCode) { + let (kind, message) = match error.code() { + 41 => (ErrorKind::ER_TOO_MANY_USER_CONNECTIONS, error.message()), + _ => (ErrorKind::ER_INTERNAL_ERROR, error.message()), + }; - if let Err(error) = - RejectConnection::reject_mysql_connection(stream, kind, message).await - { - tracing::error!( - "Unexpected error occurred during reject connection: {:?}", - error - ); - } - }); + if let Err(error) = RejectConnection::reject_mysql_connection(stream, kind, message).await { + tracing::error!( + "Unexpected error occurred during reject connection: {:?}", + error + ); + } } } diff --git a/query/src/sessions/session.rs b/query/src/sessions/session.rs index c1ccda9b81dc4..469672561fc57 100644 --- a/query/src/sessions/session.rs +++ b/query/src/sessions/session.rs @@ -58,18 +58,14 @@ pub struct Session { } impl Session { - pub fn try_create( + pub async fn try_create( conf: Config, id: String, typ: String, session_mgr: Arc, ) -> Result> { - let user_manager = - futures::executor::block_on(UserApiProvider::create_global(conf.clone()))?; - let auth_manager = Arc::new(futures::executor::block_on(AuthMgr::create( - conf.clone(), - user_manager.clone(), - ))?); + let user_manager = UserApiProvider::create_global(conf.clone()).await?; + let auth_manager = Arc::new(AuthMgr::create(conf.clone(), user_manager.clone()).await?); let role_cache_manager = Arc::new(RoleCacheMgr::new(user_manager.clone())); let session_ctx = Arc::new(SessionContext::try_create(conf.clone())?); let session_settings = diff --git a/query/src/sessions/session_mgr.rs b/query/src/sessions/session_mgr.rs index 2f61cb1e3f6bc..c99f31eaf4df0 100644 --- a/query/src/sessions/session_mgr.rs +++ b/query/src/sessions/session_mgr.rs @@ -21,10 +21,10 @@ use std::sync::Arc; use std::time::Duration; use common_base::tokio; +use common_base::tokio::sync::RwLock; use common_base::SignalStream; use common_exception::ErrorCode; use common_exception::Result; -use common_infallible::RwLock; use common_metrics::label_counter; use common_tracing::tracing; use futures::future::Either; @@ -124,9 +124,12 @@ impl SessionManager { self.storage_cache_manager.as_ref() } - pub fn create_session(self: &Arc, typ: impl Into) -> Result { - let mut sessions = self.active_sessions.write(); - match sessions.len() == self.max_sessions { + pub async fn create_session(self: &Arc, typ: impl Into) -> Result { + let total_sessions = { + let sessions = self.active_sessions.read().await; + sessions.len() + }; + match total_sessions == self.max_sessions { true => Err(ErrorCode::TooManyUserConnections( "The current accept connection has exceeded mysql_handler_thread_num config", )), @@ -136,7 +139,8 @@ impl SessionManager { uuid::Uuid::new_v4().to_string(), typ.into(), self.clone(), - )?; + ) + .await?; label_counter( super::metrics::METRIC_SESSION_CONNECT_NUMBERS, @@ -144,14 +148,21 @@ impl SessionManager { &self.conf.query.cluster_id, ); - sessions.insert(session.get_id(), session.clone()); + { + let mut sessions = self.active_sessions.write().await; + sessions.insert(session.get_id(), session.clone()); + } Ok(SessionRef::create(session)) } } } - pub fn create_rpc_session(self: &Arc, id: String, aborted: bool) -> Result { - let mut sessions = self.active_sessions.write(); + pub async fn create_rpc_session( + self: &Arc, + id: String, + aborted: bool, + ) -> Result { + let mut sessions = self.active_sessions.write().await; let session = match sessions.entry(id) { Occupied(entry) => entry.get().clone(), @@ -162,7 +173,8 @@ impl SessionManager { entry.key().clone(), String::from("RPCSession"), self.clone(), - )?; + ) + .await?; label_counter( super::metrics::METRIC_SESSION_CONNECT_NUMBERS, @@ -179,7 +191,7 @@ impl SessionManager { #[allow(clippy::ptr_arg)] pub fn get_session_by_id(self: &Arc, id: &str) -> Option { - let sessions = self.active_sessions.read(); + let sessions = futures::executor::block_on(self.active_sessions.read()); sessions .get(id) .map(|session| SessionRef::create(session.clone())) @@ -193,7 +205,8 @@ impl SessionManager { &self.conf.query.cluster_id, ); - self.active_sessions.write().remove(session_id); + let mut sessions = futures::executor::block_on(self.active_sessions.write()); + sessions.remove(session_id); } pub fn graceful_shutdown( @@ -224,14 +237,15 @@ impl SessionManager { tracing::info!("Will shutdown forcefully."); active_sessions .read() + .await .values() .for_each(Session::force_kill_session); } } pub fn processes_info(self: &Arc) -> Vec { - self.active_sessions - .read() + let sessions = futures::executor::block_on(self.active_sessions.read()); + sessions .values() .map(Session::process_info) .collect::>() @@ -240,7 +254,7 @@ impl SessionManager { fn destroy_idle_sessions(sessions: &Arc>>>) -> bool { // Read lock does not support reentrant // https://github.com/Amanieu/parking_lot/blob/lock_api-0.4.4/lock_api/src/rwlock.rs#L422 - let active_sessions_read_guard = sessions.read(); + let active_sessions_read_guard = futures::executor::block_on(sessions.read()); // First try to kill the idle session active_sessions_read_guard.values().for_each(Session::kill); diff --git a/query/tests/it/api/rpc/flight_actions.rs b/query/tests/it/api/rpc/flight_actions.rs index 8a3d5e3411e51..00f4af127f450 100644 --- a/query/tests/it/api/rpc/flight_actions.rs +++ b/query/tests/it/api/rpc/flight_actions.rs @@ -27,7 +27,7 @@ use crate::tests::create_query_context; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_shuffle_action_try_into() -> Result<()> { - let ctx = create_query_context()?; + let ctx = create_query_context().await?; let shuffle_action = ShuffleAction { query_id: String::from("query_id"), diff --git a/query/tests/it/api/rpc/flight_dispatcher.rs b/query/tests/it/api/rpc/flight_dispatcher.rs index 4bc888506d758..4c6e421d251f5 100644 --- a/query/tests/it/api/rpc/flight_dispatcher.rs +++ b/query/tests/it/api/rpc/flight_dispatcher.rs @@ -49,11 +49,11 @@ async fn test_get_stream_with_non_exists_stream() -> Result<()> { #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_run_shuffle_action_with_no_scatters() -> Result<()> { if let (Some(query_id), Some(stage_id), Some(stream_id)) = generate_uuids(3) { - let ctx = create_query_context()?; + let ctx = create_query_context().await?; let flight_dispatcher = DatabendQueryFlightDispatcher::create(); let sessions = SessionManagerBuilder::create().build()?; - let rpc_session = sessions.create_rpc_session(query_id.clone(), false)?; + let rpc_session = sessions.create_rpc_session(query_id.clone(), false).await?; flight_dispatcher .shuffle_action( @@ -94,11 +94,11 @@ async fn test_run_shuffle_action_with_no_scatters() -> Result<()> { #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_run_shuffle_action_with_scatter() -> Result<()> { if let (Some(query_id), Some(stage_id), None) = generate_uuids(2) { - let ctx = create_query_context()?; + let ctx = create_query_context().await?; let flight_dispatcher = DatabendQueryFlightDispatcher::create(); let sessions = SessionManagerBuilder::create().build()?; - let rpc_session = sessions.create_rpc_session(query_id.clone(), false)?; + let rpc_session = sessions.create_rpc_session(query_id.clone(), false).await?; flight_dispatcher .shuffle_action( diff --git a/query/tests/it/api/rpc/flight_service.rs b/query/tests/it/api/rpc/flight_service.rs index 5b65d976c36c0..f9e38d136927f 100644 --- a/query/tests/it/api/rpc/flight_service.rs +++ b/query/tests/it/api/rpc/flight_service.rs @@ -162,7 +162,7 @@ fn do_get_request(query_id: &str, stage_id: &str) -> Result> { } async fn do_action_request(query_id: &str, stage_id: &str) -> Result> { - let ctx = create_query_context()?; + let ctx = create_query_context().await?; let flight_action = FlightAction::PrepareShuffleAction(ShuffleAction { query_id: String::from(query_id), stage_id: String::from(stage_id), diff --git a/query/tests/it/functions/context_function.rs b/query/tests/it/functions/context_function.rs index 2177c935728d0..3c15bb1f25f60 100644 --- a/query/tests/it/functions/context_function.rs +++ b/query/tests/it/functions/context_function.rs @@ -12,13 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +use common_base::tokio; use common_exception::Result; use databend_query::functions::ContextFunction; -#[test] -fn test_context_function_build_arg_from_ctx() -> Result<()> { +#[tokio::test] +async fn test_context_function_build_arg_from_ctx() -> Result<()> { use pretty_assertions::assert_eq; - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; // Ok. { diff --git a/query/tests/it/interpreters/access/management_mode_access.rs b/query/tests/it/interpreters/access/management_mode_access.rs index 9aff2c0882242..73e76c45fbd5a 100644 --- a/query/tests/it/interpreters/access/management_mode_access.rs +++ b/query/tests/it/interpreters/access/management_mode_access.rs @@ -151,7 +151,7 @@ async fn test_management_mode_access() -> Result<()> { let conf = crate::tests::ConfigBuilder::create() .with_management_mode() .config(); - let ctx = crate::tests::create_query_context_with_config(conf.clone())?; + let ctx = crate::tests::create_query_context_with_config(conf.clone()).await?; // First to set tenant. { let plan = PlanParser::parse(ctx.clone(), "SUDO USE TENANT 'test'").await?; diff --git a/query/tests/it/interpreters/interpreter_admin_use_tenant.rs b/query/tests/it/interpreters/interpreter_admin_use_tenant.rs index f2dce40d899aa..b20839292bd58 100644 --- a/query/tests/it/interpreters/interpreter_admin_use_tenant.rs +++ b/query/tests/it/interpreters/interpreter_admin_use_tenant.rs @@ -24,7 +24,7 @@ async fn test_use_tenant_interpreter() -> Result<()> { let conf = crate::tests::ConfigBuilder::create() .with_management_mode() .config(); - let ctx = crate::tests::create_query_context_with_config(conf.clone())?; + let ctx = crate::tests::create_query_context_with_config(conf.clone()).await?; let plan = PlanParser::parse(ctx.clone(), "SUDO USE TENANT 't1'").await?; let interpreter = InterpreterFactory::get(ctx.clone(), plan)?; @@ -41,7 +41,7 @@ async fn test_use_tenant_interpreter() -> Result<()> { #[tokio::test] async fn test_use_tenant_interpreter_error() -> Result<()> { - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; let plan = PlanParser::parse(ctx.clone(), "SUDO USE TENANT 't1'").await?; let interpreter = InterpreterFactory::get(ctx, plan)?; diff --git a/query/tests/it/interpreters/interpreter_call.rs b/query/tests/it/interpreters/interpreter_call.rs index d452aa60160e4..b102b6d214480 100644 --- a/query/tests/it/interpreters/interpreter_call.rs +++ b/query/tests/it/interpreters/interpreter_call.rs @@ -23,7 +23,7 @@ use pretty_assertions::assert_eq; async fn test_call_interpreter() -> Result<()> { common_tracing::init_default_ut_tracing(); - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; let plan = PlanParser::parse(ctx.clone(), "call system$test()").await?; let executor = InterpreterFactory::get(ctx, plan.clone())?; @@ -41,7 +41,7 @@ async fn test_call_interpreter() -> Result<()> { async fn test_call_fuse_history_interpreter() -> Result<()> { common_tracing::init_default_ut_tracing(); - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; // NumberArgumentsNotMatch { diff --git a/query/tests/it/interpreters/interpreter_database_create.rs b/query/tests/it/interpreters/interpreter_database_create.rs index d822b51b048fc..af13ad7d24d84 100644 --- a/query/tests/it/interpreters/interpreter_database_create.rs +++ b/query/tests/it/interpreters/interpreter_database_create.rs @@ -23,7 +23,7 @@ use pretty_assertions::assert_eq; async fn test_create_database_interpreter() -> Result<()> { common_tracing::init_default_ut_tracing(); - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; let plan = PlanParser::parse(ctx.clone(), "create database db1").await?; let executor = InterpreterFactory::get(ctx, plan.clone())?; diff --git a/query/tests/it/interpreters/interpreter_database_drop.rs b/query/tests/it/interpreters/interpreter_database_drop.rs index 266d1f031db66..21f727cce86e1 100644 --- a/query/tests/it/interpreters/interpreter_database_drop.rs +++ b/query/tests/it/interpreters/interpreter_database_drop.rs @@ -21,7 +21,7 @@ use pretty_assertions::assert_eq; #[tokio::test] async fn test_drop_database_interpreter() -> Result<()> { - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; let plan = PlanParser::parse(ctx.clone(), "drop database default").await?; let executor = InterpreterFactory::get(ctx, plan.clone())?; diff --git a/query/tests/it/interpreters/interpreter_database_show_create.rs b/query/tests/it/interpreters/interpreter_database_show_create.rs index a2c297ec9341c..e84e0a5fcc7e7 100644 --- a/query/tests/it/interpreters/interpreter_database_show_create.rs +++ b/query/tests/it/interpreters/interpreter_database_show_create.rs @@ -21,7 +21,7 @@ use pretty_assertions::assert_eq; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_show_create_database_interpreter() -> Result<()> { - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; // show create database default { diff --git a/query/tests/it/interpreters/interpreter_explain.rs b/query/tests/it/interpreters/interpreter_explain.rs index 0fc1381ee163f..aa29f030c344b 100644 --- a/query/tests/it/interpreters/interpreter_explain.rs +++ b/query/tests/it/interpreters/interpreter_explain.rs @@ -21,7 +21,7 @@ use pretty_assertions::assert_eq; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_explain_interpreter() -> Result<()> { - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; let query = "\ EXPLAIN SELECT number FROM numbers_mt(10) \ diff --git a/query/tests/it/interpreters/interpreter_factory_interceptor.rs b/query/tests/it/interpreters/interpreter_factory_interceptor.rs index a9ca91827c430..d642c2b5b709d 100644 --- a/query/tests/it/interpreters/interpreter_factory_interceptor.rs +++ b/query/tests/it/interpreters/interpreter_factory_interceptor.rs @@ -22,7 +22,7 @@ use pretty_assertions::assert_eq; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_interpreter_interceptor() -> Result<()> { common_tracing::init_default_ut_tracing(); - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; { let query = "select number from numbers_mt(100) where number > 90"; ctx.attach_query_str(query); @@ -80,7 +80,7 @@ async fn test_interpreter_interceptor() -> Result<()> { #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_interpreter_interceptor_for_insert() -> Result<()> { common_tracing::init_default_ut_tracing(); - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; { let query = "create table t as select number from numbers_mt(1)"; ctx.attach_query_str(query); diff --git a/query/tests/it/interpreters/interpreter_insert.rs b/query/tests/it/interpreters/interpreter_insert.rs index 12efeac74bc5a..1aa4de8b6d9bf 100644 --- a/query/tests/it/interpreters/interpreter_insert.rs +++ b/query/tests/it/interpreters/interpreter_insert.rs @@ -20,7 +20,7 @@ use futures::TryStreamExt; #[tokio::test] async fn test_insert_into_interpreter() -> Result<()> { - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; // Create default value table. { diff --git a/query/tests/it/interpreters/interpreter_select.rs b/query/tests/it/interpreters/interpreter_select.rs index a7b6117066e54..0ae9a1b94407d 100644 --- a/query/tests/it/interpreters/interpreter_select.rs +++ b/query/tests/it/interpreters/interpreter_select.rs @@ -22,7 +22,7 @@ use pretty_assertions::assert_eq; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_select_interpreter() -> Result<()> { common_tracing::init_default_ut_tracing(); - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; { let query = "select number from numbers_mt(10)"; diff --git a/query/tests/it/interpreters/interpreter_setting.rs b/query/tests/it/interpreters/interpreter_setting.rs index 2b9237c74ae90..c16f34e89a211 100644 --- a/query/tests/it/interpreters/interpreter_setting.rs +++ b/query/tests/it/interpreters/interpreter_setting.rs @@ -21,7 +21,7 @@ use pretty_assertions::assert_eq; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_setting_interpreter() -> Result<()> { - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; let plan = PlanParser::parse(ctx.clone(), "SET max_block_size=1").await?; let executor = InterpreterFactory::get(ctx.clone(), plan)?; @@ -35,7 +35,7 @@ async fn test_setting_interpreter() -> Result<()> { #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_setting_interpreter_error() -> Result<()> { - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; let plan = PlanParser::parse(ctx.clone(), "SET max_block_size=1").await?; let executor = InterpreterFactory::get(ctx.clone(), plan)?; diff --git a/query/tests/it/interpreters/interpreter_show_databases.rs b/query/tests/it/interpreters/interpreter_show_databases.rs index f6373b1d34926..aa19e9572ddfe 100644 --- a/query/tests/it/interpreters/interpreter_show_databases.rs +++ b/query/tests/it/interpreters/interpreter_show_databases.rs @@ -21,7 +21,7 @@ use pretty_assertions::assert_eq; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_show_databases_interpreter() -> Result<()> { - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; // show databases. { diff --git a/query/tests/it/interpreters/interpreter_show_engines.rs b/query/tests/it/interpreters/interpreter_show_engines.rs index 96088f540fd0b..3b86060576031 100644 --- a/query/tests/it/interpreters/interpreter_show_engines.rs +++ b/query/tests/it/interpreters/interpreter_show_engines.rs @@ -21,7 +21,7 @@ use pretty_assertions::assert_eq; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_show_engines_interpreter() -> Result<()> { - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; // show engines. { diff --git a/query/tests/it/interpreters/interpreter_show_functions.rs b/query/tests/it/interpreters/interpreter_show_functions.rs index 32084cbfa6fa0..d8b9db3eb79ab 100644 --- a/query/tests/it/interpreters/interpreter_show_functions.rs +++ b/query/tests/it/interpreters/interpreter_show_functions.rs @@ -19,7 +19,7 @@ use databend_query::sql::PlanParser; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_show_functions_interpreter() -> Result<()> { - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; // show functions. { diff --git a/query/tests/it/interpreters/interpreter_show_metrics.rs b/query/tests/it/interpreters/interpreter_show_metrics.rs index d84e71bc80899..469f82fc7bb53 100644 --- a/query/tests/it/interpreters/interpreter_show_metrics.rs +++ b/query/tests/it/interpreters/interpreter_show_metrics.rs @@ -21,7 +21,7 @@ use databend_query::sql::PlanParser; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_show_metrics_interpreter() -> Result<()> { init_default_metrics_recorder(); - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; // show metrics. { diff --git a/query/tests/it/interpreters/interpreter_show_processlist.rs b/query/tests/it/interpreters/interpreter_show_processlist.rs index 71d6a762d2d92..3d8b301572b61 100644 --- a/query/tests/it/interpreters/interpreter_show_processlist.rs +++ b/query/tests/it/interpreters/interpreter_show_processlist.rs @@ -19,7 +19,7 @@ use databend_query::sql::PlanParser; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_show_processlist_interpreter() -> Result<()> { - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; // show processlist. { diff --git a/query/tests/it/interpreters/interpreter_show_settings.rs b/query/tests/it/interpreters/interpreter_show_settings.rs index bc27fc27e1975..438e980e627f4 100644 --- a/query/tests/it/interpreters/interpreter_show_settings.rs +++ b/query/tests/it/interpreters/interpreter_show_settings.rs @@ -20,7 +20,7 @@ use futures::TryStreamExt; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_show_settings_interpreter() -> Result<()> { - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; // show settings. { diff --git a/query/tests/it/interpreters/interpreter_show_tables.rs b/query/tests/it/interpreters/interpreter_show_tables.rs index b04808ce66562..7cb790596d5c5 100644 --- a/query/tests/it/interpreters/interpreter_show_tables.rs +++ b/query/tests/it/interpreters/interpreter_show_tables.rs @@ -21,7 +21,7 @@ use pretty_assertions::assert_eq; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_show_tables_interpreter() -> Result<()> { - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; // Setup. { diff --git a/query/tests/it/interpreters/interpreter_show_users.rs b/query/tests/it/interpreters/interpreter_show_users.rs index 0fa8f45fb7a2a..dc7107a03e2f6 100644 --- a/query/tests/it/interpreters/interpreter_show_users.rs +++ b/query/tests/it/interpreters/interpreter_show_users.rs @@ -20,7 +20,7 @@ use futures::TryStreamExt; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_show_users_interpreter() -> Result<()> { - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; { let query = "CREATE USER 'test'@'localhost' IDENTIFIED BY 'password'"; diff --git a/query/tests/it/interpreters/interpreter_table_create.rs b/query/tests/it/interpreters/interpreter_table_create.rs index d8455830b1493..6401ce34f2769 100644 --- a/query/tests/it/interpreters/interpreter_table_create.rs +++ b/query/tests/it/interpreters/interpreter_table_create.rs @@ -20,7 +20,7 @@ use futures::stream::StreamExt; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_create_table_interpreter() -> Result<()> { - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; { let query = "\ diff --git a/query/tests/it/interpreters/interpreter_table_describe.rs b/query/tests/it/interpreters/interpreter_table_describe.rs index d491c6ffbf05a..469736dbe8882 100644 --- a/query/tests/it/interpreters/interpreter_table_describe.rs +++ b/query/tests/it/interpreters/interpreter_table_describe.rs @@ -21,7 +21,7 @@ use pretty_assertions::assert_eq; #[tokio::test] async fn interpreter_describe_table_test() -> Result<()> { - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; // Create table. { diff --git a/query/tests/it/interpreters/interpreter_table_drop.rs b/query/tests/it/interpreters/interpreter_table_drop.rs index d141c2386ea79..e44ebeecacf2d 100644 --- a/query/tests/it/interpreters/interpreter_table_drop.rs +++ b/query/tests/it/interpreters/interpreter_table_drop.rs @@ -21,7 +21,7 @@ use pretty_assertions::assert_eq; #[tokio::test] async fn test_drop_table_interpreter() -> Result<()> { - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; // Create table. { diff --git a/query/tests/it/interpreters/interpreter_table_show_create.rs b/query/tests/it/interpreters/interpreter_table_show_create.rs index 8aa5be960af7d..b32a649d0b8a1 100644 --- a/query/tests/it/interpreters/interpreter_table_show_create.rs +++ b/query/tests/it/interpreters/interpreter_table_show_create.rs @@ -21,7 +21,7 @@ use pretty_assertions::assert_eq; #[tokio::test] async fn interpreter_show_create_table_test() -> Result<()> { - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; // Create table. { diff --git a/query/tests/it/interpreters/interpreter_table_truncate.rs b/query/tests/it/interpreters/interpreter_table_truncate.rs index f4a9e57fd1e61..a8b94056aa436 100644 --- a/query/tests/it/interpreters/interpreter_table_truncate.rs +++ b/query/tests/it/interpreters/interpreter_table_truncate.rs @@ -21,7 +21,7 @@ use pretty_assertions::assert_eq; #[tokio::test] async fn test_truncate_table_interpreter() -> Result<()> { - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; // Create table. { diff --git a/query/tests/it/interpreters/interpreter_use_database.rs b/query/tests/it/interpreters/interpreter_use_database.rs index 4e0a7cf378c32..76f97e0a19457 100644 --- a/query/tests/it/interpreters/interpreter_use_database.rs +++ b/query/tests/it/interpreters/interpreter_use_database.rs @@ -21,7 +21,7 @@ use pretty_assertions::assert_eq; #[tokio::test] async fn test_use_interpreter() -> Result<()> { - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; let plan = PlanParser::parse(ctx.clone(), "USE default").await?; let interpreter = InterpreterFactory::get(ctx, plan)?; @@ -35,7 +35,7 @@ async fn test_use_interpreter() -> Result<()> { #[tokio::test] async fn test_use_database_interpreter_error() -> Result<()> { - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; let plan = PlanParser::parse(ctx.clone(), "USE xx").await?; let interpreter = InterpreterFactory::get(ctx, plan)?; diff --git a/query/tests/it/interpreters/interpreter_user_alter.rs b/query/tests/it/interpreters/interpreter_user_alter.rs index 1df59155c9174..87c2ff59ec389 100644 --- a/query/tests/it/interpreters/interpreter_user_alter.rs +++ b/query/tests/it/interpreters/interpreter_user_alter.rs @@ -26,7 +26,7 @@ use pretty_assertions::assert_eq; async fn test_alter_user_interpreter() -> Result<()> { common_tracing::init_default_ut_tracing(); - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; let tenant = "test"; let name = "test"; let hostname = "localhost"; diff --git a/query/tests/it/interpreters/interpreter_user_create.rs b/query/tests/it/interpreters/interpreter_user_create.rs index 1b756a40aa812..af3530c85db3d 100644 --- a/query/tests/it/interpreters/interpreter_user_create.rs +++ b/query/tests/it/interpreters/interpreter_user_create.rs @@ -23,7 +23,7 @@ use pretty_assertions::assert_eq; async fn test_create_user_interpreter() -> Result<()> { common_tracing::init_default_ut_tracing(); - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; let query = "CREATE USER 'test'@'localhost' IDENTIFIED BY 'password'"; let plan = PlanParser::parse(ctx.clone(), query).await?; diff --git a/query/tests/it/interpreters/interpreter_user_drop.rs b/query/tests/it/interpreters/interpreter_user_drop.rs index 44a05210e83f5..0baa6a3e0033e 100644 --- a/query/tests/it/interpreters/interpreter_user_drop.rs +++ b/query/tests/it/interpreters/interpreter_user_drop.rs @@ -25,7 +25,7 @@ use pretty_assertions::assert_eq; async fn test_drop_user_interpreter() -> Result<()> { common_tracing::init_default_ut_tracing(); - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; let tenant = ctx.get_tenant(); { diff --git a/query/tests/it/interpreters/interpreter_user_previlege_revoke.rs b/query/tests/it/interpreters/interpreter_user_previlege_revoke.rs index 53463c56d4764..cca50749f79f6 100644 --- a/query/tests/it/interpreters/interpreter_user_previlege_revoke.rs +++ b/query/tests/it/interpreters/interpreter_user_previlege_revoke.rs @@ -32,7 +32,7 @@ use pretty_assertions::assert_eq; async fn test_revoke_privilege_interpreter() -> Result<()> { common_tracing::init_default_ut_tracing(); - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; let tenant = ctx.get_tenant().to_string(); let name = "test"; @@ -62,7 +62,7 @@ async fn test_revoke_privilege_interpreter() -> Result<()> { async fn test_revoke_privilege_interpreter_on_role() -> Result<()> { common_tracing::init_default_ut_tracing(); - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; let tenant = ctx.get_tenant().to_string(); let mut role_info = RoleInfo::new("role1".to_string(), "%".to_string()); diff --git a/query/tests/it/interpreters/interpreter_user_privilege_grant.rs b/query/tests/it/interpreters/interpreter_user_privilege_grant.rs index 319f3a3f5ef03..8b88b068b5cc6 100644 --- a/query/tests/it/interpreters/interpreter_user_privilege_grant.rs +++ b/query/tests/it/interpreters/interpreter_user_privilege_grant.rs @@ -31,7 +31,7 @@ use pretty_assertions::assert_eq; async fn test_grant_privilege_interpreter() -> Result<()> { common_tracing::init_default_ut_tracing(); - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; let tenant = ctx.get_tenant(); let name = "test"; diff --git a/query/tests/it/interpreters/interpreter_user_udf_alter.rs b/query/tests/it/interpreters/interpreter_user_udf_alter.rs index 5f25510de3aca..e54fe353e3660 100644 --- a/query/tests/it/interpreters/interpreter_user_udf_alter.rs +++ b/query/tests/it/interpreters/interpreter_user_udf_alter.rs @@ -23,7 +23,7 @@ use pretty_assertions::assert_eq; async fn test_alter_udf_interpreter() -> Result<()> { common_tracing::init_default_ut_tracing(); - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; let tenant = ctx.get_tenant(); { diff --git a/query/tests/it/interpreters/interpreter_user_udf_create.rs b/query/tests/it/interpreters/interpreter_user_udf_create.rs index 64808b9fc9d15..8523b5f854ad1 100644 --- a/query/tests/it/interpreters/interpreter_user_udf_create.rs +++ b/query/tests/it/interpreters/interpreter_user_udf_create.rs @@ -24,7 +24,7 @@ use pretty_assertions::assert_eq; async fn test_create_udf_interpreter() -> Result<()> { common_tracing::init_default_ut_tracing(); - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; let tenant = ctx.get_tenant(); let query = diff --git a/query/tests/it/interpreters/interpreter_user_udf_drop.rs b/query/tests/it/interpreters/interpreter_user_udf_drop.rs index 9e0976471ee3b..4b81df65f9e37 100644 --- a/query/tests/it/interpreters/interpreter_user_udf_drop.rs +++ b/query/tests/it/interpreters/interpreter_user_udf_drop.rs @@ -23,7 +23,7 @@ use pretty_assertions::assert_eq; async fn test_drop_udf_interpreter() -> Result<()> { common_tracing::init_default_ut_tracing(); - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; let tenant = ctx.get_tenant(); static CREATE_UDF: &str = diff --git a/query/tests/it/interpreters/plan_schedulers/plan_scheduler.rs b/query/tests/it/interpreters/plan_schedulers/plan_scheduler.rs index 368f4aee7a245..efa30cae183c2 100644 --- a/query/tests/it/interpreters/plan_schedulers/plan_scheduler.rs +++ b/query/tests/it/interpreters/plan_schedulers/plan_scheduler.rs @@ -331,4 +331,5 @@ async fn create_env() -> Result> { .with_node("dummy", "github.com:9090") .with_local_id("dummy_local"), ) + .await } diff --git a/query/tests/it/optimizers/optimizer.rs b/query/tests/it/optimizers/optimizer.rs index 7fa28ba3579f6..238f05223d54b 100644 --- a/query/tests/it/optimizers/optimizer.rs +++ b/query/tests/it/optimizers/optimizer.rs @@ -68,7 +68,7 @@ use databend_query::sql::PlanParser; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_literal_false_filter() -> Result<()> { let query = "select * from numbers_mt(10) where 1 + 2 = 2"; - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; let plan = PlanParser::parse(ctx.clone(), query).await?; let mut optimizer = Optimizers::without_scatters(ctx); @@ -122,7 +122,7 @@ async fn test_skip_read_data_source() -> Result<()> { ]; for test in tests { - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; let plan = PlanParser::parse(ctx.clone(), test.query).await?; let mut optimizer = Optimizers::without_scatters(ctx); diff --git a/query/tests/it/optimizers/optimizer_constant_folding.rs b/query/tests/it/optimizers/optimizer_constant_folding.rs index 90eab79b8b160..bd5aefd648858 100644 --- a/query/tests/it/optimizers/optimizer_constant_folding.rs +++ b/query/tests/it/optimizers/optimizer_constant_folding.rs @@ -101,7 +101,7 @@ async fn test_constant_folding_optimizer() -> Result<()> { ]; for test in tests { - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; let plan = PlanParser::parse(ctx.clone(), test.query).await?; let mut optimizer = ConstantFoldingOptimizer::create(ctx); diff --git a/query/tests/it/optimizers/optimizer_expression_transform.rs b/query/tests/it/optimizers/optimizer_expression_transform.rs index a861536672738..109529313b214 100644 --- a/query/tests/it/optimizers/optimizer_expression_transform.rs +++ b/query/tests/it/optimizers/optimizer_expression_transform.rs @@ -198,7 +198,7 @@ async fn test_expression_transform_optimizer() -> Result<()> { ]; for test in tests { - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; let plan = PlanParser::parse(ctx.clone(), test.query).await?; let mut optimizer = ExprTransformOptimizer::create(ctx); diff --git a/query/tests/it/optimizers/optimizer_scatters.rs b/query/tests/it/optimizers/optimizer_scatters.rs index 5fe209142f59f..91892c85182be 100644 --- a/query/tests/it/optimizers/optimizer_scatters.rs +++ b/query/tests/it/optimizers/optimizer_scatters.rs @@ -208,7 +208,8 @@ async fn test_scatter_optimizer() -> Result<()> { .with_node("Github", "www.github.com:9090") .with_node("dummy_local", "127.0.0.1:9090") .with_local_id("dummy_local"), - )?; + ) + .await?; let plan = PlanParser::parse(ctx.clone(), test.query).await?; let mut optimizer = ScattersOptimizer::create(ctx); diff --git a/query/tests/it/optimizers/optimizer_statistics_exact.rs b/query/tests/it/optimizers/optimizer_statistics_exact.rs index f5135c032bb17..96133b38428ea 100644 --- a/query/tests/it/optimizers/optimizer_statistics_exact.rs +++ b/query/tests/it/optimizers/optimizer_statistics_exact.rs @@ -14,6 +14,7 @@ use std::mem::size_of; +use common_base::tokio; use common_datavalues::*; use common_exception::Result; use common_meta_types::TableInfo; @@ -23,9 +24,9 @@ use pretty_assertions::assert_eq; use crate::optimizers::optimizer::*; -#[test] -fn test_statistics_exact_optimizer() -> Result<()> { - let ctx = crate::tests::create_query_context()?; +#[tokio::test] +async fn test_statistics_exact_optimizer() -> Result<()> { + let ctx = crate::tests::create_query_context().await?; let total = ctx.get_settings().get_max_block_size()? as u64; let statistics = Statistics::new_exact( diff --git a/query/tests/it/optimizers/optimizer_top_n_push_down.rs b/query/tests/it/optimizers/optimizer_top_n_push_down.rs index 4d7da89c19f8c..073db84f433ed 100644 --- a/query/tests/it/optimizers/optimizer_top_n_push_down.rs +++ b/query/tests/it/optimizers/optimizer_top_n_push_down.rs @@ -20,7 +20,7 @@ use databend_query::sql::PlanParser; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_simple() -> Result<()> { let query = "select number from numbers(1000) order by number limit 10;"; - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; let plan = PlanParser::parse(ctx.clone(), query).await?; @@ -41,7 +41,7 @@ async fn test_simple() -> Result<()> { #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_simple_with_offset() -> Result<()> { let query = "select number from numbers(1000) order by number limit 10 offset 5;"; - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; let plan = PlanParser::parse(ctx.clone(), query).await?; @@ -63,7 +63,7 @@ async fn test_simple_with_offset() -> Result<()> { async fn test_nested_projection() -> Result<()> { let query = "select number from (select * from numbers(1000) order by number limit 11) limit 10;"; - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; let plan = PlanParser::parse(ctx.clone(), query).await?; @@ -87,7 +87,7 @@ async fn test_nested_projection() -> Result<()> { async fn test_aggregate() -> Result<()> { let query = "select sum(number) FROM numbers(1000) group by number % 10 order by sum(number) limit 5;"; - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; let plan = PlanParser::parse(ctx.clone(), query).await?; @@ -141,7 +141,7 @@ async fn test_monotonic_function() -> Result<()> { ]; for test in tests { - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; let plan = PlanParser::parse(ctx.clone(), test.query).await?; let mut optimizer = Optimizers::without_scatters(ctx); diff --git a/query/tests/it/pipelines/processors/pipeline_builder.rs b/query/tests/it/pipelines/processors/pipeline_builder.rs index 93b4c00134ad1..57f28edb3a9a8 100644 --- a/query/tests/it/pipelines/processors/pipeline_builder.rs +++ b/query/tests/it/pipelines/processors/pipeline_builder.rs @@ -171,7 +171,7 @@ async fn test_local_pipeline_builds() -> Result<()> { }, ]; - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; for test in tests { // Plan build check. let plan = PlanParser::parse(ctx.clone(), test.query).await?; diff --git a/query/tests/it/pipelines/processors/pipeline_display.rs b/query/tests/it/pipelines/processors/pipeline_display.rs index ebec0fc34aeb3..c7dc0e8af0c03 100644 --- a/query/tests/it/pipelines/processors/pipeline_display.rs +++ b/query/tests/it/pipelines/processors/pipeline_display.rs @@ -20,7 +20,7 @@ use pretty_assertions::assert_eq; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_pipeline_display() -> Result<()> { - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; let query = "\ EXPLAIN PIPELINE SELECT \ diff --git a/query/tests/it/pipelines/processors/pipeline_walker.rs b/query/tests/it/pipelines/processors/pipeline_walker.rs index cc51f249de0d0..4d64eea902a3c 100644 --- a/query/tests/it/pipelines/processors/pipeline_walker.rs +++ b/query/tests/it/pipelines/processors/pipeline_walker.rs @@ -20,7 +20,7 @@ use pretty_assertions::assert_eq; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_pipeline_walker() -> Result<()> { - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; let query = "\ SELECT sum(number + 1) + 2 AS sumx \ diff --git a/query/tests/it/pipelines/processors/processor_merge.rs b/query/tests/it/pipelines/processors/processor_merge.rs index 26aff641c7e56..aec1c4dc2079b 100644 --- a/query/tests/it/pipelines/processors/processor_merge.rs +++ b/query/tests/it/pipelines/processors/processor_merge.rs @@ -24,7 +24,7 @@ use crate::tests; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_processor_merge() -> Result<()> { - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; let test_source = tests::NumberTestData::create(ctx.clone()); let mut pipeline = Pipeline::create(ctx.clone()); diff --git a/query/tests/it/pipelines/processors/processor_mixed.rs b/query/tests/it/pipelines/processors/processor_mixed.rs index 7dcfbcdcd00cd..8a3ab71849d9b 100644 --- a/query/tests/it/pipelines/processors/processor_mixed.rs +++ b/query/tests/it/pipelines/processors/processor_mixed.rs @@ -24,7 +24,7 @@ use crate::tests; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_processor_mixed() -> Result<()> { - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; let test_source = tests::NumberTestData::create(ctx.clone()); let mut pipeline = Pipeline::create(ctx.clone()); @@ -60,7 +60,7 @@ async fn test_processor_mixed() -> Result<()> { #[tokio::test(flavor = "multi_thread", worker_threads = 8)] async fn test_processor_mixed2() -> Result<()> { - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; let test_source = tests::NumberTestData::create(ctx.clone()); let m = 5; diff --git a/query/tests/it/pipelines/transforms/transform_aggregator_final.rs b/query/tests/it/pipelines/transforms/transform_aggregator_final.rs index e200726cf641a..5418417f1e9f9 100644 --- a/query/tests/it/pipelines/transforms/transform_aggregator_final.rs +++ b/query/tests/it/pipelines/transforms/transform_aggregator_final.rs @@ -25,7 +25,7 @@ use pretty_assertions::assert_eq; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_transform_final_aggregator() -> Result<()> { - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; let test_source = crate::tests::NumberTestData::create(ctx.clone()); // sum(number)+1, avg(number) diff --git a/query/tests/it/pipelines/transforms/transform_aggregator_partial.rs b/query/tests/it/pipelines/transforms/transform_aggregator_partial.rs index b0923674c3b38..2c7f49ffd41a4 100644 --- a/query/tests/it/pipelines/transforms/transform_aggregator_partial.rs +++ b/query/tests/it/pipelines/transforms/transform_aggregator_partial.rs @@ -25,7 +25,7 @@ use pretty_assertions::assert_eq; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_transform_partial_aggregator() -> Result<()> { - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; let test_source = crate::tests::NumberTestData::create(ctx.clone()); // sum(number), avg(number) diff --git a/query/tests/it/pipelines/transforms/transform_expression.rs b/query/tests/it/pipelines/transforms/transform_expression.rs index 239b8274f05a5..fe016d3d7305c 100644 --- a/query/tests/it/pipelines/transforms/transform_expression.rs +++ b/query/tests/it/pipelines/transforms/transform_expression.rs @@ -24,7 +24,7 @@ use pretty_assertions::assert_eq; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_transform_expression() -> Result<()> { - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; let test_source = crate::tests::NumberTestData::create(ctx.clone()); let mut pipeline = Pipeline::create(ctx.clone()); @@ -73,7 +73,7 @@ async fn test_transform_expression() -> Result<()> { #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_transform_expression_error() -> Result<()> { - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; let test_source = crate::tests::NumberTestData::create(ctx.clone()); let mut pipeline = Pipeline::create(ctx); @@ -95,7 +95,7 @@ async fn test_transform_expression_error() -> Result<()> { #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_transform_expression_issue2857() -> Result<()> { - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; let test_source = crate::tests::NumberTestData::create(ctx.clone()); let mut pipeline = Pipeline::create(ctx.clone()); diff --git a/query/tests/it/pipelines/transforms/transform_filter.rs b/query/tests/it/pipelines/transforms/transform_filter.rs index 1a38eefa35014..fb52fbc61cbb4 100644 --- a/query/tests/it/pipelines/transforms/transform_filter.rs +++ b/query/tests/it/pipelines/transforms/transform_filter.rs @@ -24,7 +24,7 @@ use pretty_assertions::assert_eq; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_transform_filter() -> Result<()> { - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; let test_source = crate::tests::NumberTestData::create(ctx.clone()); let mut pipeline = Pipeline::create(ctx.clone()); @@ -64,7 +64,7 @@ async fn test_transform_filter() -> Result<()> { #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_transform_filter_error() -> Result<()> { - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; let test_source = crate::tests::NumberTestData::create(ctx.clone()); let mut pipeline = Pipeline::create(ctx); diff --git a/query/tests/it/pipelines/transforms/transform_group_by_final.rs b/query/tests/it/pipelines/transforms/transform_group_by_final.rs index c5bd4f2071fc3..4c57dccbdea1f 100644 --- a/query/tests/it/pipelines/transforms/transform_group_by_final.rs +++ b/query/tests/it/pipelines/transforms/transform_group_by_final.rs @@ -25,7 +25,7 @@ use pretty_assertions::assert_eq; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_transform_final_group_by() -> Result<()> { - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; let test_source = crate::tests::NumberTestData::create(ctx.clone()); // sum(number), avg(number) diff --git a/query/tests/it/pipelines/transforms/transform_group_by_partial.rs b/query/tests/it/pipelines/transforms/transform_group_by_partial.rs index f4d4024bf2f6d..7c742c07be18a 100644 --- a/query/tests/it/pipelines/transforms/transform_group_by_partial.rs +++ b/query/tests/it/pipelines/transforms/transform_group_by_partial.rs @@ -25,7 +25,7 @@ use pretty_assertions::assert_eq; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_transform_partial_group_by() -> Result<()> { - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; let test_source = crate::tests::NumberTestData::create(ctx.clone()); // sum(number), avg(number) diff --git a/query/tests/it/pipelines/transforms/transform_limit.rs b/query/tests/it/pipelines/transforms/transform_limit.rs index c9c23ea0e3996..389b230d2e80d 100644 --- a/query/tests/it/pipelines/transforms/transform_limit.rs +++ b/query/tests/it/pipelines/transforms/transform_limit.rs @@ -54,7 +54,7 @@ async fn test_transform_limit() -> Result<()> { ]; for ((limit, offset), expected) in testcases { - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; let test_source = crate::tests::NumberTestData::create(ctx.clone()); let mut pipeline = Pipeline::create(ctx.clone()); diff --git a/query/tests/it/pipelines/transforms/transform_limit_by.rs b/query/tests/it/pipelines/transforms/transform_limit_by.rs index 4073aa199d531..40128b41f92e6 100644 --- a/query/tests/it/pipelines/transforms/transform_limit_by.rs +++ b/query/tests/it/pipelines/transforms/transform_limit_by.rs @@ -25,7 +25,7 @@ use pretty_assertions::assert_eq; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_transform_limit_by() -> Result<()> { - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; let test_source = crate::tests::NumberTestData::create(ctx.clone()); let mut pipeline = Pipeline::create(ctx.clone()); diff --git a/query/tests/it/pipelines/transforms/transform_projection.rs b/query/tests/it/pipelines/transforms/transform_projection.rs index 5c6b8bb4a1792..5726c4cc2a28c 100644 --- a/query/tests/it/pipelines/transforms/transform_projection.rs +++ b/query/tests/it/pipelines/transforms/transform_projection.rs @@ -24,7 +24,7 @@ use pretty_assertions::assert_eq; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_transform_projection() -> Result<()> { - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; let test_source = crate::tests::NumberTestData::create(ctx.clone()); let mut pipeline = Pipeline::create(ctx.clone()); diff --git a/query/tests/it/pipelines/transforms/transform_sort.rs b/query/tests/it/pipelines/transforms/transform_sort.rs index 4222ec9ccb4d7..60cc1b6cb4996 100644 --- a/query/tests/it/pipelines/transforms/transform_sort.rs +++ b/query/tests/it/pipelines/transforms/transform_sort.rs @@ -25,7 +25,7 @@ use pretty_assertions::assert_eq; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_transform_sort() -> Result<()> { - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; let test_source = crate::tests::NumberTestData::create(ctx.clone()); // Pipeline. diff --git a/query/tests/it/pipelines/transforms/transform_source.rs b/query/tests/it/pipelines/transforms/transform_source.rs index 69353fbfa4099..d18ee87608780 100644 --- a/query/tests/it/pipelines/transforms/transform_source.rs +++ b/query/tests/it/pipelines/transforms/transform_source.rs @@ -22,7 +22,7 @@ use pretty_assertions::assert_eq; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn transform_source_test() -> Result<()> { - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; let test_source = crate::tests::NumberTestData::create(ctx.clone()); let mut pipeline = Pipeline::create(ctx); diff --git a/query/tests/it/sessions/query_ctx.rs b/query/tests/it/sessions/query_ctx.rs index 6991b1172865e..8824c28f45f20 100644 --- a/query/tests/it/sessions/query_ctx.rs +++ b/query/tests/it/sessions/query_ctx.rs @@ -33,7 +33,7 @@ async fn test_get_storage_accessor_s3() -> Result<()> { root: "".to_string(), }; - let qctx = crate::tests::create_query_context_with_config(conf)?; + let qctx = crate::tests::create_query_context_with_config(conf).await?; let _ = qctx.get_storage_operator().await?; @@ -50,7 +50,7 @@ async fn test_get_storage_accessor_fs() -> Result<()> { temp_data_path: "/tmp".to_string(), }; - let qctx = crate::tests::create_query_context_with_config(conf)?; + let qctx = crate::tests::create_query_context_with_config(conf).await?; let _ = qctx.get_storage_operator().await?; diff --git a/query/tests/it/sessions/session.rs b/query/tests/it/sessions/session.rs index 9a6f491c70a9d..40889af2c1570 100644 --- a/query/tests/it/sessions/session.rs +++ b/query/tests/it/sessions/session.rs @@ -29,7 +29,8 @@ async fn test_session() -> Result<()> { String::from("test-001"), String::from("test-type"), session_manager, - )?; + ) + .await?; // Tenant. { @@ -73,7 +74,8 @@ async fn test_session_in_management_mode() -> Result<()> { String::from("test-001"), String::from("test-type"), session_manager, - )?; + ) + .await?; // Tenant. { diff --git a/query/tests/it/sessions/session_context.rs b/query/tests/it/sessions/session_context.rs index b8eb5df23ffc8..a655a918d9fce 100644 --- a/query/tests/it/sessions/session_context.rs +++ b/query/tests/it/sessions/session_context.rs @@ -16,6 +16,7 @@ use std::net::SocketAddr; use std::sync::Arc; +use common_base::tokio; use common_exception::Result; use common_meta_types::UserInfo; use databend_query::clusters::Cluster; @@ -25,8 +26,8 @@ use databend_query::sessions::SessionContext; use crate::tests::SessionManagerBuilder; -#[test] -fn test_session_context() -> Result<()> { +#[tokio::test] +async fn test_session_context() -> Result<()> { let conf = Config::load_from_args(); let session_ctx = SessionContext::try_create(conf)?; @@ -78,7 +79,7 @@ fn test_session_context() -> Result<()> { // context shared. { let sessions = SessionManagerBuilder::create().build()?; - let dummy_session = sessions.create_session("TestSession")?; + let dummy_session = sessions.create_session("TestSession").await?; let shared = QueryContextShared::try_create( sessions.get_conf().clone(), Arc::new(dummy_session.as_ref().clone()), diff --git a/query/tests/it/sessions/session_setting.rs b/query/tests/it/sessions/session_setting.rs index 07ad4f28deacc..ba8ce04133906 100644 --- a/query/tests/it/sessions/session_setting.rs +++ b/query/tests/it/sessions/session_setting.rs @@ -28,7 +28,8 @@ async fn test_session_setting() -> Result<()> { String::from("test-001"), String::from("test-type"), session_manager, - )?; + ) + .await?; // Settings. { diff --git a/query/tests/it/sql/plan_parser.rs b/query/tests/it/sql/plan_parser.rs index f09e1be03bcca..a9493fe4235cb 100644 --- a/query/tests/it/sql/plan_parser.rs +++ b/query/tests/it/sql/plan_parser.rs @@ -245,7 +245,7 @@ async fn test_plan_parser() -> Result<()> { } ]; - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; for t in tests { match PlanParser::parse(ctx.clone(), t.sql).await { Ok(v) => { diff --git a/query/tests/it/sql/statements/query/query_normalizer.rs b/query/tests/it/sql/statements/query/query_normalizer.rs index ddec6f11ca9f6..edd51f7013624 100644 --- a/query/tests/it/sql/statements/query/query_normalizer.rs +++ b/query/tests/it/sql/statements/query/query_normalizer.rs @@ -118,7 +118,7 @@ async fn test_query_normalizer() -> Result<()> { ]; for test_case in &tests { - let ctx = create_query_context()?; + let ctx = create_query_context().await?; let (mut statements, _) = DfParser::parse_sql(test_case.query)?; match statements.remove(0) { diff --git a/query/tests/it/sql/statements/query/query_qualified_rewriter.rs b/query/tests/it/sql/statements/query/query_qualified_rewriter.rs index a4e9242555994..fda8e5147710b 100644 --- a/query/tests/it/sql/statements/query/query_qualified_rewriter.rs +++ b/query/tests/it/sql/statements/query/query_qualified_rewriter.rs @@ -100,7 +100,7 @@ async fn test_query_qualified_rewriter() -> Result<()> { ]; for test_case in &tests { - let ctx = create_query_context()?; + let ctx = create_query_context().await?; let (mut statements, _) = DfParser::parse_sql(test_case.query)?; match statements.remove(0) { diff --git a/query/tests/it/sql/statements/query/query_schema_joined_analyzer.rs b/query/tests/it/sql/statements/query/query_schema_joined_analyzer.rs index 0deb8676e9754..d16772aed0697 100644 --- a/query/tests/it/sql/statements/query/query_schema_joined_analyzer.rs +++ b/query/tests/it/sql/statements/query/query_schema_joined_analyzer.rs @@ -52,7 +52,7 @@ async fn test_joined_schema_analyzer() -> Result<()> { ]; for test_case in &tests { - let ctx = create_query_context()?; + let ctx = create_query_context().await?; let (mut statements, _) = DfParser::parse_sql(test_case.query)?; match statements.remove(0) { diff --git a/query/tests/it/sql/statements/statement_copy.rs b/query/tests/it/sql/statements/statement_copy.rs index b545d141d6d65..8996ee3061841 100644 --- a/query/tests/it/sql/statements/statement_copy.rs +++ b/query/tests/it/sql/statements/statement_copy.rs @@ -155,7 +155,7 @@ async fn test_statement_copy() -> Result<()> { ]; for test in &tests { - let ctx = create_query_context()?; + let ctx = create_query_context().await?; let (mut statements, _) = DfParser::parse_sql(test.query)?; let statement = statements.remove(0); if test.err.is_empty() { diff --git a/query/tests/it/sql/statements/statement_select.rs b/query/tests/it/sql/statements/statement_select.rs index 905de7b9f5f01..0bbdf3ba26e82 100644 --- a/query/tests/it/sql/statements/statement_select.rs +++ b/query/tests/it/sql/statements/statement_select.rs @@ -124,7 +124,7 @@ async fn test_statement_select_analyze() -> Result<()> { ]; for test_case in &tests { - let ctx = create_query_context()?; + let ctx = create_query_context().await?; let (mut statements, _) = DfParser::parse_sql(test_case.query)?; match statements.remove(0) { diff --git a/query/tests/it/storages/fuse/table_test_fixture.rs b/query/tests/it/storages/fuse/table_test_fixture.rs index a2620808cee2a..e6bfe13d2d224 100644 --- a/query/tests/it/storages/fuse/table_test_fixture.rs +++ b/query/tests/it/storages/fuse/table_test_fixture.rs @@ -61,7 +61,9 @@ impl TestFixture { // use `TempDir` as root path (auto clean) conf.storage.disk.data_path = tmp_dir.path().to_str().unwrap().to_string(); conf.storage.disk.temp_data_path = tmp_dir.path().to_str().unwrap().to_string(); - let ctx = crate::tests::create_query_context_with_config(conf).unwrap(); + let ctx = crate::tests::create_query_context_with_config(conf) + .await + .unwrap(); let tenant = ctx.get_tenant(); let random_prefix: String = Uuid::new_v4().to_simple().to_string(); @@ -212,7 +214,7 @@ pub async fn test_drive( } pub async fn test_drive_with_args(tbl_args: TableArgs) -> Result { - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; test_drive_with_args_and_ctx(tbl_args, ctx).await } diff --git a/query/tests/it/storages/memory.rs b/query/tests/it/storages/memory.rs index 73d5e7bd51683..15c4afbde9185 100644 --- a/query/tests/it/storages/memory.rs +++ b/query/tests/it/storages/memory.rs @@ -27,7 +27,7 @@ use futures::TryStreamExt; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_memorytable() -> Result<()> { - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; let schema = DataSchemaRefExt::create(vec![ DataField::new("a", u32::to_data_type()), DataField::new("b", u64::to_data_type()), diff --git a/query/tests/it/storages/null.rs b/query/tests/it/storages/null.rs index 4337bb3e8fbf6..1879595c6ca32 100644 --- a/query/tests/it/storages/null.rs +++ b/query/tests/it/storages/null.rs @@ -26,7 +26,7 @@ use futures::TryStreamExt; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_null_table() -> Result<()> { - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; let schema = DataSchemaRefExt::create(vec![ DataField::new("a", u64::to_data_type()), DataField::new("b", u64::to_data_type()), diff --git a/query/tests/it/storages/system/clusters_table.rs b/query/tests/it/storages/system/clusters_table.rs index e1cdc696c634e..180efd4561c63 100644 --- a/query/tests/it/storages/system/clusters_table.rs +++ b/query/tests/it/storages/system/clusters_table.rs @@ -20,7 +20,7 @@ use futures::TryStreamExt; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_clusters_table() -> Result<()> { - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; let table = ClustersTable::create(1); let source_plan = table.read_plan(ctx.clone(), None).await?; diff --git a/query/tests/it/storages/system/columns_table.rs b/query/tests/it/storages/system/columns_table.rs index c39b5337b4c48..54603224d6fab 100644 --- a/query/tests/it/storages/system/columns_table.rs +++ b/query/tests/it/storages/system/columns_table.rs @@ -20,7 +20,7 @@ use futures::TryStreamExt; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_columns_table() -> Result<()> { - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; let table = ColumnsTable::create(1); let source_plan = table.read_plan(ctx.clone(), None).await?; diff --git a/query/tests/it/storages/system/configs_table.rs b/query/tests/it/storages/system/configs_table.rs index 6bb33116cf503..006a80e10947b 100644 --- a/query/tests/it/storages/system/configs_table.rs +++ b/query/tests/it/storages/system/configs_table.rs @@ -22,7 +22,7 @@ use pretty_assertions::assert_eq; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_configs_table() -> Result<()> { let conf = crate::tests::ConfigBuilder::create().config(); - let ctx = crate::tests::create_query_context_with_config(conf)?; + let ctx = crate::tests::create_query_context_with_config(conf).await?; ctx.get_settings().set_max_threads(8)?; let table = ConfigsTable::create(1); diff --git a/query/tests/it/storages/system/contributors_table.rs b/query/tests/it/storages/system/contributors_table.rs index 37cd93803c318..9058b15ed0b38 100644 --- a/query/tests/it/storages/system/contributors_table.rs +++ b/query/tests/it/storages/system/contributors_table.rs @@ -20,7 +20,7 @@ use futures::TryStreamExt; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_contributors_table() -> Result<()> { - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; let table = ContributorsTable::create(1); let source_plan = table.read_plan(ctx.clone(), None).await?; diff --git a/query/tests/it/storages/system/credits_table.rs b/query/tests/it/storages/system/credits_table.rs index 3e93d826c0dc9..27b50a8041556 100644 --- a/query/tests/it/storages/system/credits_table.rs +++ b/query/tests/it/storages/system/credits_table.rs @@ -20,7 +20,7 @@ use futures::TryStreamExt; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_credits_table() -> Result<()> { - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; let table = CreditsTable::create(1); let source_plan = table.read_plan(ctx.clone(), None).await?; diff --git a/query/tests/it/storages/system/databases_table.rs b/query/tests/it/storages/system/databases_table.rs index acb416111719a..88d3191dbfe42 100644 --- a/query/tests/it/storages/system/databases_table.rs +++ b/query/tests/it/storages/system/databases_table.rs @@ -20,7 +20,7 @@ use futures::TryStreamExt; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_tables_table() -> Result<()> { - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; let table = DatabasesTable::create(1); let source_plan = table.read_plan(ctx.clone(), None).await?; diff --git a/query/tests/it/storages/system/engines_table.rs b/query/tests/it/storages/system/engines_table.rs index 87c63610b2b2a..115a7a8351528 100644 --- a/query/tests/it/storages/system/engines_table.rs +++ b/query/tests/it/storages/system/engines_table.rs @@ -20,7 +20,7 @@ use futures::TryStreamExt; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_engines_table() -> Result<()> { - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; let table = EnginesTable::create(1); let source_plan = table.read_plan(ctx.clone(), None).await?; diff --git a/query/tests/it/storages/system/functions_table.rs b/query/tests/it/storages/system/functions_table.rs index 7175a1e0e0b28..20360c595a5f9 100644 --- a/query/tests/it/storages/system/functions_table.rs +++ b/query/tests/it/storages/system/functions_table.rs @@ -20,7 +20,7 @@ use futures::TryStreamExt; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_functions_table() -> Result<()> { - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; let table = FunctionsTable::create(1); let source_plan = table.read_plan(ctx.clone(), None).await?; diff --git a/query/tests/it/storages/system/metrics_table.rs b/query/tests/it/storages/system/metrics_table.rs index 5df3a1b4e7fc7..ee2b74bd77a1e 100644 --- a/query/tests/it/storages/system/metrics_table.rs +++ b/query/tests/it/storages/system/metrics_table.rs @@ -23,7 +23,7 @@ use futures::TryStreamExt; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_metrics_table() -> Result<()> { init_default_metrics_recorder(); - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; let table = MetricsTable::create(1); let source_plan = table.read_plan(ctx.clone(), None).await?; diff --git a/query/tests/it/storages/system/query_log_table.rs b/query/tests/it/storages/system/query_log_table.rs index b3a9e99202e70..9eeec9133d55e 100644 --- a/query/tests/it/storages/system/query_log_table.rs +++ b/query/tests/it/storages/system/query_log_table.rs @@ -27,7 +27,7 @@ use futures::TryStreamExt; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_query_log_table() -> Result<()> { - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; ctx.get_settings().set_max_threads(2)?; let mut query_log = QueryLogTable::create(0); diff --git a/query/tests/it/storages/system/settings_table.rs b/query/tests/it/storages/system/settings_table.rs index 1cdfb6343eda8..3555bda50d85b 100644 --- a/query/tests/it/storages/system/settings_table.rs +++ b/query/tests/it/storages/system/settings_table.rs @@ -20,7 +20,7 @@ use futures::TryStreamExt; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_settings_table() -> Result<()> { - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; ctx.get_settings().set_max_threads(2)?; let table = SettingsTable::create(1); diff --git a/query/tests/it/storages/system/tables_table.rs b/query/tests/it/storages/system/tables_table.rs index d142432812cf6..da4c20de5d7c2 100644 --- a/query/tests/it/storages/system/tables_table.rs +++ b/query/tests/it/storages/system/tables_table.rs @@ -20,7 +20,7 @@ use futures::TryStreamExt; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_tables_table() -> Result<()> { - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; let table = TablesTable::create(1); let source_plan = table.read_plan(ctx.clone(), None).await?; diff --git a/query/tests/it/storages/system/tracing_table.rs b/query/tests/it/storages/system/tracing_table.rs index c07c406d0b500..096dcb8574fa1 100644 --- a/query/tests/it/storages/system/tracing_table.rs +++ b/query/tests/it/storages/system/tracing_table.rs @@ -23,7 +23,7 @@ use futures::TryStreamExt; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_tracing_table() -> Result<()> { - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; let table: Arc = Arc::new(TracingTable::create(1)); let source_plan = table.read_plan(ctx.clone(), None).await?; diff --git a/query/tests/it/storages/system/users_table.rs b/query/tests/it/storages/system/users_table.rs index 600ebfabb0052..5a2e198d0328c 100644 --- a/query/tests/it/storages/system/users_table.rs +++ b/query/tests/it/storages/system/users_table.rs @@ -27,7 +27,7 @@ use pretty_assertions::assert_eq; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_users_table() -> Result<()> { - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; let tenant = ctx.get_tenant(); ctx.get_settings().set_max_threads(2)?; let auth_data = AuthInfo::None; diff --git a/query/tests/it/table_functions/numbers_table.rs b/query/tests/it/table_functions/numbers_table.rs index df9296d28aee4..187671b0f20bc 100644 --- a/query/tests/it/table_functions/numbers_table.rs +++ b/query/tests/it/table_functions/numbers_table.rs @@ -26,7 +26,7 @@ use futures::TryStreamExt; #[tokio::test] async fn test_number_table() -> Result<()> { let tbl_args = Some(vec![Expression::create_literal(DataValue::UInt64(8))]); - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; let table = NumbersTable::create("system", "numbers_mt", 1, tbl_args)?; let source_plan = table @@ -107,7 +107,7 @@ async fn test_limit_push_down() -> Result<()> { ]; for test in tests { - let ctx = crate::tests::create_query_context()?; + let ctx = crate::tests::create_query_context().await?; let plan = PlanParser::parse(ctx.clone(), test.query).await?; let actual = format!("{:?}", plan); assert_eq!(test.expect, actual, "{:#?}", test.name); diff --git a/query/tests/it/tests/context.rs b/query/tests/it/tests/context.rs index 5caaa48373cdb..a0ee36c19b164 100644 --- a/query/tests/it/tests/context.rs +++ b/query/tests/it/tests/context.rs @@ -33,9 +33,9 @@ use databend_query::storages::StorageFactory; use crate::tests::SessionManagerBuilder; -pub fn create_query_context() -> Result> { +pub async fn create_query_context() -> Result> { let sessions = SessionManagerBuilder::create().build()?; - let dummy_session = sessions.create_session("TestSession")?; + let dummy_session = sessions.create_session("TestSession").await?; // Set user with all privileges let mut user_info = UserInfo::new( @@ -65,9 +65,9 @@ pub fn create_query_context() -> Result> { Ok(context) } -pub fn create_query_context_with_config(config: Config) -> Result> { +pub async fn create_query_context_with_config(config: Config) -> Result> { let sessions = SessionManagerBuilder::create_with_conf(config.clone()).build()?; - let dummy_session = sessions.create_session("TestSession")?; + let dummy_session = sessions.create_session("TestSession").await?; let mut user_info = UserInfo::new( "root".to_string(), @@ -155,9 +155,11 @@ impl Default for ClusterDescriptor { } } -pub fn create_query_context_with_cluster(desc: ClusterDescriptor) -> Result> { +pub async fn create_query_context_with_cluster( + desc: ClusterDescriptor, +) -> Result> { let sessions = SessionManagerBuilder::create().build()?; - let dummy_session = sessions.create_session("TestSession")?; + let dummy_session = sessions.create_session("TestSession").await?; let local_id = desc.local_node_id; let nodes = desc.cluster_nodes_list;