Skip to content

Commit

Permalink
Merge "This commit changes the versioning scheme for the keys"
Browse files Browse the repository at this point in the history
  • Loading branch information
Srinivasan Muralidharan authored and Gerrit Code Review committed Dec 6, 2016
2 parents 3ea19f3 + 836fdc6 commit ede30a4
Show file tree
Hide file tree
Showing 8 changed files with 217 additions and 95 deletions.
39 changes: 14 additions & 25 deletions core/ledger/kvledger/txmgmt/couchdbtxmgmt/couchdb_txmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/hyperledger/fabric/core/ledger/util/db"
"github.com/op/go-logging"

"github.com/hyperledger/fabric/core/ledger/kvledger/version"
"github.com/hyperledger/fabric/protos/common"
pb "github.com/hyperledger/fabric/protos/peer"
putils "github.com/hyperledger/fabric/protos/utils"
Expand All @@ -40,7 +41,7 @@ type Conf struct {

type versionedValue struct {
value []byte
version uint64
version *version.Height
}

type updateSet struct {
Expand Down Expand Up @@ -119,7 +120,7 @@ func (txmgr *CouchDBTxMgr) ValidateAndPrepare(block *common.Block) (*common.Bloc
var valid bool
txmgr.updateSet = newUpdateSet()
logger.Debugf("Validating a block with [%d] transactions", len(block.Data.Data))
for _, envBytes := range block.Data.Data {
for txIndex, envBytes := range block.Data.Data {
// extract actions from the envelope message
respPayload, err := putils.GetActionFromEnvelope(envBytes)
if err != nil {
Expand Down Expand Up @@ -150,7 +151,7 @@ func (txmgr *CouchDBTxMgr) ValidateAndPrepare(block *common.Block) (*common.Bloc
}

if valid {
if err := txmgr.addWriteSetToBatch(txRWSet); err != nil {
if err := txmgr.addWriteSetToBatch(txRWSet, version.NewHeight(block.Header.Number, uint64(txIndex+1))); err != nil {
return nil, nil, err
}
} else {
Expand All @@ -170,7 +171,7 @@ func (txmgr *CouchDBTxMgr) Shutdown() {
func (txmgr *CouchDBTxMgr) validateTx(txRWSet *txmgmt.TxReadWriteSet) (bool, error) {

var err error
var currentVersion uint64
var currentVersion *version.Height

for _, nsRWSet := range txRWSet.NsRWs {
ns := nsRWSet.NameSpace
Expand All @@ -182,7 +183,7 @@ func (txmgr *CouchDBTxMgr) validateTx(txRWSet *txmgmt.TxReadWriteSet) (bool, err
if currentVersion, err = txmgr.getCommitedVersion(ns, kvRead.Key); err != nil {
return false, err
}
if currentVersion != kvRead.Version {
if !version.AreSame(currentVersion, kvRead.Version) {
logger.Debugf("Version mismatch for key [%s:%s]. Current version = [%d], Version in readSet [%d]",
ns, kvRead.Key, currentVersion, kvRead.Version)
return false, nil
Expand All @@ -192,27 +193,15 @@ func (txmgr *CouchDBTxMgr) validateTx(txRWSet *txmgmt.TxReadWriteSet) (bool, err
return true, nil
}

func (txmgr *CouchDBTxMgr) addWriteSetToBatch(txRWSet *txmgmt.TxReadWriteSet) error {
var err error
var currentVersion uint64

func (txmgr *CouchDBTxMgr) addWriteSetToBatch(txRWSet *txmgmt.TxReadWriteSet, txHeight *version.Height) error {
if txmgr.updateSet == nil {
txmgr.updateSet = newUpdateSet()
}
for _, nsRWSet := range txRWSet.NsRWs {
ns := nsRWSet.NameSpace
for _, kvWrite := range nsRWSet.Writes {
compositeKey := constructCompositeKey(ns, kvWrite.Key)
versionedVal := txmgr.updateSet.get(compositeKey)
if versionedVal != nil {
currentVersion = versionedVal.version
} else {
currentVersion, err = txmgr.getCommitedVersion(ns, kvWrite.Key)
if err != nil {
return err
}
}
txmgr.updateSet.add(compositeKey, &versionedValue{kvWrite.Value, currentVersion + 1})
txmgr.updateSet.add(compositeKey, &versionedValue{kvWrite.Value, txHeight})
}
}
return nil
Expand Down Expand Up @@ -278,16 +267,16 @@ func (txmgr *CouchDBTxMgr) Rollback() {
txmgr.updateSet = nil
}

func (txmgr *CouchDBTxMgr) getCommitedVersion(ns string, key string) (uint64, error) {
func (txmgr *CouchDBTxMgr) getCommitedVersion(ns string, key string) (*version.Height, error) {
var err error
var version uint64
var version *version.Height
if _, version, err = txmgr.getCommittedValueAndVersion(ns, key); err != nil {
return 0, err
return nil, err
}
return version, nil
}

func (txmgr *CouchDBTxMgr) getCommittedValueAndVersion(ns string, key string) ([]byte, uint64, error) {
func (txmgr *CouchDBTxMgr) getCommittedValueAndVersion(ns string, key string) ([]byte, *version.Height, error) {

compositeKey := constructCompositeKey(ns, key)

Expand All @@ -302,8 +291,8 @@ func (txmgr *CouchDBTxMgr) getCommittedValueAndVersion(ns string, key string) ([
}
}

var version uint64 = 1 //TODO - version hardcoded to 1 is a temporary value for the prototype
return docBytes, version, nil
ver := version.NewHeight(1, 1) //TODO - version hardcoded to 1 is a temporary value for the prototype
return docBytes, ver, nil
}

func encodeValue(value []byte, version uint64) []byte {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"testing"

"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/kvledger/version"
"github.com/hyperledger/fabric/core/ledger/testutil"
)

Expand Down Expand Up @@ -65,7 +66,7 @@ func TestTxSimulatorWithExistingData(t *testing.T) {
isValid, err := txMgr.validateTx(txRWSet)
testutil.AssertNoError(t, err, fmt.Sprintf("Error in validateTx(): %s", err))
testutil.AssertSame(t, isValid, true)
txMgr.addWriteSetToBatch(txRWSet)
txMgr.addWriteSetToBatch(txRWSet, version.NewHeight(1, 1))
err = txMgr.Commit()
testutil.AssertNoError(t, err, fmt.Sprintf("Error while calling commit(): %s", err))

Expand All @@ -82,7 +83,7 @@ func TestTxSimulatorWithExistingData(t *testing.T) {
txRWSet = s2.(*LockBasedTxSimulator).getTxReadWriteSet()
isValid, err = txMgr.validateTx(txRWSet)
testutil.AssertSame(t, isValid, true)
txMgr.addWriteSetToBatch(txRWSet)
txMgr.addWriteSetToBatch(txRWSet, version.NewHeight(2, 1))
txMgr.Commit()

// simulate tx3
Expand All @@ -95,11 +96,9 @@ func TestTxSimulatorWithExistingData(t *testing.T) {

// verify the versions of keys in persistence
ver, _ := txMgr.getCommitedVersion("ns1", "key1")
testutil.AssertEquals(t, ver, uint64(2))
testutil.AssertEquals(t, ver, version.NewHeight(2, 1))
ver, _ = txMgr.getCommitedVersion("ns1", "key2")
testutil.AssertEquals(t, ver, uint64(1))
ver, _ = txMgr.getCommitedVersion("ns2", "key3")
testutil.AssertEquals(t, ver, uint64(2))
testutil.AssertEquals(t, ver, version.NewHeight(1, 1))
}

func TestTxValidation(t *testing.T) {
Expand All @@ -120,7 +119,7 @@ func TestTxValidation(t *testing.T) {
isValid, err := txMgr.validateTx(txRWSet)
testutil.AssertNoError(t, err, fmt.Sprintf("Error in validateTx(): %s", err))
testutil.AssertSame(t, isValid, true)
txMgr.addWriteSetToBatch(txRWSet)
txMgr.addWriteSetToBatch(txRWSet, version.NewHeight(1, 1))
err = txMgr.Commit()
testutil.AssertNoError(t, err, fmt.Sprintf("Error while calling commit(): %s", err))

Expand Down Expand Up @@ -161,7 +160,7 @@ func TestTxValidation(t *testing.T) {
isValid, err = txMgr.validateTx(txRWSet)
testutil.AssertNoError(t, err, fmt.Sprintf("Error in validateTx(): %s", err))
testutil.AssertSame(t, isValid, true)
txMgr.addWriteSetToBatch(txRWSet)
txMgr.addWriteSetToBatch(txRWSet, version.NewHeight(2, 1))
txMgr.Commit()

//RWSet for tx3 and tx4 should not be invalid now
Expand All @@ -183,11 +182,11 @@ func TestTxValidation(t *testing.T) {
}

func TestEncodeDecodeValueAndVersion(t *testing.T) {
testValueAndVersionEncodeing(t, []byte("value1"), uint64(1))
testValueAndVersionEncodeing(t, nil, uint64(2))
testValueAndVersionEncodeing(t, []byte("value1"), version.NewHeight(1, 2))
testValueAndVersionEncodeing(t, []byte{}, version.NewHeight(50, 50))
}

func testValueAndVersionEncodeing(t *testing.T, value []byte, version uint64) {
func testValueAndVersionEncodeing(t *testing.T, value []byte, version *version.Height) {
encodedValue := encodeValue(value, version)
val, ver := decodeValue(encodedValue)
testutil.AssertEquals(t, val, value)
Expand Down Expand Up @@ -221,7 +220,7 @@ func testIterator(t *testing.T, numKeys int, startKeyNum int, endKeyNum int) {
isValid, err := txMgr.validateTx(txRWSet)
testutil.AssertNoError(t, err, "")
testutil.AssertSame(t, isValid, true)
txMgr.addWriteSetToBatch(txRWSet)
txMgr.addWriteSetToBatch(txRWSet, version.NewHeight(1, 1))
err = txMgr.Commit()
testutil.AssertNoError(t, err, "")

Expand Down Expand Up @@ -286,7 +285,7 @@ func TestIteratorWithDeletes(t *testing.T) {
isValid, err := txMgr.validateTx(txRWSet)
testutil.AssertNoError(t, err, "")
testutil.AssertSame(t, isValid, true)
txMgr.addWriteSetToBatch(txRWSet)
txMgr.addWriteSetToBatch(txRWSet, version.NewHeight(1, 1))
err = txMgr.Commit()
testutil.AssertNoError(t, err, "")

Expand All @@ -298,7 +297,7 @@ func TestIteratorWithDeletes(t *testing.T) {
isValid, err = txMgr.validateTx(txRWSet)
testutil.AssertNoError(t, err, "")
testutil.AssertSame(t, isValid, true)
txMgr.addWriteSetToBatch(txRWSet)
txMgr.addWriteSetToBatch(txRWSet, version.NewHeight(2, 1))
err = txMgr.Commit()
testutil.AssertNoError(t, err, "")

Expand Down Expand Up @@ -332,7 +331,7 @@ func TestTxValidationWithItr(t *testing.T) {
isValid, err := txMgr.validateTx(txRWSet)
testutil.AssertNoError(t, err, "")
testutil.AssertSame(t, isValid, true)
txMgr.addWriteSetToBatch(txRWSet)
txMgr.addWriteSetToBatch(txRWSet, version.NewHeight(1, 1))
err = txMgr.Commit()
testutil.AssertNoError(t, err, "")

Expand Down Expand Up @@ -364,7 +363,7 @@ func TestTxValidationWithItr(t *testing.T) {
isValid, err = txMgr.validateTx(txRWSet)
testutil.AssertNoError(t, err, fmt.Sprintf("Error in validateTx(): %s", err))
testutil.AssertSame(t, isValid, true)
txMgr.addWriteSetToBatch(txRWSet)
txMgr.addWriteSetToBatch(txRWSet, version.NewHeight(2, 1))
txMgr.Commit()

//RWSet tx3 should not be invalid now
Expand Down Expand Up @@ -399,7 +398,7 @@ func TestGetSetMultipeKeys(t *testing.T) {
isValid, err := txMgr.validateTx(txRWSet)
testutil.AssertNoError(t, err, "")
testutil.AssertSame(t, isValid, true)
txMgr.addWriteSetToBatch(txRWSet)
txMgr.addWriteSetToBatch(txRWSet, version.NewHeight(1, 1))
err = txMgr.Commit()
testutil.AssertNoError(t, err, "")

Expand Down
Loading

0 comments on commit ede30a4

Please sign in to comment.