Skip to content

Commit

Permalink
Add snapshot file creator and reader functions
Browse files Browse the repository at this point in the history
This commit adds reusable functions for creating and reading
the snapshot files. These functions are expected to be used
by various components that would generate or consume the snapshot
files, such as blockstorage and statedb

Signed-off-by: manish <manish.sethi@gmail.com>
  • Loading branch information
manish-sethi authored and denyeart committed May 12, 2020
1 parent cc92b8d commit e12530c
Show file tree
Hide file tree
Showing 2 changed files with 393 additions and 0 deletions.
201 changes: 201 additions & 0 deletions common/ledger/snapshot/file.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package snapshot

import (
"bufio"
"encoding/binary"
"fmt"
"hash"
"io"
"os"

"github.com/golang/protobuf/proto"
"github.com/pkg/errors"
)

// FileWriter creates a new file for ledger snapshot. This is expected to be used by various
// components of ledger, such as blockstorage and statedb for exporting the relevent snapshot data
type FileWriter struct {
file *os.File
hasher hash.Hash
bufWriter *bufio.Writer
multiWriter io.Writer
varintReusableBuf []byte
}

// CreateFile creates a new file for exporting the ledger snapshot data
// This function returns an error if the file already exists. The `dataformat` is the first byte
// written to the file. The hasher is used for computing the hash of the data stream
func CreateFile(filePath string, dataformat byte, hasher hash.Hash) (*FileWriter, error) {
// create the file only if it does not already exist.
// set the permission mode to read-only, as once the file is closed, we do not support modifying the file
file, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0444)
if err != nil {
return nil, errors.Wrapf(err, "error while creating the snapshot file: %s", filePath)
}
bufWriter := bufio.NewWriter(file)
multiWriter := io.MultiWriter(bufWriter, hasher)
if _, err := multiWriter.Write([]byte{dataformat}); err != nil {
file.Close()
return nil, errors.Wrapf(err, "error while writing data format to the snapshot file: %s", filePath)
}
return &FileWriter{
file: file,
bufWriter: bufWriter,
multiWriter: multiWriter,
hasher: hasher,
varintReusableBuf: make([]byte, binary.MaxVarintLen64),
}, nil
}

// EncodeString encodes and appends the string to the data stream
func (c *FileWriter) EncodeString(str string) error {
return c.EncodeBytes([]byte(str))
}

// EncodeString encodes and appends a proto message to the data stream
func (c *FileWriter) EncodeProtoMessage(m proto.Message) error {
b, err := proto.Marshal(m)
if err != nil {
return errors.Wrapf(err, "error marshalling proto message to write to the snapshot file: %s", c.file.Name())
}
return c.EncodeBytes(b)
}

// EncodeBytes encodes and appends bytes to the data stream
func (c *FileWriter) EncodeBytes(b []byte) error {
if err := c.EncodeUVarint(uint64(len(b))); err != nil {
return err
}
if _, err := c.multiWriter.Write(b); err != nil {
return errors.Wrapf(err, "error while writing data to the snapshot file: %s", c.file.Name())
}
return nil
}

// EncodeUVarint encodes and appends a number to the data stream
func (c *FileWriter) EncodeUVarint(u uint64) error {
n := binary.PutUvarint(c.varintReusableBuf, u)
if _, err := c.multiWriter.Write(c.varintReusableBuf[:n]); err != nil {
return errors.Wrapf(err, "error while writing data to the snapshot file: %s", c.file.Name())
}
return nil
}

// Done closes the snapshot file and returns the final hash of the data stream
func (c *FileWriter) Done() ([]byte, error) {
if err := c.bufWriter.Flush(); err != nil {
return nil, errors.Wrapf(err, "error while flushing to the snapshot file: %s ", c.file.Name())
}
if err := c.file.Close(); err != nil {
return nil, errors.Wrapf(err, "error while closing the snapshot file: %s ", c.file.Name())
}
return c.hasher.Sum(nil), nil
}

// Close closes the underlying file, if not already done. A consumer can invoke this function if the consumer
// encountered some error and simply wants to abandon the snapshot file creation (typically, intended to be used in a defer statement)
func (c *FileWriter) Close() error {
if c == nil {
return nil
}
return errors.Wrapf(c.file.Close(), "error while closing the snapshot file: %s", c.file.Name())
}

// FileReader reads from a ledger snapshot file. This is expected to be used for loading the ledger snapshot data
// during bootstrapping a channel from snapshot. The data should be read, using the functions `DecodeXXX`,
// in the same sequence in which the data was written by the functions `EncodeXXX` in the `FileCreator`.
// Note that the FileReader does not verifies the hash of stream and it is expected that the hash has been verified
// by the consumer. Later, if we decide to perform this, on-the-side, while loading the snapshot data, the FileRedear,
// like the FileCreator, would take a `hasher` as an input
type FileReader struct {
file *os.File
bufReader *bufio.Reader
reusableByteSlice []byte
}

// OpenFile constructs a FileReader. This function returns an error if the format of the file, stored in the
// first byte, does not match with the expectedDataFormat
func OpenFile(filePath string, expectDataformat byte) (*FileReader, error) {
file, err := os.Open(filePath)
if err != nil {
return nil, errors.Wrapf(err, "error while opening the snapshot file: %s", filePath)
}
bufReader := bufio.NewReader(file)
dataFormat, err := bufReader.ReadByte()
if err != nil {
file.Close()
return nil, errors.Wrapf(err, "error while reading from the snapshot file: %s", filePath)
}
if dataFormat != expectDataformat {
file.Close()
return nil, errors.New(fmt.Sprintf("unexpected data format: %x", dataFormat))
}
return &FileReader{
file: file,
bufReader: bufReader,
}, nil
}

// DecodeString reads and decodes a string
func (r *FileReader) DecodeString() (string, error) {
b, err := r.decodeBytes()
return string(b), err
}

// DecodeBytes reads and decodes bytes
func (r *FileReader) DecodeBytes() ([]byte, error) {
b, err := r.decodeBytes()
if err != nil {
return nil, err
}
c := make([]byte, len(b))
copy(c, b)
return c, nil
}

// DecodeUVarInt reads and decodes a number
func (r *FileReader) DecodeUVarInt() (uint64, error) {
u, err := binary.ReadUvarint(r.bufReader)
if err != nil {
return 0, errors.Wrapf(err, "error while reading from snapshot file: %s", r.file.Name())
}
return u, nil
}

// DecodeProtoMessage reads and decodes a protoMessage
func (r *FileReader) DecodeProtoMessage(m proto.Message) error {
b, err := r.decodeBytes()
if err != nil {
return err
}
return proto.Unmarshal(b, m)
}

// Close closes the file
func (r *FileReader) Close() error {
if r == nil {
return nil
}
return errors.Wrapf(r.file.Close(), "error while closing the snapshot file: %s", r.file.Name())
}

func (r *FileReader) decodeBytes() ([]byte, error) {
sizeUint, err := r.DecodeUVarInt()
if err != nil {
return nil, err
}
size := int(sizeUint)
if len(r.reusableByteSlice) < size {
r.reusableByteSlice = make([]byte, size)
}
if _, err := r.bufReader.Read(r.reusableByteSlice[0:size]); err != nil {
return nil, errors.Wrapf(err, "error while reading from snapshot file: %s", r.file.Name())
}
return r.reusableByteSlice[0:size], nil
}
192 changes: 192 additions & 0 deletions common/ledger/snapshot/file_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package snapshot

import (
"bufio"
"crypto/sha256"
"errors"
"io/ioutil"
"os"
"path"
"testing"

"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric-protos-go/common"
"github.com/stretchr/testify/require"
)

func TestFileCreateAndRead(t *testing.T) {
testDir := testPath(t)
defer os.RemoveAll(testDir)

// create file and encode some data
fileCreator, err := CreateFile(path.Join(testDir, "dataFile"), byte(5), sha256.New())
defer fileCreator.Close()

require.NoError(t, err)
require.NoError(t, fileCreator.EncodeString("Hi there"))
require.NoError(t, fileCreator.EncodeString("How are you?"))
require.NoError(t, fileCreator.EncodeUVarint(uint64(25)))
require.NoError(t, fileCreator.EncodeProtoMessage(
&common.BlockchainInfo{
Height: 30,
CurrentBlockHash: []byte("Current-Block-Hash"),
PreviousBlockHash: []byte("Previous-Block-Hash"),
},
))
require.NoError(t, fileCreator.EncodeBytes([]byte("some junk bytes")))

// Done and verify the returned hash
dataHash, err := fileCreator.Done()
require.NoError(t, err)
require.Equal(
t,
dataHash,
computeSha256(t, path.Join(testDir, "dataFile")),
)

// open the file and verify the reads
fileReader, err := OpenFile(path.Join(testDir, "dataFile"), byte(5))
defer fileReader.Close()

str, err := fileReader.DecodeString()
require.NoError(t, err)
require.Equal(t, "Hi there", str)

str, err = fileReader.DecodeString()
require.NoError(t, err)
require.Equal(t, "How are you?", str)

number, err := fileReader.DecodeUVarInt()
require.NoError(t, err)
require.Equal(t, uint64(25), number)

retrievedBlockchainInfo := &common.BlockchainInfo{}
require.NoError(t, fileReader.DecodeProtoMessage(retrievedBlockchainInfo))
require.True(t, proto.Equal(
&common.BlockchainInfo{
Height: 30,
CurrentBlockHash: []byte("Current-Block-Hash"),
PreviousBlockHash: []byte("Previous-Block-Hash"),
},
retrievedBlockchainInfo,
))

b, err := fileReader.DecodeBytes()
require.NoError(t, err)
require.Equal(t, []byte("some junk bytes"), b)
}

func TestFileCreatorErrorPropagation(t *testing.T) {
testPath := testPath(t)
defer os.RemoveAll(testPath)

// error propagation from CreateFile function when file already exists
existingFilePath := path.Join(testPath, "an-existing-file")
file, err := os.Create(existingFilePath)
require.NoError(t, err)
require.NoError(t, file.Close())
_, err = CreateFile(existingFilePath, byte(1), sha256.New())
require.Contains(t, err.Error(), "error while creating the snapshot file: "+existingFilePath)

// error propagation from Encode functions.
// Mimic the errors by setting the writer to an error returning writer
dataFilePath := path.Join(testPath, "data-file")
fileCreator, err := CreateFile(dataFilePath, byte(1), sha256.New())
defer fileCreator.Close()

fileCreator.multiWriter = &errorCausingWriter{err: errors.New("error-from-EncodeUVarint")}
require.EqualError(t, fileCreator.EncodeUVarint(9), "error while writing data to the snapshot file: "+dataFilePath+": error-from-EncodeUVarint")

fileCreator.multiWriter = &errorCausingWriter{err: errors.New("error-from-EncodeBytes")}
require.EqualError(t, fileCreator.EncodeBytes([]byte("junk")), "error while writing data to the snapshot file: "+dataFilePath+": error-from-EncodeBytes")

fileCreator.multiWriter = &errorCausingWriter{err: errors.New("error-from-EncodeProtoMessage")}
require.EqualError(t, fileCreator.EncodeProtoMessage(&common.BlockchainInfo{}), "error while writing data to the snapshot file: "+dataFilePath+": error-from-EncodeProtoMessage")
require.EqualError(t, fileCreator.EncodeProtoMessage(nil), "error marshalling proto message to write to the snapshot file: "+dataFilePath+": proto: Marshal called with nil")

fileCreator.multiWriter = &errorCausingWriter{err: errors.New("error-from-EncodeString")}
require.EqualError(t, fileCreator.EncodeString("junk"), "error while writing data to the snapshot file: "+dataFilePath+": error-from-EncodeString")

// error propagation from Done function
fileCreator.file.Close()
_, err = fileCreator.Done()
require.Contains(t, err.Error(), "error while flushing to the snapshot file: "+dataFilePath)

// error propagation from Close function
require.Contains(t, fileCreator.Close().Error(), "error while closing the snapshot file: "+dataFilePath)
}

func TestFileReaderErrorPropagation(t *testing.T) {
testPath := testPath(t)
defer os.RemoveAll(testPath)

// non-existent-file cuases an error
nonExistentFile := path.Join(testPath, "non-existent-file")
_, err := OpenFile(nonExistentFile, byte(1))
require.Contains(t, err.Error(), "error while opening the snapshot file: "+nonExistentFile)

// an empty-file causes an error
emptyFile := path.Join(testPath, "empty-file")
f, err := os.Create(emptyFile)
require.NoError(t, err)
f.Close()
emptyFileReader, err := OpenFile(emptyFile, byte(1))
require.Contains(t, err.Error(), "error while reading from the snapshot file: "+emptyFile)
defer emptyFileReader.Close()

// a file with mismatched format info causes an error
unexpectedFormatFile := path.Join(testPath, "wrong-data-format-file")
fw, err := CreateFile(unexpectedFormatFile, byte(1), sha256.New())
require.NoError(t, fw.EncodeString("Hello there"))
_, err = fw.Done()
require.NoError(t, err)
unexpectedFormatFileReader, err := OpenFile(unexpectedFormatFile, byte(2))
require.EqualError(t, err, "unexpected data format: 1")
defer unexpectedFormatFileReader.Close()

// decodeMethodsErrors - mimic errors by closing the underlying file
closedFile := path.Join(testPath, "closed-file")
f, err = os.Create(closedFile)
require.NoError(t, err)
require.NoError(t, f.Close())

closedFileReader := &FileReader{
file: f,
bufReader: bufio.NewReader(f),
}
_, err = closedFileReader.DecodeUVarInt()
require.Contains(t, err.Error(), "error while reading from snapshot file: "+closedFile)
_, err = closedFileReader.DecodeBytes()
require.Contains(t, err.Error(), "error while reading from snapshot file: "+closedFile)
_, err = closedFileReader.DecodeString()
require.Contains(t, err.Error(), "error while reading from snapshot file: "+closedFile)
err = closedFileReader.DecodeProtoMessage(&common.BlockchainInfo{})
require.Contains(t, err.Error(), "error while reading from snapshot file: "+closedFile)
err = closedFileReader.Close()
require.Contains(t, err.Error(), "error while closing the snapshot file: "+closedFile)
}

func computeSha256(t *testing.T, file string) []byte {
data, err := ioutil.ReadFile(file)
require.NoError(t, err)
sha := sha256.Sum256(data)
return sha[:]
}

func testPath(t *testing.T) string {
path, err := ioutil.TempDir("", "test-file-encoder-")
require.NoError(t, err)
return path
}

type errorCausingWriter struct {
err error
}

func (w *errorCausingWriter) Write(p []byte) (n int, err error) { return 0, w.err }

0 comments on commit e12530c

Please sign in to comment.