Skip to content

Commit

Permalink
write data partition decommission url message to log
Browse files Browse the repository at this point in the history
Signed-off-by: zhuhyc <zzhniy.163.niy@163.com>
  • Loading branch information
zhuhyc authored and awzhgw committed Jul 2, 2019
1 parent 8cb94af commit 2520f73
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 15 deletions.
14 changes: 8 additions & 6 deletions master/api_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@ func (m *Server) decommissionDisk(w http.ResponseWriter, r *http.Request) {
for _, bdp := range badPartitions {
badPartitionIds = append(badPartitionIds, bdp.PartitionID)
}
rstMsg = fmt.Sprintf("recive decommissionDisk node[%v] disk[%v], badPartitionIds[%v] has offline successfully",
rstMsg = fmt.Sprintf("receive decommissionDisk node[%v] disk[%v], badPartitionIds[%v] has offline successfully",
node.Addr, diskPath, badPartitionIds)
if err = m.cluster.decommissionDisk(node, diskPath, badPartitions); err != nil {
sendErrReply(w, r, newErrHTTPReply(err))
Expand Down Expand Up @@ -1142,12 +1142,14 @@ func getMetaPartitionView(mp *MetaPartition) (mpView *proto.MetaPartitionView) {
mpView = proto.NewMetaPartitionView(mp.PartitionID, mp.Start, mp.End, mp.Status)
mp.Lock()
defer mp.Unlock()
for _, metaReplica := range mp.Replicas {
mpView.Members = append(mpView.Members, metaReplica.Addr)
if metaReplica.IsLeader {
mpView.LeaderAddr = metaReplica.Addr
}
for _, host := range mp.Hosts {
mpView.Members = append(mpView.Members, host)
}
mr, err := mp.getMetaReplicaLeader()
if err != nil {
return
}
mpView.LeaderAddr = mr.Addr
return
}

Expand Down
10 changes: 8 additions & 2 deletions master/data_partition_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (partition *DataPartition) checkReplicaStatus(timeOutSec int64) {
}

// Check if there is any missing replica for a data partition.
func (partition *DataPartition) checkMissingReplicas(clusterID string, dataPartitionMissSec, dataPartitionWarnInterval int64) {
func (partition *DataPartition) checkMissingReplicas(clusterID, leaderAddr string, dataPartitionMissSec, dataPartitionWarnInterval int64) {
partition.Lock()
defer partition.Unlock()
for _, replica := range partition.Replicas {
Expand All @@ -95,6 +95,8 @@ func (partition *DataPartition) checkMissingReplicas(clusterID string, dataParti
"miss time > %v lastRepostTime:%v dnodeLastReportTime:%v nodeisActive:%v So Migrate by manual",
clusterID, partition.PartitionID, replica.Addr, dataPartitionMissSec, replica.ReportTime, lastReportTime, isActive)
Warn(clusterID, msg)
msg = fmt.Sprintf("decommissionDataPartitionURL is http://%v/dataPartition/decommission?id=%v&addr=%v", leaderAddr, partition.PartitionID, replica.Addr)
Warn(clusterID, msg)
}
}

Expand All @@ -103,6 +105,8 @@ func (partition *DataPartition) checkMissingReplicas(clusterID string, dataParti
msg := fmt.Sprintf("action[checkMissErr],clusterID[%v] partitionID:%v on Node:%v "+
"miss time > :%v but server not exsit So Migrate", clusterID, partition.PartitionID, addr, dataPartitionMissSec)
Warn(clusterID, msg)
msg = fmt.Sprintf("decommissionDataPartitionURL is http://%v/dataPartition/decommission?id=%v&addr=%v", leaderAddr, partition.PartitionID, addr)
Warn(clusterID, msg)
}
}
}
Expand Down Expand Up @@ -132,7 +136,7 @@ func (partition *DataPartition) hasMissingDataPartition(addr string) (isMissing
return
}

func (partition *DataPartition) checkDiskError(clusterID string) (diskErrorAddrs []string) {
func (partition *DataPartition) checkDiskError(clusterID, leaderAddr string) (diskErrorAddrs []string) {
diskErrorAddrs = make([]string, 0)
partition.Lock()
defer partition.Unlock()
Expand All @@ -154,6 +158,8 @@ func (partition *DataPartition) checkDiskError(clusterID string) (diskErrorAddrs
msg := fmt.Sprintf("action[%v],clusterID[%v],partitionID:%v On :%v Disk Error,So Remove it From RocksDBHost",
checkDataPartitionDiskErr, clusterID, partition.PartitionID, diskAddr)
Warn(clusterID, msg)
msg = fmt.Sprintf("decommissionDataPartitionURL is http://%v/dataPartition/decommission?id=%v&addr=%v", leaderAddr, partition.PartitionID, diskAddr)
Warn(clusterID, msg)
}

return
Expand Down
6 changes: 3 additions & 3 deletions master/id_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (alloc *IDAllocator) allocateDataPartitionID() (partitionID uint64, err err
alloc.setDataPartitionID(partitionID)
return
errHandler:
log.LogError("action[allocateDataPartitionID] err:%v", err.Error())
log.LogErrorf("action[allocateDataPartitionID] err:%v", err.Error())
return
}

Expand All @@ -159,7 +159,7 @@ func (alloc *IDAllocator) allocateMetaPartitionID() (partitionID uint64, err err
alloc.setMetaPartitionID(partitionID)
return
errHandler:
log.LogError("action[allocateMetaPartitionID] err:%v", err.Error())
log.LogErrorf("action[allocateMetaPartitionID] err:%v", err.Error())
return
}

Expand All @@ -183,6 +183,6 @@ func (alloc *IDAllocator) allocateCommonID() (id uint64, err error) {
alloc.setCommonID(id)
return
errHandler:
log.LogError("action[allocateCommonID] err:%v", err.Error())
log.LogErrorf("action[allocateCommonID] err:%v", err.Error())
return
}
6 changes: 5 additions & 1 deletion master/meta_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ func (mp *MetaPartition) shouldReportMissingReplica(addr string, interval int64)
return false
}

func (mp *MetaPartition) reportMissingReplicas(clusterID string, seconds, interval int64) {
func (mp *MetaPartition) reportMissingReplicas(clusterID, leaderAddr string, seconds, interval int64) {
mp.Lock()
defer mp.Unlock()
for _, replica := range mp.Replicas {
Expand All @@ -398,6 +398,8 @@ func (mp *MetaPartition) reportMissingReplicas(clusterID string, seconds, interv
"miss time > :%v vlocLastRepostTime:%v dnodeLastReportTime:%v nodeisActive:%v",
clusterID, mp.volName, mp.PartitionID, replica.Addr, seconds, replica.ReportTime, lastReportTime, isActive)
Warn(clusterID, msg)
msg = fmt.Sprintf("decommissionMetaPartitionURL is http://%v/dataPartition/decommission?id=%v&addr=%v", leaderAddr, mp.PartitionID, replica.Addr)
Warn(clusterID, msg)
}
}

Expand All @@ -407,6 +409,8 @@ func (mp *MetaPartition) reportMissingReplicas(clusterID string, seconds, interv
"miss time > %v ",
clusterID, mp.volName, mp.PartitionID, addr, defaultMetaPartitionTimeOutSec)
Warn(clusterID, msg)
msg = fmt.Sprintf("decommissionMetaPartitionURL is http://%v/dataPartition/decommission?id=%v&addr=%v", leaderAddr, mp.PartitionID, addr)
Warn(clusterID, msg)
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions master/vol.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,12 @@ func (vol *Vol) checkDataPartitions(c *Cluster) (cnt int) {
dp.checkReplicaStatus(c.cfg.DataPartitionTimeOutSec)
dp.checkStatus(c.Name, true, c.cfg.DataPartitionTimeOutSec)

dp.checkMissingReplicas(c.Name, c.cfg.MissingDataPartitionInterval, c.cfg.IntervalToAlarmMissingDataPartition)
dp.checkMissingReplicas(c.Name, c.leaderInfo.addr, c.cfg.MissingDataPartitionInterval, c.cfg.IntervalToAlarmMissingDataPartition)
dp.checkReplicaNum(c, vol.Name)
if dp.Status == proto.ReadWrite {
cnt++
}
dp.checkDiskError(c.Name)
dp.checkDiskError(c.Name, c.leaderInfo.addr)
tasks := dp.checkReplicationTask(c.Name, vol.dataPartitionSize)
if len(tasks) != 0 {
c.addDataNodeTasks(tasks)
Expand Down Expand Up @@ -192,7 +192,7 @@ func (vol *Vol) checkMetaPartitions(c *Cluster) {
mp.checkLeader()
mp.checkReplicaNum(c, vol.Name, vol.mpReplicaNum)
mp.checkEnd(c, maxPartitionID)
mp.reportMissingReplicas(c.Name, defaultMetaPartitionTimeOutSec, defaultIntervalToAlarmMissingMetaPartition)
mp.reportMissingReplicas(c.Name, c.leaderInfo.addr, defaultMetaPartitionTimeOutSec, defaultIntervalToAlarmMissingMetaPartition)
tasks = append(tasks, mp.replicaCreationTasks(c.Name, vol.Name)...)
}
c.addMetaNodeTasks(tasks)
Expand Down

0 comments on commit 2520f73

Please sign in to comment.