Skip to content

Commit

Permalink
small cleanup
Browse files Browse the repository at this point in the history
Signed-off-by: rleungx <rleungx@gmail.com>
  • Loading branch information
rleungx committed Feb 14, 2019
1 parent 70d8405 commit 5aa5c41
Show file tree
Hide file tree
Showing 21 changed files with 74 additions and 74 deletions.
4 changes: 2 additions & 2 deletions server/api/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ func (h *memberHandler) getMembers() (*pdpb.GetMembersResponse, error) {
// Fill leader priorities.
for _, m := range members.GetMembers() {
if h.svr.GetEtcdLeader() == 0 {
log.L().Warn("no etcd leader, skip get leader priority", zap.Uint64("member", m.GetMemberId()))
log.Warn("no etcd leader, skip get leader priority", zap.Uint64("member", m.GetMemberId()))
continue
}
leaderPriority, e := h.svr.GetMemberLeaderPriority(m.GetMemberId())
if e != nil {
log.L().Error("failed to load leader priority", zap.Uint64("member", m.GetMemberId()), zap.Error(err))
log.Error("failed to load leader priority", zap.Uint64("member", m.GetMemberId()), zap.Error(err))
continue
}
m.LeaderPriority = int32(leaderPriority)
Expand Down
8 changes: 4 additions & 4 deletions server/api/redirector.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (h *redirector) ServeHTTP(w http.ResponseWriter, r *http.Request, next http

// Prevent more than one redirection.
if name := r.Header.Get(redirectorHeader); len(name) != 0 {
log.L().Error("redirect but server is not leader", zap.String("from", name), zap.String("server", h.s.Name()))
log.Error("redirect but server is not leader", zap.String("from", name), zap.String("server", h.s.Name()))
http.Error(w, errRedirectToNotLeader, http.StatusInternalServerError)
return
}
Expand Down Expand Up @@ -95,21 +95,21 @@ func (p *customReverseProxies) ServeHTTP(w http.ResponseWriter, r *http.Request)

resp, err := p.client.Do(r)
if err != nil {
log.L().Error(fmt.Sprintf("%+v", err))
log.Error(fmt.Sprintf("%+v", err))
continue
}

b, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
log.L().Error(fmt.Sprintf("%+v", err))
log.Error(fmt.Sprintf("%+v", err))
continue
}

copyHeader(w.Header(), resp.Header)
w.WriteHeader(resp.StatusCode)
if _, err := w.Write(b); err != nil {
log.L().Error(fmt.Sprintf("%+v", err))
log.Error(fmt.Sprintf("%+v", err))
continue
}

Expand Down
2 changes: 1 addition & 1 deletion server/api/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
// If the error is nil, this also responds with a 500 and logs at the error level.
func errorResp(rd *render.Render, w http.ResponseWriter, err error) {
if err == nil {
log.L().Error("nil is given to errorResp")
log.Error("nil is given to errorResp")
rd.JSON(w, http.StatusInternalServerError, "nil error")
return
}
Expand Down
10 changes: 5 additions & 5 deletions server/region_syncer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,23 +104,23 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
return
}
}
log.L().Error("server failed to establish sync stream with leader", zap.String("server", s.server.Name()), zap.String("leader", s.server.GetLeader().GetName()), zap.Error(err))
log.Error("server failed to establish sync stream with leader", zap.String("server", s.server.Name()), zap.String("leader", s.server.GetLeader().GetName()), zap.Error(err))
time.Sleep(time.Second)
continue
}
log.L().Info("server starts to synchronize with leader", zap.String("server", s.server.Name()), zap.String("leader", s.server.GetLeader().GetName()), zap.Uint64("request-index", s.history.GetNextIndex()))
log.Info("server starts to synchronize with leader", zap.String("server", s.server.Name()), zap.String("leader", s.server.GetLeader().GetName()), zap.Uint64("request-index", s.history.GetNextIndex()))
for {
resp, err := client.Recv()
if err != nil {
log.L().Error("region sync with leader meet error", zap.Error(err))
log.Error("region sync with leader meet error", zap.Error(err))
if err = client.CloseSend(); err != nil {
log.L().Error("failed to terminate client stream", zap.Error(err))
log.Error("failed to terminate client stream", zap.Error(err))
}
time.Sleep(time.Second)
break
}
if s.history.GetNextIndex() != resp.GetStartIndex() {
log.L().Warn("server sync index not match the leader",
log.Warn("server sync index not match the leader",
zap.String("server", s.server.Name()),
zap.Uint64("own", s.history.GetNextIndex()),
zap.Uint64("leader", resp.GetStartIndex()),
Expand Down
8 changes: 4 additions & 4 deletions server/region_syncer/history_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,22 +134,22 @@ func (h *historyBuffer) get(index uint64) *core.RegionInfo {
func (h *historyBuffer) reload() {
v, err := h.kv.Load(historyKey)
if err != nil {
log.L().Warn("load history index failed", zap.Error(err))
log.Warn("load history index failed", zap.Error(err))
}
if v != "" {
h.index, err = strconv.ParseUint(v, 10, 64)
if err != nil {
log.L().Fatal("load history index failed", zap.Error(err))
log.Fatal("load history index failed", zap.Error(err))
}
}
log.L().Info("start from history index", zap.Uint64("start-index", h.firstIndex()))
log.Info("start from history index", zap.Uint64("start-index", h.firstIndex()))
}

func (h *historyBuffer) persist() {
regionSyncerStatus.WithLabelValues("first_index").Set(float64(h.firstIndex()))
regionSyncerStatus.WithLabelValues("last_index").Set(float64(h.nextIndex()))
err := h.kv.Save(historyKey, strconv.FormatUint(h.nextIndex(), 10))
if err != nil {
log.L().Warn("persist history index failed", zap.Uint64("persist-index", h.nextIndex()), zap.Error(err))
log.Warn("persist history index failed", zap.Uint64("persist-index", h.nextIndex()), zap.Error(err))
}
}
18 changes: 9 additions & 9 deletions server/region_syncer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (s *RegionSyncer) RunServer(regionNotifier <-chan *core.RegionInfo, quit ch
for {
select {
case <-quit:
log.L().Info("exit region syncer")
log.Info("exit region syncer")
return
case first := <-regionNotifier:
requests = append(requests, first.GetMeta())
Expand Down Expand Up @@ -141,7 +141,7 @@ func (s *RegionSyncer) Sync(stream pdpb.PD_SyncRegionsServer) error {
if clusterID != s.server.ClusterID() {
return status.Errorf(codes.FailedPrecondition, "mismatch cluster id, need %d but got %d", s.server.ClusterID(), clusterID)
}
log.L().Info("establish sync region stream",
log.Info("establish sync region stream",
zap.String("requested-server", request.GetMember().GetName()),
zap.String("url", request.GetMember().GetClientUrls()[0]))

Expand All @@ -159,7 +159,7 @@ func (s *RegionSyncer) syncHistoryRegion(request *pdpb.SyncRegionRequest, stream
records := s.history.RecordsFrom(startIndex)
if len(records) == 0 {
if s.history.GetNextIndex() == startIndex {
log.L().Info("requested server has already in sync with server",
log.Info("requested server has already in sync with server",
zap.String("requested-server", name), zap.String("server", s.server.Name()), zap.Uint64("last-index", startIndex))
return nil
}
Expand All @@ -182,18 +182,18 @@ func (s *RegionSyncer) syncHistoryRegion(request *pdpb.SyncRegionRequest, stream
s.limit.Wait(int64(resp.Size()))
lastIndex += len(res)
if err := stream.Send(resp); err != nil {
log.L().Error("failed to send sync region response", zap.Error(err))
log.Error("failed to send sync region response", zap.Error(err))
}
res = res[:0]
}
log.L().Info("requested server has completed full synchronization with server",
log.Info("requested server has completed full synchronization with server",
zap.String("requested-server", name), zap.String("server", s.server.Name()), zap.Duration("cost", time.Since(start)))
return nil
}
log.L().Warn("no history regions from index, the leader may be restarted", zap.Uint64("index", startIndex))
log.Warn("no history regions from index, the leader may be restarted", zap.Uint64("index", startIndex))
return nil
}
log.L().Info("sync the history regions with server",
log.Info("sync the history regions with server",
zap.String("server", name),
zap.Uint64("from-index", startIndex),
zap.Uint64("last-index", s.history.GetNextIndex()),
Expand Down Expand Up @@ -223,7 +223,7 @@ func (s *RegionSyncer) broadcast(regions *pdpb.SyncRegionResponse) {
for name, sender := range s.streams {
err := sender.Send(regions)
if err != nil {
log.L().Error("region syncer send data meet error", zap.Error(err))
log.Error("region syncer send data meet error", zap.Error(err))
failed = append(failed, name)
}
}
Expand All @@ -232,7 +232,7 @@ func (s *RegionSyncer) broadcast(regions *pdpb.SyncRegionResponse) {
s.Lock()
for _, name := range failed {
delete(s.streams, name)
log.L().Info("region syncer delete the stream", zap.String("stream", name))
log.Info("region syncer delete the stream", zap.String("stream", name))
}
s.Unlock()
}
Expand Down
2 changes: 1 addition & 1 deletion server/schedule/merge_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (m *MergeChecker) Check(region *core.RegionInfo) []*Operator {
}

checkerCounter.WithLabelValues("merge_checker", "new_operator").Inc()
log.L().Debug("try to merge region", zap.Reflect("from", core.HexRegionMeta(region.GetMeta())), zap.Reflect("to", core.HexRegionMeta(target.GetMeta())))
log.Debug("try to merge region", zap.Reflect("from", core.HexRegionMeta(region.GetMeta())), zap.Reflect("to", core.HexRegionMeta(target.GetMeta())))
ops, err := CreateMergeRegionOperator("merge-region", m.cluster, region, target, OpMerge)
if err != nil {
return nil
Expand Down
2 changes: 1 addition & 1 deletion server/schedule/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (mc *MockCluster) RandHotRegionFromStore(store uint64, kind FlowKind) *core
func (mc *MockCluster) AllocPeer(storeID uint64) (*metapb.Peer, error) {
peerID, err := mc.allocID()
if err != nil {
log.L().Error("failed to alloc peer", zap.Error(err))
log.Error("failed to alloc peer", zap.Error(err))
return nil, err
}
peer := &metapb.Peer{
Expand Down
4 changes: 2 additions & 2 deletions server/schedule/namespace_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (n *NamespaceChecker) Check(region *core.RegionInfo) *Operator {
if n.isExists(targetStores, peer.StoreId) {
continue
}
log.L().Debug("peer is not located in namespace target stores", zap.Uint64("region", region.GetID()), zap.Reflect("peer", peer))
log.Debug("peer is not located in namespace target stores", zap.Uint64("region", region.GetID()), zap.Reflect("peer", peer))
newPeer := n.SelectBestPeerToRelocate(region, targetStores)
if newPeer == nil {
checkerCounter.WithLabelValues("namespace_checker", "no_target_peer").Inc()
Expand All @@ -82,7 +82,7 @@ func (n *NamespaceChecker) Check(region *core.RegionInfo) *Operator {
func (n *NamespaceChecker) SelectBestPeerToRelocate(region *core.RegionInfo, targets []*core.StoreInfo) *metapb.Peer {
storeID := n.SelectBestStoreToRelocate(region, targets)
if storeID == 0 {
log.L().Debug("has no best store to relocate", zap.Uint64("region", region.GetID()))
log.Debug("has no best store to relocate", zap.Uint64("region", region.GetID()))
return nil
}
newPeer, err := n.cluster.AllocPeer(storeID)
Expand Down
8 changes: 4 additions & 4 deletions server/schedule/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (ap AddPeer) String() string {
func (ap AddPeer) IsFinish(region *core.RegionInfo) bool {
if p := region.GetStoreVoter(ap.ToStore); p != nil {
if p.GetId() != ap.PeerID {
log.L().Warn("obtain unexpected peer", zap.String("expect", ap.String()), zap.Uint64("obtain voter", p.GetId()))
log.Warn("obtain unexpected peer", zap.String("expect", ap.String()), zap.Uint64("obtain-voter", p.GetId()))
return false
}
return region.GetPendingVoter(p.GetId()) == nil
Expand Down Expand Up @@ -111,7 +111,7 @@ func (al AddLearner) String() string {
func (al AddLearner) IsFinish(region *core.RegionInfo) bool {
if p := region.GetStoreLearner(al.ToStore); p != nil {
if p.GetId() != al.PeerID {
log.L().Warn("obtain unexpected peer", zap.String("expect", al.String()), zap.Uint64("obtain learner", p.GetId()))
log.Warn("obtain unexpected peer", zap.String("expect", al.String()), zap.Uint64("obtain-learner", p.GetId()))
return false
}
return region.GetPendingLearner(p.GetId()) == nil
Expand Down Expand Up @@ -140,7 +140,7 @@ func (pl PromoteLearner) String() string {
func (pl PromoteLearner) IsFinish(region *core.RegionInfo) bool {
if p := region.GetStoreVoter(pl.ToStore); p != nil {
if p.GetId() != pl.PeerID {
log.L().Warn("obtain unexpected peer", zap.String("expect", pl.String()), zap.Uint64("obtain voter", p.GetId()))
log.Warn("obtain unexpected peer", zap.String("expect", pl.String()), zap.Uint64("obtain-voter", p.GetId()))
}
return p.GetId() == pl.PeerID
}
Expand Down Expand Up @@ -522,7 +522,7 @@ func matchPeerSteps(cluster Cluster, source *core.RegionInfo, target *core.Regio

peer, err := cluster.AllocPeer(storeID)
if err != nil {
log.L().Debug("peer alloc failed", zap.Error(err))
log.Debug("peer alloc failed", zap.Error(err))
return nil, kind, err
}
if cluster.IsRaftLearnerEnabled() {
Expand Down
18 changes: 9 additions & 9 deletions server/schedule/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,13 @@ func (oc *OperatorController) Dispatch(region *core.RegionInfo) {
return
}
if op.IsFinish() {
log.L().Info("operator finish", zap.Uint64("region", region.GetID()), zap.Reflect("operator", op))
log.Info("operator finish", zap.Uint64("region", region.GetID()), zap.Reflect("operator", op))
operatorCounter.WithLabelValues(op.Desc(), "finish").Inc()
operatorDuration.WithLabelValues(op.Desc()).Observe(op.ElapsedTime().Seconds())
oc.pushHistory(op)
oc.RemoveOperator(op)
} else if timeout {
log.L().Info("operator timeout", zap.Uint64("region", region.GetID()), zap.Reflect("operator", op))
log.Info("operator timeout", zap.Uint64("region", region.GetID()), zap.Reflect("operator", op))
oc.RemoveOperator(op)
}
}
Expand Down Expand Up @@ -103,15 +103,15 @@ func (oc *OperatorController) AddOperator(ops ...*Operator) bool {
func (oc *OperatorController) checkAddOperator(op *Operator) bool {
region := oc.cluster.GetRegion(op.RegionID())
if region == nil {
log.L().Debug("region not found, cancel add operator", zap.Uint64("region", op.RegionID()))
log.Debug("region not found, cancel add operator", zap.Uint64("region", op.RegionID()))
return false
}
if region.GetRegionEpoch().GetVersion() != op.RegionEpoch().GetVersion() || region.GetRegionEpoch().GetConfVer() != op.RegionEpoch().GetConfVer() {
log.L().Debug("region epoch not match, cancel add operator", zap.Uint64("region", op.RegionID()), zap.Reflect("old", region.GetRegionEpoch()), zap.Reflect("new", op.RegionEpoch()))
log.Debug("region epoch not match, cancel add operator", zap.Uint64("region", op.RegionID()), zap.Reflect("old", region.GetRegionEpoch()), zap.Reflect("new", op.RegionEpoch()))
return false
}
if old := oc.operators[op.RegionID()]; old != nil && !isHigherPriorityOperator(op, old) {
log.L().Debug("already have operator, cancel add operator", zap.Uint64("region", op.RegionID()), zap.Reflect("old", old))
log.Debug("already have operator, cancel add operator", zap.Uint64("region", op.RegionID()), zap.Reflect("old", old))
return false
}
return true
Expand All @@ -124,12 +124,12 @@ func isHigherPriorityOperator(new, old *Operator) bool {
func (oc *OperatorController) addOperatorLocked(op *Operator) bool {
regionID := op.RegionID()

log.L().Info("add operator", zap.Uint64("region", regionID), zap.Reflect("operator", op))
log.Info("add operator", zap.Uint64("region", regionID), zap.Reflect("operator", op))

// If there is an old operator, replace it. The priority should be checked
// already.
if old, ok := oc.operators[regionID]; ok {
log.L().Info("replace old operator", zap.Uint64("region", regionID), zap.Reflect("operator", old))
log.Info("replace old operator", zap.Uint64("region", regionID), zap.Reflect("operator", old))
operatorCounter.WithLabelValues(old.Desc(), "replaced").Inc()
oc.removeOperatorLocked(old)
}
Expand Down Expand Up @@ -183,7 +183,7 @@ func (oc *OperatorController) GetOperators() []*Operator {

// SendScheduleCommand sends a command to the region.
func (oc *OperatorController) SendScheduleCommand(region *core.RegionInfo, step OperatorStep) {
log.L().Info("send schedule command", zap.Uint64("region", region.GetID()), zap.Reflect("step", step))
log.Info("send schedule command", zap.Uint64("region", region.GetID()), zap.Reflect("step", step))
switch st := step.(type) {
case TransferLeader:
cmd := &pdpb.RegionHeartbeatResponse{
Expand Down Expand Up @@ -261,7 +261,7 @@ func (oc *OperatorController) SendScheduleCommand(region *core.RegionInfo, step
}
oc.hbStreams.SendMsg(region, cmd)
default:
log.L().Error("unknown operator step", zap.Reflect("step", step))
log.Error("unknown operator step", zap.Reflect("step", step))
}
}

Expand Down
Loading

0 comments on commit 5aa5c41

Please sign in to comment.