Skip to content

Commit

Permalink
fix(mysql): 端口3306的mysql配置文件路径错误 TencentBlueKing#7541
Browse files Browse the repository at this point in the history
  • Loading branch information
xfwduke committed Nov 19, 2024
1 parent e98cd78 commit 3b6495f
Show file tree
Hide file tree
Showing 16 changed files with 349 additions and 136 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,15 @@ func (c *OnMySQLComponent) instanceDropSourceDBs(port int) error {
func (c *OnMySQLComponent) instanceRecreateSourceTables(port int) error {
for db := range c.dbTablesMap {
stageDBName := generateStageDBName(c.Param.StageDBHeader, c.Param.FlowTimeStr, db)
for _, table := range c.dbTablesMap[db] {
stageTables, err := rpkg.ListDBTables(c.dbConn, stageDBName)
if err != nil {
logger.Error("list stage db tables on instance %d from %s failed: %s", port, stageDBName, err.Error())
return err
}

// 这里改成从 stage 库拿表列表, 在重试的时候才能幂等
//for _, table := range c.dbTablesMap[db] {
for _, table := range stageTables {
err := c.instanceRecreateSourceTable(port, db, stageDBName, table)
if err != nil {
logger.Error(
Expand All @@ -328,25 +336,25 @@ func (c *OnMySQLComponent) instanceRecreateSourceTables(port int) error {
}

func (c *OnMySQLComponent) instanceRecreateSourceTable(port int, dbName, stageDBName, tableName string) error {
yes, err := rpkg.IsTableExistsIn(c.dbConn, tableName, stageDBName)
if err != nil {
logger.Error("check table %s exists in %s failed: %s", tableName, stageDBName, err.Error())
return err
}

if !yes {
err := fmt.Errorf("%s.%s not found", stageDBName, tableName)
logger.Error("re create source table on instance %d failed: ", port, err.Error())
return err
}
logger.Info("source table found in stage db on instance %d, try to truncate", port)
//yes, err := rpkg.IsTableExistsIn(c.dbConn, tableName, stageDBName)
//if err != nil {
// logger.Error("check table %s exists in %s failed: %s", tableName, stageDBName, err.Error())
// return err
//}
//
//if !yes {
// err := fmt.Errorf("%s.%s not found", stageDBName, tableName)
// logger.Error("re create source table on instance %d failed: ", port, err.Error())
// return err
//}
//logger.Info("source table found in stage db on instance %d, try to truncate", port)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err = c.dbConn.ExecContext(
_, err := c.dbConn.ExecContext(
ctx,
fmt.Sprintf(
"CREATE TABLE `%s`.`%s` LIKE `%s`.`%s`",
"CREATE TABLE IF NOT EXISTS `%s`.`%s` LIKE `%s`.`%s`",
dbName, tableName, stageDBName, tableName,
),
)
Expand Down
65 changes: 33 additions & 32 deletions dbm-services/mysql/db-tools/mysql-monitor/items-config.sql

Large diffs are not rendered by default.

23 changes: 18 additions & 5 deletions dbm-services/mysql/db-tools/mysql-monitor/items-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
role: []
- name: ibd-statistic
enable: true
schedule: 0 0 14 * * 1
schedule: 0 45 23 * * *
machine_type:
- single
- backend
Expand All @@ -78,15 +78,21 @@
- spider
role: []
- name: mysql-connlog-size
enable: true
enable: false
schedule: 0 0 12 * * *
machine_type:
- single
- backend
- remote
- spider
role: []
- name: mysql-connlog-rotate
enable: true
enable: false
schedule: 0 30 23 * * *
machine_type:
- single
- backend
- remote
- spider
role: []
- name: mysql-err-notice
Expand Down Expand Up @@ -211,7 +217,7 @@
- backend
role: []
- name: unique-ctl-master
enable: true
enable: false
schedule: '@every 1m'
machine_type:
- spider
Expand Down Expand Up @@ -250,4 +256,11 @@
schedule: 0 55 23 * * *
machine_type:
- proxy
role: []
role: []
- name: get-ctl-primary
enable: false
schedule: '@every 1m'
machine_type:
- spider
role:
- spider_master
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ var systemDBs = []string{
"infodba_schema",
"performance_schema",
"test",
"db_infobase",
}

func init() {
Expand Down Expand Up @@ -75,6 +76,11 @@ func (c *ibdStatistic) Run() (msg string, err error) {
return "", err
}

err = reportLog(result)
if err != nil {
return "", err
}

return "", nil
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package ibdstatistic

import (
"dbm-services/mysql/db-tools/mysql-monitor/pkg/config"
"dbm-services/mysql/db-tools/mysql-monitor/pkg/internal/cst"
"encoding/json"
"fmt"
"log/slog"
"os"
"path/filepath"
"slices"
"time"

"github.com/pkg/errors"
)

type tableSizeStruct struct {
BkCloudId int `json:"bk_cloud_id"`
BkBizId int `json:"bk_biz_id"`
ImmuteDomain string `json:"cluster_domain"`
DBModule int `json:"db_module"`
MachineType string `json:"machine_type"`
Ip string `json:"instance_host"`
Port int `json:"instance_port"`
Role string `json:"instance_role"`
ServiceInstanceId int64 `json:"bk_target_service_instance_id"`
OriginalDBName string `json:"original_database_name"`
DBName string `json:"database_name"`
DBSize int64 `json:"database_size"`
TableName string `json:"table_name"`
TableSize int64 `json:"table_size"`
}

func reportLog(result map[string]map[string]int64) error {
dbsizeReportBaseDir := filepath.Join(cst.DBAReportBase, "mysql/dbsize")
err := os.MkdirAll(dbsizeReportBaseDir, os.ModePerm)
if err != nil {
slog.Error("failed to create database size reports directory", slog.String("error", err.Error()))
return errors.Wrap(err, "failed to create database size reports directory")
}

filePath := filepath.Join(
dbsizeReportBaseDir,
fmt.Sprintf(`report.log.%d`, time.Now().Weekday()),
)
err = os.RemoveAll(filePath)
if err != nil {
slog.Error("failed to remove database size reports directory", slog.String("error", err.Error()))
return errors.Wrap(err, "failed to remove database size reports directory")
}

f, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0666)
if err != nil {
slog.Error("failed to open log file", "file", filePath)
return errors.Wrap(err, "failed to open file")
}
defer func() {
_ = f.Close()
}()

for originalDBName, dbInfo := range result {
// 根据 dbm 枚举约定, remote 是 tendbcluster 的存储机器类型
dbName := originalDBName
if config.MonitorConfig.MachineType == "remote" && slices.Index(systemDBs, originalDBName) < 0 {
match := tenDBClusterDbNamePattern.FindStringSubmatch(originalDBName)
if match == nil {
err := errors.Errorf(
"invalid dbname: '%s' on %s",
originalDBName, config.MonitorConfig.MachineType,
)
slog.Error("ibd-statistic report", slog.String("error", err.Error()))
return err
}
originalDBName = match[1]
}

var dbSize int64
var tablesInfo []tableSizeStruct
for tableName, tableSize := range dbInfo {
tablesInfo = append(tablesInfo, tableSizeStruct{
BkCloudId: *config.MonitorConfig.BkCloudID,
BkBizId: config.MonitorConfig.BkBizId,
ImmuteDomain: config.MonitorConfig.ImmuteDomain,
DBModule: *config.MonitorConfig.DBModuleID,
MachineType: config.MonitorConfig.MachineType,
Ip: config.MonitorConfig.Ip,
Port: config.MonitorConfig.Port,
Role: *config.MonitorConfig.Role,
ServiceInstanceId: config.MonitorConfig.BkInstanceId,
OriginalDBName: originalDBName,
DBName: dbName,
DBSize: 0,
TableName: tableName,
TableSize: tableSize,
})
dbSize += tableSize
}

for _, row := range tablesInfo {
row.DBSize = dbSize
b, err := json.Marshal(row)
if err != nil {
slog.Error("ibd-statistic report", slog.String("error", err.Error()))
return errors.Wrap(err, "failed to marshal row")
}

b = append(b, '\n')
_, err = f.Write(b)
if err != nil {
slog.Error("ibd-statistic report", slog.String("error", err.Error()))
return errors.Wrap(err, "failed to write row")
}
}
}
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ import (
var tableSizeMetricName string
var dbSizeMetricName string

var tendbClusterDbNamePattern *regexp.Regexp
var tenDBClusterDbNamePattern *regexp.Regexp

func init() {
tableSizeMetricName = "mysql_table_size"
dbSizeMetricName = "mysql_database_size"

tendbClusterDbNamePattern = regexp.MustCompile(`^(.*)_[0-9]+$`)
tenDBClusterDbNamePattern = regexp.MustCompile(`^(.*)_[0-9]+$`)
}

func reportMetrics(result map[string]map[string]int64) error {
Expand All @@ -38,7 +38,7 @@ func reportMetrics(result map[string]map[string]int64) error {

// 根据 dbm 枚举约定, remote 是 tendbcluster 的存储机器类型
if config.MonitorConfig.MachineType == "remote" && slices.Index(systemDBs, dbName) < 0 {
match := tendbClusterDbNamePattern.FindStringSubmatch(dbName)
match := tenDBClusterDbNamePattern.FindStringSubmatch(dbName)
if match == nil {
err := errors.Errorf(
"invalid dbname: '%s' on %s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ import (
"dbm-services/mysql/db-tools/mysql-monitor/pkg/itemscollect/rotateslowlog"
"dbm-services/mysql/db-tools/mysql-monitor/pkg/itemscollect/scenesnapshot"
"dbm-services/mysql/db-tools/mysql-monitor/pkg/itemscollect/slavestatus"
"dbm-services/mysql/db-tools/mysql-monitor/pkg/itemscollect/spiderctlchecker"
"dbm-services/mysql/db-tools/mysql-monitor/pkg/itemscollect/spiderremote"
"dbm-services/mysql/db-tools/mysql-monitor/pkg/itemscollect/timezonechange"
"dbm-services/mysql/db-tools/mysql-monitor/pkg/itemscollect/tscc"
"dbm-services/mysql/db-tools/mysql-monitor/pkg/itemscollect/uniquectlmaster"
mi "dbm-services/mysql/db-tools/mysql-monitor/pkg/monitoriteminterface"
)

Expand Down Expand Up @@ -89,9 +89,10 @@ func init() {
_ = registerItemConstructor(spiderremote.Register())
_ = registerItemConstructor(tscc.Register())
_ = registerItemConstructor(dbhaheartbeat.Register())
_ = registerItemConstructor(uniquectlmaster.Register())
_ = registerItemConstructor(spiderctlchecker.UniqueCtlCheckerRegister())
_ = registerItemConstructor(scenesnapshot.Register())
_ = registerItemConstructor(timezonechange.RegisterSysTimezoneChange())
_ = registerItemConstructor(timezonechange.RegisterMySQLTimezoneChange())
_ = registerItemConstructor(rotateproxyconnlog.RegisterRotateProxyConnlog())
_ = registerItemConstructor(spiderctlchecker.GetCtlPrimaryRegister())
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ type Checker struct {
func (c *Checker) Run() (msg string, err error) {
var cnfFile string
if config.MonitorConfig.Port == 3306 {
cnfFile = "etc/my.cnf.3306"
cnfFile = "/etc/my.cnf.3306"
if _, err := os.Stat(cnfFile); os.IsNotExist(err) {
cnfFile = "/etc/my.cnf"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ func (c *Checker) cleanOldBackup() error {
ctx,
`DELETE FROM infodba_schema.proxy_user_list
WHERE create_at < DATE(NOW() - INTERVAL 7 DAY)`,
config.MonitorConfig.Ip,
)
if err != nil {
slog.Error("backup proxy user list delete", slog.String("err", err.Error()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ var ctlReplicateName = "ctl-replicate"

type ctlReplicateChecker struct {
slaveStatusChecker
primary *getPrimaryRes
}

type getPrimaryRes struct {
ServerName string `db:"SERVER_NAME"`
Host string `db:"HOST"`
Port uint32 `db:"PORT"`
IsThisServer uint32 `db:"IS_THIS_SERVER"`
}

// Run 运行
Expand Down Expand Up @@ -51,27 +59,36 @@ func (c *ctlReplicateChecker) Run() (msg string, err error) {
return fmt.Sprintf("IO/SQL thread not running: %s", slaveErr), nil

}
slog.Info(
"tdbctl primary is master",
slog.String("primary", c.primary.Host),
slog.String("master", c.masterHost()),
)
if c.masterHost() != c.primary.Host {
err = fmt.Errorf(
"tdbctl slave's master host [%s] != primary host [%s]",
c.masterHost(), c.primary.Host,
)
slog.Error("tdbctl primary is master", slog.String("err", err.Error()))
return "", err
}
return "", nil
}

func (c *ctlReplicateChecker) isPrimary() (bool, error) {
ctx, cancel := context.WithTimeout(context.Background(), config.MonitorConfig.InteractTimeout)
defer cancel()

var getPrimaryRes []struct {
ServerName string `db:"SERVER_NAME"`
Host string `db:"HOST"`
Port string `db:"PORT"`
IsThis int `db:"IS_THIS_SERVER"`
}
res := getPrimaryRes{}

err := c.db.SelectContext(ctx, &getPrimaryRes, `TDBCTL GET PRIMARY`)
err := c.db.QueryRowxContext(ctx, `TDBCTL GET PRIMARY`).StructScan(&res)
if err != nil {
slog.Error("TDBCTL GET PRIMARY", slog.String("error", err.Error()))
return false, err
}
c.primary = &res

return getPrimaryRes[0].IsThis == 1, nil
return res.IsThisServer == 1, nil
}

// Name 监控项名
Expand All @@ -81,10 +98,13 @@ func (c *ctlReplicateChecker) Name() string {

// NewCtlReplicateChecker 新建监控项实例
func NewCtlReplicateChecker(cc *monitoriteminterface.ConnectionCollect) monitoriteminterface.MonitorItemInterface {
return &ctlReplicateChecker{slaveStatusChecker{
db: cc.CtlDB,
slaveStatus: make(map[string]interface{}),
}}
return &ctlReplicateChecker{
slaveStatusChecker{
db: cc.CtlDB,
slaveStatus: make(map[string]interface{}),
},
nil,
}
}

// RegisterCtlReplicateChecker 注册监控项
Expand Down
Loading

0 comments on commit 3b6495f

Please sign in to comment.