From c031b2749151f33063dbaafe21e5bb393e10bd21 Mon Sep 17 00:00:00 2001 From: jingyih Date: Thu, 13 Feb 2020 07:57:14 -0800 Subject: [PATCH] etcdserver: corruption check via http During corruption check, get peer's hashKV via http call. --- etcdserver/api/etcdhttp/peer.go | 15 ++- etcdserver/api/etcdhttp/peer_test.go | 4 +- etcdserver/corrupt.go | 170 ++++++++++++++++++++++----- 3 files changed, 152 insertions(+), 37 deletions(-) diff --git a/etcdserver/api/etcdhttp/peer.go b/etcdserver/api/etcdhttp/peer.go index 1fb7ccb7b96..e9dae3aaea9 100644 --- a/etcdserver/api/etcdhttp/peer.go +++ b/etcdserver/api/etcdhttp/peer.go @@ -37,11 +37,17 @@ const ( ) // NewPeerHandler generates an http.Handler to handle etcd peer requests. -func NewPeerHandler(lg *zap.Logger, s etcdserver.ServerPeer) http.Handler { - return newPeerHandler(lg, s, s.RaftHandler(), s.LeaseHandler()) +func NewPeerHandler(lg *zap.Logger, s etcdserver.ServerPeerV2) http.Handler { + return newPeerHandler(lg, s, s.RaftHandler(), s.LeaseHandler(), s.HashKVHandler()) } -func newPeerHandler(lg *zap.Logger, s etcdserver.Server, raftHandler http.Handler, leaseHandler http.Handler) http.Handler { +func newPeerHandler( + lg *zap.Logger, + s etcdserver.Server, + raftHandler http.Handler, + leaseHandler http.Handler, + hashKVHandler http.Handler, +) http.Handler { if lg == nil { lg = zap.NewNop() } @@ -58,6 +64,9 @@ func newPeerHandler(lg *zap.Logger, s etcdserver.Server, raftHandler http.Handle mux.Handle(leasehttp.LeasePrefix, leaseHandler) mux.Handle(leasehttp.LeaseInternalPrefix, leaseHandler) } + if hashKVHandler != nil { + mux.Handle(etcdserver.PeerHashKVPath, hashKVHandler) + } mux.HandleFunc(versionPath, versionHandler(s.Cluster(), serveVersion)) return mux } diff --git a/etcdserver/api/etcdhttp/peer_test.go b/etcdserver/api/etcdhttp/peer_test.go index 8d890c0b585..8a5a8c8c855 100644 --- a/etcdserver/api/etcdhttp/peer_test.go +++ b/etcdserver/api/etcdhttp/peer_test.go @@ -83,7 +83,7 @@ var fakeRaftHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Reque // TestNewPeerHandlerOnRaftPrefix tests that NewPeerHandler returns a handler that // handles raft-prefix requests well. func TestNewPeerHandlerOnRaftPrefix(t *testing.T) { - ph := newPeerHandler(zap.NewExample(), &fakeServer{cluster: &fakeCluster{}}, fakeRaftHandler, nil) + ph := newPeerHandler(zap.NewExample(), &fakeServer{cluster: &fakeCluster{}}, fakeRaftHandler, nil, nil) srv := httptest.NewServer(ph) defer srv.Close() @@ -231,7 +231,7 @@ func TestServeMemberPromoteFails(t *testing.T) { // TestNewPeerHandlerOnMembersPromotePrefix verifies the request with members promote prefix is routed correctly func TestNewPeerHandlerOnMembersPromotePrefix(t *testing.T) { - ph := newPeerHandler(zap.NewExample(), &fakeServer{cluster: &fakeCluster{}}, fakeRaftHandler, nil) + ph := newPeerHandler(zap.NewExample(), &fakeServer{cluster: &fakeCluster{}}, fakeRaftHandler, nil, nil) srv := httptest.NewServer(ph) defer srv.Close() diff --git a/etcdserver/corrupt.go b/etcdserver/corrupt.go index 7950529f6a9..70d6d21996c 100644 --- a/etcdserver/corrupt.go +++ b/etcdserver/corrupt.go @@ -15,11 +15,15 @@ package etcdserver import ( + "bytes" "context" + "encoding/json" "fmt" + "io/ioutil" + "net/http" + "strings" "time" - "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" pb "go.etcd.io/etcd/etcdserver/etcdserverpb" "go.etcd.io/etcd/mvcc" @@ -194,10 +198,12 @@ func (s *EtcdServer) checkHashKV() error { mismatch(uint64(s.ID())) } + checkedCount := 0 for _, p := range peers { if p.resp == nil { continue } + checkedCount++ id := p.resp.Header.MemberId // leader expects follower's latest revision less than or equal to leader's @@ -235,72 +241,63 @@ func (s *EtcdServer) checkHashKV() error { mismatch(id) } } + lg.Info("finished peer corruption check", zap.Int("number-of-peers-checked", checkedCount)) return nil } -type peerHashKVResp struct { +type peerInfo struct { id types.ID eps []string +} - resp *clientv3.HashKVResponse +type peerHashKVResp struct { + peerInfo + resp *pb.HashKVResponse err error } -func (s *EtcdServer) getPeerHashKVs(rev int64) (resps []*peerHashKVResp) { +func (s *EtcdServer) getPeerHashKVs(rev int64) []*peerHashKVResp { // TODO: handle the case when "s.cluster.Members" have not // been populated (e.g. no snapshot to load from disk) - mbs := s.cluster.Members() - pss := make([]peerHashKVResp, 0, len(mbs)) - for _, m := range mbs { + members := s.cluster.Members() + peers := make([]peerInfo, 0, len(members)) + for _, m := range members { if m.ID == s.ID() { continue } - pss = append(pss, peerHashKVResp{id: m.ID, eps: m.PeerURLs}) + peers = append(peers, peerInfo{id: m.ID, eps: m.PeerURLs}) } lg := s.getLogger() - for _, p := range pss { + var resps []*peerHashKVResp + for _, p := range peers { if len(p.eps) == 0 { continue } - cli, cerr := clientv3.New(clientv3.Config{ - DialTimeout: s.Cfg.ReqTimeout(), - Endpoints: p.eps, - }) - if cerr != nil { - lg.Warn( - "failed to create client to peer URL", - zap.String("local-member-id", s.ID().String()), - zap.String("remote-peer-id", p.id.String()), - zap.Strings("remote-peer-endpoints", p.eps), - zap.Error(cerr), - ) - continue - } respsLen := len(resps) - for _, c := range cli.Endpoints() { + var lastErr error + for _, ep := range p.eps { ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout()) - var resp *clientv3.HashKVResponse - resp, cerr = cli.HashKV(ctx, c, rev) + resp, lastErr := s.getPeerHashKVHTTP(ctx, ep, rev) cancel() - if cerr == nil { - resps = append(resps, &peerHashKVResp{id: p.id, eps: p.eps, resp: resp, err: nil}) + if lastErr == nil { + resps = append(resps, &peerHashKVResp{peerInfo: p, resp: resp, err: nil}) break } lg.Warn( "failed hash kv request", zap.String("local-member-id", s.ID().String()), zap.Int64("requested-revision", rev), - zap.String("remote-peer-endpoint", c), - zap.Error(cerr), + zap.String("remote-peer-endpoint", ep), + zap.Error(lastErr), ) } - cli.Close() + // failed to get hashKV from all endpoints of this peer if respsLen == len(resps) { - resps = append(resps, &peerHashKVResp{id: p.id, eps: p.eps, resp: nil, err: cerr}) + resps = append(resps, &peerHashKVResp{peerInfo: p, resp: nil, err: lastErr}) } } return resps @@ -339,3 +336,112 @@ func (a *applierV3Corrupt) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantR func (a *applierV3Corrupt) LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) { return nil, ErrCorrupt } + +type ServerPeerV2 interface { + ServerPeer + HashKVHandler() http.Handler +} + +const PeerHashKVPath = "/members/hashkv" + +type hashKVHandler struct { + lg *zap.Logger + server *EtcdServer +} + +func (s *EtcdServer) HashKVHandler() http.Handler { + return &hashKVHandler{lg: s.getLogger(), server: s} +} + +func (h *hashKVHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + w.Header().Set("Allow", http.MethodGet) + http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) + return + } + if r.URL.Path != PeerHashKVPath { + http.Error(w, "bad path", http.StatusBadRequest) + return + } + + defer r.Body.Close() + b, err := ioutil.ReadAll(r.Body) + if err != nil { + http.Error(w, "error reading body", http.StatusBadRequest) + return + } + + req := &pb.HashKVRequest{} + if err := json.Unmarshal(b, req); err != nil { + h.lg.Warn("failed to unmarshal request", zap.Error(err)) + http.Error(w, "error unmarshalling request", http.StatusBadRequest) + return + } + hash, rev, compactRev, err := h.server.KV().HashByRev(req.Revision) + if err != nil { + h.lg.Warn( + "failed to get hashKV", + zap.Int64("requested-revision", req.Revision), + zap.Error(err), + ) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + resp := &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: rev}, Hash: hash, CompactRevision: compactRev} + respBytes, err := json.Marshal(resp) + if err != nil { + h.lg.Warn("failed to marshal hashKV response", zap.Error(err)) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.Header().Set("X-Etcd-Cluster-ID", h.server.Cluster().ID().String()) + w.Header().Set("Content-Type", "application/json") + w.Write(respBytes) +} + +// getPeerHashKVHTTP fetch hash of kv store at the given rev via http call to the given url +func (s *EtcdServer) getPeerHashKVHTTP(ctx context.Context, url string, rev int64) (*pb.HashKVResponse, error) { + cc := &http.Client{Transport: s.peerRt} + hashReq := &pb.HashKVRequest{Revision: rev} + hashReqBytes, err := json.Marshal(hashReq) + if err != nil { + return nil, err + } + requestUrl := url + PeerHashKVPath + req, err := http.NewRequest(http.MethodGet, requestUrl, bytes.NewReader(hashReqBytes)) + if err != nil { + return nil, err + } + req = req.WithContext(ctx) + req.Header.Set("Content-Type", "application/json") + req.Cancel = ctx.Done() + + resp, err := cc.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + b, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + if resp.StatusCode == http.StatusBadRequest { + if strings.Contains(string(b), mvcc.ErrCompacted.Error()) { + return nil, rpctypes.ErrCompacted + } + if strings.Contains(string(b), mvcc.ErrFutureRev.Error()) { + return nil, rpctypes.ErrFutureRev + } + } + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("unknown error: %s", string(b)) + } + + hashResp := &pb.HashKVResponse{} + if err := json.Unmarshal(b, hashResp); err != nil { + return nil, err + } + return hashResp, nil +}