From e12530c33054f3524119b849c7bc338fb012f1ae Mon Sep 17 00:00:00 2001 From: manish Date: Thu, 7 May 2020 18:09:52 -0400 Subject: [PATCH] Add snapshot file creator and reader functions This commit adds reusable functions for creating and reading the snapshot files. These functions are expected to be used by various components that would generate or consume the snapshot files, such as blockstorage and statedb Signed-off-by: manish --- common/ledger/snapshot/file.go | 201 ++++++++++++++++++++++++++++ common/ledger/snapshot/file_test.go | 192 ++++++++++++++++++++++++++ 2 files changed, 393 insertions(+) create mode 100644 common/ledger/snapshot/file.go create mode 100644 common/ledger/snapshot/file_test.go diff --git a/common/ledger/snapshot/file.go b/common/ledger/snapshot/file.go new file mode 100644 index 00000000000..b83c46698ff --- /dev/null +++ b/common/ledger/snapshot/file.go @@ -0,0 +1,201 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package snapshot + +import ( + "bufio" + "encoding/binary" + "fmt" + "hash" + "io" + "os" + + "github.com/golang/protobuf/proto" + "github.com/pkg/errors" +) + +// FileWriter creates a new file for ledger snapshot. This is expected to be used by various +// components of ledger, such as blockstorage and statedb for exporting the relevent snapshot data +type FileWriter struct { + file *os.File + hasher hash.Hash + bufWriter *bufio.Writer + multiWriter io.Writer + varintReusableBuf []byte +} + +// CreateFile creates a new file for exporting the ledger snapshot data +// This function returns an error if the file already exists. The `dataformat` is the first byte +// written to the file. The hasher is used for computing the hash of the data stream +func CreateFile(filePath string, dataformat byte, hasher hash.Hash) (*FileWriter, error) { + // create the file only if it does not already exist. + // set the permission mode to read-only, as once the file is closed, we do not support modifying the file + file, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0444) + if err != nil { + return nil, errors.Wrapf(err, "error while creating the snapshot file: %s", filePath) + } + bufWriter := bufio.NewWriter(file) + multiWriter := io.MultiWriter(bufWriter, hasher) + if _, err := multiWriter.Write([]byte{dataformat}); err != nil { + file.Close() + return nil, errors.Wrapf(err, "error while writing data format to the snapshot file: %s", filePath) + } + return &FileWriter{ + file: file, + bufWriter: bufWriter, + multiWriter: multiWriter, + hasher: hasher, + varintReusableBuf: make([]byte, binary.MaxVarintLen64), + }, nil +} + +// EncodeString encodes and appends the string to the data stream +func (c *FileWriter) EncodeString(str string) error { + return c.EncodeBytes([]byte(str)) +} + +// EncodeString encodes and appends a proto message to the data stream +func (c *FileWriter) EncodeProtoMessage(m proto.Message) error { + b, err := proto.Marshal(m) + if err != nil { + return errors.Wrapf(err, "error marshalling proto message to write to the snapshot file: %s", c.file.Name()) + } + return c.EncodeBytes(b) +} + +// EncodeBytes encodes and appends bytes to the data stream +func (c *FileWriter) EncodeBytes(b []byte) error { + if err := c.EncodeUVarint(uint64(len(b))); err != nil { + return err + } + if _, err := c.multiWriter.Write(b); err != nil { + return errors.Wrapf(err, "error while writing data to the snapshot file: %s", c.file.Name()) + } + return nil +} + +// EncodeUVarint encodes and appends a number to the data stream +func (c *FileWriter) EncodeUVarint(u uint64) error { + n := binary.PutUvarint(c.varintReusableBuf, u) + if _, err := c.multiWriter.Write(c.varintReusableBuf[:n]); err != nil { + return errors.Wrapf(err, "error while writing data to the snapshot file: %s", c.file.Name()) + } + return nil +} + +// Done closes the snapshot file and returns the final hash of the data stream +func (c *FileWriter) Done() ([]byte, error) { + if err := c.bufWriter.Flush(); err != nil { + return nil, errors.Wrapf(err, "error while flushing to the snapshot file: %s ", c.file.Name()) + } + if err := c.file.Close(); err != nil { + return nil, errors.Wrapf(err, "error while closing the snapshot file: %s ", c.file.Name()) + } + return c.hasher.Sum(nil), nil +} + +// Close closes the underlying file, if not already done. A consumer can invoke this function if the consumer +// encountered some error and simply wants to abandon the snapshot file creation (typically, intended to be used in a defer statement) +func (c *FileWriter) Close() error { + if c == nil { + return nil + } + return errors.Wrapf(c.file.Close(), "error while closing the snapshot file: %s", c.file.Name()) +} + +// FileReader reads from a ledger snapshot file. This is expected to be used for loading the ledger snapshot data +// during bootstrapping a channel from snapshot. The data should be read, using the functions `DecodeXXX`, +// in the same sequence in which the data was written by the functions `EncodeXXX` in the `FileCreator`. +// Note that the FileReader does not verifies the hash of stream and it is expected that the hash has been verified +// by the consumer. Later, if we decide to perform this, on-the-side, while loading the snapshot data, the FileRedear, +// like the FileCreator, would take a `hasher` as an input +type FileReader struct { + file *os.File + bufReader *bufio.Reader + reusableByteSlice []byte +} + +// OpenFile constructs a FileReader. This function returns an error if the format of the file, stored in the +// first byte, does not match with the expectedDataFormat +func OpenFile(filePath string, expectDataformat byte) (*FileReader, error) { + file, err := os.Open(filePath) + if err != nil { + return nil, errors.Wrapf(err, "error while opening the snapshot file: %s", filePath) + } + bufReader := bufio.NewReader(file) + dataFormat, err := bufReader.ReadByte() + if err != nil { + file.Close() + return nil, errors.Wrapf(err, "error while reading from the snapshot file: %s", filePath) + } + if dataFormat != expectDataformat { + file.Close() + return nil, errors.New(fmt.Sprintf("unexpected data format: %x", dataFormat)) + } + return &FileReader{ + file: file, + bufReader: bufReader, + }, nil +} + +// DecodeString reads and decodes a string +func (r *FileReader) DecodeString() (string, error) { + b, err := r.decodeBytes() + return string(b), err +} + +// DecodeBytes reads and decodes bytes +func (r *FileReader) DecodeBytes() ([]byte, error) { + b, err := r.decodeBytes() + if err != nil { + return nil, err + } + c := make([]byte, len(b)) + copy(c, b) + return c, nil +} + +// DecodeUVarInt reads and decodes a number +func (r *FileReader) DecodeUVarInt() (uint64, error) { + u, err := binary.ReadUvarint(r.bufReader) + if err != nil { + return 0, errors.Wrapf(err, "error while reading from snapshot file: %s", r.file.Name()) + } + return u, nil +} + +// DecodeProtoMessage reads and decodes a protoMessage +func (r *FileReader) DecodeProtoMessage(m proto.Message) error { + b, err := r.decodeBytes() + if err != nil { + return err + } + return proto.Unmarshal(b, m) +} + +// Close closes the file +func (r *FileReader) Close() error { + if r == nil { + return nil + } + return errors.Wrapf(r.file.Close(), "error while closing the snapshot file: %s", r.file.Name()) +} + +func (r *FileReader) decodeBytes() ([]byte, error) { + sizeUint, err := r.DecodeUVarInt() + if err != nil { + return nil, err + } + size := int(sizeUint) + if len(r.reusableByteSlice) < size { + r.reusableByteSlice = make([]byte, size) + } + if _, err := r.bufReader.Read(r.reusableByteSlice[0:size]); err != nil { + return nil, errors.Wrapf(err, "error while reading from snapshot file: %s", r.file.Name()) + } + return r.reusableByteSlice[0:size], nil +} diff --git a/common/ledger/snapshot/file_test.go b/common/ledger/snapshot/file_test.go new file mode 100644 index 00000000000..870df7df4dd --- /dev/null +++ b/common/ledger/snapshot/file_test.go @@ -0,0 +1,192 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package snapshot + +import ( + "bufio" + "crypto/sha256" + "errors" + "io/ioutil" + "os" + "path" + "testing" + + "github.com/golang/protobuf/proto" + "github.com/hyperledger/fabric-protos-go/common" + "github.com/stretchr/testify/require" +) + +func TestFileCreateAndRead(t *testing.T) { + testDir := testPath(t) + defer os.RemoveAll(testDir) + + // create file and encode some data + fileCreator, err := CreateFile(path.Join(testDir, "dataFile"), byte(5), sha256.New()) + defer fileCreator.Close() + + require.NoError(t, err) + require.NoError(t, fileCreator.EncodeString("Hi there")) + require.NoError(t, fileCreator.EncodeString("How are you?")) + require.NoError(t, fileCreator.EncodeUVarint(uint64(25))) + require.NoError(t, fileCreator.EncodeProtoMessage( + &common.BlockchainInfo{ + Height: 30, + CurrentBlockHash: []byte("Current-Block-Hash"), + PreviousBlockHash: []byte("Previous-Block-Hash"), + }, + )) + require.NoError(t, fileCreator.EncodeBytes([]byte("some junk bytes"))) + + // Done and verify the returned hash + dataHash, err := fileCreator.Done() + require.NoError(t, err) + require.Equal( + t, + dataHash, + computeSha256(t, path.Join(testDir, "dataFile")), + ) + + // open the file and verify the reads + fileReader, err := OpenFile(path.Join(testDir, "dataFile"), byte(5)) + defer fileReader.Close() + + str, err := fileReader.DecodeString() + require.NoError(t, err) + require.Equal(t, "Hi there", str) + + str, err = fileReader.DecodeString() + require.NoError(t, err) + require.Equal(t, "How are you?", str) + + number, err := fileReader.DecodeUVarInt() + require.NoError(t, err) + require.Equal(t, uint64(25), number) + + retrievedBlockchainInfo := &common.BlockchainInfo{} + require.NoError(t, fileReader.DecodeProtoMessage(retrievedBlockchainInfo)) + require.True(t, proto.Equal( + &common.BlockchainInfo{ + Height: 30, + CurrentBlockHash: []byte("Current-Block-Hash"), + PreviousBlockHash: []byte("Previous-Block-Hash"), + }, + retrievedBlockchainInfo, + )) + + b, err := fileReader.DecodeBytes() + require.NoError(t, err) + require.Equal(t, []byte("some junk bytes"), b) +} + +func TestFileCreatorErrorPropagation(t *testing.T) { + testPath := testPath(t) + defer os.RemoveAll(testPath) + + // error propagation from CreateFile function when file already exists + existingFilePath := path.Join(testPath, "an-existing-file") + file, err := os.Create(existingFilePath) + require.NoError(t, err) + require.NoError(t, file.Close()) + _, err = CreateFile(existingFilePath, byte(1), sha256.New()) + require.Contains(t, err.Error(), "error while creating the snapshot file: "+existingFilePath) + + // error propagation from Encode functions. + // Mimic the errors by setting the writer to an error returning writer + dataFilePath := path.Join(testPath, "data-file") + fileCreator, err := CreateFile(dataFilePath, byte(1), sha256.New()) + defer fileCreator.Close() + + fileCreator.multiWriter = &errorCausingWriter{err: errors.New("error-from-EncodeUVarint")} + require.EqualError(t, fileCreator.EncodeUVarint(9), "error while writing data to the snapshot file: "+dataFilePath+": error-from-EncodeUVarint") + + fileCreator.multiWriter = &errorCausingWriter{err: errors.New("error-from-EncodeBytes")} + require.EqualError(t, fileCreator.EncodeBytes([]byte("junk")), "error while writing data to the snapshot file: "+dataFilePath+": error-from-EncodeBytes") + + fileCreator.multiWriter = &errorCausingWriter{err: errors.New("error-from-EncodeProtoMessage")} + require.EqualError(t, fileCreator.EncodeProtoMessage(&common.BlockchainInfo{}), "error while writing data to the snapshot file: "+dataFilePath+": error-from-EncodeProtoMessage") + require.EqualError(t, fileCreator.EncodeProtoMessage(nil), "error marshalling proto message to write to the snapshot file: "+dataFilePath+": proto: Marshal called with nil") + + fileCreator.multiWriter = &errorCausingWriter{err: errors.New("error-from-EncodeString")} + require.EqualError(t, fileCreator.EncodeString("junk"), "error while writing data to the snapshot file: "+dataFilePath+": error-from-EncodeString") + + // error propagation from Done function + fileCreator.file.Close() + _, err = fileCreator.Done() + require.Contains(t, err.Error(), "error while flushing to the snapshot file: "+dataFilePath) + + // error propagation from Close function + require.Contains(t, fileCreator.Close().Error(), "error while closing the snapshot file: "+dataFilePath) +} + +func TestFileReaderErrorPropagation(t *testing.T) { + testPath := testPath(t) + defer os.RemoveAll(testPath) + + // non-existent-file cuases an error + nonExistentFile := path.Join(testPath, "non-existent-file") + _, err := OpenFile(nonExistentFile, byte(1)) + require.Contains(t, err.Error(), "error while opening the snapshot file: "+nonExistentFile) + + // an empty-file causes an error + emptyFile := path.Join(testPath, "empty-file") + f, err := os.Create(emptyFile) + require.NoError(t, err) + f.Close() + emptyFileReader, err := OpenFile(emptyFile, byte(1)) + require.Contains(t, err.Error(), "error while reading from the snapshot file: "+emptyFile) + defer emptyFileReader.Close() + + // a file with mismatched format info causes an error + unexpectedFormatFile := path.Join(testPath, "wrong-data-format-file") + fw, err := CreateFile(unexpectedFormatFile, byte(1), sha256.New()) + require.NoError(t, fw.EncodeString("Hello there")) + _, err = fw.Done() + require.NoError(t, err) + unexpectedFormatFileReader, err := OpenFile(unexpectedFormatFile, byte(2)) + require.EqualError(t, err, "unexpected data format: 1") + defer unexpectedFormatFileReader.Close() + + // decodeMethodsErrors - mimic errors by closing the underlying file + closedFile := path.Join(testPath, "closed-file") + f, err = os.Create(closedFile) + require.NoError(t, err) + require.NoError(t, f.Close()) + + closedFileReader := &FileReader{ + file: f, + bufReader: bufio.NewReader(f), + } + _, err = closedFileReader.DecodeUVarInt() + require.Contains(t, err.Error(), "error while reading from snapshot file: "+closedFile) + _, err = closedFileReader.DecodeBytes() + require.Contains(t, err.Error(), "error while reading from snapshot file: "+closedFile) + _, err = closedFileReader.DecodeString() + require.Contains(t, err.Error(), "error while reading from snapshot file: "+closedFile) + err = closedFileReader.DecodeProtoMessage(&common.BlockchainInfo{}) + require.Contains(t, err.Error(), "error while reading from snapshot file: "+closedFile) + err = closedFileReader.Close() + require.Contains(t, err.Error(), "error while closing the snapshot file: "+closedFile) +} + +func computeSha256(t *testing.T, file string) []byte { + data, err := ioutil.ReadFile(file) + require.NoError(t, err) + sha := sha256.Sum256(data) + return sha[:] +} + +func testPath(t *testing.T) string { + path, err := ioutil.TempDir("", "test-file-encoder-") + require.NoError(t, err) + return path +} + +type errorCausingWriter struct { + err error +} + +func (w *errorCausingWriter) Write(p []byte) (n int, err error) { return 0, w.err }