From c5b4c617467d454ca604c285e60b4454448b649c Mon Sep 17 00:00:00 2001 From: aceforeverd Date: Tue, 23 Jul 2024 14:52:39 +0800 Subject: [PATCH 1/2] fix: select from JOB_INFO should always in online mode Fix error when user set default `execute_mode` to offline: ```sql set global execute_mode = 'offline'; select 1; ``` --- .../_4paradigm/openmldb/taskmanager/JobInfoManager.scala | 8 ++++---- src/sdk/sql_cluster_router.cc | 6 +++--- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/JobInfoManager.scala b/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/JobInfoManager.scala index 47f1afb4d7b..7f6cd9c49b8 100644 --- a/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/JobInfoManager.scala +++ b/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/JobInfoManager.scala @@ -73,7 +73,7 @@ object JobInfoManager { } def getAllJobs(): List[JobInfo] = { - val sql = s"SELECT * FROM $JOB_INFO_TABLE_NAME" + val sql = s"SELECT * FROM $JOB_INFO_TABLE_NAME CONFIG (execute_mode = 'online')" val rs = sqlExecutor.executeSQL(INTERNAL_DB_NAME, sql) // TODO: Reorder in output, use orderby desc if SQL supported resultSetToJobs(rs).sortWith(_.getId > _.getId) @@ -82,7 +82,7 @@ object JobInfoManager { def getUnfinishedJobs(): List[JobInfo] = { // TODO: Now we can not add index for `state` and run sql with // s"SELECT * FROM $tableName WHERE state NOT IN (${JobInfo.FINAL_STATE.mkString(",")})" - val sql = s"SELECT * FROM $JOB_INFO_TABLE_NAME" + val sql = s"SELECT * FROM $JOB_INFO_TABLE_NAME CONFIG (execute_mode = 'online')" val rs = sqlExecutor.executeSQL(INTERNAL_DB_NAME, sql) val jobs = mutable.ArrayBuffer[JobInfo]() @@ -99,7 +99,7 @@ object JobInfoManager { } def stopJob(jobId: Int): JobInfo = { - val sql = s"SELECT * FROM $JOB_INFO_TABLE_NAME WHERE id = $jobId" + val sql = s"SELECT * FROM $JOB_INFO_TABLE_NAME WHERE id = $jobId CONFIG (execute_mode = 'online')" val rs = sqlExecutor.executeSQL(INTERNAL_DB_NAME, sql) val jobInfo = if (rs.getFetchSize == 0) { @@ -131,7 +131,7 @@ object JobInfoManager { def getJob(jobId: Int): Option[JobInfo] = { // TODO: Require to get only one row, https://github.com/4paradigm/OpenMLDB/issues/704 - val sql = s"SELECT * FROM $JOB_INFO_TABLE_NAME WHERE id = $jobId" + val sql = s"SELECT * FROM $JOB_INFO_TABLE_NAME WHERE id = $jobId CONFIG (execute_mode = 'online')" val rs = sqlExecutor.executeSQL(INTERNAL_DB_NAME, sql) if (rs.getFetchSize == 0) { diff --git a/src/sdk/sql_cluster_router.cc b/src/sdk/sql_cluster_router.cc index 0a77681668d..ca96e0b7db8 100644 --- a/src/sdk/sql_cluster_router.cc +++ b/src/sdk/sql_cluster_router.cc @@ -5143,7 +5143,7 @@ void SQLClusterRouter::ReadSparkConfFromFile(std::string conf_file_path, std::ma std::shared_ptr SQLClusterRouter::GetJobResultSet(int job_id, ::hybridse::sdk::Status* status) { std::string db = openmldb::nameserver::INTERNAL_DB; - std::string sql = "SELECT * FROM JOB_INFO WHERE id = " + std::to_string(job_id); + std::string sql = absl::Substitute("SELECT * FROM JOB_INFO WHERE id = $0 CONFIG (execute_mode = 'online')", job_id); auto rs = ExecuteSQLParameterized(db, sql, {}, status); if (!status->IsOK()) { @@ -5164,7 +5164,7 @@ std::shared_ptr SQLClusterRouter::GetJobResultSet(int std::shared_ptr SQLClusterRouter::GetJobResultSet(::hybridse::sdk::Status* status) { std::string db = openmldb::nameserver::INTERNAL_DB; - std::string sql = "SELECT * FROM JOB_INFO"; + std::string sql = "SELECT * FROM JOB_INFO CONFIG (execute_mode = 'online')"; auto rs = ExecuteSQLParameterized(db, sql, std::shared_ptr(), status); if (!status->IsOK()) { return {}; @@ -5187,7 +5187,7 @@ std::shared_ptr SQLClusterRouter::GetTaskManagerJobRes return this->GetJobResultSet(job_id, status); } std::string db = openmldb::nameserver::INTERNAL_DB; - std::string sql = "SELECT * FROM JOB_INFO;"; + std::string sql = "SELECT * FROM JOB_INFO CONFIG (execute_mode = 'online');"; auto rs = ExecuteSQLParameterized(db, sql, {}, status); if (!status->IsOK()) { return {}; From ea939f8174d78d08e5df156f9b99748dee8f8080 Mon Sep 17 00:00:00 2001 From: aceforeverd Date: Tue, 23 Jul 2024 15:02:31 +0800 Subject: [PATCH 2/2] fix: query mode on user & pre_agg tables --- src/sdk/sql_cluster_router.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/sdk/sql_cluster_router.cc b/src/sdk/sql_cluster_router.cc index ca96e0b7db8..607dd1c85b7 100644 --- a/src/sdk/sql_cluster_router.cc +++ b/src/sdk/sql_cluster_router.cc @@ -885,7 +885,7 @@ bool SQLClusterRouter::DropTable(const std::string& db, const std::string& table std::string meta_table = openmldb::nameserver::PRE_AGG_META_NAME; std::string select_aggr_info = absl::StrCat("select aggr_db, aggr_table from ", meta_db, ".", meta_table, " where base_table = '", - table_info->name(), "' and base_db='", table_info->db(), "';"); + table_info->name(), "' and base_db='", table_info->db(), "' CONFIG (execute_mode = 'online');"); auto rs = ExecuteSQL("", select_aggr_info, true, true, 0, status); WARN_NOT_OK_AND_RET(status, "get aggr info failed", false); if (rs->Size() > 0) { @@ -5226,7 +5226,7 @@ std::shared_ptr SQLClusterRouter::GetNameServerJobResu } absl::StatusOr SQLClusterRouter::GetUser(const std::string& name, UserInfo* user_info) { - std::string sql = absl::StrCat("select * from ", nameserver::USER_INFO_NAME); + std::string sql = absl::StrCat("select * from ", nameserver::USER_INFO_NAME, " CONFIG (execute_mode = 'online')"); hybridse::sdk::Status status; auto rs = ExecuteSQLParameterized(nameserver::INTERNAL_DB, sql, std::shared_ptr(), &status);