From 6e82d5798ad1615b2b43bf6e7ec5379f081cddb8 Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Mon, 8 Jan 2018 02:59:44 -0800 Subject: [PATCH] etcdctl/ctlv3: use "snapshot" package Signed-off-by: Gyuho Lee --- etcdctl/ctlv3/command/printer.go | 11 +- etcdctl/ctlv3/command/printer_fields.go | 3 +- etcdctl/ctlv3/command/printer_json.go | 8 +- etcdctl/ctlv3/command/printer_simple.go | 3 +- etcdctl/ctlv3/command/printer_table.go | 3 +- etcdctl/ctlv3/command/snapshot_command.go | 366 +++------------------- 6 files changed, 57 insertions(+), 337 deletions(-) diff --git a/etcdctl/ctlv3/command/printer.go b/etcdctl/ctlv3/command/printer.go index b0c2d6cad652..58514a8c1244 100644 --- a/etcdctl/ctlv3/command/printer.go +++ b/etcdctl/ctlv3/command/printer.go @@ -20,6 +20,7 @@ import ( "strings" v3 "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/snapshot" "github.com/dustin/go-humanize" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" @@ -48,7 +49,7 @@ type printer interface { MoveLeader(leader, target uint64, r v3.MoveLeaderResponse) Alarm(v3.AlarmResponse) - DBStatus(dbstatus) + DBStatus(snapshot.DBStatus) RoleAdd(role string, r v3.AuthRoleAddResponse) RoleGet(role string, r v3.AuthRoleGetResponse) @@ -148,9 +149,9 @@ func newPrinterUnsupported(n string) printer { return &printerUnsupported{printerRPC{nil, f}} } -func (p *printerUnsupported) EndpointStatus([]epStatus) { p.p(nil) } -func (p *printerUnsupported) EndpointHashKV([]epHashKV) { p.p(nil) } -func (p *printerUnsupported) DBStatus(dbstatus) { p.p(nil) } +func (p *printerUnsupported) EndpointStatus([]epStatus) { p.p(nil) } +func (p *printerUnsupported) EndpointHashKV([]epHashKV) { p.p(nil) } +func (p *printerUnsupported) DBStatus(snapshot.DBStatus) { p.p(nil) } func (p *printerUnsupported) MoveLeader(leader, target uint64, r v3.MoveLeaderResponse) { p.p(nil) } @@ -199,7 +200,7 @@ func makeEndpointHashKVTable(hashList []epHashKV) (hdr []string, rows [][]string return hdr, rows } -func makeDBStatusTable(ds dbstatus) (hdr []string, rows [][]string) { +func makeDBStatusTable(ds snapshot.DBStatus) (hdr []string, rows [][]string) { hdr = []string{"hash", "revision", "total keys", "total size"} rows = append(rows, []string{ fmt.Sprintf("%x", ds.Hash), diff --git a/etcdctl/ctlv3/command/printer_fields.go b/etcdctl/ctlv3/command/printer_fields.go index 24ff283a8cac..44407590b63a 100644 --- a/etcdctl/ctlv3/command/printer_fields.go +++ b/etcdctl/ctlv3/command/printer_fields.go @@ -20,6 +20,7 @@ import ( v3 "github.com/coreos/etcd/clientv3" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" spb "github.com/coreos/etcd/mvcc/mvccpb" + "github.com/coreos/etcd/snapshot" ) type fieldsPrinter struct{ printer } @@ -171,7 +172,7 @@ func (p *fieldsPrinter) Alarm(r v3.AlarmResponse) { } } -func (p *fieldsPrinter) DBStatus(r dbstatus) { +func (p *fieldsPrinter) DBStatus(r snapshot.DBStatus) { fmt.Println(`"Hash" :`, r.Hash) fmt.Println(`"Revision" :`, r.Revision) fmt.Println(`"Keys" :`, r.TotalKey) diff --git a/etcdctl/ctlv3/command/printer_json.go b/etcdctl/ctlv3/command/printer_json.go index 19b3a5e688bc..722602eab99e 100644 --- a/etcdctl/ctlv3/command/printer_json.go +++ b/etcdctl/ctlv3/command/printer_json.go @@ -18,6 +18,8 @@ import ( "encoding/json" "fmt" "os" + + "github.com/coreos/etcd/snapshot" ) type jsonPrinter struct{ printer } @@ -28,9 +30,9 @@ func newJSONPrinter() printer { } } -func (p *jsonPrinter) EndpointStatus(r []epStatus) { printJSON(r) } -func (p *jsonPrinter) EndpointHashKV(r []epHashKV) { printJSON(r) } -func (p *jsonPrinter) DBStatus(r dbstatus) { printJSON(r) } +func (p *jsonPrinter) EndpointStatus(r []epStatus) { printJSON(r) } +func (p *jsonPrinter) EndpointHashKV(r []epHashKV) { printJSON(r) } +func (p *jsonPrinter) DBStatus(r snapshot.DBStatus) { printJSON(r) } func printJSON(v interface{}) { b, err := json.Marshal(v) diff --git a/etcdctl/ctlv3/command/printer_simple.go b/etcdctl/ctlv3/command/printer_simple.go index 2f4f53201c80..932527681c94 100644 --- a/etcdctl/ctlv3/command/printer_simple.go +++ b/etcdctl/ctlv3/command/printer_simple.go @@ -21,6 +21,7 @@ import ( v3 "github.com/coreos/etcd/clientv3" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/pkg/types" + "github.com/coreos/etcd/snapshot" ) type simplePrinter struct { @@ -155,7 +156,7 @@ func (s *simplePrinter) EndpointHashKV(hashList []epHashKV) { } } -func (s *simplePrinter) DBStatus(ds dbstatus) { +func (s *simplePrinter) DBStatus(ds snapshot.DBStatus) { _, rows := makeDBStatusTable(ds) for _, row := range rows { fmt.Println(strings.Join(row, ", ")) diff --git a/etcdctl/ctlv3/command/printer_table.go b/etcdctl/ctlv3/command/printer_table.go index 1aea61a8456f..294a2a982852 100644 --- a/etcdctl/ctlv3/command/printer_table.go +++ b/etcdctl/ctlv3/command/printer_table.go @@ -20,6 +20,7 @@ import ( "github.com/olekukonko/tablewriter" v3 "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/snapshot" ) type tablePrinter struct{ printer } @@ -54,7 +55,7 @@ func (tp *tablePrinter) EndpointHashKV(r []epHashKV) { table.SetAlignment(tablewriter.ALIGN_RIGHT) table.Render() } -func (tp *tablePrinter) DBStatus(r dbstatus) { +func (tp *tablePrinter) DBStatus(r snapshot.DBStatus) { hdr, rows := makeDBStatusTable(r) table := tablewriter.NewWriter(os.Stdout) table.SetHeader(hdr) diff --git a/etcdctl/ctlv3/command/snapshot_command.go b/etcdctl/ctlv3/command/snapshot_command.go index 026339bc1f2d..3a72bfc9a089 100644 --- a/etcdctl/ctlv3/command/snapshot_command.go +++ b/etcdctl/ctlv3/command/snapshot_command.go @@ -16,34 +16,12 @@ package command import ( "context" - "crypto/sha256" - "encoding/binary" - "encoding/json" "fmt" - "hash/crc32" - "io" - "math" - "os" - "path/filepath" - "reflect" "strings" - "github.com/coreos/etcd/etcdserver" - "github.com/coreos/etcd/etcdserver/etcdserverpb" - "github.com/coreos/etcd/etcdserver/membership" - "github.com/coreos/etcd/lease" - "github.com/coreos/etcd/mvcc" - "github.com/coreos/etcd/mvcc/backend" - "github.com/coreos/etcd/pkg/fileutil" "github.com/coreos/etcd/pkg/types" - "github.com/coreos/etcd/raft" - "github.com/coreos/etcd/raft/raftpb" - "github.com/coreos/etcd/snap" - "github.com/coreos/etcd/store" - "github.com/coreos/etcd/wal" - "github.com/coreos/etcd/wal/walpb" + "github.com/coreos/etcd/snapshot" - bolt "github.com/coreos/bbolt" "github.com/spf13/cobra" ) @@ -116,32 +94,17 @@ func snapshotSaveCommandFunc(cmd *cobra.Command, args []string) { path := args[0] - partpath := path + ".part" - f, err := os.Create(partpath) - + scfg := snapshot.Config{} + debug, err := cmd.Flags().GetBool("debug") if err != nil { - exiterr := fmt.Errorf("could not open %s (%v)", partpath, err) - ExitWithError(ExitBadArgs, exiterr) - } - - c := mustClientFromCmd(cmd) - r, serr := c.Snapshot(context.TODO()) - if serr != nil { - os.RemoveAll(partpath) - ExitWithError(ExitInterrupted, serr) + ExitWithError(ExitError, err) } - if _, rerr := io.Copy(f, r); rerr != nil { - os.RemoveAll(partpath) - ExitWithError(ExitInterrupted, rerr) + if debug { + scfg.Logger = snapshot.DefaultLogger } - - fileutil.Fsync(f) - - f.Close() - - if rerr := os.Rename(partpath, path); rerr != nil { - exiterr := fmt.Errorf("could not rename %s to %s (%v)", partpath, path, rerr) - ExitWithError(ExitIO, exiterr) + sp := snapshot.NewV3(scfg) + if err := sp.Save(context.TODO(), mustClientFromCmd(cmd), path); err != nil { + ExitWithError(ExitInterrupted, err) } fmt.Printf("Snapshot saved at %s\n", path) } @@ -152,7 +115,19 @@ func snapshotStatusCommandFunc(cmd *cobra.Command, args []string) { ExitWithError(ExitBadArgs, err) } initDisplayFromCmd(cmd) - ds := dbStatus(args[0]) + scfg := snapshot.Config{} + debug, err := cmd.Flags().GetBool("debug") + if err != nil { + ExitWithError(ExitError, err) + } + if debug { + scfg.Logger = snapshot.DefaultLogger + } + sp := snapshot.NewV3(scfg) + ds, err := sp.Status(args[0]) + if err != nil { + ExitWithError(ExitError, err) + } display.DBStatus(ds) } @@ -166,36 +141,30 @@ func snapshotRestoreCommandFunc(cmd *cobra.Command, args []string) { if uerr != nil { ExitWithError(ExitBadArgs, uerr) } - - cfg := etcdserver.ServerConfig{ - InitialClusterToken: restoreClusterToken, - InitialPeerURLsMap: urlmap, - PeerURLs: types.MustNewURLs(strings.Split(restorePeerURLs, ",")), - Name: restoreName, - } - if err := cfg.VerifyBootstrap(); err != nil { - ExitWithError(ExitBadArgs, err) - } - - cl, cerr := membership.NewClusterFromURLsMap(restoreClusterToken, urlmap) - if cerr != nil { - ExitWithError(ExitBadArgs, cerr) - } - basedir := restoreDataDir if basedir == "" { basedir = restoreName + ".etcd" } - waldir := filepath.Join(basedir, "member", "wal") - snapdir := filepath.Join(basedir, "member", "snap") - - if _, err := os.Stat(basedir); err == nil { - ExitWithError(ExitInvalidInput, fmt.Errorf("data-dir %q exists", basedir)) + scfg := snapshot.Config{} + debug, err := cmd.Flags().GetBool("debug") + if err != nil { + ExitWithError(ExitError, err) + } + if debug { + scfg.Logger = snapshot.DefaultLogger + } + sp := snapshot.NewV3(scfg) + if err := sp.Restore(args[0], snapshot.RestoreConfig{ + Name: restoreName, + OutputDataDir: basedir, + InitialCluster: urlmap, + InitialClusterToken: restoreClusterToken, + PeerURLs: types.MustNewURLs(strings.Split(restorePeerURLs, ",")), + SkipHashCheck: skipHashCheck, + }); err != nil { + ExitWithError(ExitError, err) } - - makeDB(snapdir, args[0], len(cl.Members())) - makeWALAndSnap(waldir, snapdir, cl) } func initialClusterFromName(name string) string { @@ -205,258 +174,3 @@ func initialClusterFromName(name string) string { } return fmt.Sprintf("%s=http://localhost:2380", n) } - -// makeWAL creates a WAL for the initial cluster -func makeWALAndSnap(waldir, snapdir string, cl *membership.RaftCluster) { - if err := fileutil.CreateDirAll(waldir); err != nil { - ExitWithError(ExitIO, err) - } - - // add members again to persist them to the store we create. - st := store.New(etcdserver.StoreClusterPrefix, etcdserver.StoreKeysPrefix) - cl.SetStore(st) - for _, m := range cl.Members() { - cl.AddMember(m) - } - - m := cl.MemberByName(restoreName) - md := &etcdserverpb.Metadata{NodeID: uint64(m.ID), ClusterID: uint64(cl.ID())} - metadata, merr := md.Marshal() - if merr != nil { - ExitWithError(ExitInvalidInput, merr) - } - - w, walerr := wal.Create(waldir, metadata) - if walerr != nil { - ExitWithError(ExitIO, walerr) - } - defer w.Close() - - peers := make([]raft.Peer, len(cl.MemberIDs())) - for i, id := range cl.MemberIDs() { - ctx, err := json.Marshal((*cl).Member(id)) - if err != nil { - ExitWithError(ExitInvalidInput, err) - } - peers[i] = raft.Peer{ID: uint64(id), Context: ctx} - } - - ents := make([]raftpb.Entry, len(peers)) - nodeIDs := make([]uint64, len(peers)) - for i, p := range peers { - nodeIDs[i] = p.ID - cc := raftpb.ConfChange{ - Type: raftpb.ConfChangeAddNode, - NodeID: p.ID, - Context: p.Context} - d, err := cc.Marshal() - if err != nil { - ExitWithError(ExitInvalidInput, err) - } - e := raftpb.Entry{ - Type: raftpb.EntryConfChange, - Term: 1, - Index: uint64(i + 1), - Data: d, - } - ents[i] = e - } - - commit, term := uint64(len(ents)), uint64(1) - - if err := w.Save(raftpb.HardState{ - Term: term, - Vote: peers[0].ID, - Commit: commit}, ents); err != nil { - ExitWithError(ExitIO, err) - } - - b, berr := st.Save() - if berr != nil { - ExitWithError(ExitError, berr) - } - - raftSnap := raftpb.Snapshot{ - Data: b, - Metadata: raftpb.SnapshotMetadata{ - Index: commit, - Term: term, - ConfState: raftpb.ConfState{ - Nodes: nodeIDs, - }, - }, - } - snapshotter := snap.New(snapdir) - if err := snapshotter.SaveSnap(raftSnap); err != nil { - panic(err) - } - - if err := w.SaveSnapshot(walpb.Snapshot{Index: commit, Term: term}); err != nil { - ExitWithError(ExitIO, err) - } -} - -// initIndex implements ConsistentIndexGetter so the snapshot won't block -// the new raft instance by waiting for a future raft index. -type initIndex int - -func (i *initIndex) ConsistentIndex() uint64 { return uint64(*i) } - -// makeDB copies the database snapshot to the snapshot directory -func makeDB(snapdir, dbfile string, commit int) { - f, ferr := os.OpenFile(dbfile, os.O_RDONLY, 0600) - if ferr != nil { - ExitWithError(ExitInvalidInput, ferr) - } - defer f.Close() - - // get snapshot integrity hash - if _, err := f.Seek(-sha256.Size, io.SeekEnd); err != nil { - ExitWithError(ExitIO, err) - } - sha := make([]byte, sha256.Size) - if _, err := f.Read(sha); err != nil { - ExitWithError(ExitIO, err) - } - if _, err := f.Seek(0, io.SeekStart); err != nil { - ExitWithError(ExitIO, err) - } - - if err := fileutil.CreateDirAll(snapdir); err != nil { - ExitWithError(ExitIO, err) - } - - dbpath := filepath.Join(snapdir, "db") - db, dberr := os.OpenFile(dbpath, os.O_RDWR|os.O_CREATE, 0600) - if dberr != nil { - ExitWithError(ExitIO, dberr) - } - if _, err := io.Copy(db, f); err != nil { - ExitWithError(ExitIO, err) - } - - // truncate away integrity hash, if any. - off, serr := db.Seek(0, io.SeekEnd) - if serr != nil { - ExitWithError(ExitIO, serr) - } - hasHash := (off % 512) == sha256.Size - if hasHash { - if err := db.Truncate(off - sha256.Size); err != nil { - ExitWithError(ExitIO, err) - } - } - - if !hasHash && !skipHashCheck { - err := fmt.Errorf("snapshot missing hash but --skip-hash-check=false") - ExitWithError(ExitBadArgs, err) - } - - if hasHash && !skipHashCheck { - // check for match - if _, err := db.Seek(0, io.SeekStart); err != nil { - ExitWithError(ExitIO, err) - } - h := sha256.New() - if _, err := io.Copy(h, db); err != nil { - ExitWithError(ExitIO, err) - } - dbsha := h.Sum(nil) - if !reflect.DeepEqual(sha, dbsha) { - err := fmt.Errorf("expected sha256 %v, got %v", sha, dbsha) - ExitWithError(ExitInvalidInput, err) - } - } - - // db hash is OK, can now modify DB so it can be part of a new cluster - db.Close() - - // update consistentIndex so applies go through on etcdserver despite - // having a new raft instance - be := backend.NewDefaultBackend(dbpath) - // a lessor never timeouts leases - lessor := lease.NewLessor(be, math.MaxInt64) - s := mvcc.NewStore(be, lessor, (*initIndex)(&commit)) - txn := s.Write() - btx := be.BatchTx() - del := func(k, v []byte) error { - txn.DeleteRange(k, nil) - return nil - } - - // delete stored members from old cluster since using new members - btx.UnsafeForEach([]byte("members"), del) - // todo: add back new members when we start to deprecate old snap file. - btx.UnsafeForEach([]byte("members_removed"), del) - // trigger write-out of new consistent index - txn.End() - s.Commit() - s.Close() - be.Close() -} - -type dbstatus struct { - Hash uint32 `json:"hash"` - Revision int64 `json:"revision"` - TotalKey int `json:"totalKey"` - TotalSize int64 `json:"totalSize"` -} - -func dbStatus(p string) dbstatus { - if _, err := os.Stat(p); err != nil { - ExitWithError(ExitError, err) - } - - ds := dbstatus{} - - db, err := bolt.Open(p, 0400, &bolt.Options{ReadOnly: true}) - if err != nil { - ExitWithError(ExitError, err) - } - defer db.Close() - - h := crc32.New(crc32.MakeTable(crc32.Castagnoli)) - - err = db.View(func(tx *bolt.Tx) error { - ds.TotalSize = tx.Size() - c := tx.Cursor() - for next, _ := c.First(); next != nil; next, _ = c.Next() { - b := tx.Bucket(next) - if b == nil { - return fmt.Errorf("cannot get hash of bucket %s", string(next)) - } - h.Write(next) - iskeyb := (string(next) == "key") - b.ForEach(func(k, v []byte) error { - h.Write(k) - h.Write(v) - if iskeyb { - rev := bytesToRev(k) - ds.Revision = rev.main - } - ds.TotalKey++ - return nil - }) - } - return nil - }) - - if err != nil { - ExitWithError(ExitError, err) - } - - ds.Hash = h.Sum32() - return ds -} - -type revision struct { - main int64 - sub int64 -} - -func bytesToRev(bytes []byte) revision { - return revision{ - main: int64(binary.BigEndian.Uint64(bytes[0:8])), - sub: int64(binary.BigEndian.Uint64(bytes[9:])), - } -}