Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/access message notification/#2881 #1

Open
wants to merge 19 commits into
base: v1.3.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
90497e3
fix(frontend): dumper自测bug修复 #2791
jinquantianxia Dec 27, 2023
2c44f8b
fix(dbm-services): fix: change report db status fatal log to error le…
Dec 27, 2023
278f022
feat(mysql): 权限和分区迁移可选择mysql或者spider或者全部 #2679
fanfanyangyang Dec 19, 2023
45e2a5f
fix(backend): 修复 redis 日志存储时长的问题 #2837
zhangzhw8 Dec 27, 2023
b11a2e4
feat(frontend): dumper入口针对特定业务开放 #2846
jinquantianxia Dec 27, 2023
9f0f586
fix(redis): 自愈流程修复 #2836
xiepaup Dec 27, 2023
815de02
fix(backend): 大数据亲和性改造 #2848
iSecloud Dec 27, 2023
ce5784b
feat(mysql): tendbcluster集群标准化增加实例化配置 close #2851
xfwduke Dec 28, 2023
868e28b
fix(backend): 检查连接过滤系统账户 close #2858
ymakedaq Dec 28, 2023
56e0756
fix(backend): 去掉job回调接口的鉴权 #2860
iSecloud Dec 28, 2023
2e55737
feat(redis): 迁移时,将最近N天旧的备份记录重新上报到bklog中,便于迁移后依然能做数据构造 #2835
lukemakeit Dec 28, 2023
cc0a229
feat(redis): 数据构造兼容gcs备份和压缩格式 close #2863
mikluo Dec 28, 2023
f68d994
feat(frontend): 集群、实例列表导出Excel #2855
jinquantianxia Dec 28, 2023
33ae85e
feat(frontend): mysql支持开区模板 #2869
jinquantianxia Dec 29, 2023
86d472e
fix(backend): influxdb 实例导出数据问题 #2852
zhangzhw8 Dec 28, 2023
c826bbf
feat(sqlserver): 添加备份和恢复数据库等的原子任务节点 #1749
yksitu Nov 16, 2023
8e9b804
feat: 增加数据库备份、恢复、清档相关原子任务 #2619
yksitu Dec 28, 2023
ea8f769
fix(mysql): 部署mysqlcrond抓取启动日志文件内容 close #2876
xfwduke Dec 29, 2023
f3b7c99
feat(backend): 接入消息通知 #2881
ygcyao Jan 3, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dbm-services/common/dbha/ha-module/client/hadb.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func (c *HaDBClient) ReportDBStatus(app, agentIp, ip string, port int, dbType, s
return nil
}
if result.RowsAffected > 1 {
log.Logger.Fatalf("bug: update instance status affect rows %d", result.RowsAffected)
log.Logger.Errorf("bug: update instance status affect rows %d", result.RowsAffected)
}

insertReq := DbStatusRequest{
Expand Down
16 changes: 0 additions & 16 deletions dbm-services/common/dbha/ha-module/dbmodule/redis/svr_password.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,8 +390,6 @@ func GetInstancePass(insArr []dbutil.DataBaseDetect,
if ok {
passwdRedis := passwdVal.(RedisPasswd)
cluster2Passwd[key] = passwdRedis
log.Logger.Debugf("hit cache, key:%s, passwd:%v",
key, passwdRedis)
} else {
pins = append(pins, client.PasswdInstance{
Ip: strconv.Itoa(ins.GetClusterId()),
Expand Down Expand Up @@ -437,7 +435,6 @@ func GetInstancePass(insArr []dbutil.DataBaseDetect,
log.Logger.Errorf("PassWDClusters ins[%s:%d] db[%s] not find cluster[%s] in passwds",
host, port, ins.GetType(), ins.GetCluster())
} else {
log.Logger.Debugf("passwd set,addr[%s:%d] val:%v", host, port, passwd)
err := SetPasswordToInstanceEx(ins.GetType(), passwd, ins)
if err != nil {
log.Logger.Errorf("PassWDClusters ins[%s:%d] db[%s] cluster[%s] set passwd[%v] fail",
Expand Down Expand Up @@ -475,17 +472,13 @@ func ProcessSinglePassword(key string, pw client.PasswdItem,
}

pwVal := string(pwByte)
log.Logger.Debugf("passwd cluster:%s encode_pw:%s, decode_pw:%s",
pw.Ip, pw.Passwd, pwVal)
tmp, find := cluster2passwd[key]
if !find {
if pw.Component == constvar.ComponentRedisProxy {
log.Logger.Debugf("passwd set redisProxy, key:%s, pw:%s", key, pwVal)
cluster2passwd[key] = RedisPasswd{
Proxy: pwVal,
}
} else if pw.Component == constvar.ComponentRedis {
log.Logger.Debugf("passwd set redis, key:%s, pw:%s", key, pwVal)
cluster2passwd[key] = RedisPasswd{
Redis: pwVal,
}
Expand All @@ -495,20 +488,15 @@ func ProcessSinglePassword(key string, pw client.PasswdItem,
} else {
if pw.Component == constvar.ComponentRedisProxy {
tmp.Proxy = pwVal
log.Logger.Debugf("passwd set proxy exist, key:%s, redis_pw:%s, proxy_pw:%s",
key, tmp.Redis, tmp.Proxy)
} else if pw.Component == constvar.ComponentRedis {
tmp.Redis = pwVal
log.Logger.Debugf("passwd set redis exist, key:%s, redis_pw:%s, proxy_pw:%s",
key, tmp.Redis, tmp.Proxy)
} else {
return passwd, setCache
}

passwd = tmp
cluster2passwd[key] = tmp
if len(passwd.Redis) > 0 && len(passwd.Proxy) > 0 {
log.Logger.Debugf("passwd write to cache, key:%s", key)
setCache = true
}
}
Expand Down Expand Up @@ -574,14 +562,10 @@ func GetInstancePassByClusterId(dbType string, clusterId int,

if dbType == constvar.TwemproxyMetaType ||
dbType == constvar.PredixyMetaType {
log.Logger.Debugf("GetPasswdByCId proxy type[%s], passwd:%s",
string(dbType), passwdCluster.Proxy)
return passwdCluster.Proxy, nil
} else if dbType == constvar.RedisMetaType ||
dbType == constvar.TendisplusMetaType ||
dbType == constvar.TendisSSDMetaType {
log.Logger.Debugf("GetPasswdByCId redis type[%s], passwd:%s",
string(dbType), passwdCluster.Redis)
return passwdCluster.Redis, nil
} else {
typeErr := fmt.Errorf("GetInstancePassByClusterId dbtype[%s] not support",
Expand Down
6 changes: 3 additions & 3 deletions dbm-services/mysql/db-partition/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,19 +257,19 @@ func MigrateConfig(r *gin.Context) {
SendResponse(r, err, nil)
return
}
mysqlIds, spiderIds, mysqlFail, spiderFail, err := input.MigrateConfig()
mysqlIds, mysqlFail, spiderIds, spiderFail, err := input.MigrateConfig()
info := "迁移成功"
if err != nil {
slog.Error("msg", "迁移失败", err.Error())
info = "迁移失败"
}
data := struct {
MigratedMysqlIds []int `json:"migrated_mysql_ids"`
MigratedSpiderIds []int `json:"migrated_spider_ids"`
MysqlMigrateFail []int `json:"mysql_migrate_fail"`
MigratedSpiderIds []int `json:"migrated_spider_ids"`
SpiderMigrateFail []int `json:"spider_migrate_fail"`
Info string `json:"info"`
}{mysqlIds, spiderIds, mysqlFail,
}{mysqlIds, mysqlFail, spiderIds,
spiderFail, info}
SendResponse(r, err, data)
return
Expand Down
4 changes: 2 additions & 2 deletions dbm-services/mysql/db-partition/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ func SendEvent(eventName string, dimension map[string]interface{}, content strin
}
c := util.NewClientByHosts(viper.GetString("monitor.service"))
_, err := c.Do(http.MethodPost, "", body)
slog.Info(fmt.Sprintf("SendEvent:%v", body))
if err != nil {
slog.Info(fmt.Sprintf("%v", body))
slog.Error("msg", "send partition event error", err)
// 监控无法上报,如果服务异常无法上报监控,所以让服务退出,可触发服务故障的告警。
InitMonitor()
Expand Down Expand Up @@ -103,8 +103,8 @@ func TestSendEvent(dataId int, token string, serviceHost string) error {
}
c := util.NewClientByHosts(serviceHost)
_, err := c.Do(http.MethodPost, "", body)
slog.Info(fmt.Sprintf("TestSendEvent:%v", body))
if err != nil {
slog.Info(fmt.Sprintf("%v", body))
slog.Error("msg", "send partition event error", err)
return err
}
Expand Down
162 changes: 82 additions & 80 deletions dbm-services/mysql/db-partition/service/migrate_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,6 @@ import (

// MigrateConfig 从scr迁移分区配置
func (m *MigratePara) MigrateConfig() ([]int, []int, []int, []int, error) {
// 从scr迁移成功的mysql分区配置
var mysqlIdFromScr []int
// 从scr迁移成功的spider分区配置
var spiderIdFromScr []int
// 从scr迁移失败的mysql分区配置
var mysqlMigrateFail []int
// 从scr迁移失败的spider分区配置
var spiderMigrateFail []int
GcsDb = &model.Database{
Self: model.OpenDB(m.GcsDb.User, m.GcsDb.Psw, fmt.Sprintf("%s:%s", m.GcsDb.Host, m.GcsDb.Port), m.GcsDb.Name),
}
Expand All @@ -28,7 +20,7 @@ func (m *MigratePara) MigrateConfig() ([]int, []int, []int, []int, error) {
// 检查迁移的入参
apps, err := m.CheckPara()
if err != nil {
return mysqlIdFromScr, spiderIdFromScr, mysqlMigrateFail, spiderMigrateFail, err
return nil, nil, nil, nil, err
}
var appWhere string
var clusters []Cluster
Expand All @@ -37,8 +29,7 @@ func (m *MigratePara) MigrateConfig() ([]int, []int, []int, []int, error) {
// 获取业务的集群信息,为了获取cluster_id
c, errInner := GetAllClustersInfo(BkBizId{bkBizId})
if errInner != nil {
return mysqlIdFromScr, spiderIdFromScr, mysqlMigrateFail, spiderMigrateFail,
fmt.Errorf("从dbm biz_clusters获取集群信息失败: %s", errInner.Error())
return nil, nil, nil, nil, fmt.Errorf("从dbm biz_clusters获取集群信息失败: %s", errInner.Error())
}
clusters = append(clusters, c...)
}
Expand All @@ -52,12 +43,62 @@ func (m *MigratePara) MigrateConfig() ([]int, []int, []int, []int, error) {
// 检查是否可以迁移
err = Check(appWhere)
if err != nil {
return mysqlIdFromScr, spiderIdFromScr, mysqlMigrateFail, spiderMigrateFail, err
return nil, nil, nil, nil, err
}
// mysql分区配置
mysqlConfig := make([]*PartitionConfigWithApp, 0)
// spider分区配置
spiderConfig := make([]*PartitionConfigWithApp, 0)

// 区分迁移范围
if m.Range == "mysql" {
// 从scr迁移成功的mysql分区配置、从scr迁移失败的mysql分区配置
mysqlIdFromScr, mysqlMigrateFail, errs := Migrate(appWhere, apps, domainBkbizIdMap, "mysql")
if len(errs) > 0 {
return mysqlIdFromScr, mysqlMigrateFail, nil, nil,
fmt.Errorf("errors: %s", strings.Join(errs, "\n"))
}
return mysqlIdFromScr, mysqlMigrateFail, nil, nil, nil
} else if m.Range == "spider" {
// 从scr迁移成功的spider分区配置、从scr迁移失败的spider分区配置
spiderIdFromScr, spiderMigrateFail, errs := Migrate(appWhere, apps, domainBkbizIdMap, "spider")
if len(errs) > 0 {
return nil, nil, spiderIdFromScr, spiderMigrateFail,
fmt.Errorf("errors: %s", strings.Join(errs, "\n"))
}
return nil, nil, spiderIdFromScr, spiderMigrateFail, nil
} else if m.Range == "all" {
// 从scr迁移成功的mysql分区配置、从scr迁移失败的mysql分区配置
mysqlIdFromScr, mysqlMigrateFail, errs := Migrate(appWhere, apps, domainBkbizIdMap, "mysql")
spiderIdFromScr, spiderMigrateFail, errs1 := Migrate(appWhere, apps, domainBkbizIdMap, "spider")
errs = append(errs, errs1...)
if len(errs) > 0 {
return mysqlIdFromScr, mysqlMigrateFail, spiderIdFromScr, spiderMigrateFail,
fmt.Errorf("errors: %s", strings.Join(errs, "\n"))
}
return mysqlIdFromScr, mysqlMigrateFail, spiderIdFromScr, spiderMigrateFail, nil
} else {
return nil, nil, nil, nil, fmt.Errorf("not support range")
}
}

// Migrate 迁移指定数据库类型和业务的分区规则
func Migrate(appWhere string, apps map[string]int64,
domainBkbizIdMap map[string]int64, dbType string) ([]int, []int, []string) {
var scrTable, dbmTable, logTable string
if dbType == "mysql" {
scrTable = MysqlPartitionConfigScr
dbmTable = MysqlPartitionConfig
logTable = MysqlManageLogsTable
} else if dbType == "spider" {
scrTable = SpiderPartitionConfigScr
dbmTable = SpiderPartitionConfig
logTable = SpiderManageLogsTable
} else {
return nil, nil, []string{fmt.Sprintf("not support %s dbtype", dbType)}
}
// 从scr迁移成功的分区配置
var idFromScr []int
// 从scr迁移失败的分区配置
var migrateFail []int
config := make([]*PartitionConfigWithApp, 0)
var errs []string
// 国内时区默认东八区、直连区域
getConfigColumns := "select ID as id, app as app, Ip as immute_domain,Port as port, " +
"0 as bk_cloud_id, DbLike as dblike, PartitionTableName as tblike, " +
Expand All @@ -68,100 +109,54 @@ func (m *MigratePara) MigrateConfig() ([]int, []int, []int, []int, error) {
"Creator as creator, Updator as updator, CreateTime as create_time, " +
"UpdateTime as update_time "
getMysqlConfig := fmt.Sprintf("%s from %s where app in (%s)", getConfigColumns,
MysqlPartitionConfigScr, appWhere)
getSpiderConfig := fmt.Sprintf("%s from %s where app in (%s)", getConfigColumns,
SpiderPartitionConfigScr, appWhere)
scrTable, appWhere)
// 获取配置信息
err = GcsDb.Self.Debug().Raw(getMysqlConfig).Scan(&mysqlConfig).Error
if err != nil {
slog.Error(getMysqlConfig, "execute error", err)
return mysqlIdFromScr, spiderIdFromScr, mysqlMigrateFail, spiderMigrateFail, err
}
err = GcsDb.Self.Debug().Raw(getSpiderConfig).Scan(&spiderConfig).Error
err := GcsDb.Self.Debug().Raw(getMysqlConfig).Scan(&config).Error
if err != nil {
slog.Error(getSpiderConfig, "execute error", err)
return mysqlIdFromScr, spiderIdFromScr, mysqlMigrateFail, spiderMigrateFail, err
slog.Error(getMysqlConfig, "dbType", dbType, "execute error", err, "sql", getMysqlConfig)
return idFromScr, migrateFail, []string{fmt.Sprintf("dbtype[%s] %s", dbType, err.Error())}
}
var errs []string
// 迁移mysql分区配置
for k, item := range mysqlConfig {
for k, item := range config {
id := item.ID
// 清洗数据
// app换成bk_biz_id

// app没有对应的bk_biz_id
if apps[item.App] == 0 {
mysqlMigrateFail = append(mysqlMigrateFail, id)
migrateFail = append(migrateFail, id)
errs = append(errs, fmt.Sprintf("not find bk_biz_id for app: %s", item.App))
continue
} else {
mysqlConfig[k].BkBizId = apps[item.App]
config[k].BkBizId = apps[item.App]
}
// 补充cluster_id
// 域名没有对应的cluster_id
if domainBkbizIdMap[item.ImmuteDomain] == 0 {
mysqlMigrateFail = append(mysqlMigrateFail, id)
errs = append(errs, fmt.Sprintf("not find cluster id for cluster: %s", item.ImmuteDomain))
migrateFail = append(migrateFail, id)
errs = append(errs, fmt.Sprintf("dbtype[%s] not find cluster id for cluster: %s",
dbType, item.ImmuteDomain))
continue
} else {
mysqlConfig[k].ClusterId = int(domainBkbizIdMap[item.ImmuteDomain])
config[k].ClusterId = int(domainBkbizIdMap[item.ImmuteDomain])
}
// 自增
mysqlConfig[k].ID = 0
partitionConfig := item.PartitionConfig
// 插入分区规则,后台定时任务会执行
err = model.DB.Self.Table(MysqlPartitionConfig).Create(&partitionConfig).Error
if err != nil {
mysqlMigrateFail = append(mysqlMigrateFail, id)
errs = append(errs, err.Error())
} else {
// 记录从scr迁移的id
mysqlIdFromScr = append(mysqlIdFromScr, id)
// 记录日志
CreateManageLog(MysqlPartitionConfig, MysqlManageLogsTable, partitionConfig.ID,
"Insert", "migrator")
}
}

// 迁移spider分区配置
for k, item := range spiderConfig {
id := item.ID
// app换成bk_biz_id
if apps[item.App] == 0 {
spiderMigrateFail = append(spiderMigrateFail, id)
errs = append(errs, fmt.Sprintf("not find bk_biz_id for app: %s", item.App))
continue
} else {
spiderConfig[k].BkBizId = apps[item.App]
}
// 补充cluster_id
if domainBkbizIdMap[item.ImmuteDomain] == 0 {
spiderMigrateFail = append(spiderMigrateFail, id)
errs = append(errs, fmt.Sprintf("not find cluster id for cluster: %s", item.ImmuteDomain))
continue
} else {
spiderConfig[k].ClusterId = int(domainBkbizIdMap[item.ImmuteDomain])
}
spiderConfig[k].ID = 0
config[k].ID = 0
partitionConfig := item.PartitionConfig
// 插入分区规则,后台定时任务会执行
err = model.DB.Self.Table(SpiderPartitionConfig).Create(&partitionConfig).Error
err = model.DB.Self.Table(dbmTable).Create(&partitionConfig).Error
if err != nil {
spiderMigrateFail = append(spiderMigrateFail, id)
migrateFail = append(migrateFail, id)
errs = append(errs, err.Error())
slog.Error("msg", "insert rule", err)
} else {
// 记录从scr迁移的id
spiderIdFromScr = append(spiderIdFromScr, id)
idFromScr = append(idFromScr, id)
// 记录日志
CreateManageLog(SpiderPartitionConfig, SpiderManageLogsTable, partitionConfig.ID,
CreateManageLog(dbmTable, logTable, partitionConfig.ID,
"Insert", "migrator")
}
}
if len(errs) > 0 {
return mysqlIdFromScr, spiderIdFromScr, mysqlMigrateFail, spiderMigrateFail,
fmt.Errorf("errors: %s", strings.Join(errs, "\n"))
}
return mysqlIdFromScr, spiderIdFromScr, mysqlMigrateFail, spiderMigrateFail, nil
return idFromScr, migrateFail, errs
}

// CheckPara 检查迁移分区配置的参数
Expand All @@ -174,6 +169,7 @@ func (m *MigratePara) CheckPara() (map[string]int64, error) {
}

tips := "请设置需要迁移的app列表,多个app用逗号间隔,格式如\nAPPS='{\"test\":1, \"test2\":2}',名称区分大小写"
rangeTips := "range可选值:all、mysql、spider"
if m.Apps == "" {
slog.Error(tips)
return nil, fmt.Errorf("apps为空,%s", tips)
Expand All @@ -187,6 +183,12 @@ func (m *MigratePara) CheckPara() (map[string]int64, error) {
slog.Error(tips)
return nil, fmt.Errorf("apps为空,%s", tips)
}
if m.Range == "" {
return nil, fmt.Errorf("%s,不能为空", rangeTips)
}
if !(m.Range == "all" || m.Range == "mysql" || m.Range == "spider") {
return nil, fmt.Errorf(",不支持[%s]", m.Range)
}
return apps, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ type MigratePara struct {
GcsDb DbConf `json:"gcs_db"`
Apps string `json:"apps"`
// 是否是国内的,国内的Apps不要求包含time_zone、bk_cloud_id,国内直连区域默认填充
// 国外的集群信息需要包含time_zone、bk_cloud_id todo
// 国外的集群信息需要包含time_zone、bk_cloud_id
Foreign *bool `json:"foreign"`
// 迁移范围,可选all、mysql、spider
Range string `json:"range"`
}

// DbConf 分区规则所在数据库的配置
Expand Down
Loading
Loading