Skip to content

Commit

Permalink
refactor(dbm-services): 优化本地升级流程 #8543
Browse files Browse the repository at this point in the history
  • Loading branch information
ymakedaq committed Dec 11, 2024
1 parent af3f501 commit e737966
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,17 @@ func (d *UpgradeMySQLAct) Run() (err error) {
FunName: "前置检查",
Func: d.Service.PreCheck,
},
{
FunName: "升级检查",
Func: d.Service.MysqlUpgradeCheck,
},
}
if d.Service.Params.Run {
steps = append(steps, subcmd.StepFunc{
FunName: "升级MySQL",
Func: d.Service.Upgrade,
})
} else {
steps = append(steps, subcmd.StepFunc{
FunName: "升级检查",
Func: d.Service.MysqlUpgradeCheck,
})
}

if err := steps.Run(); err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,12 @@ func (m *MysqlUpgradeComp) MysqlUpgradeCheck() (err error) {
}
}
// table check
if err = conn.CheckTableUpgrade(currentVer.MysqlVersion, m.newVersion.MysqlVersion); err != nil {
logger.Error("check table upgrade failed %s", err.Error())
return err
errs := conn.CheckTableUpgrade(currentVer.MysqlVersion, m.newVersion.MysqlVersion)
if len(errs) > 0 {
for _, err := range errs {
logger.Error("port:[%d]: check table upgrade error: %s", port, err.Error())
}
return fmt.Errorf("check table upgrade failed, port: %d, errors: %v", port, errs)
}
}
return
Expand Down Expand Up @@ -308,6 +311,26 @@ func (m *MysqlUpgradeComp) Upgrade() (err error) {
logger.Info("Upgrading to MySQL version>=8.0.16, remaining upgrade procedure is skipped.")
return nil
}
// 处理分区表升级
if m.newVersion.MysqlVersion >= native.MYSQL_5P70 && m.newVersion.MysqlVersion < native.MYSQL_8P0 {
// logger.Info("Upgrading to MySQL version>=5.7.0, remaining upgrade procedure is skipped.")
pdata, errx := dbConn.GetPartitionSchema()
if errx != nil {
logger.Error("get partition schema failed %s", errx.Error())
return errx
}
if len(pdata) > 0 {
for _, p := range pdata {
usql := fmt.Sprintf("ALTER TABLE `%s`.`%s` UPGRADE PARTITIONING", p.TableSchema, p.TableName)
logger.Info("upgrade partition sql: %s", usql)
_, err = dbConn.Exec(usql)
if err != nil {
logger.Error("upgrade partition table %s.%s failed %s", p.TableSchema, p.TableName, err.Error())
return err
}
}
}
}
logger.Info("do mysqlcheck for %d", port)
if err = m.mysqlCheck(dbConn, port); err != nil {
logger.Error("do %d mysqlcheck failed %s", port, err.Error())
Expand Down
61 changes: 41 additions & 20 deletions dbm-services/mysql/db-tools/dbactuator/pkg/native/upgrade_tool.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,12 +222,13 @@ type TableInfo struct {
RowFormat string `db:"ROW_FORMAT"`
}

// CheckTableUpgrade TODO
func (h *DbWorker) CheckTableUpgrade(currentVersion, newVersion uint64) (err error) {
// TableNameIsValid TODO
func (h *DbWorker) TableNameIsValid(currentVersion, newVersion uint64) (errs []error) {
type checkFunc struct {
fn func(currentVersion, newVersion uint64) error
desc string
}
var err error
// 库表名关键字检查
fns := []checkFunc{}
fns = append(fns, checkFunc{
Expand Down Expand Up @@ -257,14 +258,20 @@ func (h *DbWorker) CheckTableUpgrade(currentVersion, newVersion uint64) (err err
logger.Info("start check %s ...", f.desc)
if err = f.fn(currentVersion, newVersion); err != nil {
logger.Error("when check %s,failed %s", f.desc, err.Error())
return err
errs = append(errs, fmt.Errorf("[%s]:%v", f.desc, err.Error()))
}
}
type checkFuncNoparam struct {
fn func() error
desc string
}
// 非法字符检查
return errs
}

type checkFuncNoparam struct {
fn func() error
desc string
}

// IllegalCharacterCheck TODO
func (h *DbWorker) IllegalCharacterCheck() (errs []error) {
var err error
fnns := []checkFuncNoparam{}
fnns = append(fnns, checkFuncNoparam{
fn: h.tableNameAsciiCodeCheck,
Expand Down Expand Up @@ -294,9 +301,19 @@ func (h *DbWorker) CheckTableUpgrade(currentVersion, newVersion uint64) (err err
logger.Info("start check %s ...", f.desc)
if err = f.fn(); err != nil {
logger.Error("when check %s,failed %s", f.desc, err.Error())
return err
errs = append(errs, fmt.Errorf("[%s]:%v", f.desc, err.Error()))
}
}
return errs
}

// CheckTableUpgrade 检查表是否满足升级条件
func (h *DbWorker) CheckTableUpgrade(currentVersion, newVersion uint64) (errs []error) {
var err error
errs = append(errs, h.TableNameIsValid(currentVersion, newVersion)...)
// 非法字符检查
errs = append(errs, h.IllegalCharacterCheck()...)

switch {
// 当准备升级到8.0版本
case newVersion >= MYSQL_8P0 && currentVersion < MYSQL_8P0:
Expand Down Expand Up @@ -330,7 +347,7 @@ func (h *DbWorker) CheckTableUpgrade(currentVersion, newVersion uint64) (err err
logger.Info("start check %s ...", f.desc)
if err = f.fn(); err != nil {
logger.Error("when check %s,failed %s", f.desc, err.Error())
return err
errs = append(errs, fmt.Errorf("[%s]:%v", f.desc, err.Error()))
}
}
// 当准备升级到5.7版本
Expand Down Expand Up @@ -358,19 +375,19 @@ func (h *DbWorker) CheckTableUpgrade(currentVersion, newVersion uint64) (err err
logger.Info("start check %s ...", f.desc)
if err = f.fn(); err != nil {
logger.Error("when check %s,failed %s", f.desc, err.Error())
return err
errs = append(errs, fmt.Errorf("[%s]:%v", f.desc, err.Error()))
}
}
// 当准备升级到5.6版本
case newVersion >= MYSQL_5P60 && currentVersion < MYSQL_5P60:
// per-4.1 password check
logger.Info("准备升级到MySQL5.6 需要做这些额外的检查...")
if err = h.passwordCheck(); err != nil {
return err
errs = append(errs, fmt.Errorf("%v", err.Error()))
}

}
return nil
return errs
}

func (h *DbWorker) getKeyWords(currentVersion, newVersion uint64) []string {
Expand Down Expand Up @@ -765,6 +782,15 @@ func (h *DbWorker) passwordCheck() (err error) {
// handler to use the native partitioning handler instead, run mysql_upgrade.
func (h *DbWorker) partitionCheck() (err error) {
var data []TableInfo
data, err = h.GetPartitionSchema()
if len(data) > 0 {
return fmt.Errorf("%v found partition name,but it is not allowed", data)
}
return nil
}

// GetPartitionSchema 获取分区表
func (h *DbWorker) GetPartitionSchema() (data []TableInfo, err error) {
q := `
select TABLE_SCHEMA,
TABLE_NAME,
Expand All @@ -774,13 +800,8 @@ where PARTITION_NAME is not NULL
group by 1,
2;
`
if err = h.Queryx(&data, q); err != nil {
return err
}
if len(data) > 0 {
return fmt.Errorf("%v found partition name,but it is not allowed", data)
}
return nil
err = h.Queryx(&data, q)
return data, err
}

func (h *DbWorker) tokudbEngineCheck() (err error) {
Expand Down

0 comments on commit e737966

Please sign in to comment.