Skip to content

Commit

Permalink
Merge "[FAB-15689] Check decoding error" into release-1.4
Browse files Browse the repository at this point in the history
  • Loading branch information
Jason Yellick authored and Gerrit Code Review committed Jul 12, 2019
2 parents 2d516a4 + 8150222 commit bbaed63
Show file tree
Hide file tree
Showing 13 changed files with 122 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,10 @@ func (historyDB *historyDB) GetLastSavepoint() (*version.Height, error) {
if err != nil || versionBytes == nil {
return nil, err
}
height, _ := version.NewHeightFromBytes(versionBytes)
height, _, err := version.NewHeightFromBytes(versionBytes)
if err != nil {
return nil, err
}
return height, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,16 @@ func decodeVersionAndMetadata(encodedstr string) (*version.Height, []byte, error
if err = proto.Unmarshal(versionFieldBytes, versionFieldMsg); err != nil {
return nil, nil, err
}
ver, _ := version.NewHeightFromBytes(versionFieldMsg.VersionBytes)
ver, _, err := version.NewHeightFromBytes(versionFieldMsg.VersionBytes)
if err != nil {
return nil, nil, err
}
return ver, versionFieldMsg.Metadata, nil
}

// encodeVersionOldFormat return string representation of version
// With the intorduction of metadata feature, we change the encoding (see function below). However, we retain
// this funtion for test so as to make sure that we can decode old format and support mixed formats present
// With the introduction of metadata feature, we change the encoding (see function below). However, we retain
// this function for test so as to make sure that we can decode old format and support mixed formats present
// in a statedb. This function should be used only in tests to generate the encoding in old format
func encodeVersionOldFormat(version *version.Height) string {
return fmt.Sprintf("%v:%v", version.BlockNum, version.TxNum)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,10 @@ func (vdb *versionedDB) GetLatestSavePoint() (*version.Height, error) {
if versionBytes == nil {
return nil, nil
}
version, _ := version.NewHeightFromBytes(versionBytes)
version, _, err := version.NewHeightFromBytes(versionBytes)
if err != nil {
return nil, err
}
return version, nil
}

Expand Down
21 changes: 15 additions & 6 deletions core/ledger/kvledger/txmgmt/statedb/stateleveldb/value_encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,21 @@ func encodeValue(v *statedb.VersionedValue) ([]byte, error) {
// or the new (v1.3 and later) encoding that supports metadata.
func decodeValue(encodedValue []byte) (*statedb.VersionedValue, error) {
if oldFormatEncoding(encodedValue) {
val, ver := decodeValueOldFormat(encodedValue)
val, ver, err := decodeValueOldFormat(encodedValue)
if err != nil {
return nil, err
}
return &statedb.VersionedValue{Version: ver, Value: val, Metadata: nil}, nil
}
msg := &msgs.VersionedValueProto{}
err := proto.Unmarshal(encodedValue[1:], msg)
if err != nil {
return nil, err
}
ver, _ := version.NewHeightFromBytes(msg.VersionBytes)
ver, _, err := version.NewHeightFromBytes(msg.VersionBytes)
if err != nil {
return nil, err
}
val := msg.Value
metadata := msg.Metadata
// protobuf always makes an empty byte array as nil
Expand All @@ -53,7 +59,7 @@ func decodeValue(encodedValue []byte) (*statedb.VersionedValue, error) {

// encodeValueOldFormat appends the value to the version, allows storage of version and value in binary form.
// With the introduction of metadata feature in v1.3, we change the encoding (see function below). However, we retain
// this funtion for test so as to make sure that we can decode old format and support mixed formats present
// this function for test so as to make sure that we can decode old format and support mixed formats present
// in a statedb. This function should be used only in tests to generate the encoding in old format
func encodeValueOldFormat(value []byte, version *version.Height) []byte {
encodedValue := version.ToBytes()
Expand All @@ -69,10 +75,13 @@ func encodeValueOldFormat(value []byte, version *version.Height) []byte {
// should not be used directly or in a tests. The function 'decodeValue' should be used
// for all decodings - which is expected to detect the encoded format and direct the call
// to this function for decoding the values encoded in the old format
func decodeValueOldFormat(encodedValue []byte) ([]byte, *version.Height) {
height, n := version.NewHeightFromBytes(encodedValue)
func decodeValueOldFormat(encodedValue []byte) ([]byte, *version.Height, error) {
height, n, err := version.NewHeightFromBytes(encodedValue)
if err != nil {
return nil, nil, err
}
value := encodedValue[n:]
return value, height
return value, height, nil
}

// oldFormatEncoding checks whether the value is encoded using the old (pre-v1.3) format
Expand Down
14 changes: 10 additions & 4 deletions core/ledger/kvledger/txmgmt/version/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,16 @@ func NewHeight(blockNum, txNum uint64) *Height {
}

// NewHeightFromBytes constructs a new instance of Height from serialized bytes
func NewHeightFromBytes(b []byte) (*Height, int) {
blockNum, n1, _ := util.DecodeOrderPreservingVarUint64(b)
txNum, n2, _ := util.DecodeOrderPreservingVarUint64(b[n1:])
return NewHeight(blockNum, txNum), n1 + n2
func NewHeightFromBytes(b []byte) (*Height, int, error) {
blockNum, n1, err := util.DecodeOrderPreservingVarUint64(b)
if err != nil {
return nil, -1, err
}
txNum, n2, err := util.DecodeOrderPreservingVarUint64(b[n1:])
if err != nil {
return nil, -1, err
}
return NewHeight(blockNum, txNum), n1 + n2, nil
}

// ToBytes serializes the Height
Expand Down
6 changes: 4 additions & 2 deletions core/ledger/kvledger/txmgmt/version/version_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ import (
func TestVersionSerialization(t *testing.T) {
h1 := NewHeight(10, 100)
b := h1.ToBytes()
h2, n := NewHeightFromBytes(b)
h2, n, err := NewHeightFromBytes(b)
assert.NoError(t, err)
assert.Equal(t, h1, h2)
assert.Len(t, b, n)
}
Expand All @@ -46,7 +47,8 @@ func TestVersionExtraBytes(t *testing.T) {
h1 := NewHeight(10, 100)
b := h1.ToBytes()
b1 := append(b, extraBytes...)
h2, n := NewHeightFromBytes(b1)
h2, n, err := NewHeightFromBytes(b1)
assert.NoError(t, err)
assert.Equal(t, h1, h2)
assert.Len(t, b, n)
assert.Equal(t, extraBytes, b1[n:])
Expand Down
18 changes: 12 additions & 6 deletions core/ledger/pvtdatastorage/kv_encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,12 @@ func encodeExpiryValue(expiryData *ExpiryData) ([]byte, error) {
return proto.Marshal(expiryData)
}

func decodeExpiryKey(expiryKeyBytes []byte) *expiryKey {
height, _ := version.NewHeightFromBytes(expiryKeyBytes[1:])
return &expiryKey{expiringBlk: height.BlockNum, committingBlk: height.TxNum}
func decodeExpiryKey(expiryKeyBytes []byte) (*expiryKey, error) {
height, _, err := version.NewHeightFromBytes(expiryKeyBytes[1:])
if err != nil {
return nil, err
}
return &expiryKey{expiringBlk: height.BlockNum, committingBlk: height.TxNum}, nil
}

func decodeExpiryValue(expiryValueBytes []byte) (*ExpiryData, error) {
Expand All @@ -84,15 +87,18 @@ func decodeExpiryValue(expiryValueBytes []byte) (*ExpiryData, error) {
return expiryData, err
}

func decodeDatakey(datakeyBytes []byte) *dataKey {
v, n := version.NewHeightFromBytes(datakeyBytes[1:])
func decodeDatakey(datakeyBytes []byte) (*dataKey, error) {
v, n, err := version.NewHeightFromBytes(datakeyBytes[1:])
if err != nil {
return nil, err
}
blkNum := v.BlockNum
tranNum := v.TxNum
remainingBytes := datakeyBytes[n+1:]
nilByteIndex := bytes.IndexByte(remainingBytes, nilByte)
ns := string(remainingBytes[:nilByteIndex])
coll := string(remainingBytes[nilByteIndex+1:])
return &dataKey{nsCollBlk{ns, coll, blkNum}, tranNum}
return &dataKey{nsCollBlk{ns, coll, blkNum}, tranNum}, nil
}

func decodeDataValue(datavalueBytes []byte) (*rwset.CollectionPvtReadWriteSet, error) {
Expand Down
3 changes: 2 additions & 1 deletion core/ledger/pvtdatastorage/kv_encoding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ import (

func TestDataKeyEncoding(t *testing.T) {
dataKey1 := &dataKey{nsCollBlk: nsCollBlk{ns: "ns1", coll: "coll1", blkNum: 2}, txNum: 5}
datakey2 := decodeDatakey(encodeDataKey(dataKey1))
datakey2, err := decodeDatakey(encodeDataKey(dataKey1))
assert.NoError(t, err)
assert.Equal(t, dataKey1, datakey2)
}

Expand Down
20 changes: 15 additions & 5 deletions core/ledger/pvtdatastorage/store_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (s *store) Prepare(blockNum uint64, pvtData []*ledger.TxPvtData, missingPvt
}
expectedBlockNum := s.nextBlockNum()
if expectedBlockNum != blockNum {
return &ErrIllegalArgs{fmt.Sprintf("Expected block number=%d, recived block number=%d", expectedBlockNum, blockNum)}
return &ErrIllegalArgs{fmt.Sprintf("Expected block number=%d, received block number=%d", expectedBlockNum, blockNum)}
}

batch := leveldbhelper.NewUpdateBatch()
Expand Down Expand Up @@ -241,7 +241,7 @@ func (s *store) Commit() error {
// would have exact same entries and will overwrite those. This also leaves the
// existing expiry entires as is because, most likely they will also get overwritten
// per new data entries. Even if some of the expiry entries does not get overwritten,
// (beacuse of some data may be missing next time), the additional expiry entries are just
// (because of some data may be missing next time), the additional expiry entries are just
// a Noop
func (s *store) Rollback() error {
if !s.batchPending {
Expand Down Expand Up @@ -626,11 +626,18 @@ func (s *store) GetPvtDataByBlockNum(blockNum uint64, filter ledger.PvtNsCollFil

for itr.Next() {
dataKeyBytes := itr.Key()
if v11Format(dataKeyBytes) {
v11Fmt, err := v11Format(dataKeyBytes)
if err != nil {
return nil, err
}
if v11Fmt {
return v11RetrievePvtdata(itr, filter)
}
dataValueBytes := itr.Value()
dataKey := decodeDatakey(dataKeyBytes)
dataKey, err := decodeDatakey(dataKeyBytes)
if err != nil {
return nil, err
}
expired, err := isExpired(dataKey.nsCollBlk, s.btlPolicy, lastCommittedBlock)
if err != nil {
return nil, err
Expand Down Expand Up @@ -821,7 +828,10 @@ func (s *store) retrieveExpiryEntries(minBlkNum, maxBlkNum uint64) ([]*expiryEnt
for itr.Next() {
expiryKeyBytes := itr.Key()
expiryValueBytes := itr.Value()
expiryKey := decodeExpiryKey(expiryKeyBytes)
expiryKey, err := decodeExpiryKey(expiryKeyBytes)
if err != nil {
return nil, err
}
expiryValue, err := decodeExpiryValue(expiryValueBytes)
if err != nil {
return nil, err
Expand Down
25 changes: 17 additions & 8 deletions core/ledger/pvtdatastorage/v11.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,22 @@ import (
"github.com/hyperledger/fabric/protos/ledger/rwset"
)

func v11Format(datakeyBytes []byte) bool {
_, n := version.NewHeightFromBytes(datakeyBytes[1:])
func v11Format(datakeyBytes []byte) (bool, error) {
_, n, err := version.NewHeightFromBytes(datakeyBytes[1:])
if err != nil {
return false, err
}
remainingBytes := datakeyBytes[n+1:]
return len(remainingBytes) == 0
return len(remainingBytes) == 0, err
}

func v11DecodePK(key blkTranNumKey) (blockNum uint64, tranNum uint64) {
height, _ := version.NewHeightFromBytes(key[1:])
return height.BlockNum, height.TxNum
// v11DecodePK returns block number, tx number, and error.
func v11DecodePK(key blkTranNumKey) (uint64, uint64, error) {
height, _, err := version.NewHeightFromBytes(key[1:])
if err != nil {
return 0, 0, err
}
return height.BlockNum, height.TxNum, nil
}

func v11DecodePvtRwSet(encodedBytes []byte) (*rwset.TxPvtReadWriteSet, error) {
Expand All @@ -48,9 +55,11 @@ func v11RetrievePvtdata(itr *leveldbhelper.Iterator, filter ledger.PvtNsCollFilt
}

func v11DecodeKV(k, v []byte, filter ledger.PvtNsCollFilter) (*ledger.TxPvtData, error) {
bNum, tNum := v11DecodePK(k)
bNum, tNum, err := v11DecodePK(k)
if err != nil {
return nil, err
}
var pvtWSet *rwset.TxPvtReadWriteSet
var err error
if pvtWSet, err = v11DecodePvtRwSet(v); err != nil {
return nil, err
}
Expand Down
28 changes: 20 additions & 8 deletions core/transientstore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,13 +271,16 @@ func (s *store) PurgeByTxids(txids []string) error {
// Get all txid and uuid from above result and remove it from transient store (both
// write set and the corresponding indexes.
for iter.Next() {
// For each entry, remove the private read-write set and correponding indexes
// For each entry, remove the private read-write set and corresponding indexes

// Remove private write set
compositeKeyPurgeIndexByTxid := iter.Key()
// Note: We can create compositeKeyPvtRWSet by just replacing the prefix of compositeKeyPurgeIndexByTxid
// with prwsetPrefix. For code readability and to be expressive, we split and create again.
uuid, blockHeight := splitCompositeKeyOfPurgeIndexByTxid(compositeKeyPurgeIndexByTxid)
uuid, blockHeight, err := splitCompositeKeyOfPurgeIndexByTxid(compositeKeyPurgeIndexByTxid)
if err != nil {
return err
}
compositeKeyPvtRWSet := createCompositeKeyForPvtRWSet(txid, uuid, blockHeight)
dbBatch.Delete(compositeKeyPvtRWSet)

Expand Down Expand Up @@ -315,11 +318,14 @@ func (s *store) PurgeByHeight(maxBlockNumToRetain uint64) error {
// Get all txid and uuid from above result and remove it from transient store (both
// write set and the corresponding index.
for iter.Next() {
// For each entry, remove the private read-write set and correponding indexes
// For each entry, remove the private read-write set and corresponding indexes

// Remove private write set
compositeKeyPurgeIndexByHeight := iter.Key()
txid, uuid, blockHeight := splitCompositeKeyOfPurgeIndexByHeight(compositeKeyPurgeIndexByHeight)
txid, uuid, blockHeight, err := splitCompositeKeyOfPurgeIndexByHeight(compositeKeyPurgeIndexByHeight)
if err != nil {
return err
}
logger.Debugf("Purging from transient store private data simulated at block [%d]: txid [%s] uuid [%s]", blockHeight, txid, uuid)

compositeKeyPvtRWSet := createCompositeKeyForPvtRWSet(txid, uuid, blockHeight)
Expand Down Expand Up @@ -349,8 +355,8 @@ func (s *store) GetMinTransientBlkHt() (uint64, error) {
// Fetch the minimum transient block height
if iter.Next() {
dbKey := iter.Key()
_, _, blockHeight := splitCompositeKeyOfPurgeIndexByHeight(dbKey)
return blockHeight, nil
_, _, blockHeight, err := splitCompositeKeyOfPurgeIndexByHeight(dbKey)
return blockHeight, err
}
// Returning an error may not be the right thing to do here. May be
// return a bool. -1 is not possible due to unsigned int as first
Expand All @@ -371,7 +377,10 @@ func (scanner *RwsetScanner) Next() (*EndorserPvtSimulationResults, error) {
}
dbKey := scanner.dbItr.Key()
dbVal := scanner.dbItr.Value()
_, blockHeight := splitCompositeKeyOfPvtRWSet(dbKey)
_, blockHeight, err := splitCompositeKeyOfPvtRWSet(dbKey)
if err != nil {
return nil, err
}

txPvtRWSet := &rwset.TxPvtReadWriteSet{}
if err := proto.Unmarshal(dbVal, txPvtRWSet); err != nil {
Expand All @@ -394,7 +403,10 @@ func (scanner *RwsetScanner) NextWithConfig() (*EndorserPvtSimulationResultsWith
}
dbKey := scanner.dbItr.Key()
dbVal := scanner.dbItr.Value()
_, blockHeight := splitCompositeKeyOfPvtRWSet(dbKey)
_, blockHeight, err := splitCompositeKeyOfPvtRWSet(dbKey)
if err != nil {
return nil, err
}

txPvtRWSet := &rwset.TxPvtReadWriteSet{}
filteredTxPvtRWSet := &rwset.TxPvtReadWriteSet{}
Expand Down
15 changes: 9 additions & 6 deletions core/transientstore/store_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,21 +78,24 @@ func createCompositeKeyForPurgeIndexByHeight(blockHeight uint64, txid string, uu

// splitCompositeKeyOfPvtRWSet splits the compositeKey (<prwsetPrefix>~txid~uuid~blockHeight)
// into uuid and blockHeight.
func splitCompositeKeyOfPvtRWSet(compositeKey []byte) (uuid string, blockHeight uint64) {
func splitCompositeKeyOfPvtRWSet(compositeKey []byte) (uuid string, blockHeight uint64, err error) {
return splitCompositeKeyWithoutPrefixForTxid(compositeKey[2:])
}

// splitCompositeKeyOfPurgeIndexByTxid splits the compositeKey (<purgeIndexByTxidPrefix>~txid~uuid~blockHeight)
// into uuid and blockHeight.
func splitCompositeKeyOfPurgeIndexByTxid(compositeKey []byte) (uuid string, blockHeight uint64) {
func splitCompositeKeyOfPurgeIndexByTxid(compositeKey []byte) (uuid string, blockHeight uint64, err error) {
return splitCompositeKeyWithoutPrefixForTxid(compositeKey[2:])
}

// splitCompositeKeyOfPurgeIndexByHeight splits the compositeKey (<purgeIndexByHeightPrefix>~blockHeight~txid~uuid)
// into txid, uuid and blockHeight.
func splitCompositeKeyOfPurgeIndexByHeight(compositeKey []byte) (txid string, uuid string, blockHeight uint64) {
func splitCompositeKeyOfPurgeIndexByHeight(compositeKey []byte) (txid string, uuid string, blockHeight uint64, err error) {
var n int
blockHeight, n, _ = util.DecodeOrderPreservingVarUint64(compositeKey[2:])
blockHeight, n, err = util.DecodeOrderPreservingVarUint64(compositeKey[2:])
if err != nil {
return
}
splits := bytes.Split(compositeKey[n+3:], []byte{compositeKeySep})
txid = string(splits[0])
uuid = string(splits[1])
Expand All @@ -101,12 +104,12 @@ func splitCompositeKeyOfPurgeIndexByHeight(compositeKey []byte) (txid string, uu

// splitCompositeKeyWithoutPrefixForTxid splits the composite key txid~uuid~blockHeight into
// uuid and blockHeight
func splitCompositeKeyWithoutPrefixForTxid(compositeKey []byte) (uuid string, blockHeight uint64) {
func splitCompositeKeyWithoutPrefixForTxid(compositeKey []byte) (uuid string, blockHeight uint64, err error) {
// skip txid as all functions which requires split of composite key already has it
firstSepIndex := bytes.IndexByte(compositeKey, compositeKeySep)
secondSepIndex := firstSepIndex + bytes.IndexByte(compositeKey[firstSepIndex+1:], compositeKeySep) + 1
uuid = string(compositeKey[firstSepIndex+1 : secondSepIndex])
blockHeight, _, _ = util.DecodeOrderPreservingVarUint64(compositeKey[secondSepIndex+1:])
blockHeight, _, err = util.DecodeOrderPreservingVarUint64(compositeKey[secondSepIndex+1:])
return
}

Expand Down
Loading

0 comments on commit bbaed63

Please sign in to comment.