Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: log format for schedule, scheduler, syncer, api #1423

Merged
merged 4 commits into from
Feb 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions server/api/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ import (

"github.com/gorilla/mux"
"github.com/pingcap/kvproto/pkg/pdpb"
log "github.com/pingcap/log"
"github.com/pingcap/pd/pkg/etcdutil"
"github.com/pingcap/pd/server"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/unrolled/render"
"go.uber.org/zap"
)

type memberHandler struct {
Expand Down Expand Up @@ -58,12 +59,12 @@ func (h *memberHandler) getMembers() (*pdpb.GetMembersResponse, error) {
// Fill leader priorities.
for _, m := range members.GetMembers() {
if h.svr.GetEtcdLeader() == 0 {
log.Warnf("no etcd leader, skip get leader priority, member: %v", m.GetMemberId())
log.Warn("no etcd leader, skip get leader priority", zap.Uint64("member", m.GetMemberId()))
continue
}
leaderPriority, e := h.svr.GetMemberLeaderPriority(m.GetMemberId())
if e != nil {
log.Errorf("failed to load leader priority, member: %v, err: %v", m.GetMemberId(), e)
log.Error("failed to load leader priority", zap.Uint64("member", m.GetMemberId()), zap.Error(err))
continue
}
m.LeaderPriority = int32(leaderPriority)
Expand Down
12 changes: 7 additions & 5 deletions server/api/redirector.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@
package api

import (
"fmt"
"io/ioutil"
"net/http"
"net/url"
"strings"

log "github.com/pingcap/log"
"github.com/pingcap/pd/server"
log "github.com/sirupsen/logrus"
"go.uber.org/zap"
)

const (
Expand Down Expand Up @@ -48,7 +50,7 @@ func (h *redirector) ServeHTTP(w http.ResponseWriter, r *http.Request, next http

// Prevent more than one redirection.
if name := r.Header.Get(redirectorHeader); len(name) != 0 {
log.Errorf("redirect from %v, but %v is not leader", name, h.s.Name())
log.Error("redirect but server is not leader", zap.String("from", name), zap.String("server", h.s.Name()))
http.Error(w, errRedirectToNotLeader, http.StatusInternalServerError)
return
}
Expand Down Expand Up @@ -93,21 +95,21 @@ func (p *customReverseProxies) ServeHTTP(w http.ResponseWriter, r *http.Request)

resp, err := p.client.Do(r)
if err != nil {
log.Error(err)
log.Error(fmt.Sprintf("%+v", err))
continue
}

b, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
log.Error(err)
log.Error(fmt.Sprintf("%+v", err))
continue
}

copyHeader(w.Header(), resp.Header)
w.WriteHeader(resp.StatusCode)
if _, err := w.Write(b); err != nil {
log.Error(err)
log.Error(fmt.Sprintf("%+v", err))
continue
}

Expand Down
4 changes: 2 additions & 2 deletions server/api/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import (
"net/http"

"github.com/pingcap/errcode"
log "github.com/pingcap/log"
"github.com/pingcap/pd/pkg/apiutil"
"github.com/pingcap/pd/server"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/unrolled/render"
)

Expand All @@ -36,7 +36,7 @@ import (
// If the error is nil, this also responds with a 500 and logs at the error level.
func errorResp(rd *render.Render, w http.ResponseWriter, err error) {
if err == nil {
log.Errorf("nil given to errorResp")
log.Error("nil is given to errorResp")
rd.JSON(w, http.StatusInternalServerError, "nil error")
return
}
Expand Down
18 changes: 11 additions & 7 deletions server/region_syncer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ import (
"time"

"github.com/pingcap/kvproto/pkg/pdpb"
log "github.com/pingcap/log"
"github.com/pingcap/pd/server/core"
log "github.com/sirupsen/logrus"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -103,24 +104,27 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
return
}
}
log.Errorf("%s failed to establish sync stream with leader %s: %s", s.server.Name(), s.server.GetLeader().GetName(), err)
log.Error("server failed to establish sync stream with leader", zap.String("server", s.server.Name()), zap.String("leader", s.server.GetLeader().GetName()), zap.Error(err))
time.Sleep(time.Second)
continue
}
log.Infof("%s start sync with leader %s, the request index is %d", s.server.Name(), s.server.GetLeader().GetName(), s.history.GetNextIndex())
log.Info("server starts to synchronize with leader", zap.String("server", s.server.Name()), zap.String("leader", s.server.GetLeader().GetName()), zap.Uint64("request-index", s.history.GetNextIndex()))
for {
resp, err := client.Recv()
if err != nil {
log.Error("region sync with leader meet error:", err)
log.Error("region sync with leader meet error", zap.Error(err))
if err = client.CloseSend(); err != nil {
log.Errorf("Failed to terminate client stream: %v", err)
log.Error("failed to terminate client stream", zap.Error(err))
}
time.Sleep(time.Second)
break
}
if s.history.GetNextIndex() != resp.GetStartIndex() {
log.Warnf("%s sync index not match the leader, own: %d, leader: %d, records length: %d",
s.server.Name(), s.history.GetNextIndex(), resp.GetStartIndex(), len(resp.GetRegions()))
log.Warn("server sync index not match the leader",
zap.String("server", s.server.Name()),
zap.Uint64("own", s.history.GetNextIndex()),
zap.Uint64("leader", resp.GetStartIndex()),
zap.Int("records-length", len(resp.GetRegions())))
// reset index
s.history.ResetWithIndex(resp.GetStartIndex())
}
Expand Down
11 changes: 6 additions & 5 deletions server/region_syncer/history_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ import (
"strconv"
"sync"

log "github.com/pingcap/log"
"github.com/pingcap/pd/server/core"
log "github.com/sirupsen/logrus"
"go.uber.org/zap"
)

const (
Expand Down Expand Up @@ -133,22 +134,22 @@ func (h *historyBuffer) get(index uint64) *core.RegionInfo {
func (h *historyBuffer) reload() {
v, err := h.kv.Load(historyKey)
if err != nil {
log.Warnf("load history index failed: %s", err)
log.Warn("load history index failed", zap.Error(err))
}
if v != "" {
h.index, err = strconv.ParseUint(v, 10, 64)
if err != nil {
log.Fatalf("load history index failed: %s", err)
log.Fatal("load history index failed", zap.Error(err))
}
}
log.Info("history index start at: ", h.firstIndex())
log.Info("start from history index", zap.Uint64("start-index", h.firstIndex()))
}

func (h *historyBuffer) persist() {
regionSyncerStatus.WithLabelValues("first_index").Set(float64(h.firstIndex()))
regionSyncerStatus.WithLabelValues("last_index").Set(float64(h.nextIndex()))
err := h.kv.Save(historyKey, strconv.FormatUint(h.nextIndex(), 10))
if err != nil {
log.Warnf("persist history index (%d) failed: %v", h.nextIndex(), err)
log.Warn("persist history index failed", zap.Uint64("persist-index", h.nextIndex()), zap.Error(err))
}
}
28 changes: 18 additions & 10 deletions server/region_syncer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ import (
"github.com/juju/ratelimit"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
log "github.com/pingcap/log"
"github.com/pingcap/pd/server/core"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
Expand Down Expand Up @@ -140,7 +141,9 @@ func (s *RegionSyncer) Sync(stream pdpb.PD_SyncRegionsServer) error {
if clusterID != s.server.ClusterID() {
return status.Errorf(codes.FailedPrecondition, "mismatch cluster id, need %d but got %d", s.server.ClusterID(), clusterID)
}
log.Infof("establish sync region stream with %s [%s]", request.GetMember().GetName(), request.GetMember().GetClientUrls()[0])
log.Info("establish sync region stream",
zap.String("requested-server", request.GetMember().GetName()),
zap.String("url", request.GetMember().GetClientUrls()[0]))

err = s.syncHistoryRegion(request, stream)
if err != nil {
Expand All @@ -156,7 +159,8 @@ func (s *RegionSyncer) syncHistoryRegion(request *pdpb.SyncRegionRequest, stream
records := s.history.RecordsFrom(startIndex)
if len(records) == 0 {
if s.history.GetNextIndex() == startIndex {
log.Infof("%s already in sync with %s, the last index is %d", name, s.server.Name(), startIndex)
log.Info("requested server has already in sync with server",
zap.String("requested-server", name), zap.String("server", s.server.Name()), zap.Uint64("last-index", startIndex))
return nil
}
// do full synchronization
Expand All @@ -178,18 +182,22 @@ func (s *RegionSyncer) syncHistoryRegion(request *pdpb.SyncRegionRequest, stream
s.limit.Wait(int64(resp.Size()))
lastIndex += len(res)
if err := stream.Send(resp); err != nil {
log.Errorf("failed to send sync region response, error: %v", err)
log.Error("failed to send sync region response", zap.Error(err))
}
res = res[:0]
}
log.Infof("%s has completed full synchronization with %s, spend %v", name, s.server.Name(), time.Since(start))
log.Info("requested server has completed full synchronization with server",
zap.String("requested-server", name), zap.String("server", s.server.Name()), zap.Duration("cost", time.Since(start)))
return nil
}
log.Warnf("no history regions from index %d, the leader maybe restarted", startIndex)
log.Warn("no history regions from index, the leader may be restarted", zap.Uint64("index", startIndex))
return nil
}
log.Infof("sync the history regions with %s from index: %d, own last index: %d, got records length: %d",
name, startIndex, s.history.GetNextIndex(), len(records))
log.Info("sync the history regions with server",
zap.String("server", name),
zap.Uint64("from-index", startIndex),
zap.Uint64("last-index", s.history.GetNextIndex()),
zap.Int("records-length", len(records)))
regions := make([]*metapb.Region, len(records))
for i, r := range records {
regions[i] = r.GetMeta()
Expand All @@ -215,7 +223,7 @@ func (s *RegionSyncer) broadcast(regions *pdpb.SyncRegionResponse) {
for name, sender := range s.streams {
err := sender.Send(regions)
if err != nil {
log.Error("region syncer send data meet error:", err)
log.Error("region syncer send data meet error", zap.Error(err))
failed = append(failed, name)
}
}
Expand All @@ -224,7 +232,7 @@ func (s *RegionSyncer) broadcast(regions *pdpb.SyncRegionResponse) {
s.Lock()
for _, name := range failed {
delete(s.streams, name)
log.Infof("region syncer delete the stream of %s", name)
log.Info("region syncer delete the stream", zap.String("stream", name))
}
s.Unlock()
}
Expand Down
3 changes: 0 additions & 3 deletions server/schedule/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/pingcap/pd/server/cache"
"github.com/pingcap/pd/server/core"
"github.com/pingcap/pd/server/namespace"
log "github.com/sirupsen/logrus"
)

//revive:disable:unused-parameter
Expand All @@ -38,7 +37,6 @@ func FilterSource(opt Options, store *core.StoreInfo, filters []Filter) bool {
storeID := fmt.Sprintf("store%d", store.GetID())
for _, filter := range filters {
if filter.FilterSource(opt, store) {
log.Debugf("[filter %T] filters store %v from source", filter, store)
filterCounter.WithLabelValues("filter-source", storeID, filter.Type()).Inc()
return true
}
Expand All @@ -51,7 +49,6 @@ func FilterTarget(opt Options, store *core.StoreInfo, filters []Filter) bool {
storeID := fmt.Sprintf("store%d", store.GetID())
for _, filter := range filters {
if filter.FilterTarget(opt, store) {
log.Debugf("[filter %T] filters store %v from target", filter, store)
filterCounter.WithLabelValues("filter-target", storeID, filter.Type()).Inc()
return true
}
Expand Down
5 changes: 3 additions & 2 deletions server/schedule/merge_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ package schedule
import (
"time"

log "github.com/pingcap/log"
"github.com/pingcap/pd/server/cache"
"github.com/pingcap/pd/server/core"
"github.com/pingcap/pd/server/namespace"
log "github.com/sirupsen/logrus"
"go.uber.org/zap"
)

// As region split history is not persisted. We put a special marker into
Expand Down Expand Up @@ -109,7 +110,7 @@ func (m *MergeChecker) Check(region *core.RegionInfo) []*Operator {
}

checkerCounter.WithLabelValues("merge_checker", "new_operator").Inc()
log.Debugf("try to merge region %v into region %v", core.HexRegionMeta(region.GetMeta()), core.HexRegionMeta(target.GetMeta()))
log.Debug("try to merge region", zap.Reflect("from", core.HexRegionMeta(region.GetMeta())), zap.Reflect("to", core.HexRegionMeta(target.GetMeta())))
ops, err := CreateMergeRegionOperator("merge-region", m.cluster, region, target, OpMerge)
if err != nil {
return nil
Expand Down
5 changes: 3 additions & 2 deletions server/schedule/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
log "github.com/pingcap/log"
"github.com/pingcap/pd/server/core"
"github.com/pingcap/pd/server/namespace"
log "github.com/sirupsen/logrus"
"go.uber.org/zap"
)

// MockCluster is used to mock clusterInfo for test use
Expand Down Expand Up @@ -77,7 +78,7 @@ func (mc *MockCluster) RandHotRegionFromStore(store uint64, kind FlowKind) *core
func (mc *MockCluster) AllocPeer(storeID uint64) (*metapb.Peer, error) {
peerID, err := mc.allocID()
if err != nil {
log.Errorf("failed to alloc peer: %v", err)
log.Error("failed to alloc peer", zap.Error(err))
return nil, err
}
peer := &metapb.Peer{
Expand Down
7 changes: 4 additions & 3 deletions server/schedule/namespace_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ package schedule

import (
"github.com/pingcap/kvproto/pkg/metapb"
log "github.com/pingcap/log"
"github.com/pingcap/pd/server/core"
"github.com/pingcap/pd/server/namespace"
log "github.com/sirupsen/logrus"
"go.uber.org/zap"
)

// NamespaceChecker ensures region to go to the right place.
Expand Down Expand Up @@ -63,7 +64,7 @@ func (n *NamespaceChecker) Check(region *core.RegionInfo) *Operator {
if n.isExists(targetStores, peer.StoreId) {
continue
}
log.Debugf("[region %d] peer %v is not located in namespace target stores", region.GetID(), peer)
log.Debug("peer is not located in namespace target stores", zap.Uint64("region-id", region.GetID()), zap.Reflect("peer", peer))
newPeer := n.SelectBestPeerToRelocate(region, targetStores)
if newPeer == nil {
checkerCounter.WithLabelValues("namespace_checker", "no_target_peer").Inc()
Expand All @@ -81,7 +82,7 @@ func (n *NamespaceChecker) Check(region *core.RegionInfo) *Operator {
func (n *NamespaceChecker) SelectBestPeerToRelocate(region *core.RegionInfo, targets []*core.StoreInfo) *metapb.Peer {
storeID := n.SelectBestStoreToRelocate(region, targets)
if storeID == 0 {
log.Debugf("[region %d] has no best store to relocate", region.GetID())
log.Debug("has no best store to relocate", zap.Uint64("region-id", region.GetID()))
return nil
}
newPeer, err := n.cluster.AllocPeer(storeID)
Expand Down
Loading