diff --git a/dbm-services/mysql/db-simulation/app/keyworld/mysql56.go b/dbm-services/mysql/db-simulation/app/keyworld/mysql56.go index dd65daa628..3dbac3738f 100644 --- a/dbm-services/mysql/db-simulation/app/keyworld/mysql56.go +++ b/dbm-services/mysql/db-simulation/app/keyworld/mysql56.go @@ -21,7 +21,6 @@ var Keywords56 = []Keyword{ {"ALGORITHM", false}, {"ALL", false}, {"ALTER", false}, - {"ANALYSE", false}, {"ANALYZE", false}, {"AND", false}, {"ANY", false}, diff --git a/dbm-services/mysql/db-simulation/app/keyworld/mysql57.go b/dbm-services/mysql/db-simulation/app/keyworld/mysql57.go index 7e7a47051b..6c9837637d 100644 --- a/dbm-services/mysql/db-simulation/app/keyworld/mysql57.go +++ b/dbm-services/mysql/db-simulation/app/keyworld/mysql57.go @@ -23,7 +23,6 @@ var Keywords57 = []Keyword{ {"ALL", false}, {"ALTER", true}, {"ALWAYS", false}, - {"ANALYSE", false}, {"ANALYZE", false}, {"AND", false}, {"ANY", false}, diff --git a/dbm-services/mysql/db-simulation/app/keyworld/mysql80.go b/dbm-services/mysql/db-simulation/app/keyworld/mysql80.go index f1a4804a2e..e9b1d7ba28 100644 --- a/dbm-services/mysql/db-simulation/app/keyworld/mysql80.go +++ b/dbm-services/mysql/db-simulation/app/keyworld/mysql80.go @@ -25,7 +25,6 @@ var Keywords80 = []Keyword{ {"ALL", false}, {"ALTER", true}, {"ALWAYS", false}, - {"ANALYSE", false}, {"ANALYZE", false}, {"AND", false}, {"ANY", false}, diff --git a/dbm-services/mysql/db-simulation/app/syntax/parse_relation_db.go b/dbm-services/mysql/db-simulation/app/syntax/parse_relation_db.go new file mode 100644 index 0000000000..d5abdf7b7a --- /dev/null +++ b/dbm-services/mysql/db-simulation/app/syntax/parse_relation_db.go @@ -0,0 +1,173 @@ +/* + * TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available. + * Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved. + * Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at https://opensource.org/licenses/MIT + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package syntax + +import ( + "bufio" + "encoding/json" + "errors" + "io" + "os" + "runtime/debug" + "slices" + "sync" + + "github.com/samber/lo" + + "dbm-services/common/go-pubpkg/logger" +) + +// DoParseRelationDbs parse relation db from sql file +func (tf *TmysqlParseFile) DoParseRelationDbs(version string) (createDbs, relationDbs []string, dumpAll bool, + err error) { + logger.Info("doing....") + tf.result = make(map[string]*CheckInfo) + tf.tmpWorkdir = tf.BaseWorkdir + tf.mu = sync.Mutex{} + + if !tf.IsLocalFile { + if err = tf.Init(); err != nil { + logger.Error("Do init failed %s", err.Error()) + return nil, nil, false, err + } + if err = tf.Downloadfile(); err != nil { + logger.Error("failed to download sql file from the product library %s", err.Error()) + return nil, nil, false, err + } + } + // 最后删除临时目录,不会返回错误 + defer tf.delTempDir() + + errChan := make(chan error, 1) + alreadExecutedSqlfileChan := make(chan string, len(tf.Param.FileNames)) + + go func() { + if err = tf.Execute(alreadExecutedSqlfileChan, version); err != nil { + logger.Error("failed to execute tmysqlparse: %s", err.Error()) + errChan <- err + } + close(alreadExecutedSqlfileChan) + }() + + logger.Info("start to analyze the parsing result") + createDbs, relationDbs, dumpAll, err = tf.doParseInchan(alreadExecutedSqlfileChan, version) + if err != nil { + logger.Error("failed to analyze the parsing result:%s", err.Error()) + return nil, nil, false, err + } + logger.Info("createDbs:%v,relationDbs:%v,dumpAll:%v,err:%v", createDbs, relationDbs, dumpAll, err) + return createDbs, relationDbs, dumpAll, nil +} + +// doParseInchan RelationDbs do parse relation db +func (t *TmysqlParse) doParseInchan(alreadExecutedSqlfileCh chan string, + mysqlVersion string) (createDbs []string, relationDbs []string, dumpAll bool, err error) { + var errs []error + c := make(chan struct{}, 10) + errChan := make(chan error, 5) + wg := &sync.WaitGroup{} + stopChan := make(chan struct{}) + + for sqlfile := range alreadExecutedSqlfileCh { + wg.Add(1) + c <- struct{}{} + go func(fileName string) { + defer wg.Done() + cdbs, dbs, dumpAllDbs, err := t.analyzeRelationDbs(fileName, mysqlVersion) + logger.Info("createDbs:%v,dbs:%v,dumpAllDbs:%v,err:%v", cdbs, dbs, dumpAllDbs, err) + if err != nil { + errChan <- err + } + if dumpAllDbs { + dumpAll = true + stopChan <- struct{}{} + } + t.mu.Lock() + relationDbs = append(relationDbs, dbs...) + createDbs = append(createDbs, cdbs...) + t.mu.Unlock() + <-c + }(sqlfile) + } + + go func() { + wg.Wait() + close(errChan) + stopChan <- struct{}{} + }() + + for { + select { + case err := <-errChan: + errs = append(errs, err) + case <-stopChan: + return createDbs, relationDbs, dumpAll, errors.Join(errs...) + } + } +} + +// analyzeRelationDbs 分析变更sql相关的db +func (t *TmysqlParse) analyzeRelationDbs(inputfileName, mysqlVersion string) ( + createDbs []string, + relationDbs []string, + dumpAll bool, + err error) { + defer func() { + if r := recover(); r != nil { + logger.Error("panic error:%v,stack:%s", r, string(debug.Stack())) + } + }() + t.result[inputfileName] = &CheckInfo{} + f, err := os.Open(t.getAbsoutputfilePath(inputfileName, mysqlVersion)) + if err != nil { + logger.Error("open file failed %s", err.Error()) + return nil, nil, false, err + } + defer f.Close() + reader := bufio.NewReader(f) + for { + line, errx := reader.ReadBytes(byte('\n')) + if errx != nil { + if errx == io.EOF { + break + } + logger.Error("read Line Error %s", errx.Error()) + return nil, nil, false, errx + } + if len(line) == 1 && line[0] == byte('\n') { + continue + } + var res ParseLineQueryBase + if err = json.Unmarshal(line, &res); err != nil { + logger.Error("json unmasrshal line:%s failed %s", string(line), err.Error()) + return nil, nil, false, err + } + // 判断是否有语法错误 + if res.ErrorCode != 0 { + return nil, nil, false, err + } + if slices.Contains([]string{SQLTypeCreateProcedure, SQLTypeCreateFunction, SQLTypeCreateView, SQLTypeCreateTrigger}, + res.Command) { + return nil, nil, true, nil + } + if lo.IsEmpty(res.DbName) { + continue + } + // create db not need dump db + if slices.Contains([]string{SQLTypeCreateDb}, res.Command) { + createDbs = append(createDbs, res.DbName) + continue + } + relationDbs = append(relationDbs, res.DbName) + + } + return createDbs, relationDbs, false, nil +} diff --git a/dbm-services/mysql/db-simulation/app/syntax/syntax.go b/dbm-services/mysql/db-simulation/app/syntax/syntax.go index ff699e68b2..f0ad05fb22 100644 --- a/dbm-services/mysql/db-simulation/app/syntax/syntax.go +++ b/dbm-services/mysql/db-simulation/app/syntax/syntax.go @@ -301,7 +301,10 @@ func (tf *TmysqlParseFile) UploadDdlTblMapFile() (err error) { } func getSQLParseResultFile(fileName, version string) string { - return fmt.Sprintf("%s-%s.json", version, fileName) + if lo.IsNotEmpty(version) { + return fmt.Sprintf("%s-%s.json", version, fileName) + } + return fileName + ".json" } // getCommand generates the command string for running TmysqlParse diff --git a/dbm-services/mysql/db-simulation/app/syntax/tmysqlpase_schema.go b/dbm-services/mysql/db-simulation/app/syntax/tmysqlpase_schema.go index 64032c102c..d48da5ab2b 100644 --- a/dbm-services/mysql/db-simulation/app/syntax/tmysqlpase_schema.go +++ b/dbm-services/mysql/db-simulation/app/syntax/tmysqlpase_schema.go @@ -43,6 +43,8 @@ const ( SQLTypeDelete = "delete" // SQLTypeUpdate is update sql SQLTypeUpdate = "update" + // SQLTypeUseDb is use db sql + SQLTypeUseDb = "change_db" ) // ColDef mysql column definition diff --git a/dbm-services/mysql/db-simulation/handler/dbsimulation.go b/dbm-services/mysql/db-simulation/handler/dbsimulation.go index 22e6bd5a3b..e2bdf50777 100644 --- a/dbm-services/mysql/db-simulation/handler/dbsimulation.go +++ b/dbm-services/mysql/db-simulation/handler/dbsimulation.go @@ -12,6 +12,7 @@ package handler import ( "fmt" + "net/http" "strings" "github.com/gin-gonic/gin" @@ -22,6 +23,125 @@ import ( "dbm-services/mysql/db-simulation/model" ) +// SimulationHandler TODO +type SimulationHandler struct { + BaseHandler +} + +// RegisterRouter 注册路由信息 +func (s *SimulationHandler) RegisterRouter(engine *gin.Engine) { + t := engine.Group("/simulation") + { + // query simulation task status info + t.POST("/task/file", s.QuerySimulationFileResult) + t.POST("/task", s.QueryTask) + } + // mysql + g := engine.Group("/mysql") + { + g.POST("/simulation", s.TendbSimulation) + g.POST("/task", s.QueryTask) + } + // spider + sp := engine.Group("/spider") + { + sp.POST("/simulation", s.TendbClusterSimulation) + sp.POST("/create", s.CreateTmpSpiderPodCluster) + } +} + +// CreateClusterParam 创建临时的spider的集群参数 +type CreateClusterParam struct { + Pwd string `json:"pwd"` + PodName string `json:"podname"` + SpiderVersion string `json:"spider_version"` + BackendVersion string `json:"backend_version"` +} + +// CreateTmpSpiderPodCluster 创建临时的spider的集群,多用于测试,debug +func (s *SimulationHandler) CreateTmpSpiderPodCluster(r *gin.Context) { + var param CreateClusterParam + if err := s.Prepare(r, ¶m); err != nil { + logger.Error("ShouldBind failed %s", err) + return + } + ps := service.NewDbPodSets() + ps.BaseInfo = &service.MySQLPodBaseInfo{ + PodName: param.PodName, + RootPwd: param.Pwd, + Charset: "utf8mb4", + } + var err error + ps.DbImage, err = service.GetImgFromMySQLVersion(param.BackendVersion) + if err != nil { + logger.Error(err.Error()) + return + } + ps.SpiderImage, ps.TdbCtlImage = service.GetSpiderAndTdbctlImg(param.SpiderVersion, service.LatestVersion) + if err := ps.CreateClusterPod(""); err != nil { + logger.Error(err.Error()) + return + } + s.SendResponse(r, nil, "ok") +} + +func replaceUnderSource(str string) string { + return strings.ReplaceAll(str, "_", "-") +} + +// T 请求查询模拟执行整体任务的执行状态参数 +type T struct { + TaskID string `json:"task_id"` +} + +// QueryTask 查询模拟执行整体任务的执行状态 +func (s *SimulationHandler) QueryTask(c *gin.Context) { + var param T + if err := s.Prepare(c, ¶m); err != nil { + logger.Error("ShouldBind failed %s", err) + return + } + logger.Info("get task_id is %s", param.TaskID) + var tasks []model.TbSimulationTask + if err := model.DB.Where(&model.TbSimulationTask{TaskId: param.TaskID}).Find(&tasks).Error; err != nil { + logger.Error("query task failed %s", err.Error()) + s.SendResponse(c, err, map[string]interface{}{"stderr": err.Error()}) + return + } + allSuccessful := false + for _, task := range tasks { + if task.Phase != model.PhaseDone { + c.JSON(http.StatusOK, Response{ + Code: 2, + Message: fmt.Sprintf("task current phase is %s", task.Phase), + Data: "", + }) + return + } + switch task.Status { + case model.TaskFailed: + allSuccessful = false + s.SendResponse(c, fmt.Errorf("%s", task.SysErrMsg), map[string]interface{}{ + "simulation_version": task.MySQLVersion, + "stdout": task.Stdout, + "stderr": task.Stderr, + "errmsg": fmt.Sprintf("the program has been run with abnormal status:%s", task.Status)}) + + case model.TaskSuccess: + allSuccessful = true + default: + allSuccessful = false + s.SendResponse(c, fmt.Errorf("unknown transition state"), map[string]interface{}{ + "stdout": task.Stdout, + "stderr": task.Stderr, + "errmsg": fmt.Sprintf("the program has been run with abnormal status:%s", task.Status)}) + } + } + if allSuccessful { + s.SendResponse(c, nil, map[string]interface{}{"stdout": "all ok", "stderr": "all ok"}) + } +} + // QueryFileResultParam 获取模拟执行文件的结果 type QueryFileResultParam struct { RootID string `json:"root_id" binding:"required" ` @@ -29,11 +149,10 @@ type QueryFileResultParam struct { } // QuerySimulationFileResult 查询模拟执行每个文件的执行结果 -func QuerySimulationFileResult(r *gin.Context) { +func (s *SimulationHandler) QuerySimulationFileResult(r *gin.Context) { var param QueryFileResultParam - if err := r.ShouldBindJSON(¶m); err != nil { + if err := s.Prepare(r, ¶m); err != nil { logger.Error("ShouldBind failed %s", err) - SendResponse(r, err, "failed to deserialize parameters", "") return } task_id := fmt.Sprintf("%s_%s", param.RootID, param.VersionID) @@ -41,39 +160,37 @@ func QuerySimulationFileResult(r *gin.Context) { err := model.DB.Where("task_id = ? ", task_id).Find(&data).Error if err != nil { logger.Error("query file task result failed %v", err) - SendResponse(r, err, err.Error(), "") + s.SendResponse(r, err, err.Error()) return } - SendResponse(r, nil, data, "") + s.SendResponse(r, nil, data) } // TendbSimulation Tendb simulation handler -func TendbSimulation(r *gin.Context) { +func (s *SimulationHandler) TendbSimulation(r *gin.Context) { var param service.BaseParam - requestID := r.GetString("request_id") - if err := r.ShouldBindJSON(¶m); err != nil { + if err := s.Prepare(r, ¶m); err != nil { logger.Error("ShouldBind failed %s", err) - SendResponse(r, err, "failed to deserialize parameters", requestID) return } - if requestID == "" { - SendResponse(r, fmt.Errorf("create request id failed"), nil, requestID) + if s.RequestId == "" { + s.SendResponse(r, fmt.Errorf("create request id failed"), nil) return } version := param.MySQLVersion img, err := service.GetImgFromMySQLVersion(version) if err != nil { logger.Error("GetImgFromMySQLVersion %s failed:%s", version, err.Error()) - SendResponse(r, err, nil, requestID) + s.SendResponse(r, err, nil) return } - if err := model.CreateTask(param.TaskId, requestID, version, param.Uid); err != nil { + if err := model.CreateTask(param.TaskId, s.RequestId, version, param.Uid); err != nil { logger.Error("create task db record error %s", err.Error()) - SendResponse(r, err, nil, requestID) + s.SendResponse(r, err, nil) return } tsk := service.SimulationTask{ - RequestId: requestID, + RequestId: s.RequestId, DbPodSets: service.NewDbPodSets(), BaseParam: ¶m, Version: version, @@ -83,40 +200,38 @@ func TendbSimulation(r *gin.Context) { PodName: fmt.Sprintf("tendb-%s-%s", strings.ToLower(version), replaceUnderSource(param.TaskId)), Lables: map[string]string{"task_id": replaceUnderSource(param.TaskId), - "request_id": requestID}, + "request_id": s.RequestId}, RootPwd: param.TaskId, Args: param.BuildStartArgs(), Charset: param.MySQLCharSet, } service.TaskChan <- tsk - SendResponse(r, nil, "request successful", requestID) + s.SendResponse(r, nil, "request successful") } // TendbClusterSimulation TendbCluster simulation handler -func TendbClusterSimulation(r *gin.Context) { +func (s *SimulationHandler) TendbClusterSimulation(r *gin.Context) { var param service.SpiderSimulationExecParam - RequestID := r.GetString("request_id") - if err := r.ShouldBindJSON(¶m); err != nil { + if err := s.Prepare(r, ¶m); err != nil { logger.Error("ShouldBind failed %s", err) - SendResponse(r, err, "failed to deserialize parameters", RequestID) return } version := param.MySQLVersion img, err := service.GetImgFromMySQLVersion(version) if err != nil { logger.Error("GetImgFromMySQLVersion %s failed:%s", version, err.Error()) - SendResponse(r, err, nil, RequestID) + s.SendResponse(r, err, nil) return } - if err := model.CreateTask(param.TaskId, RequestID, version, param.Uid); err != nil { + if err := model.CreateTask(param.TaskId, s.RequestId, version, param.Uid); err != nil { logger.Error("create task db record error %s", err.Error()) - SendResponse(r, err, nil, RequestID) + s.SendResponse(r, err, nil) return } tsk := service.SimulationTask{ - RequestId: RequestID, + RequestId: s.RequestId, DbPodSets: service.NewDbPodSets(), BaseParam: ¶m.BaseParam, Version: version, @@ -131,10 +246,10 @@ func TendbClusterSimulation(r *gin.Context) { PodName: fmt.Sprintf("spider-%s-%s", strings.ToLower(version), replaceUnderSource(param.TaskId)), Lables: map[string]string{"task_id": replaceUnderSource(param.TaskId), - "request_id": RequestID}, + "request_id": s.RequestId}, RootPwd: rootPwd, Charset: param.MySQLCharSet, } service.SpiderTaskChan <- tsk - SendResponse(r, nil, "request successful", RequestID) + s.SendResponse(r, nil, "request successful") } diff --git a/dbm-services/mysql/db-simulation/handler/handler.go b/dbm-services/mysql/db-simulation/handler/handler.go index c085d377b0..ee5225c8ab 100644 --- a/dbm-services/mysql/db-simulation/handler/handler.go +++ b/dbm-services/mysql/db-simulation/handler/handler.go @@ -14,13 +14,12 @@ package handler import ( "fmt" "net/http" - "strings" "github.com/gin-gonic/gin" + "dbm-services/common/go-pubpkg/cmutil" + "dbm-services/common/go-pubpkg/errno" "dbm-services/common/go-pubpkg/logger" - "dbm-services/mysql/db-simulation/app/service" - "dbm-services/mysql/db-simulation/model" ) // Response response data define @@ -31,117 +30,36 @@ type Response struct { Code int `json:"code"` } -// CreateClusterParam 创建临时的spider的集群参数 -type CreateClusterParam struct { - Pwd string `json:"pwd"` - PodName string `json:"podname"` - SpiderVersion string `json:"spider_version"` - BackendVersion string `json:"backend_version"` +// BaseHandler base handler +type BaseHandler struct { + RequestId string } -// CreateTmpSpiderPodCluster 创建临时的spider的集群,多用于测试,debug -func CreateTmpSpiderPodCluster(r *gin.Context) { - var param CreateClusterParam - if err := r.ShouldBindJSON(¶m); err != nil { - logger.Error("ShouldBind failed %s", err) - SendResponse(r, err, "failed to deserialize parameters", "") - return - } - ps := service.NewDbPodSets() - ps.BaseInfo = &service.MySQLPodBaseInfo{ - PodName: param.PodName, - RootPwd: param.Pwd, - Charset: "utf8mb4", - } - var err error - ps.DbImage, err = service.GetImgFromMySQLVersion(param.BackendVersion) - if err != nil { - logger.Error(err.Error()) - return - } - ps.SpiderImage, ps.TdbCtlImage = service.GetSpiderAndTdbctlImg(param.SpiderVersion, service.LatestVersion) - if err := ps.CreateClusterPod(""); err != nil { - logger.Error(err.Error()) - return - } - SendResponse(r, nil, "ok", "") -} - -func replaceUnderSource(str string) string { - return strings.ReplaceAll(str, "_", "-") -} - -// T 请求查询模拟执行整体任务的执行状态参数 -type T struct { - TaskID string `json:"task_id"` +// SendResponse retrnurns a response +func (c *BaseHandler) SendResponse(r *gin.Context, err error, data interface{}) { + code, message := errno.DecodeErr(err) + r.JSON(http.StatusOK, Response{ + Code: code, + Message: message, + Data: data, + RequestID: c.RequestId, + }) } -// QueryTask 查询模拟执行整体任务的执行状态 -func QueryTask(c *gin.Context) { - var param T - if err := c.ShouldBindJSON(¶m); err != nil { - logger.Error("ShouldBind failed %s", err) - SendResponse(c, err, map[string]interface{}{"stderr": "failed to deserialize parameters"}, "") - return - } - logger.Info("get task_id is %s", param.TaskID) - var tasks []model.TbSimulationTask - if err := model.DB.Where(&model.TbSimulationTask{TaskId: param.TaskID}).Find(&tasks).Error; err != nil { - logger.Error("query task failed %s", err.Error()) - SendResponse(c, err, map[string]interface{}{"stderr": err.Error()}, "") - return - } - allSuccessful := false - for _, task := range tasks { - if task.Phase != model.PhaseDone { - c.JSON(http.StatusOK, Response{ - Code: 2, - Message: fmt.Sprintf("task current phase is %s", task.Phase), - Data: "", - }) - return - } - switch task.Status { - case model.TaskFailed: - allSuccessful = false - SendResponse(c, fmt.Errorf("%s", task.SysErrMsg), map[string]interface{}{ - "simulation_version": task.MySQLVersion, - "stdout": task.Stdout, - "stderr": task.Stderr, - "errmsg": fmt.Sprintf("the program has been run with abnormal status:%s", task.Status)}, - "") - - case model.TaskSuccess: - allSuccessful = true - default: - allSuccessful = false - SendResponse(c, fmt.Errorf("unknown transition state"), map[string]interface{}{ - "stdout": task.Stdout, - "stderr": task.Stderr, - "errmsg": fmt.Sprintf("the program has been run with abnormal status:%s", task.Status)}, - "") - } - } - if allSuccessful { - SendResponse(c, nil, map[string]interface{}{"stdout": "all ok", "stderr": "all ok"}, "") +// Prepare before request prepared +func (c *BaseHandler) Prepare(r *gin.Context, schema interface{}) error { + requestId := r.GetString("request_id") + if cmutil.IsEmpty(requestId) { + err := fmt.Errorf("get request id error ~") + c.SendResponse(r, err, nil) + return err } -} - -// SendResponse return response data to http client -func SendResponse(r *gin.Context, err error, data interface{}, requestid string) { - if err != nil { - r.JSON(http.StatusOK, Response{ - Code: 1, - Message: err.Error(), - Data: data, - RequestID: requestid, - }) - return + c.RequestId = requestId + if err := r.ShouldBind(&schema); err != nil { + logger.Error("ShouldBind Failed %s", err.Error()) + c.SendResponse(r, err, nil) + return err } - r.JSON(http.StatusOK, Response{ - Code: 0, - Message: "successfully", - Data: data, - RequestID: requestid, - }) + logger.Info("param is %v", schema) + return nil } diff --git a/dbm-services/mysql/db-simulation/handler/syntax_check.go b/dbm-services/mysql/db-simulation/handler/syntax_check.go index ebe129aafa..3f45298890 100644 --- a/dbm-services/mysql/db-simulation/handler/syntax_check.go +++ b/dbm-services/mysql/db-simulation/handler/syntax_check.go @@ -28,6 +28,9 @@ import ( var tmysqlParserBin string var workdir string +// ForceDumpAll 是否强制 dump 所有库表 +var ForceDumpAll bool + func init() { tmysqlParserBin = strings.TrimSpace(viper.GetString("tmysqlparser_bin")) // 容器环境会把 tmysqlparse 打包进来 @@ -44,10 +47,27 @@ func init() { } workdir = "/" } + ForceDumpAll = false } // SyntaxHandler 语法检查 handler -type SyntaxHandler struct{} +type SyntaxHandler struct { + BaseHandler +} + +// RegisterRouter 注册路由信息 +func (s *SyntaxHandler) RegisterRouter(engine *gin.Engine) { + r := engine.Group("/syntax") + { + // syntax + r.POST("/check/file", s.SyntaxCheckFile) + r.POST("/check/sql", s.SyntaxCheckSQL) + r.POST("/upload/ddl/tbls", s.CreateAndUploadDDLTblListFile) + r.POST("/parse/file/relation/db", s.ParseSQLFileRelationDb) + r.POST("/parse/sql/relation/db", s.ParseSQLRelationDb) + r.POST("/parse/set/dumpall", s.SetDumpAll) + } +} // CheckSQLStringParam sql string 语法检查参数 type CheckSQLStringParam struct { @@ -56,16 +76,20 @@ type CheckSQLStringParam struct { Sqls []string `json:"sqls" binding:"gt=0,dive,required"` } +// SetDumpAll set dump all +func (s *SyntaxHandler) SetDumpAll(r *gin.Context) { + ForceDumpAll = !ForceDumpAll + logger.Info("ForceDumpAll is: %v", ForceDumpAll) +} + // SyntaxCheckSQL 语法检查入参SQL string -func SyntaxCheckSQL(r *gin.Context) { - requestID := r.GetString("request_id") +func (s *SyntaxHandler) SyntaxCheckSQL(r *gin.Context) { var param CheckSQLStringParam var data map[string]*syntax.CheckInfo var versions []string // 将request中的数据按照json格式直接解析到结构体中 - if err := r.ShouldBindJSON(¶m); err != nil { - logger.Error("ShouldBind failed %s", err) - SendResponse(r, err, nil, requestID) + if err := s.Prepare(r, ¶m); err != nil { + logger.Error("Preare Error %s", err.Error()) return } @@ -78,22 +102,22 @@ func SyntaxCheckSQL(r *gin.Context) { sqlContext := strings.Join(param.Sqls, "\n") fileName := "ce_" + cmutil.RandStr(10) + ".sql" - workdir = path.Join(workdir, time.Now().Format("20060102150405")) - if err := os.MkdirAll(workdir, 0755); err != nil { - SendResponse(r, err, err.Error(), requestID) + tpWorkdir := path.Join(workdir, time.Now().Format("20060102150405")) + if err := os.MkdirAll(tpWorkdir, 0755); err != nil { + s.SendResponse(r, err, err.Error()) return } - f := path.Join(workdir, fileName) + f := path.Join(tpWorkdir, fileName) err := os.WriteFile(f, []byte(sqlContext), 0600) if err != nil { - SendResponse(r, err, err.Error(), requestID) + s.SendResponse(r, err, err.Error()) return } check := &syntax.TmysqlParseFile{ TmysqlParse: syntax.TmysqlParse{ TmysqlParseBinPath: tmysqlParserBin, - BaseWorkdir: workdir, + BaseWorkdir: tpWorkdir, }, IsLocalFile: true, Param: syntax.CheckSQLFileParam{ @@ -114,10 +138,10 @@ func SyntaxCheckSQL(r *gin.Context) { } if err != nil { - SendResponse(r, err, data, requestID) + s.SendResponse(r, err, data) return } - SendResponse(r, nil, data, requestID) + s.SendResponse(r, nil, data) } // CheckFileParam 语法检查请求参数 @@ -129,16 +153,14 @@ type CheckFileParam struct { } // SyntaxCheckFile 运行语法检查 -func SyntaxCheckFile(r *gin.Context) { - requestID := r.GetString("request_id") +func (s *SyntaxHandler) SyntaxCheckFile(r *gin.Context) { var param CheckFileParam var data map[string]*syntax.CheckInfo var err error var versions []string // 将request中的数据按照json格式直接解析到结构体中 - if err = r.ShouldBindJSON(¶m); err != nil { + if err = s.Prepare(r, ¶m); err != nil { logger.Error("ShouldBind failed %s", err) - SendResponse(r, err, nil, requestID) return } @@ -170,20 +192,18 @@ func SyntaxCheckFile(r *gin.Context) { } if err != nil { - SendResponse(r, err, data, requestID) + s.SendResponse(r, err, data) return } - SendResponse(r, nil, data, requestID) + s.SendResponse(r, nil, data) } // CreateAndUploadDDLTblListFile 分析变更SQL DDL操作的表,并将文件上传到制品库 -func CreateAndUploadDDLTblListFile(r *gin.Context) { - requestID := r.GetString("request_id") +func (s *SyntaxHandler) CreateAndUploadDDLTblListFile(r *gin.Context) { var param CheckFileParam // 将request中的数据按照json格式直接解析到结构体中 - if err := r.ShouldBindJSON(¶m); err != nil { + if err := s.Prepare(r, ¶m); err != nil { logger.Error("ShouldBind failed %s", err) - SendResponse(r, err, nil, requestID) return } check := &syntax.TmysqlParseFile{ @@ -197,10 +217,97 @@ func CreateAndUploadDDLTblListFile(r *gin.Context) { }, } if err := check.CreateAndUploadDDLTblFile(); err != nil { - SendResponse(r, err, nil, requestID) + s.SendResponse(r, err, nil) + return + } + s.SendResponse(r, nil, "ok") +} + +// ParseSQLFileRelationDb 解析SQL文件中涉及到需要变更的数据库 +func (s SyntaxHandler) ParseSQLFileRelationDb(r *gin.Context) { + if ForceDumpAll { + s.SendResponse(r, nil, gin.H{ + "create_dbs": []string{}, + "dbs": []string{}, + "dump_all": true, + "timestamp": time.Now().Unix(), + "desc": "force dump all", + }) + return + } + var param CheckFileParam + // 将request中的数据按照json格式直接解析到结构体中 + if err := s.Prepare(r, ¶m); err != nil { + logger.Error("ShouldBind failed %s", err) + return + } + p := &syntax.TmysqlParseFile{ + TmysqlParse: syntax.TmysqlParse{ + TmysqlParseBinPath: tmysqlParserBin, + BaseWorkdir: workdir, + }, + Param: syntax.CheckSQLFileParam{ + BkRepoBasePath: param.Path, + FileNames: param.Files, + }, + } + createDbs, dbs, dumpall, err := p.DoParseRelationDbs("") + if err != nil { + s.SendResponse(r, err, nil) + return + } + s.SendResponse(r, nil, gin.H{ + "create_dbs": createDbs, + "dbs": dbs, + "dump_all": dumpall, + "timestamp": time.Now().Unix(), + }) +} + +// ParseSQLRelationDb 语法检查入参SQL string +func (s *SyntaxHandler) ParseSQLRelationDb(r *gin.Context) { + var param CheckSQLStringParam + // 将request中的数据按照json格式直接解析到结构体中 + if err := s.Prepare(r, ¶m); err != nil { + logger.Error("Preare Error %s", err.Error()) + return + } + sqlContext := strings.Join(param.Sqls, "\n") + fileName := "ce_" + cmutil.RandStr(10) + ".sql" + tmpWorkdir := path.Join(workdir, time.Now().Format("20060102150405")) + if err := os.MkdirAll(tmpWorkdir, 0755); err != nil { + s.SendResponse(r, err, err.Error()) + return + } + f := path.Join(tmpWorkdir, fileName) + err := os.WriteFile(f, []byte(sqlContext), 0600) + if err != nil { + s.SendResponse(r, err, err.Error()) + return + } + + p := &syntax.TmysqlParseFile{ + TmysqlParse: syntax.TmysqlParse{ + TmysqlParseBinPath: tmysqlParserBin, + BaseWorkdir: tmpWorkdir, + }, + IsLocalFile: true, + Param: syntax.CheckSQLFileParam{ + BkRepoBasePath: "", + FileNames: []string{fileName}, + }, + } + createDbs, dbs, dumpall, err := p.DoParseRelationDbs("") + if err != nil { + s.SendResponse(r, err, nil) return } - SendResponse(r, nil, "ok", requestID) + s.SendResponse(r, nil, gin.H{ + "create_dbs": createDbs, + "dbs": dbs, + "dump_all": dumpall, + "timestamp": time.Now().Unix(), + }) } // rebuildVersion tmysql 需要指定特殊的version diff --git a/dbm-services/mysql/db-simulation/handler/syntax_rule.go b/dbm-services/mysql/db-simulation/handler/syntax_rule.go index bd161e68a5..a5a13fc6c0 100644 --- a/dbm-services/mysql/db-simulation/handler/syntax_rule.go +++ b/dbm-services/mysql/db-simulation/handler/syntax_rule.go @@ -21,6 +21,22 @@ import ( "dbm-services/mysql/db-simulation/model" ) +// ManageRuleHandler manage rule handler +type ManageRuleHandler struct { + BaseHandler +} + +// RegisterRouter 注册路由信息 +func (m *ManageRuleHandler) RegisterRouter(engine *gin.Engine) { + r := engine.Group("/rule") + { + r.POST("/manage", m.ManageRule) + r.GET("/getall", m.GetAllRule) + r.POST("/update", m.UpdateRule) + r.POST("/reload", m.ReloadRule) + } +} + // OptRuleParam 语法规则管理参数 type OptRuleParam struct { RuleID int `json:"rule_id" binding:"required"` @@ -28,32 +44,31 @@ type OptRuleParam struct { } // ManageRule 语法规则管理 -func ManageRule(c *gin.Context) { +func (m *ManageRuleHandler) ManageRule(c *gin.Context) { var param OptRuleParam - if err := c.ShouldBindJSON(¶m); err != nil { + if err := m.Prepare(c, ¶m); err != nil { logger.Error("ShouldBind failed %s", err) - SendResponse(c, err, "failed to deserialize parameters", "") return } result := model.DB.Model(&model.TbSyntaxRule{}).Where(&model.TbSyntaxRule{ID: param.RuleID}).Update("status", param.Status).Limit(1) if result.Error != nil { logger.Error("update rule status failed %s,affect rows %d", result.Error.Error(), result.RowsAffected) - SendResponse(c, result.Error, result.Error, "") + m.SendResponse(c, result.Error, result.Error) return } - SendResponse(c, nil, "ok", "") + m.SendResponse(c, nil, "ok") } // GetAllRule 获取所有权限规则 -func GetAllRule(c *gin.Context) { +func (m *ManageRuleHandler) GetAllRule(c *gin.Context) { var rs []model.TbSyntaxRule if err := model.DB.Find(&rs).Error; err != nil { logger.Error("query rules failed %s", err.Error()) - SendResponse(c, err, err.Error(), "") + m.SendResponse(c, err, err.Error()) return } - SendResponse(c, nil, rs, "") + m.SendResponse(c, nil, rs) } // UpdateRuleParam 更新语法规则参数 @@ -63,13 +78,12 @@ type UpdateRuleParam struct { } // UpdateRule update syntax rule -func UpdateRule(r *gin.Context) { +func (m *ManageRuleHandler) UpdateRule(r *gin.Context) { logger.Info("UpdateRule...") var param UpdateRuleParam // 将request中的数据按照json格式直接解析到结构体中 - if err := r.ShouldBindJSON(¶m); err != nil { + if err := m.Prepare(r, ¶m); err != nil { logger.Error("ShouldBind failed %s", err) - SendResponse(r, err, nil, "") return } var tsr model.TbSyntaxRule @@ -81,63 +95,63 @@ func UpdateRule(r *gin.Context) { // 判断float64存的是整数 if v == float64(int64(v)) { if !(tsr.ItemType == "int") { - errReturn(r, &tsr) + m.errReturn(r, &tsr) return } updateTable(param.ID, int(v)) } else { err = errors.New("not int") logger.Error("Type of error: %s", err) - SendResponse(r, err, nil, "") + m.SendResponse(r, err, nil) return } case bool: if tsr.ItemType == "bool" { updateTable(param.ID, fmt.Sprintf("%t", v)) } else { - errReturn(r, &tsr) + m.errReturn(r, &tsr) return } case string: if tsr.ItemType == "string" { updateTable(param.ID, fmt.Sprintf("%+q", v)) } else { - errReturn(r, &tsr) + m.errReturn(r, &tsr) return } case []interface{}: if tsr.ItemType == "arry" { updateTable(param.ID, fmt.Sprintf("%+q", v)) } else { - errReturn(r, &tsr) + m.errReturn(r, &tsr) return } default: err = errors.New("illegal type") logger.Error("%s", err) - SendResponse(r, err, nil, "") + m.SendResponse(r, err, nil) return } - SendResponse(r, nil, "sucessed", "") + m.SendResponse(r, nil, "sucessed") } func updateTable(id int, item interface{}) { model.DB.Model(&model.TbSyntaxRule{}).Where("id", id).Update("item", item) } -func errReturn(r *gin.Context, tsr *model.TbSyntaxRule) { +func (m *ManageRuleHandler) errReturn(r *gin.Context, tsr *model.TbSyntaxRule) { err := fmt.Errorf("%s type required", tsr.ItemType) logger.Error("Item type error: %s", err) - SendResponse(r, err, nil, "") + m.SendResponse(r, err, nil) } // ReloadRule trigger reload rule -func ReloadRule(c *gin.Context) { +func (m *ManageRuleHandler) ReloadRule(c *gin.Context) { err := syntax.ReloadRuleFromDb() if err != nil { logger.Error("reload rule from db failed %s", err.Error()) - SendResponse(c, err, nil, "") + m.SendResponse(c, err, nil) return } - SendResponse(c, nil, "ok", "") + m.SendResponse(c, nil, "ok") } diff --git a/dbm-services/mysql/db-simulation/main.go b/dbm-services/mysql/db-simulation/main.go index 0a8986ff97..8604c17348 100644 --- a/dbm-services/mysql/db-simulation/main.go +++ b/dbm-services/mysql/db-simulation/main.go @@ -132,6 +132,7 @@ func (w bodyLogWriter) Write(b []byte) (int, error) { return w.ResponseWriter.Write(b) } +// returnRessponeMiddleware TODO // BodyLogMiddleware 记录返回的body func returnRessponeMiddleware(c *gin.Context) { blw := &bodyLogWriter{body: bytes.NewBufferString(""), ResponseWriter: c.Writer} diff --git a/dbm-services/mysql/db-simulation/router/router.go b/dbm-services/mysql/db-simulation/router/router.go index 51d5f3fc6d..c2c01a15cf 100644 --- a/dbm-services/mysql/db-simulation/router/router.go +++ b/dbm-services/mysql/db-simulation/router/router.go @@ -27,30 +27,16 @@ func RegisterRouter(engine *gin.Engine) { context.String(http.StatusOK, "pong") }) engine.POST("/app/debug", TurnOnDebug) - - // query simulation task status info - t := engine.Group("/simulation") - t.POST("/task/file", handler.QuerySimulationFileResult) - t.POST("/task", handler.QueryTask) - // mysql - g := engine.Group("/mysql") - g.POST("/simulation", handler.TendbSimulation) - g.POST("/task", handler.QueryTask) // syntax - s := engine.Group("/syntax") - s.POST("/check/file", handler.SyntaxCheckFile) - s.POST("/check/sql", handler.SyntaxCheckSQL) - s.POST("/upload/ddl/tbls", handler.CreateAndUploadDDLTblListFile) + syntaxHandler := handler.SyntaxHandler{} + syntaxHandler.RegisterRouter(engine) + // simulation + simulationHandler := handler.SimulationHandler{} + simulationHandler.RegisterRouter(engine) // rule - r := engine.Group("/rule") - r.POST("/manage", handler.ManageRule) - r.GET("/getall", handler.GetAllRule) - r.POST("/update", handler.UpdateRule) - r.POST("/reload", handler.ReloadRule) - // spider - sp := engine.Group("/spider") - sp.POST("/simulation", handler.TendbClusterSimulation) - sp.POST("/create", handler.CreateTmpSpiderPodCluster) + manageRuleHandler := handler.ManageRuleHandler{} + manageRuleHandler.RegisterRouter(engine) + } // TurnOnDebug turn on debug,not del simulation pod diff --git a/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql/semantic_dump_schema.go b/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql/semantic_dump_schema.go index ec4ee037c8..2e2f01b886 100644 --- a/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql/semantic_dump_schema.go +++ b/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql/semantic_dump_schema.go @@ -19,6 +19,8 @@ import ( "regexp" "strings" + "github.com/samber/lo" + "dbm-services/common/go-pubpkg/bkrepo" "dbm-services/common/go-pubpkg/cmutil" "dbm-services/common/go-pubpkg/logger" @@ -48,6 +50,14 @@ type DumpSchemaParam struct { CharSet string `json:"charset" validate:"required,checkCharset"` // 备份文件名后缀,清理相关文件 BackupFileNameSuffix string `json:"backup_file_name_suffix" validate:"required"` + + DumpAll bool `json:"dump_all"` + ParseNeedDumpDbs []string `json:"parse_need_dump_dbs"` + // SQL 语句中解析出来的create database dbs + // 需要导出的原因是复现 create database 是否已经存在的错误 + ParseCreateDbs []string `json:"parse_create_dbs"` + ExecuteObjects []ExecuteSQLFileObj `json:"execute_objects"` + UploadBkRepoParam } @@ -153,28 +163,22 @@ func (c *SemanticDumpSchemaComp) Init() (err error) { if c.isSpider { c.dumpCmd = path.Join(cst.TdbctlInstallPath, "bin", "mysqldump") } - finaldbs := []string{} - reg := regexp.MustCompile(`^bak_cbs`) - newBackupDbreg := regexp.MustCompile(`^stage_truncate`) - ignoreDBs := computil.GetGcsSystemDatabasesIgnoreTest(version) + if c.isSpider { // test 库里面的这些表没有主键,导入中控会失败 c.ignoreTables = []string{"test.conn_log", "test.free_space"} } - for _, db := range util.FilterOutStringSlice(alldbs, ignoreDBs) { - if reg.MatchString(db) { - continue - } - if newBackupDbreg.MatchString(db) { - continue - } - finaldbs = append(finaldbs, db) - } + finaldbs, err := c.getDumpdbs(alldbs, version) + if err != nil { + logger.Error("calculate the dbs to dump failed:%s", err.Error()) + return err + } if len(finaldbs) == 0 { return fmt.Errorf("变更实例排除系统库后,再也没有可以变更的库") } - c.dbs = finaldbs + + c.dbs = lo.Uniq(finaldbs) c.charset = c.Params.CharSet if c.Params.CharSet == "default" { if c.charset, err = conn.ShowServerCharset(); err != nil { @@ -185,6 +189,65 @@ func (c *SemanticDumpSchemaComp) Init() (err error) { return err } +func (c *SemanticDumpSchemaComp) getDumpdbs(alldbs []string, version string) (realexcutedbs []string, err error) { + finaldbs := []string{} + dbsExcluesysdbs := util.FilterOutStringSlice(alldbs, computil.GetGcsSystemDatabasesIgnoreTest(version)) + if c.Params.DumpAll { + logger.Info("param is dump all") + reg := regexp.MustCompile(`^bak_cbs`) + newBackupDbreg := regexp.MustCompile(`^stage_truncate`) + for _, db := range dbsExcluesysdbs { + if reg.MatchString(db) { + continue + } + if newBackupDbreg.MatchString(db) { + continue + } + finaldbs = append(finaldbs, db) + } + } else { + if len(lo.Intersect(alldbs, c.Params.ParseCreateDbs)) > 0 { + err = fmt.Errorf("create dbs %v,已经存在目标实例中", c.Params.ParseCreateDbs) + return nil, err + } + for _, f := range c.Params.ExecuteObjects { + var realexcutedbs []string + // 获得目标库 因为是通配符 所以需要获取完整名称 + intentionDbs, err := match(dbsExcluesysdbs, f.parseDbParamRe()) + if err != nil { + return nil, err + } + // 获得忽略库 + ignoreDbs, err := match(dbsExcluesysdbs, f.parseIgnoreDbParamRe()) + if err != nil { + return nil, err + } + // 获取最终需要执行的库 + realexcutedbs = util.FilterOutStringSlice(intentionDbs, ignoreDbs) + finaldbs = append(finaldbs, realexcutedbs...) + } + finaldbs = append(finaldbs, c.Params.ParseNeedDumpDbs...) + } + logger.Info("dump dbs:%v", finaldbs) + return finaldbs, nil +} + +func match(dbsExculeSysdb, regularDbNames []string) (matched []string, err error) { + for _, regexpStr := range regularDbNames { + re, err := regexp.Compile(regexpStr) + if err != nil { + logger.Error(" regexp.Compile(%s) failed:%s", regexpStr, err.Error()) + return nil, err + } + for _, db := range dbsExculeSysdb { + if re.MatchString(db) { + matched = append(matched, db) + } + } + } + return +} + // Precheck 预检查 // // @receiver c diff --git a/dbm-services/mysql/db-tools/dbactuator/pkg/util/mysqlutil/mysql_dumper.go b/dbm-services/mysql/db-tools/dbactuator/pkg/util/mysqlutil/mysql_dumper.go index 9c24b806d1..9118126d1c 100644 --- a/dbm-services/mysql/db-tools/dbactuator/pkg/util/mysqlutil/mysql_dumper.go +++ b/dbm-services/mysql/db-tools/dbactuator/pkg/util/mysqlutil/mysql_dumper.go @@ -189,6 +189,15 @@ func (m *MySQLDumperTogether) Dump() (err error) { if m.UseTMySQLDump { dumpOption = m.getTMySQLDumpOption() } + defer func() { + if err != nil { + errFileContext, e1 := osutil.ReadFileString(errFile) + if e1 != nil { + logger.Error("read errFile failed %s", e1.Error()) + } + logger.Error("errFile:%s", errFileContext) + } + }() dumpCmd := m.getDumpCmd(outputFile, errFile, dumpOption) logger.Info("mysqldump cmd:%s", mysqlcomm.ClearSensitiveInformation(dumpCmd)) output, err := osutil.StandardShellCommand(false, dumpCmd) diff --git a/dbm-ui/backend/components/sql_import/client.py b/dbm-ui/backend/components/sql_import/client.py index d2406d5466..535eccdc67 100644 --- a/dbm-ui/backend/components/sql_import/client.py +++ b/dbm-ui/backend/components/sql_import/client.py @@ -45,6 +45,11 @@ def __init__(self): url="/simulation/task/file", description=_("查询语义执行结果"), ) + self.query_relation_dbs_from_sqlfile = self.generate_data_api( + method="POST", + url="/syntax/parse/file/relation/db", + description=_("查询语义执行结果"), + ) SQLSimulationApi = _SQLSimulationApi() diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/import_sqlfile_flow.py b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/import_sqlfile_flow.py index b8d083a47f..311723c1e0 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/import_sqlfile_flow.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/import_sqlfile_flow.py @@ -8,6 +8,7 @@ an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ +import itertools import logging.config import os from dataclasses import asdict @@ -31,7 +32,7 @@ from backend.flow.plugins.components.collections.mysql.trans_flies import TransFileComponent from backend.flow.utils.mysql.mysql_act_dataclass import DownloadMediaKwargs, ExecActuatorKwargs from backend.flow.utils.mysql.mysql_act_playload import MysqlActPayload -from backend.flow.utils.mysql.mysql_commom_query import query_mysql_variables +from backend.flow.utils.mysql.mysql_commom_query import parse_db_from_sqlfile, query_mysql_variables from backend.ticket.constants import TicketType logger = logging.getLogger("flow") @@ -189,7 +190,19 @@ def sql_semantic_check_flow(self): ) ), ) + # parse db from sqlfile + sqlfile_list = itertools.chain(*[set(obj["sql_files"]) for obj in self.data["execute_objects"]]) + path = self.data["path"] + resp = parse_db_from_sqlfile(path=path, files=list(sqlfile_list)) + if resp is None: + logger.warning("root id:[{}]parse db from sqlfile resp is None,set dump_all to True.".format(self.root_id)) + else: + logger.info(f"resp: {resp}") + template_cluster["dump_all"] = resp.get("dump_all") + template_cluster["parse_need_dump_dbs"] = resp.get("dbs") + template_cluster["parse_create_dbs"] = resp.get("create_dbs") template_cluster["semantic_dump_schema_file_name_suffix"] = self.semantic_dump_schema_file_name_suffix + template_cluster["execute_objects"] = self.data["execute_objects"] semantic_check_pipeline.add_act( act_name=_("备份测试库表结构"), act_component_code=ExecuteDBActuatorScriptComponent.code, diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/spider/import_sqlfile_flow.py b/dbm-ui/backend/flow/engine/bamboo/scene/spider/import_sqlfile_flow.py index ba48975509..fabfe66309 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/spider/import_sqlfile_flow.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/spider/import_sqlfile_flow.py @@ -8,6 +8,7 @@ an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ +import itertools import logging.config import os from dataclasses import asdict @@ -32,6 +33,7 @@ from backend.flow.plugins.components.collections.mysql.trans_flies import TransFileComponent from backend.flow.utils.mysql.mysql_act_dataclass import DownloadMediaKwargs, ExecActuatorKwargs from backend.flow.utils.mysql.mysql_act_playload import MysqlActPayload +from backend.flow.utils.mysql.mysql_commom_query import parse_db_from_sqlfile from backend.flow.utils.mysql.mysql_version_parse import major_version_parse from backend.flow.utils.spider.spider_bk_config import get_spider_version_and_charset from backend.ticket.constants import TicketType @@ -174,6 +176,18 @@ def sql_semantic_check_flow(self): ) ), ) + # parse db from sqlfile + sqlfile_list = itertools.chain(*[set(obj["sql_files"]) for obj in self.data["execute_objects"]]) + path = self.data["path"] + resp = parse_db_from_sqlfile(path=path, files=list(sqlfile_list)) + if resp is None: + logger.warning("root id:[{}]parse db from sqlfile resp is None,set dump_all to True.".format(self.root_id)) + else: + logger.info(f"resp: {resp}") + cluster["dump_all"] = resp.get("dump_all") + cluster["parse_need_dump_dbs"] = resp.get("dbs") + cluster["parse_create_dbs"] = resp.get("create_dbs") + cluster["execute_objects"] = self.data["execute_objects"] cluster["semantic_dump_schema_file_name_suffix"] = self.semantic_dump_schema_file_name_suffix semantic_check_pipeline.add_act( act_name=_("备份测试库表结构"), diff --git a/dbm-ui/backend/flow/utils/mysql/mysql_act_playload.py b/dbm-ui/backend/flow/utils/mysql/mysql_act_playload.py index 0558f9bb1c..b632457acf 100644 --- a/dbm-ui/backend/flow/utils/mysql/mysql_act_playload.py +++ b/dbm-ui/backend/flow/utils/mysql/mysql_act_playload.py @@ -894,6 +894,10 @@ def get_semantic_dump_schema_payload(self, **kwargs): "host": kwargs["ip"], "port": self.cluster["port"], "charset": self.ticket_data["charset"], + "dump_all": self.cluster.get("dump_all", True), + "parse_need_dump_dbs": self.cluster.get("parse_need_dump_dbs", []), + "parse_create_dbs": self.cluster.get("parse_create_dbs", []), + "execute_objects": self.cluster.get("execute_objects", None), "backup_file_name": f"{self.cluster['semantic_dump_schema_file_name']}", "backup_file_name_suffix": f"{self.cluster['semantic_dump_schema_file_name_suffix']}", "backup_dir": BK_PKG_INSTALL_PATH, diff --git a/dbm-ui/backend/flow/utils/mysql/mysql_commom_query.py b/dbm-ui/backend/flow/utils/mysql/mysql_commom_query.py index 0d8bdd37db..497424b437 100644 --- a/dbm-ui/backend/flow/utils/mysql/mysql_commom_query.py +++ b/dbm-ui/backend/flow/utils/mysql/mysql_commom_query.py @@ -14,6 +14,7 @@ from django.utils.translation import gettext as _ from backend.components.db_remote_service.client import DRSApi +from backend.components.sql_import.client import SQLSimulationApi from backend.constants import IP_PORT_DIVIDER from backend.db_meta.models import StorageInstance from backend.flow.utils.mysql.mysql_version_parse import mysql_version_parse @@ -136,3 +137,38 @@ def check_backend_in_proxy(proxys: List[str], bk_cloud_id: int): is_pass = False return is_pass + + +def parse_db_from_sqlfile(path: str, files: List[str]): + """ + 从变更sql文件中解析出变更相关的DB + respone data is : + { + "data": { + "create_dbs": [ + "xxx" + ], + "dbs": null, + "dump_all": false, + "timestamp": 1733734571 + }, + "request_id": "9faaf67f-1b09-4575-8974-472677b2db5b", + "msg": "", + "code": 0 + } + create_dbs: create database + dbs: need dump database + dump_all: 是否需要dump所有数据库 + """ + payload = {} + payload["path"] = path + payload["files"] = files + try: + resp = SQLSimulationApi.query_relation_dbs_from_sqlfile(payload, raw=True) + if resp["code"] != 0: + logger.error(_("从SQL文件解析变更相关DB失败: {}").format(resp)) + return None + return resp["data"] + except Exception as e: + logger.error(f"parse db from sqlfile failed: [{e}]") + return None