diff --git a/common/ledger/snapshot/file.go b/common/ledger/snapshot/file.go new file mode 100644 index 00000000000..b83c46698ff --- /dev/null +++ b/common/ledger/snapshot/file.go @@ -0,0 +1,201 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package snapshot + +import ( + "bufio" + "encoding/binary" + "fmt" + "hash" + "io" + "os" + + "github.com/golang/protobuf/proto" + "github.com/pkg/errors" +) + +// FileWriter creates a new file for ledger snapshot. This is expected to be used by various +// components of ledger, such as blockstorage and statedb for exporting the relevent snapshot data +type FileWriter struct { + file *os.File + hasher hash.Hash + bufWriter *bufio.Writer + multiWriter io.Writer + varintReusableBuf []byte +} + +// CreateFile creates a new file for exporting the ledger snapshot data +// This function returns an error if the file already exists. The `dataformat` is the first byte +// written to the file. The hasher is used for computing the hash of the data stream +func CreateFile(filePath string, dataformat byte, hasher hash.Hash) (*FileWriter, error) { + // create the file only if it does not already exist. + // set the permission mode to read-only, as once the file is closed, we do not support modifying the file + file, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0444) + if err != nil { + return nil, errors.Wrapf(err, "error while creating the snapshot file: %s", filePath) + } + bufWriter := bufio.NewWriter(file) + multiWriter := io.MultiWriter(bufWriter, hasher) + if _, err := multiWriter.Write([]byte{dataformat}); err != nil { + file.Close() + return nil, errors.Wrapf(err, "error while writing data format to the snapshot file: %s", filePath) + } + return &FileWriter{ + file: file, + bufWriter: bufWriter, + multiWriter: multiWriter, + hasher: hasher, + varintReusableBuf: make([]byte, binary.MaxVarintLen64), + }, nil +} + +// EncodeString encodes and appends the string to the data stream +func (c *FileWriter) EncodeString(str string) error { + return c.EncodeBytes([]byte(str)) +} + +// EncodeString encodes and appends a proto message to the data stream +func (c *FileWriter) EncodeProtoMessage(m proto.Message) error { + b, err := proto.Marshal(m) + if err != nil { + return errors.Wrapf(err, "error marshalling proto message to write to the snapshot file: %s", c.file.Name()) + } + return c.EncodeBytes(b) +} + +// EncodeBytes encodes and appends bytes to the data stream +func (c *FileWriter) EncodeBytes(b []byte) error { + if err := c.EncodeUVarint(uint64(len(b))); err != nil { + return err + } + if _, err := c.multiWriter.Write(b); err != nil { + return errors.Wrapf(err, "error while writing data to the snapshot file: %s", c.file.Name()) + } + return nil +} + +// EncodeUVarint encodes and appends a number to the data stream +func (c *FileWriter) EncodeUVarint(u uint64) error { + n := binary.PutUvarint(c.varintReusableBuf, u) + if _, err := c.multiWriter.Write(c.varintReusableBuf[:n]); err != nil { + return errors.Wrapf(err, "error while writing data to the snapshot file: %s", c.file.Name()) + } + return nil +} + +// Done closes the snapshot file and returns the final hash of the data stream +func (c *FileWriter) Done() ([]byte, error) { + if err := c.bufWriter.Flush(); err != nil { + return nil, errors.Wrapf(err, "error while flushing to the snapshot file: %s ", c.file.Name()) + } + if err := c.file.Close(); err != nil { + return nil, errors.Wrapf(err, "error while closing the snapshot file: %s ", c.file.Name()) + } + return c.hasher.Sum(nil), nil +} + +// Close closes the underlying file, if not already done. A consumer can invoke this function if the consumer +// encountered some error and simply wants to abandon the snapshot file creation (typically, intended to be used in a defer statement) +func (c *FileWriter) Close() error { + if c == nil { + return nil + } + return errors.Wrapf(c.file.Close(), "error while closing the snapshot file: %s", c.file.Name()) +} + +// FileReader reads from a ledger snapshot file. This is expected to be used for loading the ledger snapshot data +// during bootstrapping a channel from snapshot. The data should be read, using the functions `DecodeXXX`, +// in the same sequence in which the data was written by the functions `EncodeXXX` in the `FileCreator`. +// Note that the FileReader does not verifies the hash of stream and it is expected that the hash has been verified +// by the consumer. Later, if we decide to perform this, on-the-side, while loading the snapshot data, the FileRedear, +// like the FileCreator, would take a `hasher` as an input +type FileReader struct { + file *os.File + bufReader *bufio.Reader + reusableByteSlice []byte +} + +// OpenFile constructs a FileReader. This function returns an error if the format of the file, stored in the +// first byte, does not match with the expectedDataFormat +func OpenFile(filePath string, expectDataformat byte) (*FileReader, error) { + file, err := os.Open(filePath) + if err != nil { + return nil, errors.Wrapf(err, "error while opening the snapshot file: %s", filePath) + } + bufReader := bufio.NewReader(file) + dataFormat, err := bufReader.ReadByte() + if err != nil { + file.Close() + return nil, errors.Wrapf(err, "error while reading from the snapshot file: %s", filePath) + } + if dataFormat != expectDataformat { + file.Close() + return nil, errors.New(fmt.Sprintf("unexpected data format: %x", dataFormat)) + } + return &FileReader{ + file: file, + bufReader: bufReader, + }, nil +} + +// DecodeString reads and decodes a string +func (r *FileReader) DecodeString() (string, error) { + b, err := r.decodeBytes() + return string(b), err +} + +// DecodeBytes reads and decodes bytes +func (r *FileReader) DecodeBytes() ([]byte, error) { + b, err := r.decodeBytes() + if err != nil { + return nil, err + } + c := make([]byte, len(b)) + copy(c, b) + return c, nil +} + +// DecodeUVarInt reads and decodes a number +func (r *FileReader) DecodeUVarInt() (uint64, error) { + u, err := binary.ReadUvarint(r.bufReader) + if err != nil { + return 0, errors.Wrapf(err, "error while reading from snapshot file: %s", r.file.Name()) + } + return u, nil +} + +// DecodeProtoMessage reads and decodes a protoMessage +func (r *FileReader) DecodeProtoMessage(m proto.Message) error { + b, err := r.decodeBytes() + if err != nil { + return err + } + return proto.Unmarshal(b, m) +} + +// Close closes the file +func (r *FileReader) Close() error { + if r == nil { + return nil + } + return errors.Wrapf(r.file.Close(), "error while closing the snapshot file: %s", r.file.Name()) +} + +func (r *FileReader) decodeBytes() ([]byte, error) { + sizeUint, err := r.DecodeUVarInt() + if err != nil { + return nil, err + } + size := int(sizeUint) + if len(r.reusableByteSlice) < size { + r.reusableByteSlice = make([]byte, size) + } + if _, err := r.bufReader.Read(r.reusableByteSlice[0:size]); err != nil { + return nil, errors.Wrapf(err, "error while reading from snapshot file: %s", r.file.Name()) + } + return r.reusableByteSlice[0:size], nil +} diff --git a/common/ledger/snapshot/file_test.go b/common/ledger/snapshot/file_test.go new file mode 100644 index 00000000000..870df7df4dd --- /dev/null +++ b/common/ledger/snapshot/file_test.go @@ -0,0 +1,192 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package snapshot + +import ( + "bufio" + "crypto/sha256" + "errors" + "io/ioutil" + "os" + "path" + "testing" + + "github.com/golang/protobuf/proto" + "github.com/hyperledger/fabric-protos-go/common" + "github.com/stretchr/testify/require" +) + +func TestFileCreateAndRead(t *testing.T) { + testDir := testPath(t) + defer os.RemoveAll(testDir) + + // create file and encode some data + fileCreator, err := CreateFile(path.Join(testDir, "dataFile"), byte(5), sha256.New()) + defer fileCreator.Close() + + require.NoError(t, err) + require.NoError(t, fileCreator.EncodeString("Hi there")) + require.NoError(t, fileCreator.EncodeString("How are you?")) + require.NoError(t, fileCreator.EncodeUVarint(uint64(25))) + require.NoError(t, fileCreator.EncodeProtoMessage( + &common.BlockchainInfo{ + Height: 30, + CurrentBlockHash: []byte("Current-Block-Hash"), + PreviousBlockHash: []byte("Previous-Block-Hash"), + }, + )) + require.NoError(t, fileCreator.EncodeBytes([]byte("some junk bytes"))) + + // Done and verify the returned hash + dataHash, err := fileCreator.Done() + require.NoError(t, err) + require.Equal( + t, + dataHash, + computeSha256(t, path.Join(testDir, "dataFile")), + ) + + // open the file and verify the reads + fileReader, err := OpenFile(path.Join(testDir, "dataFile"), byte(5)) + defer fileReader.Close() + + str, err := fileReader.DecodeString() + require.NoError(t, err) + require.Equal(t, "Hi there", str) + + str, err = fileReader.DecodeString() + require.NoError(t, err) + require.Equal(t, "How are you?", str) + + number, err := fileReader.DecodeUVarInt() + require.NoError(t, err) + require.Equal(t, uint64(25), number) + + retrievedBlockchainInfo := &common.BlockchainInfo{} + require.NoError(t, fileReader.DecodeProtoMessage(retrievedBlockchainInfo)) + require.True(t, proto.Equal( + &common.BlockchainInfo{ + Height: 30, + CurrentBlockHash: []byte("Current-Block-Hash"), + PreviousBlockHash: []byte("Previous-Block-Hash"), + }, + retrievedBlockchainInfo, + )) + + b, err := fileReader.DecodeBytes() + require.NoError(t, err) + require.Equal(t, []byte("some junk bytes"), b) +} + +func TestFileCreatorErrorPropagation(t *testing.T) { + testPath := testPath(t) + defer os.RemoveAll(testPath) + + // error propagation from CreateFile function when file already exists + existingFilePath := path.Join(testPath, "an-existing-file") + file, err := os.Create(existingFilePath) + require.NoError(t, err) + require.NoError(t, file.Close()) + _, err = CreateFile(existingFilePath, byte(1), sha256.New()) + require.Contains(t, err.Error(), "error while creating the snapshot file: "+existingFilePath) + + // error propagation from Encode functions. + // Mimic the errors by setting the writer to an error returning writer + dataFilePath := path.Join(testPath, "data-file") + fileCreator, err := CreateFile(dataFilePath, byte(1), sha256.New()) + defer fileCreator.Close() + + fileCreator.multiWriter = &errorCausingWriter{err: errors.New("error-from-EncodeUVarint")} + require.EqualError(t, fileCreator.EncodeUVarint(9), "error while writing data to the snapshot file: "+dataFilePath+": error-from-EncodeUVarint") + + fileCreator.multiWriter = &errorCausingWriter{err: errors.New("error-from-EncodeBytes")} + require.EqualError(t, fileCreator.EncodeBytes([]byte("junk")), "error while writing data to the snapshot file: "+dataFilePath+": error-from-EncodeBytes") + + fileCreator.multiWriter = &errorCausingWriter{err: errors.New("error-from-EncodeProtoMessage")} + require.EqualError(t, fileCreator.EncodeProtoMessage(&common.BlockchainInfo{}), "error while writing data to the snapshot file: "+dataFilePath+": error-from-EncodeProtoMessage") + require.EqualError(t, fileCreator.EncodeProtoMessage(nil), "error marshalling proto message to write to the snapshot file: "+dataFilePath+": proto: Marshal called with nil") + + fileCreator.multiWriter = &errorCausingWriter{err: errors.New("error-from-EncodeString")} + require.EqualError(t, fileCreator.EncodeString("junk"), "error while writing data to the snapshot file: "+dataFilePath+": error-from-EncodeString") + + // error propagation from Done function + fileCreator.file.Close() + _, err = fileCreator.Done() + require.Contains(t, err.Error(), "error while flushing to the snapshot file: "+dataFilePath) + + // error propagation from Close function + require.Contains(t, fileCreator.Close().Error(), "error while closing the snapshot file: "+dataFilePath) +} + +func TestFileReaderErrorPropagation(t *testing.T) { + testPath := testPath(t) + defer os.RemoveAll(testPath) + + // non-existent-file cuases an error + nonExistentFile := path.Join(testPath, "non-existent-file") + _, err := OpenFile(nonExistentFile, byte(1)) + require.Contains(t, err.Error(), "error while opening the snapshot file: "+nonExistentFile) + + // an empty-file causes an error + emptyFile := path.Join(testPath, "empty-file") + f, err := os.Create(emptyFile) + require.NoError(t, err) + f.Close() + emptyFileReader, err := OpenFile(emptyFile, byte(1)) + require.Contains(t, err.Error(), "error while reading from the snapshot file: "+emptyFile) + defer emptyFileReader.Close() + + // a file with mismatched format info causes an error + unexpectedFormatFile := path.Join(testPath, "wrong-data-format-file") + fw, err := CreateFile(unexpectedFormatFile, byte(1), sha256.New()) + require.NoError(t, fw.EncodeString("Hello there")) + _, err = fw.Done() + require.NoError(t, err) + unexpectedFormatFileReader, err := OpenFile(unexpectedFormatFile, byte(2)) + require.EqualError(t, err, "unexpected data format: 1") + defer unexpectedFormatFileReader.Close() + + // decodeMethodsErrors - mimic errors by closing the underlying file + closedFile := path.Join(testPath, "closed-file") + f, err = os.Create(closedFile) + require.NoError(t, err) + require.NoError(t, f.Close()) + + closedFileReader := &FileReader{ + file: f, + bufReader: bufio.NewReader(f), + } + _, err = closedFileReader.DecodeUVarInt() + require.Contains(t, err.Error(), "error while reading from snapshot file: "+closedFile) + _, err = closedFileReader.DecodeBytes() + require.Contains(t, err.Error(), "error while reading from snapshot file: "+closedFile) + _, err = closedFileReader.DecodeString() + require.Contains(t, err.Error(), "error while reading from snapshot file: "+closedFile) + err = closedFileReader.DecodeProtoMessage(&common.BlockchainInfo{}) + require.Contains(t, err.Error(), "error while reading from snapshot file: "+closedFile) + err = closedFileReader.Close() + require.Contains(t, err.Error(), "error while closing the snapshot file: "+closedFile) +} + +func computeSha256(t *testing.T, file string) []byte { + data, err := ioutil.ReadFile(file) + require.NoError(t, err) + sha := sha256.Sum256(data) + return sha[:] +} + +func testPath(t *testing.T) string { + path, err := ioutil.TempDir("", "test-file-encoder-") + require.NoError(t, err) + return path +} + +type errorCausingWriter struct { + err error +} + +func (w *errorCausingWriter) Write(p []byte) (n int, err error) { return 0, w.err }