Skip to content

Commit

Permalink
*: log format for schedule, scheduler, syncer, api (#1423)
Browse files Browse the repository at this point in the history
* log format for schedule, scheduler, syncer, api

Signed-off-by: rleungx <rleungx@gmail.com>
  • Loading branch information
rleungx authored and nolouch committed Feb 15, 2019
1 parent 225948c commit 41f5261
Show file tree
Hide file tree
Showing 23 changed files with 138 additions and 112 deletions.
7 changes: 4 additions & 3 deletions server/api/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ import (

"github.com/gorilla/mux"
"github.com/pingcap/kvproto/pkg/pdpb"
log "github.com/pingcap/log"
"github.com/pingcap/pd/pkg/etcdutil"
"github.com/pingcap/pd/server"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/unrolled/render"
"go.uber.org/zap"
)

type memberHandler struct {
Expand Down Expand Up @@ -58,12 +59,12 @@ func (h *memberHandler) getMembers() (*pdpb.GetMembersResponse, error) {
// Fill leader priorities.
for _, m := range members.GetMembers() {
if h.svr.GetEtcdLeader() == 0 {
log.Warnf("no etcd leader, skip get leader priority, member: %v", m.GetMemberId())
log.Warn("no etcd leader, skip get leader priority", zap.Uint64("member", m.GetMemberId()))
continue
}
leaderPriority, e := h.svr.GetMemberLeaderPriority(m.GetMemberId())
if e != nil {
log.Errorf("failed to load leader priority, member: %v, err: %v", m.GetMemberId(), e)
log.Error("failed to load leader priority", zap.Uint64("member", m.GetMemberId()), zap.Error(err))
continue
}
m.LeaderPriority = int32(leaderPriority)
Expand Down
12 changes: 7 additions & 5 deletions server/api/redirector.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@
package api

import (
"fmt"
"io/ioutil"
"net/http"
"net/url"
"strings"

log "github.com/pingcap/log"
"github.com/pingcap/pd/server"
log "github.com/sirupsen/logrus"
"go.uber.org/zap"
)

const (
Expand Down Expand Up @@ -48,7 +50,7 @@ func (h *redirector) ServeHTTP(w http.ResponseWriter, r *http.Request, next http

// Prevent more than one redirection.
if name := r.Header.Get(redirectorHeader); len(name) != 0 {
log.Errorf("redirect from %v, but %v is not leader", name, h.s.Name())
log.Error("redirect but server is not leader", zap.String("from", name), zap.String("server", h.s.Name()))
http.Error(w, errRedirectToNotLeader, http.StatusInternalServerError)
return
}
Expand Down Expand Up @@ -93,21 +95,21 @@ func (p *customReverseProxies) ServeHTTP(w http.ResponseWriter, r *http.Request)

resp, err := p.client.Do(r)
if err != nil {
log.Error(err)
log.Error(fmt.Sprintf("%+v", err))
continue
}

b, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
log.Error(err)
log.Error(fmt.Sprintf("%+v", err))
continue
}

copyHeader(w.Header(), resp.Header)
w.WriteHeader(resp.StatusCode)
if _, err := w.Write(b); err != nil {
log.Error(err)
log.Error(fmt.Sprintf("%+v", err))
continue
}

Expand Down
4 changes: 2 additions & 2 deletions server/api/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import (
"net/http"

"github.com/pingcap/errcode"
log "github.com/pingcap/log"
"github.com/pingcap/pd/pkg/apiutil"
"github.com/pingcap/pd/server"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/unrolled/render"
)

Expand All @@ -36,7 +36,7 @@ import (
// If the error is nil, this also responds with a 500 and logs at the error level.
func errorResp(rd *render.Render, w http.ResponseWriter, err error) {
if err == nil {
log.Errorf("nil given to errorResp")
log.Error("nil is given to errorResp")
rd.JSON(w, http.StatusInternalServerError, "nil error")
return
}
Expand Down
18 changes: 11 additions & 7 deletions server/region_syncer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ import (
"time"

"github.com/pingcap/kvproto/pkg/pdpb"
log "github.com/pingcap/log"
"github.com/pingcap/pd/server/core"
log "github.com/sirupsen/logrus"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -103,24 +104,27 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
return
}
}
log.Errorf("%s failed to establish sync stream with leader %s: %s", s.server.Name(), s.server.GetLeader().GetName(), err)
log.Error("server failed to establish sync stream with leader", zap.String("server", s.server.Name()), zap.String("leader", s.server.GetLeader().GetName()), zap.Error(err))
time.Sleep(time.Second)
continue
}
log.Infof("%s start sync with leader %s, the request index is %d", s.server.Name(), s.server.GetLeader().GetName(), s.history.GetNextIndex())
log.Info("server starts to synchronize with leader", zap.String("server", s.server.Name()), zap.String("leader", s.server.GetLeader().GetName()), zap.Uint64("request-index", s.history.GetNextIndex()))
for {
resp, err := client.Recv()
if err != nil {
log.Error("region sync with leader meet error:", err)
log.Error("region sync with leader meet error", zap.Error(err))
if err = client.CloseSend(); err != nil {
log.Errorf("Failed to terminate client stream: %v", err)
log.Error("failed to terminate client stream", zap.Error(err))
}
time.Sleep(time.Second)
break
}
if s.history.GetNextIndex() != resp.GetStartIndex() {
log.Warnf("%s sync index not match the leader, own: %d, leader: %d, records length: %d",
s.server.Name(), s.history.GetNextIndex(), resp.GetStartIndex(), len(resp.GetRegions()))
log.Warn("server sync index not match the leader",
zap.String("server", s.server.Name()),
zap.Uint64("own", s.history.GetNextIndex()),
zap.Uint64("leader", resp.GetStartIndex()),
zap.Int("records-length", len(resp.GetRegions())))
// reset index
s.history.ResetWithIndex(resp.GetStartIndex())
}
Expand Down
11 changes: 6 additions & 5 deletions server/region_syncer/history_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ import (
"strconv"
"sync"

log "github.com/pingcap/log"
"github.com/pingcap/pd/server/core"
log "github.com/sirupsen/logrus"
"go.uber.org/zap"
)

const (
Expand Down Expand Up @@ -133,22 +134,22 @@ func (h *historyBuffer) get(index uint64) *core.RegionInfo {
func (h *historyBuffer) reload() {
v, err := h.kv.Load(historyKey)
if err != nil {
log.Warnf("load history index failed: %s", err)
log.Warn("load history index failed", zap.Error(err))
}
if v != "" {
h.index, err = strconv.ParseUint(v, 10, 64)
if err != nil {
log.Fatalf("load history index failed: %s", err)
log.Fatal("load history index failed", zap.Error(err))
}
}
log.Info("history index start at: ", h.firstIndex())
log.Info("start from history index", zap.Uint64("start-index", h.firstIndex()))
}

func (h *historyBuffer) persist() {
regionSyncerStatus.WithLabelValues("first_index").Set(float64(h.firstIndex()))
regionSyncerStatus.WithLabelValues("last_index").Set(float64(h.nextIndex()))
err := h.kv.Save(historyKey, strconv.FormatUint(h.nextIndex(), 10))
if err != nil {
log.Warnf("persist history index (%d) failed: %v", h.nextIndex(), err)
log.Warn("persist history index failed", zap.Uint64("persist-index", h.nextIndex()), zap.Error(err))
}
}
28 changes: 18 additions & 10 deletions server/region_syncer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ import (
"github.com/juju/ratelimit"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
log "github.com/pingcap/log"
"github.com/pingcap/pd/server/core"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
Expand Down Expand Up @@ -140,7 +141,9 @@ func (s *RegionSyncer) Sync(stream pdpb.PD_SyncRegionsServer) error {
if clusterID != s.server.ClusterID() {
return status.Errorf(codes.FailedPrecondition, "mismatch cluster id, need %d but got %d", s.server.ClusterID(), clusterID)
}
log.Infof("establish sync region stream with %s [%s]", request.GetMember().GetName(), request.GetMember().GetClientUrls()[0])
log.Info("establish sync region stream",
zap.String("requested-server", request.GetMember().GetName()),
zap.String("url", request.GetMember().GetClientUrls()[0]))

err = s.syncHistoryRegion(request, stream)
if err != nil {
Expand All @@ -156,7 +159,8 @@ func (s *RegionSyncer) syncHistoryRegion(request *pdpb.SyncRegionRequest, stream
records := s.history.RecordsFrom(startIndex)
if len(records) == 0 {
if s.history.GetNextIndex() == startIndex {
log.Infof("%s already in sync with %s, the last index is %d", name, s.server.Name(), startIndex)
log.Info("requested server has already in sync with server",
zap.String("requested-server", name), zap.String("server", s.server.Name()), zap.Uint64("last-index", startIndex))
return nil
}
// do full synchronization
Expand All @@ -178,18 +182,22 @@ func (s *RegionSyncer) syncHistoryRegion(request *pdpb.SyncRegionRequest, stream
s.limit.Wait(int64(resp.Size()))
lastIndex += len(res)
if err := stream.Send(resp); err != nil {
log.Errorf("failed to send sync region response, error: %v", err)
log.Error("failed to send sync region response", zap.Error(err))
}
res = res[:0]
}
log.Infof("%s has completed full synchronization with %s, spend %v", name, s.server.Name(), time.Since(start))
log.Info("requested server has completed full synchronization with server",
zap.String("requested-server", name), zap.String("server", s.server.Name()), zap.Duration("cost", time.Since(start)))
return nil
}
log.Warnf("no history regions from index %d, the leader maybe restarted", startIndex)
log.Warn("no history regions from index, the leader may be restarted", zap.Uint64("index", startIndex))
return nil
}
log.Infof("sync the history regions with %s from index: %d, own last index: %d, got records length: %d",
name, startIndex, s.history.GetNextIndex(), len(records))
log.Info("sync the history regions with server",
zap.String("server", name),
zap.Uint64("from-index", startIndex),
zap.Uint64("last-index", s.history.GetNextIndex()),
zap.Int("records-length", len(records)))
regions := make([]*metapb.Region, len(records))
for i, r := range records {
regions[i] = r.GetMeta()
Expand All @@ -215,7 +223,7 @@ func (s *RegionSyncer) broadcast(regions *pdpb.SyncRegionResponse) {
for name, sender := range s.streams {
err := sender.Send(regions)
if err != nil {
log.Error("region syncer send data meet error:", err)
log.Error("region syncer send data meet error", zap.Error(err))
failed = append(failed, name)
}
}
Expand All @@ -224,7 +232,7 @@ func (s *RegionSyncer) broadcast(regions *pdpb.SyncRegionResponse) {
s.Lock()
for _, name := range failed {
delete(s.streams, name)
log.Infof("region syncer delete the stream of %s", name)
log.Info("region syncer delete the stream", zap.String("stream", name))
}
s.Unlock()
}
Expand Down
3 changes: 0 additions & 3 deletions server/schedule/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/pingcap/pd/server/cache"
"github.com/pingcap/pd/server/core"
"github.com/pingcap/pd/server/namespace"
log "github.com/sirupsen/logrus"
)

//revive:disable:unused-parameter
Expand All @@ -38,7 +37,6 @@ func FilterSource(opt Options, store *core.StoreInfo, filters []Filter) bool {
storeID := fmt.Sprintf("store%d", store.GetID())
for _, filter := range filters {
if filter.FilterSource(opt, store) {
log.Debugf("[filter %T] filters store %v from source", filter, store)
filterCounter.WithLabelValues("filter-source", storeID, filter.Type()).Inc()
return true
}
Expand All @@ -51,7 +49,6 @@ func FilterTarget(opt Options, store *core.StoreInfo, filters []Filter) bool {
storeID := fmt.Sprintf("store%d", store.GetID())
for _, filter := range filters {
if filter.FilterTarget(opt, store) {
log.Debugf("[filter %T] filters store %v from target", filter, store)
filterCounter.WithLabelValues("filter-target", storeID, filter.Type()).Inc()
return true
}
Expand Down
5 changes: 3 additions & 2 deletions server/schedule/merge_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ package schedule
import (
"time"

log "github.com/pingcap/log"
"github.com/pingcap/pd/server/cache"
"github.com/pingcap/pd/server/core"
"github.com/pingcap/pd/server/namespace"
log "github.com/sirupsen/logrus"
"go.uber.org/zap"
)

// As region split history is not persisted. We put a special marker into
Expand Down Expand Up @@ -109,7 +110,7 @@ func (m *MergeChecker) Check(region *core.RegionInfo) []*Operator {
}

checkerCounter.WithLabelValues("merge_checker", "new_operator").Inc()
log.Debugf("try to merge region %v into region %v", core.HexRegionMeta(region.GetMeta()), core.HexRegionMeta(target.GetMeta()))
log.Debug("try to merge region", zap.Reflect("from", core.HexRegionMeta(region.GetMeta())), zap.Reflect("to", core.HexRegionMeta(target.GetMeta())))
ops, err := CreateMergeRegionOperator("merge-region", m.cluster, region, target, OpMerge)
if err != nil {
return nil
Expand Down
5 changes: 3 additions & 2 deletions server/schedule/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
log "github.com/pingcap/log"
"github.com/pingcap/pd/server/core"
"github.com/pingcap/pd/server/namespace"
log "github.com/sirupsen/logrus"
"go.uber.org/zap"
)

// MockCluster is used to mock clusterInfo for test use
Expand Down Expand Up @@ -77,7 +78,7 @@ func (mc *MockCluster) RandHotRegionFromStore(store uint64, kind FlowKind) *core
func (mc *MockCluster) AllocPeer(storeID uint64) (*metapb.Peer, error) {
peerID, err := mc.allocID()
if err != nil {
log.Errorf("failed to alloc peer: %v", err)
log.Error("failed to alloc peer", zap.Error(err))
return nil, err
}
peer := &metapb.Peer{
Expand Down
7 changes: 4 additions & 3 deletions server/schedule/namespace_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ package schedule

import (
"github.com/pingcap/kvproto/pkg/metapb"
log "github.com/pingcap/log"
"github.com/pingcap/pd/server/core"
"github.com/pingcap/pd/server/namespace"
log "github.com/sirupsen/logrus"
"go.uber.org/zap"
)

// NamespaceChecker ensures region to go to the right place.
Expand Down Expand Up @@ -63,7 +64,7 @@ func (n *NamespaceChecker) Check(region *core.RegionInfo) *Operator {
if n.isExists(targetStores, peer.StoreId) {
continue
}
log.Debugf("[region %d] peer %v is not located in namespace target stores", region.GetID(), peer)
log.Debug("peer is not located in namespace target stores", zap.Uint64("region-id", region.GetID()), zap.Reflect("peer", peer))
newPeer := n.SelectBestPeerToRelocate(region, targetStores)
if newPeer == nil {
checkerCounter.WithLabelValues("namespace_checker", "no_target_peer").Inc()
Expand All @@ -81,7 +82,7 @@ func (n *NamespaceChecker) Check(region *core.RegionInfo) *Operator {
func (n *NamespaceChecker) SelectBestPeerToRelocate(region *core.RegionInfo, targets []*core.StoreInfo) *metapb.Peer {
storeID := n.SelectBestStoreToRelocate(region, targets)
if storeID == 0 {
log.Debugf("[region %d] has no best store to relocate", region.GetID())
log.Debug("has no best store to relocate", zap.Uint64("region-id", region.GetID()))
return nil
}
newPeer, err := n.cluster.AllocPeer(storeID)
Expand Down
Loading

0 comments on commit 41f5261

Please sign in to comment.