Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(dbm-services): dbha report hash mod info #6970

Merged
merged 1 commit into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 35 additions & 71 deletions dbm-services/common/dbha/ha-module/agent/monitor_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"dbm-services/common/dbha/ha-module/dbutil"
"dbm-services/common/dbha/ha-module/log"
"dbm-services/common/dbha/ha-module/monitor"
"dbm-services/common/dbha/ha-module/util"
)

// MonitorAgent agent work struct
Expand All @@ -28,8 +27,12 @@ type MonitorAgent struct {
MonIp string
LastFetchInsTime time.Time
LastFetchGMTime time.Time
DBInstance map[string]dbutil.DataBaseDetect
GMInstance map[string]*GMConnection
// mod for current agent
HashMod int
// mod value for current agent
HashValue int
DBInstance map[string]dbutil.DataBaseDetect
GMInstance map[string]*GMConnection
// config file
Conf *config.Config
// API client to access cmdb metadata
Expand Down Expand Up @@ -114,6 +117,8 @@ func (a *MonitorAgent) RefreshInstanceCache() {
if err != nil {
log.Logger.Errorf("fetch %s instance failed. err:%s",
a.DetectType, err.Error())
//if fetch failed, not flush fetch time
return
}
a.flushInsFetchTime()
}
Expand All @@ -132,17 +137,11 @@ func (a *MonitorAgent) DoDetectSingle(ins dbutil.DataBaseDetect) {
a.reportMonitor(ins, err)
if ins.NeedReporter() {
// reporter detect result to hadb
err = a.HaDBClient.ReportDBStatus(ins.GetApp(), a.MonIp, ip, port,
string(ins.GetDBType()), string(ins.GetStatus()))
if err != nil {
if err = a.ReporterDetectInfo(ins); err != nil {
log.Logger.Errorf(
"reporter hadb instance status failed. err:%s, ip:%s, port:%d, db_type:%s, status:%s",
err.Error(), ip, port, ins.GetDBType(), ins.GetStatus())
}
err = a.ReporterGM(ins)
if err != nil {
log.Logger.Errorf("reporter gm failed. err:%s", err.Error())
}
ins.UpdateReporterTime()
}
}
Expand Down Expand Up @@ -187,9 +186,14 @@ func (a *MonitorAgent) RefreshGMCache() {
func (a *MonitorAgent) FetchDBInstance() error {
mod, modValue, err := a.HaDBClient.AgentGetHashValue(a.MonIp, a.CityID, a.DetectType, a.Conf.AgentConf.FetchInterval)
if err != nil {
log.Logger.Errorf("get Modulo failed and wait next refresh time. err:%s", err.Error())
log.Logger.Errorf("get hash module info failed and wait next refresh time. err:%s", err.Error())
return err
}
//set current agent's hash mod, hash value, and report to DB later
log.Logger.Debugf("hash mod:%d, hash value:%d, dbType:%s", mod, modValue, a.DetectType)
a.HashMod = mod
a.HashValue = modValue

req := client.DBInstanceInfoRequest{
LogicalCityIDs: []int{a.CityID},
HashCnt: mod,
Expand Down Expand Up @@ -285,29 +289,36 @@ func (a *MonitorAgent) FetchGMInstance() error {
return nil
}

// ReporterGM report detect info to gm
func (a *MonitorAgent) ReporterGM(reporterInstance dbutil.DataBaseDetect) error {
// ReporterDetectInfo report detect info to gm
func (a *MonitorAgent) ReporterDetectInfo(reporterInstance dbutil.DataBaseDetect) error {
var err error
isReporter := false
ip, port := reporterInstance.GetAddress()
if reporterInstance.GetStatus() == constvar.DBCheckSuccess ||
reporterInstance.GetStatus() == constvar.SSHCheckSuccess {
// if db is normal, needn't reporter gm
if err = a.HaDBClient.ReportDBStatus(reporterInstance.GetApp(), a.MonIp, ip, port,
string(reporterInstance.GetDBType()), string(reporterInstance.GetStatus()), "N/A"); err != nil {
log.Logger.Errorf(
"reporter hadb instance status failed. err:%s, ip:%s, port:%d, db_type:%s, status:%s",
err.Error(), ip, port, reporterInstance.GetDBType(), reporterInstance.GetStatus())
}
return nil
}
var err error
isReporter := false
ip, port := reporterInstance.GetAddress()

for _, gmIns := range a.GMInstance {
gmIns.Mutex.Lock()
if !gmIns.IsConnection {
gmIns.Mutex.Unlock()
continue
}
gmInfo := fmt.Sprintf("%s#%d", gmIns.Ip, gmIns.Port)
jsonInfo, err := reporterInstance.Serialization()
if err != nil {
gmIns.Mutex.Unlock()
log.Logger.Errorf("instance Serialization failed. err:%s", err.Error())
return err
}
log.Logger.Infof("ins:[%s#%d] try to report detect info to gm:[%s]", ip, port, gmInfo)
err = gmIns.ReportInstance(reporterInstance.GetDetectType(), jsonInfo)
if err != nil {
log.Logger.Warnf("reporter gm failed. gm_ip:%s, gm_port:%d, err:%s", gmIns.Ip, gmIns.Port, err.Error())
Expand All @@ -318,9 +329,11 @@ func (a *MonitorAgent) ReporterGM(reporterInstance dbutil.DataBaseDetect) error
return err
}
} else {
log.Logger.Debugf("%s#%d reporter gm success. gm info:%s#%d", ip, port, gmIns.Ip, gmIns.Port)
if err = a.reporterBindGM(fmt.Sprintf("%s#%d", gmIns.Ip, gmIns.Port)); err != nil {
log.Logger.Warnf("update agent's bind gm info failed:%s", err.Error())
if err = a.HaDBClient.ReportDBStatus(reporterInstance.GetApp(), a.MonIp, ip, port,
string(reporterInstance.GetDBType()), string(reporterInstance.GetStatus()), gmInfo); err != nil {
log.Logger.Errorf(
"reporter hadb instance status failed. err:%s, ip:%s, port:%d, db_type:%s, status:%s",
err.Error(), ip, port, reporterInstance.GetDBType(), reporterInstance.GetStatus())
}
isReporter = true
gmIns.Mutex.Unlock()
Expand All @@ -330,7 +343,7 @@ func (a *MonitorAgent) ReporterGM(reporterInstance dbutil.DataBaseDetect) error
}

if !isReporter {
err = fmt.Errorf("all gm disconnect")
err = fmt.Errorf("get report GM failed: all gm disconnect")
log.Logger.Error(err.Error())
return err
}
Expand Down Expand Up @@ -400,59 +413,10 @@ func (a *MonitorAgent) registerAgentInfoToHaDB() error {
return nil
}

// moduloHashSharding rehash all instance into detect map, each ip
// only detect the minimum port instance, other instances ignore.
func (a *MonitorAgent) moduloHashSharding(allDbInstance []dbutil.DataBaseDetect) (map[string]dbutil.DataBaseDetect,
error) {
mod, modValue, err := a.HaDBClient.AgentGetHashValue(a.MonIp, a.CityID, a.DetectType, a.Conf.AgentConf.FetchInterval)
if err != nil {
log.Logger.Errorf("get Modulo failed and wait next refresh time. err:%s", err.Error())
return nil, err
}
log.Logger.Debugf("current agent detect dbType[%s], mod[%d], modValue[%d]",
a.DetectType, mod, modValue)
shieldConfig, err := a.HaDBClient.GetShieldConfig(&model.HAShield{
ShieldType: string(model.ShieldSwitch),
})
if err != nil {
log.Logger.Errorf("get shield config failed:%s", err.Error())
return nil, err
}

result := make(map[string]dbutil.DataBaseDetect)
for _, rawIns := range allDbInstance {
rawIp, rawPort := rawIns.GetAddress()
if _, ok := shieldConfig[rawIp]; ok {
log.Logger.Debugf("shield config exist this ip, skip detect :%s", rawIp)
continue
}
if ins, ok := result[rawIp]; !ok {
if util.CRC32(rawIp)%uint32(mod) == uint32(modValue) {
result[rawIp] = rawIns
}
} else {
_, port := ins.GetAddress()
if rawPort < port {
result[rawIp] = ins
}
}
}
return result, nil
}

// reporterHeartbeat send agent heartbeat to HA-DB
func (a *MonitorAgent) reporterHeartbeat() error {
interval := time.Now().Sub(a.heartbeat).Seconds()
err := a.HaDBClient.ReporterAgentHeartbeat(a.MonIp, a.DetectType, int(interval), "N/A")
a.heartbeat = time.Now()
return err
}

// reporterBindGM send bind gm info to hadb
// only agent trigger double check(report GM) should call this
func (a *MonitorAgent) reporterBindGM(gmInfo string) error {
interval := time.Now().Sub(a.heartbeat).Seconds()
err := a.HaDBClient.ReporterAgentHeartbeat(a.MonIp, a.DetectType, int(interval), gmInfo)
err := a.HaDBClient.ReporterAgentHeartbeat(a.MonIp, a.DetectType, int(interval), a.HashMod, a.HashValue)
a.heartbeat = time.Now()
return err
}
Expand Down
11 changes: 7 additions & 4 deletions dbm-services/common/dbha/ha-module/client/hadb.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (c *HaDBClient) GetDBDetectInfo() ([]model.HAAgentLogs, error) {
}

// ReportDBStatus report detected instance's status
func (c *HaDBClient) ReportDBStatus(app, agentIp, ip string, port int, dbType, status string) error {
func (c *HaDBClient) ReportDBStatus(app, agentIp, ip string, port int, dbType, status, bindGM string) error {
var result DbStatusResponse
currentTime := time.Now()

Expand All @@ -179,6 +179,7 @@ func (c *HaDBClient) ReportDBStatus(app, agentIp, ip string, port int, dbType, s
Status: status,
CloudID: c.CloudId,
LastTime: &currentTime,
ReportGM: bindGM,
},
}

Expand Down Expand Up @@ -401,7 +402,7 @@ func (c *HaDBClient) GetAliveHAComponent(module string, interval int) ([]GMInfo,
}

// ReporterAgentHeartbeat report agent heartbeat to ha_status table
func (c *HaDBClient) ReporterAgentHeartbeat(agentIP, detectType string, interval int, gmInfo string) error {
func (c *HaDBClient) ReporterAgentHeartbeat(agentIP, detectType string, interval, mod, modValue int) error {
var result HaStatusResponse

currentTime := time.Now()
Expand All @@ -416,7 +417,8 @@ func (c *HaDBClient) ReporterAgentHeartbeat(agentIP, detectType string, interval
SetArgs: &model.HaStatus{
ReportInterval: interval,
LastTime: &currentTime,
TakeOverGm: gmInfo,
HashMod: &mod,
HashValue: &modValue,
},
}

Expand Down Expand Up @@ -751,7 +753,8 @@ func (c *HaDBClient) AgentGetHashValue(agentIP string, cityID int, dbType string
if !find {
err = fmt.Errorf("bug: can't find in agent list. agentIP:%s, dbType:%s", agentIP, dbType)
log.Logger.Errorf(err.Error())
_ = c.ReporterAgentHeartbeat(agentIP, dbType, interval, "N/A")
//report invalid mod info
_ = c.ReporterAgentHeartbeat(agentIP, dbType, interval, 0, 0)

return mod, modValue, err
}
Expand Down
4 changes: 2 additions & 2 deletions dbm-services/common/dbha/ha-module/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,10 @@ func IntSlice2String(elements []int, sep string) string {
// GraceStructString grace struct info to string
func GraceStructString(v interface{}) string {
// 使用 json.MarshalIndent 序列化结构体,便于阅读
data, err := json.MarshalIndent(v, "", " ")
data, err := json.Marshal(v)
if err != nil {
log.Logger.Debugf("Failed to marshal struct: %v", err)
return ""
}
return fmt.Sprintf("%s", data)
return string(data)
}
1 change: 1 addition & 0 deletions dbm-services/common/dbha/hadb-api/model/HAAgentLogs.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type HAAgentLogs struct {
Status string `gorm:"column:status;type:varchar(32);NOT NULL" json:"status,omitempty"`
CloudID int `gorm:"column:cloud_id;type:int(11);NOT NULL;default:0" json:"cloud_id,omitempty"`
LastTime *time.Time `gorm:"column:last_time;type:datetime;default:CURRENT_TIMESTAMP;NOT NULL" json:"last_time,omitempty"`
ReportGM string `gorm:"column:report_gm;type:varchar(32)" json:"report_gm,omitempty"`
}

// TableName TODO
Expand Down
2 changes: 2 additions & 0 deletions dbm-services/common/dbha/hadb-api/model/HAStatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ type HaStatus struct {
Status string `gorm:"column:status;type:varchar(32);NOT NULL" json:"status,omitempty"`
TakeOverGm string `gorm:"column:take_over_gm;type:varchar(32)" json:"take_over_gm,omitempty"`
ReportInterval int `gorm:"column:report_interval;type:tinyint" json:"report_interval,omitempty"`
HashMod *int `gorm:"column:hash_mod;type:int(11)" json:"hash_mod,omitempty"`
HashValue *int `gorm:"column:hash_value;type:int(11)" json:"hash_value,omitempty"`
}

// TableName TODO
Expand Down
5 changes: 5 additions & 0 deletions dbm-services/common/dbha/hadb-api/model/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ func InitHaDB() *gorm.DB {
log.Logger.Errorf("connect to %s%d failed:%s", haDBInfo.Host, haDBInfo.Port, err.Error())
}

//should do this, otherwise go time.Time to mysql datetime may cause error 1292
//the real causes sql_mode is STRICT_TRANS_TABLES
log.Logger.Debugf("set sql_mode to null")
hadb.Exec("set sql_mode=''")

err = DoAutoMigrate(hadb)
if err != nil {
log.Logger.Errorf("hadb auto migrate failed, err:%s", err.Error())
Expand Down
Loading