From 2495049881a0a211d53153fc70fdad55d07542a6 Mon Sep 17 00:00:00 2001
From: yuanruji <ymakedaq@163.com>
Date: Tue, 7 Jan 2025 18:13:35 +0800
Subject: [PATCH] =?UTF-8?q?feat(dbm-services):=20=E5=88=86=E6=9E=90?=
 =?UTF-8?q?=E8=AF=AD=E6=B3=95=E6=96=87=E4=BB=B6=E9=87=8C=E9=9D=A2=E5=85=B3?=
 =?UTF-8?q?=E8=81=94=E7=9A=84=E5=85=B7=E4=BD=93=E8=A1=A8,=E7=BB=86?=
 =?UTF-8?q?=E5=8C=96=E6=A8=A1=E6=8B=9F=E6=89=A7=E8=A1=8C=E7=9A=84=E8=8C=83?=
 =?UTF-8?q?=E5=9B=B4=20#8925?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

---
 .../app/syntax/parse_relation_db.go           | 114 +++++++++++++++---
 .../mysql/db-simulation/app/syntax/syntax.go  |   7 +-
 .../app/syntax/tmysqlpase_schema.go           |  12 ++
 .../db-simulation/handler/syntax_check.go     |  52 +++++++-
 .../pkg/components/mysql/mysql_upgrade.go     |   2 +-
 .../components/mysql/semantic_dump_schema.go  |  58 +++++++--
 .../mysql_proxy/install_mysql_proxy.go        |   5 +-
 .../mysql_proxy/uninstall_mysql_proxy.go      |  16 +--
 .../mysql_proxy/upgrade_mysql_proxy.go        |   2 +-
 .../db-tools/dbactuator/pkg/core/cst/cst.go   |   2 +
 .../dbactuator/pkg/native/dbworker.go         |   1 +
 .../pkg/util/mysqlutil/mysql_dumper.go        | 108 ++++++++++++++++-
 .../bamboo/scene/mysql/import_sqlfile_flow.py |  11 +-
 .../bamboo/scene/mysql/mysql_proxy_upgrade.py |   4 +-
 .../scene/spider/import_sqlfile_flow.py       |   7 +-
 .../flow/utils/mysql/mysql_act_playload.py    |   2 +
 .../flow/utils/mysql/mysql_commom_query.py    |  14 +++
 17 files changed, 357 insertions(+), 60 deletions(-)

diff --git a/dbm-services/mysql/db-simulation/app/syntax/parse_relation_db.go b/dbm-services/mysql/db-simulation/app/syntax/parse_relation_db.go
index 584edcf31a..d3235e7acd 100644
--- a/dbm-services/mysql/db-simulation/app/syntax/parse_relation_db.go
+++ b/dbm-services/mysql/db-simulation/app/syntax/parse_relation_db.go
@@ -14,6 +14,7 @@ import (
 	"bufio"
 	"encoding/json"
 	"errors"
+	"fmt"
 	"io"
 	"os"
 	"runtime/debug"
@@ -29,7 +30,8 @@ import (
 const AnalyzeConcurrency = 10
 
 // DoParseRelationDbs parse relation db from sql file
-func (tf *TmysqlParseFile) DoParseRelationDbs(version string) (createDbs, relationDbs []string, dumpAll bool,
+func (tf *TmysqlParseFile) DoParseRelationDbs(version string) (createDbs, relationDbs, allCommands []string,
+	dumpAll bool,
 	err error) {
 	logger.Info("doing....")
 	tf.result = make(map[string]*CheckInfo)
@@ -39,15 +41,13 @@ func (tf *TmysqlParseFile) DoParseRelationDbs(version string) (createDbs, relati
 	if !tf.IsLocalFile {
 		if err = tf.Init(); err != nil {
 			logger.Error("Do init failed %s", err.Error())
-			return nil, nil, false, err
+			return nil, nil, nil, false, err
 		}
 		if err = tf.Downloadfile(); err != nil {
 			logger.Error("failed to download sql file from the product library %s", err.Error())
-			return nil, nil, false, err
+			return nil, nil, nil, false, err
 		}
 	}
-	// 最后删除临时目录,不会返回错误
-	defer tf.delTempDir()
 	logger.Info("all sqlfiles download ok ~")
 	alreadExecutedSqlfileChan := make(chan string, len(tf.Param.FileNames))
 
@@ -59,12 +59,13 @@ func (tf *TmysqlParseFile) DoParseRelationDbs(version string) (createDbs, relati
 	}()
 
 	logger.Info("start to analyze the parsing result")
-	createDbs, relationDbs, dumpAll, err = tf.doParseInchan(alreadExecutedSqlfileChan, version)
+	createDbs, relationDbs, allCommands, dumpAll, err = tf.doParseInchan(alreadExecutedSqlfileChan, version)
 	if err != nil {
 		logger.Error("failed to analyze the parsing result:%s", err.Error())
-		return nil, nil, false, err
+		return nil, nil, nil, false, err
 	}
-	logger.Info("createDbs:%v,relationDbs:%v,dumpAll:%v,err:%v", createDbs, relationDbs, dumpAll, err)
+	logger.Info("createDbs:%v,relationDbs:%v,allcomands%v,dumpAll:%v,err:%v", createDbs, relationDbs, allCommands, dumpAll,
+		err)
 	dumpdbs := []string{}
 	for _, d := range relationDbs {
 		if slices.Contains(createDbs, d) {
@@ -72,12 +73,12 @@ func (tf *TmysqlParseFile) DoParseRelationDbs(version string) (createDbs, relati
 		}
 		dumpdbs = append(dumpdbs, d)
 	}
-	return lo.Uniq(createDbs), lo.Uniq(dumpdbs), dumpAll, nil
+	return lo.Uniq(createDbs), lo.Uniq(dumpdbs), lo.Uniq(allCommands), dumpAll, nil
 }
 
 // doParseInchan RelationDbs do parse relation db
 func (t *TmysqlParse) doParseInchan(alreadExecutedSqlfileCh chan string,
-	mysqlVersion string) (createDbs []string, relationDbs []string, dumpAll bool, err error) {
+	mysqlVersion string) (createDbs []string, relationDbs []string, allCommands []string, dumpAll bool, err error) {
 	var errs []error
 	c := make(chan struct{}, AnalyzeConcurrency)
 	errChan := make(chan error)
@@ -90,21 +91,23 @@ func (t *TmysqlParse) doParseInchan(alreadExecutedSqlfileCh chan string,
 		c <- struct{}{}
 		go func(fileName string) {
 			defer wg.Done()
-			cdbs, dbs, dumpAllDbs, err := t.analyzeRelationDbs(fileName, mysqlVersion)
+			cdbs, dbs, commands, dumpAllDbs, err := t.analyzeRelationDbs(fileName, mysqlVersion)
 			logger.Info("createDbs:%v,dbs:%v,dumpAllDbs:%v,err:%v", cdbs, dbs, dumpAllDbs, err)
 			if err != nil {
+				logger.Error("analyzeRelationDbs failed %s", err.Error())
 				errChan <- err
+				return
 			}
 			// 如果有dumpall 则直接返回退出,不在继续分析
 			if dumpAllDbs {
 				dumpAll = true
 				<-c
-				wg.Done()
 				stopChan <- struct{}{}
 			}
 			t.mu.Lock()
 			relationDbs = append(relationDbs, dbs...)
 			createDbs = append(createDbs, cdbs...)
+			allCommands = append(allCommands, commands...)
 			t.mu.Unlock()
 			<-c
 		}(sqlfile)
@@ -121,7 +124,7 @@ func (t *TmysqlParse) doParseInchan(alreadExecutedSqlfileCh chan string,
 		case err := <-errChan:
 			errs = append(errs, err)
 		case <-stopChan:
-			return createDbs, relationDbs, dumpAll, errors.Join(errs...)
+			return createDbs, relationDbs, allCommands, dumpAll, errors.Join(errs...)
 		}
 	}
 }
@@ -130,6 +133,7 @@ func (t *TmysqlParse) doParseInchan(alreadExecutedSqlfileCh chan string,
 func (t *TmysqlParse) analyzeRelationDbs(inputfileName, mysqlVersion string) (
 	createDbs []string,
 	relationDbs []string,
+	allCommandType []string,
 	dumpAll bool,
 	err error) {
 	defer func() {
@@ -140,7 +144,7 @@ func (t *TmysqlParse) analyzeRelationDbs(inputfileName, mysqlVersion string) (
 	f, err := os.Open(t.getAbsoutputfilePath(inputfileName, mysqlVersion))
 	if err != nil {
 		logger.Error("open file failed %s", err.Error())
-		return nil, nil, false, err
+		return nil, nil, nil, false, err
 	}
 	defer f.Close()
 	reader := bufio.NewReader(f)
@@ -151,7 +155,7 @@ func (t *TmysqlParse) analyzeRelationDbs(inputfileName, mysqlVersion string) (
 				break
 			}
 			logger.Error("read Line Error %s", errx.Error())
-			return nil, nil, false, errx
+			return nil, nil, nil, false, errx
 		}
 		if len(line) == 1 && line[0] == byte('\n') {
 			continue
@@ -159,16 +163,19 @@ func (t *TmysqlParse) analyzeRelationDbs(inputfileName, mysqlVersion string) (
 		var res ParseLineQueryBase
 		if err = json.Unmarshal(line, &res); err != nil {
 			logger.Error("json unmasrshal line:%s failed %s", string(line), err.Error())
-			return nil, nil, false, err
+			return nil, nil, nil, false, err
 		}
 		// 判断是否有语法错误
 		if res.ErrorCode != 0 {
-			return nil, nil, false, err
+			return nil, nil, nil, false, fmt.Errorf("%s", res.ErrorMsg)
+		}
+		if lo.IsNotEmpty(res.Command) {
+			allCommandType = append(allCommandType, res.Command)
 		}
 		if slices.Contains([]string{SQLTypeCreateProcedure, SQLTypeCreateFunction, SQLTypeCreateView, SQLTypeCreateTrigger,
 			SQLTypeInsertSelect, SQLTypeRelaceSelect},
 			res.Command) {
-			return nil, nil, true, nil
+			return nil, nil, nil, true, nil
 		}
 		if lo.IsEmpty(res.DbName) {
 			continue
@@ -181,5 +188,74 @@ func (t *TmysqlParse) analyzeRelationDbs(inputfileName, mysqlVersion string) (
 		relationDbs = append(relationDbs, res.DbName)
 
 	}
-	return createDbs, relationDbs, false, nil
+	return createDbs, relationDbs, allCommandType, false, nil
+}
+
+// ParseSpecialTbls parse special tables
+func (tf *TmysqlParseFile) ParseSpecialTbls(mysqlVersion string) (relationTbls []RelationTbl, err error) {
+	m := make(map[string][]string)
+	for _, fileName := range tf.Param.FileNames {
+		mm, err := tf.parseSpecialSQLFile(fileName, mysqlVersion)
+		if err != nil {
+			logger.Error("parseAlterSQLFile failed %s", err.Error())
+			return nil, err
+		}
+		for k, v := range mm {
+			m[k] = append(m[k], v...)
+		}
+	}
+	for k, v := range m {
+		relationTbls = append(relationTbls, RelationTbl{
+			DbName: k,
+			Tbls:   v,
+		})
+	}
+	return relationTbls, nil
+}
+
+// RelationTbl dunmp db and table
+type RelationTbl struct {
+	DbName string   `json:"db_name"`
+	Tbls   []string `json:"tbls"`
+}
+
+// parseSpecialSQLFile 解析指定库表
+func (t *TmysqlParse) parseSpecialSQLFile(inputfileName, mysqlVersion string) (m map[string][]string, err error) {
+	f, err := os.Open(t.getAbsoutputfilePath(inputfileName, mysqlVersion))
+	if err != nil {
+		logger.Error("open file failed %s", err.Error())
+		return nil, err
+	}
+	m = make(map[string][]string)
+	defer f.Close()
+	reader := bufio.NewReader(f)
+	for {
+		line, errx := reader.ReadBytes(byte('\n'))
+		if errx != nil {
+			if errx == io.EOF {
+				break
+			}
+			logger.Error("read Line Error %s", errx.Error())
+			return nil, errx
+		}
+		if len(line) == 1 && line[0] == byte('\n') {
+			continue
+		}
+		var baseRes ParseIncludeTableBase
+		if err = json.Unmarshal(line, &baseRes); err != nil {
+			logger.Error("json unmasrshal line:%s failed %s", string(line), err.Error())
+			return nil, err
+		}
+		dbName := ""
+		if baseRes.Command == SQLTypeUseDb {
+			dbName = baseRes.DbName
+		}
+		if lo.IsNotEmpty(baseRes.DbName) {
+			dbName = baseRes.DbName
+		}
+		if lo.IsNotEmpty(baseRes.TableName) {
+			m[dbName] = append(m[dbName], baseRes.TableName)
+		}
+	}
+	return m, nil
 }
diff --git a/dbm-services/mysql/db-simulation/app/syntax/syntax.go b/dbm-services/mysql/db-simulation/app/syntax/syntax.go
index d601990f85..a012923a42 100644
--- a/dbm-services/mysql/db-simulation/app/syntax/syntax.go
+++ b/dbm-services/mysql/db-simulation/app/syntax/syntax.go
@@ -128,7 +128,7 @@ func (tf *TmysqlParseFile) Do(dbtype string, versions []string) (result map[stri
 		}
 	}
 	// 最后删除临时目录,不会返回错误
-	defer tf.delTempDir()
+	defer tf.DelTempDir()
 
 	var errs []error
 	for _, version := range versions {
@@ -184,7 +184,7 @@ func (tf *TmysqlParseFile) CreateAndUploadDDLTblFile() (err error) {
 	}
 	// 最后删除临时目录,不会返回错误
 	// 暂时屏蔽 观察过程文件
-	defer tf.delTempDir()
+	defer tf.DelTempDir()
 
 	if err = tf.Downloadfile(); err != nil {
 		logger.Error("failed to download sql file from the product library %s", err.Error())
@@ -248,7 +248,8 @@ func (t *TmysqlParse) Init() (err error) {
 	return nil
 }
 
-func (t *TmysqlParse) delTempDir() {
+// DelTempDir TODO
+func (t *TmysqlParse) DelTempDir() {
 	if err := os.RemoveAll(t.tmpWorkdir); err != nil {
 		logger.Warn("remove tempDir:" + t.tmpWorkdir + ".error info:" + err.Error())
 	}
diff --git a/dbm-services/mysql/db-simulation/app/syntax/tmysqlpase_schema.go b/dbm-services/mysql/db-simulation/app/syntax/tmysqlpase_schema.go
index 9733443be2..06aab49b48 100644
--- a/dbm-services/mysql/db-simulation/app/syntax/tmysqlpase_schema.go
+++ b/dbm-services/mysql/db-simulation/app/syntax/tmysqlpase_schema.go
@@ -49,6 +49,10 @@ const (
 	SQLTypeInsertSelect = "insert_select"
 	// SQLTypeRelaceSelect replace select sql
 	SQLTypeRelaceSelect = "replace_select"
+	// SQLTypeDropTable drop table sql
+	SQLTypeDropTable = "drop_table"
+	// SQLTypeCreateIndex is creat table sql
+	SQLTypeCreateIndex = "create_index"
 )
 
 // NotAllowedDefaulValColMap 不允许默认值的字段
@@ -309,3 +313,11 @@ type UpdateResult struct {
 	HasWhere         bool   `json:"has_where"`
 	Limit            int    `json:"limit"`
 }
+
+// ParseIncludeTableBase parse include table
+type ParseIncludeTableBase struct {
+	QueryID   int    `json:"query_id"`
+	Command   string `json:"command"`
+	DbName    string `json:"db_name"`
+	TableName string `json:"table_name"`
+}
diff --git a/dbm-services/mysql/db-simulation/handler/syntax_check.go b/dbm-services/mysql/db-simulation/handler/syntax_check.go
index aec1b8743f..0416504e1d 100644
--- a/dbm-services/mysql/db-simulation/handler/syntax_check.go
+++ b/dbm-services/mysql/db-simulation/handler/syntax_check.go
@@ -17,6 +17,7 @@ import (
 	"time"
 
 	"github.com/gin-gonic/gin"
+	"github.com/samber/lo"
 	"github.com/spf13/viper"
 
 	"dbm-services/common/go-pubpkg/cmutil"
@@ -251,11 +252,29 @@ func (s SyntaxHandler) ParseSQLFileRelationDb(r *gin.Context) {
 			FileNames:      param.Files,
 		},
 	}
-	createDbs, dbs, dumpall, err := p.DoParseRelationDbs("")
+	createDbs, dbs, allCommands, dumpall, err := p.DoParseRelationDbs("")
 	if err != nil {
 		s.SendResponse(r, err, nil)
 		return
 	}
+	// 如果所有的命令都是alter table, dump指定库表
+	logger.Debug("debug: %v,%d", allCommands, len(allCommands))
+	if isAllOperateTable(allCommands) || isAllCreateTable(allCommands) {
+		relationTbls, err := p.ParseSpecialTbls("")
+		if err != nil {
+			s.SendResponse(r, err, nil)
+			return
+		}
+		s.SendResponse(r, nil, gin.H{
+			"create_dbs":             createDbs,
+			"dbs":                    dbs,
+			"dump_all":               false,
+			"just_dump_special_tbls": true,
+			"special_tbls":           relationTbls,
+			"timestamp":              time.Now().Unix(),
+		})
+		return
+	}
 
 	s.SendResponse(r, nil, gin.H{
 		"create_dbs": createDbs,
@@ -265,6 +284,15 @@ func (s SyntaxHandler) ParseSQLFileRelationDb(r *gin.Context) {
 	})
 }
 
+func isAllOperateTable(allCommands []string) bool {
+	return lo.Every([]string{syntax.SQLTypeAlterTable, syntax.SQLTypeUseDb,
+		syntax.SQLTypeCreateIndex, syntax.SQLTypeDropTable}, allCommands)
+}
+
+func isAllCreateTable(allCommands []string) bool {
+	return lo.Every([]string{syntax.SQLTypeCreateTable, syntax.SQLTypeUseDb}, allCommands)
+}
+
 // ParseSQLRelationDb  语法检查入参SQL string
 func (s *SyntaxHandler) ParseSQLRelationDb(r *gin.Context) {
 	var param CheckSQLStringParam
@@ -298,11 +326,31 @@ func (s *SyntaxHandler) ParseSQLRelationDb(r *gin.Context) {
 			FileNames:      []string{fileName},
 		},
 	}
-	createDbs, dbs, dumpall, err := p.DoParseRelationDbs("")
+	// defer p.DelTempDir()
+	createDbs, dbs, allCommands, dumpall, err := p.DoParseRelationDbs("")
 	if err != nil {
 		s.SendResponse(r, err, nil)
 		return
 	}
+	// 如果所有的命令都是alter table, dump指定库表
+	logger.Info("make debug: %v,%d", allCommands, len(allCommands))
+	if isAllOperateTable(allCommands) || isAllCreateTable(allCommands) {
+		relationTbls, err := p.ParseSpecialTbls("")
+		if err != nil {
+			s.SendResponse(r, err, nil)
+			return
+		}
+		s.SendResponse(r, nil, gin.H{
+			"create_dbs":             createDbs,
+			"dbs":                    dbs,
+			"dump_all":               false,
+			"just_dump_special_tbls": true,
+			"special_tbls":           relationTbls,
+			"timestamp":              time.Now().Unix(),
+		})
+		return
+	}
+
 	s.SendResponse(r, nil, gin.H{
 		"create_dbs": createDbs,
 		"dbs":        dbs,
diff --git a/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql/mysql_upgrade.go b/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql/mysql_upgrade.go
index 228354254d..ac0f5a8a3d 100644
--- a/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql/mysql_upgrade.go
+++ b/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql/mysql_upgrade.go
@@ -629,7 +629,7 @@ func (m MysqlUpgradeComp) mysqlUpgrade(conn *native.DbWorker, port int) (err err
 		return nil
 	}
 	// open general_log
-	if errx := m.openGeneralLog(conn); err != nil {
+	if errx := m.openGeneralLog(conn); errx != nil {
 		logger.Warn("set global general_log=on failed %s", errx.Error())
 	}
 	upgradeScript := ""
diff --git a/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql/semantic_dump_schema.go b/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql/semantic_dump_schema.go
index 93c8c17a3d..f67a7080df 100644
--- a/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql/semantic_dump_schema.go
+++ b/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql/semantic_dump_schema.go
@@ -55,12 +55,19 @@ type DumpSchemaParam struct {
 	ParseNeedDumpDbs []string `json:"parse_need_dump_dbs"`
 	// SQL 语句中解析出来的create database dbs
 	// 需要导出的原因是复现 create database 是否已经存在的错误
-	ParseCreateDbs []string            `json:"parse_create_dbs"`
-	ExecuteObjects []ExecuteSQLFileObj `json:"execute_objects"`
-
+	ParseCreateDbs      []string            `json:"parse_create_dbs"`
+	ExecuteObjects      []ExecuteSQLFileObj `json:"execute_objects"`
+	JustDumpSpecialTbls bool                `json:"just_dump_special_tbls"`
+	SpecialTbls         []SpecialTblInfo    `json:"special_tbls"`
 	UploadBkRepoParam
 }
 
+// SpecialTblInfo TODO
+type SpecialTblInfo struct {
+	DbName string   `json:"db_name"`
+	Tbls   []string `json:"tbls"`
+}
+
 // UploadBkRepoParam upload to bk repo param
 type UploadBkRepoParam struct {
 	BackupFileName string     `json:"backup_file_name"`
@@ -195,7 +202,7 @@ func (c *SemanticDumpSchemaComp) getDumpdbs(alldbs []string, version string) (re
 	if c.Params.DumpAll {
 		logger.Info("param is dump all")
 		reg := regexp.MustCompile(`^bak_cbs`)
-		newBackupDbreg := regexp.MustCompile(`^stage_truncate`)
+		newBackupDbreg := regexp.MustCompile(fmt.Sprintf("^%s", cst.StageDbHeader))
 		for _, db := range dbsExcluesysdbs {
 			if reg.MatchString(db) {
 				continue
@@ -223,7 +230,7 @@ func (c *SemanticDumpSchemaComp) getDumpdbs(alldbs []string, version string) (re
 			finaldbs = append(finaldbs, realexcutedbs...)
 		}
 		createSQLExistDbs := lo.Intersect(alldbs, c.Params.ParseCreateDbs)
-		finaldbs = append(finaldbs, c.Params.ParseNeedDumpDbs...)
+		finaldbs = append(finaldbs, lo.Intersect(alldbs, c.Params.ParseNeedDumpDbs)...)
 		finaldbs = append(finaldbs, createSQLExistDbs...)
 	}
 	logger.Info("dump dbs:%v", finaldbs)
@@ -280,7 +287,38 @@ func (c *SemanticDumpSchemaComp) DumpSchema() (err error) {
 		dumpOption.GtidPurgedOff = true
 		c.useTmysqldump = false
 	}
-	dumper = &mysqlutil.MySQLDumperTogether{
+	switch {
+	case c.Params.JustDumpSpecialTbls:
+		return c.DumpSpecialTables(dumpOption)
+	default:
+		dumper = &mysqlutil.MySQLDumperTogether{
+			MySQLDumper: mysqlutil.MySQLDumper{
+				DumpDir:         c.Params.BackupDir,
+				Ip:              c.Params.Host,
+				Port:            c.Params.Port,
+				DbBackupUser:    c.GeneralParam.RuntimeAccountParam.AdminUser,
+				DbBackupPwd:     c.GeneralParam.RuntimeAccountParam.AdminPwd,
+				DbNames:         c.dbs,
+				IgnoreTables:    c.ignoreTables,
+				DumpCmdFile:     c.dumpCmd,
+				Charset:         c.charset,
+				MySQLDumpOption: dumpOption,
+			},
+			UseTMySQLDump:  c.useTmysqldump,
+			OutputfileName: c.Params.BackupFileName,
+		}
+		if err := dumper.Dump(); err != nil {
+			logger.Error("dump failed: %s", err.Error())
+			return err
+		}
+		return nil
+	}
+}
+
+// DumpSpecialTables dump zhi special tables
+func (c *SemanticDumpSchemaComp) DumpSpecialTables(dumpOption mysqlutil.MySQLDumpOption) (err error) {
+	dumpOption.Force = true
+	dumper := &mysqlutil.MySQLDumperAppend{
 		MySQLDumper: mysqlutil.MySQLDumper{
 			DumpDir:         c.Params.BackupDir,
 			Ip:              c.Params.Host,
@@ -293,7 +331,13 @@ func (c *SemanticDumpSchemaComp) DumpSchema() (err error) {
 			Charset:         c.charset,
 			MySQLDumpOption: dumpOption,
 		},
-		UseTMySQLDump:  c.useTmysqldump,
+		DumpMap: lo.SliceToMap(c.Params.SpecialTbls, func(item SpecialTblInfo) (string, []string) {
+			if len(item.Tbls) > 0 {
+				return item.DbName,
+					item.Tbls
+			}
+			return item.DbName, []string{}
+		}),
 		OutputfileName: c.Params.BackupFileName,
 	}
 	if err := dumper.Dump(); err != nil {
diff --git a/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql_proxy/install_mysql_proxy.go b/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql_proxy/install_mysql_proxy.go
index 5a8e76e959..98428fb2d8 100644
--- a/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql_proxy/install_mysql_proxy.go
+++ b/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql_proxy/install_mysql_proxy.go
@@ -114,7 +114,7 @@ func (i *InstallMySQLProxyComp) Init() (err error) {
 /**
  * @description: 预检查:
  * 				- 检查是否存在安装proxy的路径
- *				- 检查是否存在proxy processs
+ *				- 检查是否存在proxy processes
  * 				- 检查安装包是否存在,如果存在检查md5是否正确
  * 				-
  * @return {*}
@@ -312,8 +312,7 @@ func (i *InstallMySQLProxyComp) DecompressPkg() (err error) {
 		proxyRealDirName,
 	)
 	if output, err := osutil.ExecShellCommand(false, extraCmd); err != nil {
-		err := fmt.Errorf("execute shell[%s]  get an error:%w and output:%s", extraCmd, err, output)
-		return err
+		return fmt.Errorf("execute shell[%s]  get an error:%w and output:%s", extraCmd, err, output)
 	}
 	logger.Info("untar %s successfully", i.Params.Pkg)
 	return nil
diff --git a/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql_proxy/uninstall_mysql_proxy.go b/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql_proxy/uninstall_mysql_proxy.go
index 2ea22e41ea..b76a476612 100644
--- a/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql_proxy/uninstall_mysql_proxy.go
+++ b/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql_proxy/uninstall_mysql_proxy.go
@@ -76,20 +76,20 @@ func (u *UnInstallMySQLProxyComp) Init() (err error) {
 func (u *UnInstallMySQLProxyComp) PreCheck() (err error) {
 	for _, port := range u.Params.Ports {
 		if !u.Params.Force {
-			db, err := native.InsObject{
+			db, errx := native.InsObject{
 				Host: u.Params.Host,
 				User: u.runTimeCtx.proxyAdminUser,
 				Pwd:  u.proxyAdminPwd,
 				Port: port,
 			}.ConnProxyAdmin()
-			if err != nil {
-				logger.Error("连接%d的Admin Port 失败%s", port, err.Error())
-				return err
+			if errx != nil {
+				logger.Error("连接%d的Admin Port 失败:%s", port, errx.Error())
+				return errx
 			}
-			inuse, err := db.CheckProxyInUse()
-			if err != nil {
-				logger.Error("检查Proxy可用性检查失败")
-				return err
+			inuse, errx := db.CheckProxyInUse()
+			if errx != nil {
+				logger.Error("检查Proxy可用性检查失败:%s", errx.Error())
+				return errx
 			}
 			if inuse {
 				return fmt.Errorf("检测到%d存在可用连接", port)
diff --git a/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql_proxy/upgrade_mysql_proxy.go b/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql_proxy/upgrade_mysql_proxy.go
index f79ad154a5..1ef299de13 100644
--- a/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql_proxy/upgrade_mysql_proxy.go
+++ b/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql_proxy/upgrade_mysql_proxy.go
@@ -122,7 +122,7 @@ func (p *UpgradeProxyComp) checkAppProcessist() (err error) {
 		if len(activeprocesslist) > 0 {
 			errMsg := fmt.Sprintf("还存在活跃的业务连接,请先确认,具体连接%v", activeprocesslist)
 			logger.Error(errMsg)
-			return fmt.Errorf(errMsg)
+			return fmt.Errorf("%s", errMsg)
 		}
 	}
 	return nil
diff --git a/dbm-services/mysql/db-tools/dbactuator/pkg/core/cst/cst.go b/dbm-services/mysql/db-tools/dbactuator/pkg/core/cst/cst.go
index 152c85a3c7..0a7c8561da 100644
--- a/dbm-services/mysql/db-tools/dbactuator/pkg/core/cst/cst.go
+++ b/dbm-services/mysql/db-tools/dbactuator/pkg/core/cst/cst.go
@@ -32,6 +32,8 @@ const (
 const (
 	// DefaultCharset default charset
 	DefaultCharset = "default"
+	// StageDbHeader backup db header
+	StageDbHeader = "stage_truncate"
 )
 
 // GetNowTimeLayoutStr 20060102150405
diff --git a/dbm-services/mysql/db-tools/dbactuator/pkg/native/dbworker.go b/dbm-services/mysql/db-tools/dbactuator/pkg/native/dbworker.go
index 6d4e40557f..610dbe7e0e 100644
--- a/dbm-services/mysql/db-tools/dbactuator/pkg/native/dbworker.go
+++ b/dbm-services/mysql/db-tools/dbactuator/pkg/native/dbworker.go
@@ -120,6 +120,7 @@ func (h *DbWorker) ExecMore(sqls []string) (rowsAffectedCount int64, err error)
 	return h.ExecMoreContext(sqls, ctx)
 }
 
+// ExecMoreContext TODO
 func (h *DbWorker) ExecMoreContext(sqls []string, ctx context.Context) (rowsAffected int64, err error) {
 	var c int64
 	db, err := h.Db.Conn(ctx)
diff --git a/dbm-services/mysql/db-tools/dbactuator/pkg/util/mysqlutil/mysql_dumper.go b/dbm-services/mysql/db-tools/dbactuator/pkg/util/mysqlutil/mysql_dumper.go
index 0786b2acda..2604cf13ce 100644
--- a/dbm-services/mysql/db-tools/dbactuator/pkg/util/mysqlutil/mysql_dumper.go
+++ b/dbm-services/mysql/db-tools/dbactuator/pkg/util/mysqlutil/mysql_dumper.go
@@ -11,6 +11,7 @@
 package mysqlutil
 
 import (
+	"bufio"
 	"errors"
 	"fmt"
 	"os"
@@ -18,6 +19,7 @@ import (
 	"path/filepath"
 	"regexp"
 	"runtime"
+	"slices"
 	"strings"
 	"sync"
 
@@ -54,6 +56,7 @@ type MySQLDumpOption struct {
 	GtidPurgedOff           bool // --set-gtid-purged=OFF
 	Quick                   bool
 	ExtendedInsert          bool
+	Force                   bool
 }
 
 type runtimectx struct {
@@ -111,7 +114,7 @@ func (m MySQLDumper) Dump() (err error) {
 				<-concurrencyControl
 			}()
 			errFile := path.Join(dump.DumpDir, fmt.Sprintf("%s.err", db))
-			dumpCmd := dump.getDumpCmd(outputFile, errFile, "")
+			dumpCmd := dump.getDumpCmd(outputFile, errFile, "", false)
 			logger.Info("mysqldump cmd:%s", mysqlcomm.RemovePassword(dumpCmd))
 			output, err := osutil.StandardShellCommand(false, dumpCmd)
 			if err != nil {
@@ -198,7 +201,7 @@ func (m *MySQLDumperTogether) Dump() (err error) {
 			logger.Error("errFile:%s", errFileContext)
 		}
 	}()
-	dumpCmd := m.getDumpCmd(outputFile, errFile, dumpOption)
+	dumpCmd := m.getDumpCmd(outputFile, errFile, dumpOption, false)
 	logger.Info("mysqldump cmd:%s", mysqlcomm.ClearSensitiveInformation(dumpCmd))
 	output, err := osutil.StandardShellCommand(false, dumpCmd)
 	if err != nil {
@@ -211,6 +214,98 @@ func (m *MySQLDumperTogether) Dump() (err error) {
 	return
 }
 
+// MySQLDumperAppend 不同库表导出到同一个文件
+type MySQLDumperAppend struct {
+	MySQLDumper
+	OutputfileName string
+	DumpMap        map[string][]string
+}
+
+// Dump do dump
+func (m *MySQLDumperAppend) Dump() (err error) {
+	outputFile := path.Join(m.DumpDir, m.OutputfileName)
+	errFile := path.Join(m.DumpDir, m.OutputfileName+".err")
+	fd, err := os.OpenFile(outputFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
+	if err != nil {
+		return fmt.Errorf("open file failed %s", err.Error())
+	}
+	_, err = fd.WriteString("-- dump schema for dbm simulation\n")
+	if err != nil {
+		logger.Error("write file failed %s", err.Error())
+		return err
+	}
+	defer func() {
+		if err != nil {
+			if cmutil.FileExists(errFile) {
+				errMsg, errx := osutil.ReadFileString(errFile)
+				if errx != nil {
+					logger.Error("read errFile failed %s", errx.Error())
+				}
+				logger.Error("errFile contenxt:%s", errMsg)
+			}
+		}
+	}()
+	defer fd.Close()
+	inputdbs := m.DbNames
+	for db, tables := range m.DumpMap {
+		var realdbs []string
+		if lo.IsNotEmpty(db) {
+			// inputdbs是实际存在的库
+			// 如果dumpMap中的库不在inputdbs中,直接跳过
+			// 让错误在模拟执行中体现
+			if !slices.Contains(inputdbs, db) {
+				logger.Warn("db %s not in inputdbs %v", db, inputdbs)
+				continue
+			}
+			realdbs = []string{db}
+		} else {
+			realdbs = inputdbs
+		}
+		for _, realdb := range realdbs {
+			_, err = fd.WriteString(fmt.Sprintf("CREATE DATABASE IF NOT EXISTS `%s`;\n USE `%s`;\n", realdb, realdb))
+			if err != nil {
+				return fmt.Errorf("write file failed %s", err.Error())
+			}
+			m.Tables = lo.Uniq(tables)
+			m.DbNames = []string{realdb}
+			dumpCmd := m.getDumpCmd(outputFile, errFile, "", true)
+			logger.Info("mysqldump cmd:%s", mysqlcomm.ClearSensitiveInformation(dumpCmd))
+			output, errx := osutil.StandardShellCommand(false, dumpCmd)
+			if errx != nil {
+				if err = dumpIsOk(errFile); err == nil {
+					continue
+				}
+				return fmt.Errorf("execte %s get an error:%s,%w", dumpCmd, output, errx)
+			}
+			if err = checkDumpComplete(outputFile); err != nil {
+				logger.Error("checkDumpComplete failed %s", err.Error())
+				return err
+			}
+		}
+	}
+	return err
+}
+
+func dumpIsOk(errLog string) (err error) {
+	fd, err := os.Open(errLog)
+	if err != nil {
+		return err
+	}
+	r := regexp.MustCompile(`Couldn't find table:`)
+	var lines []string
+	scanner := bufio.NewScanner(fd)
+	for scanner.Scan() {
+		l := scanner.Text()
+		if !r.MatchString(l) && lo.IsNotEmpty(l) {
+			lines = append(lines, l)
+		}
+	}
+	if len(lines) > 0 {
+		return fmt.Errorf("%s", strings.Join(lines, "\n"))
+	}
+	return
+}
+
 /*
 mysqldump 参数说明:
 -B --databases :后面指定的 db 名字空格分隔,例如 --databases db1 db2 >> aaa.sql
@@ -247,7 +342,7 @@ DumpSchema 功能概述:
 >/data/dbbak/$dump_file.$old_db_name 2>/data/dbbak/$dump_file.$old_db_name.$SUBJOB_ID.err;
 */
 // nolint
-func (m *MySQLDumper) getDumpCmd(outputFile, errFile, dumpOption string) (dumpCmd string) {
+func (m *MySQLDumper) getDumpCmd(outputFile, errFile, dumpOption string, appendOutput bool) (dumpCmd string) {
 
 	switch {
 	case m.DumpData && m.DumpSchema:
@@ -298,6 +393,9 @@ func (m *MySQLDumper) getDumpCmd(outputFile, errFile, dumpOption string) (dumpCm
 	if m.Charset != "" { // charset 可能为空
 		dumpOption += " --default-character-set=" + m.Charset
 	}
+	if m.Force {
+		dumpOption += " -f "
+	}
 	dumpCmd = fmt.Sprintf(
 		`%s -h%s -P%d  -u%s  -p%s --skip-opt --create-options --single-transaction --max-allowed-packet=1G -q --no-autocommit %s`,
 		m.DumpCmdFile,
@@ -329,8 +427,10 @@ func (m *MySQLDumper) getDumpCmd(outputFile, errFile, dumpOption string) (dumpCm
 			dumpCmd += fmt.Sprintf(" --ignore-table=%s", igTb)
 		}
 	}
-
 	mysqlDumpCmd := fmt.Sprintf("%s > %s 2>%s", dumpCmd, outputFile, errFile)
+	if appendOutput {
+		mysqlDumpCmd = fmt.Sprintf("%s >> %s 2>>%s", dumpCmd, outputFile, errFile)
+	}
 	return strings.ReplaceAll(mysqlDumpCmd, "\n", " ")
 }
 
diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/import_sqlfile_flow.py b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/import_sqlfile_flow.py
index 311723c1e0..7a55ea0d9b 100644
--- a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/import_sqlfile_flow.py
+++ b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/import_sqlfile_flow.py
@@ -32,7 +32,11 @@
 from backend.flow.plugins.components.collections.mysql.trans_flies import TransFileComponent
 from backend.flow.utils.mysql.mysql_act_dataclass import DownloadMediaKwargs, ExecActuatorKwargs
 from backend.flow.utils.mysql.mysql_act_playload import MysqlActPayload
-from backend.flow.utils.mysql.mysql_commom_query import parse_db_from_sqlfile, query_mysql_variables
+from backend.flow.utils.mysql.mysql_commom_query import (
+    merge_resp_to_cluster,
+    parse_db_from_sqlfile,
+    query_mysql_variables,
+)
 from backend.ticket.constants import TicketType
 
 logger = logging.getLogger("flow")
@@ -197,10 +201,7 @@ def sql_semantic_check_flow(self):
         if resp is None:
             logger.warning("root id:[{}]parse db from sqlfile resp is None,set dump_all to True.".format(self.root_id))
         else:
-            logger.info(f"resp: {resp}")
-            template_cluster["dump_all"] = resp.get("dump_all")
-            template_cluster["parse_need_dump_dbs"] = resp.get("dbs")
-            template_cluster["parse_create_dbs"] = resp.get("create_dbs")
+            template_cluster.update(merge_resp_to_cluster(resp))
         template_cluster["semantic_dump_schema_file_name_suffix"] = self.semantic_dump_schema_file_name_suffix
         template_cluster["execute_objects"] = self.data["execute_objects"]
         semantic_check_pipeline.add_act(
diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_proxy_upgrade.py b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_proxy_upgrade.py
index a185941d97..c394a23a50 100644
--- a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_proxy_upgrade.py
+++ b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_proxy_upgrade.py
@@ -107,7 +107,7 @@ def upgrade_mysql_proxy_flow(self):
                         uid=self.uid,
                         root_id=self.root_id,
                         cluster=cluster_obj,
-                        is_check_client_conn=not self.force_upgrade,
+                        is_check_client_conn=True,
                         is_proxy=True,
                         check_client_conn_inst=proxy_ins,
                     )
@@ -134,7 +134,7 @@ def upgrade_mysql_proxy_flow(self):
                         pkg_id=pkg_id,
                         proxy_version=get_sub_version_by_pkg_name(proxy_pkg.name),
                         proxy_ports=ports,
-                        force_upgrade=False,
+                        force_upgrade=self.force_upgrade,
                     )
                 )
                 # 最后一个节点无需再确认
diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/spider/import_sqlfile_flow.py b/dbm-ui/backend/flow/engine/bamboo/scene/spider/import_sqlfile_flow.py
index fabfe66309..31a364d7df 100644
--- a/dbm-ui/backend/flow/engine/bamboo/scene/spider/import_sqlfile_flow.py
+++ b/dbm-ui/backend/flow/engine/bamboo/scene/spider/import_sqlfile_flow.py
@@ -33,7 +33,7 @@
 from backend.flow.plugins.components.collections.mysql.trans_flies import TransFileComponent
 from backend.flow.utils.mysql.mysql_act_dataclass import DownloadMediaKwargs, ExecActuatorKwargs
 from backend.flow.utils.mysql.mysql_act_playload import MysqlActPayload
-from backend.flow.utils.mysql.mysql_commom_query import parse_db_from_sqlfile
+from backend.flow.utils.mysql.mysql_commom_query import merge_resp_to_cluster, parse_db_from_sqlfile
 from backend.flow.utils.mysql.mysql_version_parse import major_version_parse
 from backend.flow.utils.spider.spider_bk_config import get_spider_version_and_charset
 from backend.ticket.constants import TicketType
@@ -183,10 +183,7 @@ def sql_semantic_check_flow(self):
         if resp is None:
             logger.warning("root id:[{}]parse db from sqlfile resp is None,set dump_all to True.".format(self.root_id))
         else:
-            logger.info(f"resp: {resp}")
-            cluster["dump_all"] = resp.get("dump_all")
-            cluster["parse_need_dump_dbs"] = resp.get("dbs")
-            cluster["parse_create_dbs"] = resp.get("create_dbs")
+            cluster.update(merge_resp_to_cluster(resp))
         cluster["execute_objects"] = self.data["execute_objects"]
         cluster["semantic_dump_schema_file_name_suffix"] = self.semantic_dump_schema_file_name_suffix
         semantic_check_pipeline.add_act(
diff --git a/dbm-ui/backend/flow/utils/mysql/mysql_act_playload.py b/dbm-ui/backend/flow/utils/mysql/mysql_act_playload.py
index 2befdc2c27..8bde160026 100644
--- a/dbm-ui/backend/flow/utils/mysql/mysql_act_playload.py
+++ b/dbm-ui/backend/flow/utils/mysql/mysql_act_playload.py
@@ -912,6 +912,8 @@ def get_semantic_dump_schema_payload(self, **kwargs):
                     "parse_need_dump_dbs": self.cluster.get("parse_need_dump_dbs", []),
                     "parse_create_dbs": self.cluster.get("parse_create_dbs", []),
                     "execute_objects": self.cluster.get("execute_objects", None),
+                    "just_dump_special_tbls": self.cluster.get("just_dump_special_tbls", False),
+                    "special_tbls": self.cluster.get("special_tbls", []),
                     "backup_file_name": f"{self.cluster['semantic_dump_schema_file_name']}",
                     "backup_file_name_suffix": f"{self.cluster['semantic_dump_schema_file_name_suffix']}",
                     "backup_dir": BK_PKG_INSTALL_PATH,
diff --git a/dbm-ui/backend/flow/utils/mysql/mysql_commom_query.py b/dbm-ui/backend/flow/utils/mysql/mysql_commom_query.py
index 5b168607f0..ce591f1fc0 100644
--- a/dbm-ui/backend/flow/utils/mysql/mysql_commom_query.py
+++ b/dbm-ui/backend/flow/utils/mysql/mysql_commom_query.py
@@ -176,6 +176,20 @@ def parse_db_from_sqlfile(path: str, files: List[str]):
         return None
 
 
+def merge_resp_to_cluster(resp: dict):
+    """
+    合并返回的数据到集群
+    """
+    dump_schema_payload = {}
+    logger.info(f"resp: {resp}")
+    dump_schema_payload["dump_all"] = resp.get("dump_all")
+    dump_schema_payload["parse_need_dump_dbs"] = resp.get("dbs")
+    dump_schema_payload["parse_create_dbs"] = resp.get("create_dbs")
+    dump_schema_payload["just_dump_special_tbls"] = resp.get("just_dump_special_tbls")
+    dump_schema_payload["special_tbls"] = resp.get("special_tbls")
+    return dump_schema_payload
+
+
 def create_tdbctl_user_for_remote(cluster: Cluster, ctl_primary: str, new_ip: str, new_port: int, tdbctl_pass: str):
     """
     给新的remote实例对中控primary授权