Skip to content

Commit

Permalink
Port consul changes to snapshot (#4)
Browse files Browse the repository at this point in the history
* Port Consul changes to snapshots

Consul has made some minor improvements in handling truncated and
invalid snapshots as well as handling gzip closing changes.

* tests: path.Join -> filepath.Join

* avoid loading gzip content in memory
  • Loading branch information
Mahmood Ali authored Jun 5, 2020
1 parent 84141ff commit ef3642a
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 16 deletions.
2 changes: 1 addition & 1 deletion archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func read(in io.Reader, metadata *raft.SnapshotMeta, snap io.Writer, sealer Seal
// Previously we used json.Decode to decode the archive stream. There are
// edgecases in which it doesn't read all the bytes from the stream, even
// though the json object is still being parsed properly. Since we
// simutaniously feeded everything to metaHash, our hash ended up being
// simultaneously feeded everything to metaHash, our hash ended up being
// different than what we calculated when creating the snapshot. Which in
// turn made the snapshot verification fail. By explicitly reading the
// whole thing first we ensure that we calculate the correct hash
Expand Down
34 changes: 27 additions & 7 deletions snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"io/ioutil"
"os"

log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/raft"
)

Expand Down Expand Up @@ -43,11 +43,11 @@ type Sealer interface {
// and returns an object that gives access to the file as an io.Reader. You must
// arrange to call Close() on the returned object or else you will leak a
// temporary file.
func New(logger log.Logger, r *raft.Raft) (*Snapshot, error) {
func New(logger hclog.Logger, r *raft.Raft) (*Snapshot, error) {
return NewWithSealer(logger, r, nil)
}

func NewWithSealer(logger log.Logger, r *raft.Raft, sealer Sealer) (*Snapshot, error) {
func NewWithSealer(logger hclog.Logger, r *raft.Raft, sealer Sealer) (*Snapshot, error) {
// Take the snapshot.
future := r.Snapshot()
if err := future.Error(); err != nil {
Expand Down Expand Up @@ -158,9 +158,29 @@ func Verify(in io.Reader) (*raft.SnapshotMeta, error) {
if err := read(decomp, &metadata, ioutil.Discard, nil); err != nil {
return nil, fmt.Errorf("failed to read snapshot file: %v", err)
}

if err := concludeGzipRead(decomp); err != nil {
return nil, err
}

return &metadata, nil
}

// concludeGzipRead should be invoked after you think you've consumed all of
// the data from the gzip stream. It will error if the stream was corrupt.
//
// The docs for gzip.Reader say: "Clients should treat data returned by Read as
// tentative until they receive the io.EOF marking the end of the data."
func concludeGzipRead(decomp *gzip.Reader) error {
extra, err := io.Copy(ioutil.Discard, decomp) // Copy consumes the EOF
if err != nil {
return err
} else if extra != 0 {
return fmt.Errorf("%d unread uncompressed bytes remain", extra)
}
return nil
}

// Parse reads the snapshot from the input reader, decompresses it, and pipes
// the binary data to the output writer.
func Parse(in io.Reader, out io.Writer) (*raft.SnapshotMeta, error) {
Expand All @@ -181,11 +201,11 @@ func Parse(in io.Reader, out io.Writer) (*raft.SnapshotMeta, error) {

// Restore takes the snapshot from the reader and attempts to apply it to the
// given Raft instance.
func Restore(logger log.Logger, in io.Reader, r *raft.Raft) error {
func Restore(logger hclog.Logger, in io.Reader, r *raft.Raft) error {
return RestoreWithSealer(logger, in, r, nil)
}

func RestoreWithSealer(logger log.Logger, in io.Reader, r *raft.Raft, sealer Sealer) error {
func RestoreWithSealer(logger hclog.Logger, in io.Reader, r *raft.Raft, sealer Sealer) error {
var metadata raft.SnapshotMeta
snap, cleanupFunc, err := WriteToTempFileWithSealer(logger, in, &metadata, sealer)
if err != nil {
Expand All @@ -201,11 +221,11 @@ func RestoreWithSealer(logger log.Logger, in io.Reader, r *raft.Raft, sealer Sea
return nil
}

func WriteToTempFile(logger log.Logger, in io.Reader, metadata *raft.SnapshotMeta) (*os.File, func(), error) {
func WriteToTempFile(logger hclog.Logger, in io.Reader, metadata *raft.SnapshotMeta) (*os.File, func(), error) {
return WriteToTempFileWithSealer(logger, in, metadata, nil)
}

func WriteToTempFileWithSealer(logger log.Logger, in io.Reader, metadata *raft.SnapshotMeta, sealer Sealer) (*os.File, func(), error) {
func WriteToTempFileWithSealer(logger hclog.Logger, in io.Reader, metadata *raft.SnapshotMeta, sealer Sealer) (*os.File, func(), error) {
// Wrap the reader in a gzip decompressor.
decomp, err := gzip.NewReader(in)
if err != nil {
Expand Down
72 changes: 64 additions & 8 deletions snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ import (
"fmt"
"io"
"os"
"path"
"path/filepath"
"strings"
"sync"
"testing"
"time"

"github.com/hashicorp/consul/sdk/testutil"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/raft"
)
Expand Down Expand Up @@ -133,7 +133,7 @@ func TestSnapshot(t *testing.T) {
// apply off to a buffer for checking post-snapshot.
var expected []bytes.Buffer
entries := 64 * 1024
before, _ := makeRaft(t, path.Join(dir, "before"))
before, _ := makeRaft(t, filepath.Join(dir, "before"))
defer before.Shutdown()
for i := 0; i < entries; i++ {
var log bytes.Buffer
Expand All @@ -150,7 +150,7 @@ func TestSnapshot(t *testing.T) {
}

// Take a snapshot.
logger := log.Default()
logger := hclog.Default()
snap, err := New(logger, before)
if err != nil {
t.Fatalf("err: %v", err)
Expand All @@ -176,7 +176,7 @@ func TestSnapshot(t *testing.T) {
}

// Make a new, independent Raft.
after, fsm := makeRaft(t, path.Join(dir, "after"))
after, fsm := makeRaft(t, filepath.Join(dir, "after"))
defer after.Shutdown()

// Put some initial data in there that the snapshot should overwrite.
Expand Down Expand Up @@ -234,12 +234,68 @@ func TestSnapshot_BadVerify(t *testing.T) {
}
}

func TestSnapshot_TruncatedVerify(t *testing.T) {
dir := testutil.TempDir(t, "snapshot")
defer os.RemoveAll(dir)

// Make a Raft and populate it with some data. We tee everything we
// apply off to a buffer for checking post-snapshot.
entries := 64 * 1024
before, _ := makeRaft(t, filepath.Join(dir, "before"))
defer before.Shutdown()
for i := 0; i < entries; i++ {
var log bytes.Buffer
var copy bytes.Buffer
both := io.MultiWriter(&log, &copy)

_, err := io.CopyN(both, rand.Reader, 256)
if err != nil {
t.Fatalf("err: %v", err)
}

future := before.Apply(log.Bytes(), time.Second)
if future.Error() != nil {
t.Fatalf("err: %v", future.Error())
}
}

// Take a snapshot.
logger := hclog.Default()
snap, err := New(logger, before)
if err != nil {
t.Fatalf("err: %v", err)
}
defer snap.Close()

var data []byte
{
var buf bytes.Buffer
_, err = io.Copy(&buf, snap)
if err != nil {
t.Fatalf("err: %v", err)
}
data = buf.Bytes()
}

for _, removeBytes := range []int{200, 16, 8, 4, 2, 1} {
t.Run(fmt.Sprintf("truncate %d bytes from end", removeBytes), func(t *testing.T) {
// Lop off part of the end.
buf := bytes.NewReader(data[0 : len(data)-removeBytes])

_, err = Verify(buf)
if err == nil {
t.Fatalf("expected snapshot to fail validation, but did not")
}
})
}
}

func TestSnapshot_BadRestore(t *testing.T) {
dir := testutil.TempDir(t, "snapshot")
defer os.RemoveAll(dir)

// Make a Raft and populate it with some data.
before, _ := makeRaft(t, path.Join(dir, "before"))
before, _ := makeRaft(t, filepath.Join(dir, "before"))
defer before.Shutdown()
for i := 0; i < 16*1024; i++ {
var log bytes.Buffer
Expand All @@ -253,14 +309,14 @@ func TestSnapshot_BadRestore(t *testing.T) {
}

// Take a snapshot.
logger := log.Default()
logger := hclog.Default()
snap, err := New(logger, before)
if err != nil {
t.Fatalf("err: %v", err)
}

// Make a new, independent Raft.
after, fsm := makeRaft(t, path.Join(dir, "after"))
after, fsm := makeRaft(t, filepath.Join(dir, "after"))
defer after.Shutdown()

// Put some initial data in there that should not be harmed by the
Expand Down

0 comments on commit ef3642a

Please sign in to comment.