Skip to content

Commit

Permalink
Merge "FAB-1291: Couch support for doing a savepoint."
Browse files Browse the repository at this point in the history
  • Loading branch information
Srinivasan Muralidharan authored and Gerrit Code Review committed Dec 8, 2016
2 parents 5c9fcc3 + 0183483 commit 87a0ce8
Show file tree
Hide file tree
Showing 5 changed files with 189 additions and 3 deletions.
30 changes: 30 additions & 0 deletions core/ledger/kvledger/txmgmt/couchdbtxmgmt/couchdb_txmgmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,33 @@ func TestDatabaseAutoCreate(t *testing.T) {
}

}

//TestSavepoint tests the recordSavepoint and GetBlockNumfromSavepoint methods for recording and reading a savepoint document
func TestSavepoint(t *testing.T) {

//Only run the tests if CouchDB is explitily enabled in the code,
//otherwise CouchDB may not be installed and all the tests would fail
//TODO replace this with external config property rather than config within the code
if ledgerconfig.IsCouchDBEnabled() == true {

env := newTestEnv(t)

txMgr := NewCouchDBTxMgr(env.conf,
env.couchDBAddress, //couchDB Address
env.couchDatabaseName, //couchDB db name
env.couchUsername, //enter couchDB id
env.couchPassword) //enter couchDB pw

// record savepoint
txMgr.blockNum = 5
err := txMgr.recordSavepoint()
testutil.AssertNoError(t, err, fmt.Sprintf("Error when saving recordpoint data"))

// read the savepoint
blockNum, err := txMgr.GetBlockNumFromSavepoint()
testutil.AssertNoError(t, err, fmt.Sprintf("Error when saving recordpoint data"))
testutil.AssertEquals(t, txMgr.blockNum, blockNum)

txMgr.Shutdown()
}
}
88 changes: 88 additions & 0 deletions core/ledger/kvledger/txmgmt/couchdbtxmgmt/couchdb_txmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package couchdbtxmgmt

import (
"encoding/json"
"errors"
"sync"

"github.com/golang/protobuf/proto"
Expand Down Expand Up @@ -48,6 +50,15 @@ type updateSet struct {
m map[string]*versionedValue
}

// Savepoint docid (key) for couchdb
const savepointDocID = "statedb_savepoint"

// Savepoint data for couchdb
type couchSavepointData struct {
BlockNum uint64 `json:"BlockNum"`
UpdateSeq string `json:"UpdateSeq"`
}

func newUpdateSet() *updateSet {
return &updateSet{make(map[string]*versionedValue)}
}
Expand All @@ -72,6 +83,7 @@ type CouchDBTxMgr struct {
updateSet *updateSet
commitRWLock sync.RWMutex
couchDB *couchdb.CouchDBConnectionDef // COUCHDB new properties for CouchDB
blockNum uint64 // block number corresponding to updateSet
}

// CouchConnection provides connection info for CouchDB
Expand Down Expand Up @@ -119,6 +131,7 @@ func (txmgr *CouchDBTxMgr) ValidateAndPrepare(block *common.Block) (*common.Bloc
invalidTxs := []*pb.InvalidTransaction{}
var valid bool
txmgr.updateSet = newUpdateSet()
txmgr.blockNum = block.Header.Number
logger.Debugf("Validating a block with [%d] transactions", len(block.Data.Data))
for txIndex, envBytes := range block.Data.Data {
// extract actions from the envelope message
Expand Down Expand Up @@ -159,6 +172,7 @@ func (txmgr *CouchDBTxMgr) ValidateAndPrepare(block *common.Block) (*common.Bloc
Transaction: &pb.Transaction{ /* FIXME */ }, Cause: pb.InvalidTransaction_RWConflictDuringCommit})
}
}

logger.Debugf("===COUCHDB=== Exiting CouchDBTxMgr.ValidateAndPrepare()")
return block, invalidTxs, nil
}
Expand Down Expand Up @@ -258,13 +272,87 @@ func (txmgr *CouchDBTxMgr) Commit() error {

}

// Record a savepoint
err := txmgr.recordSavepoint()
if err != nil {
logger.Errorf("===COUCHDB=== Error during recordSavepoint: %s\n", err.Error())
return err
}

logger.Debugf("===COUCHDB=== Exiting CouchDBTxMgr.Commit()")
return nil
}

// recordSavepoint Record a savepoint in statedb.
// Couch parallelizes writes in cluster or sharded setup and ordering is not guaranteed.
// Hence we need to fence the savepoint with sync. So ensure_full_commit is called before AND after writing savepoint document
// TODO: Optimization - merge 2nd ensure_full_commit with savepoint by using X-Couch-Full-Commit header
func (txmgr *CouchDBTxMgr) recordSavepoint() error {
var err error
var savepointDoc couchSavepointData
// ensure full commit to flush all changes until now to disk
dbResponse, err := txmgr.couchDB.EnsureFullCommit()
if err != nil || dbResponse.Ok != true {
logger.Errorf("====COUCHDB==== Failed to perform full commit\n")
return errors.New("Failed to perform full commit")
}

// construct savepoint document
// UpdateSeq would be useful if we want to get all db changes since a logical savepoint
dbInfo, _, err := txmgr.couchDB.GetDatabaseInfo()
if err != nil {
logger.Errorf("====COUCHDB==== Failed to get DB info %s\n", err.Error())
return err
}
savepointDoc.BlockNum = txmgr.blockNum
savepointDoc.UpdateSeq = dbInfo.UpdateSeq

savepointDocJSON, err := json.Marshal(savepointDoc)
if err != nil {
logger.Errorf("====COUCHDB==== Failed to create savepoint data %s\n", err.Error())
return err
}

// SaveDoc using couchdb client and use JSON format
_, err = txmgr.couchDB.SaveDoc(savepointDocID, "", savepointDocJSON, nil)
if err != nil {
logger.Errorf("====CouchDB==== Failed to save the savepoint to DB %s\n", err.Error())
}

// ensure full commit to flush savepoint to disk
dbResponse, err = txmgr.couchDB.EnsureFullCommit()
if err != nil || dbResponse.Ok != true {
logger.Errorf("====COUCHDB==== Failed to perform full commit\n")
return errors.New("Failed to perform full commit")
}
return nil
}

// GetBlockNumFromSavepoint Reads the savepoint from database and returns the corresponding block number.
// If no savepoint is found, it returns 0
func (txmgr *CouchDBTxMgr) GetBlockNumFromSavepoint() (uint64, error) {
var err error
savepointJSON, _, err := txmgr.couchDB.ReadDoc(savepointDocID)
if err != nil {
// TODO: differentiate between 404 and some other error code
logger.Errorf("====COUCHDB==== Failed to read savepoint data %s\n", err.Error())
return 0, err
}

savepointDoc := &couchSavepointData{}
err = json.Unmarshal(savepointJSON, &savepointDoc)
if err != nil {
logger.Errorf("====COUCHDB==== Failed to read savepoint data %s\n", err.Error())
return 0, err
}

return savepointDoc.BlockNum, nil
}

// Rollback implements method in interface `txmgmt.TxMgr`
func (txmgr *CouchDBTxMgr) Rollback() {
txmgr.updateSet = nil
txmgr.blockNum = 0
}

func (txmgr *CouchDBTxMgr) getCommitedVersion(ns string, key string) (*version.Height, error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package lockbasedtxmgmt

import (
"fmt"
"math"
"testing"

"github.com/hyperledger/fabric/core/ledger"
Expand Down Expand Up @@ -67,9 +68,13 @@ func TestTxSimulatorWithExistingData(t *testing.T) {
testutil.AssertNoError(t, err, fmt.Sprintf("Error in validateTx(): %s", err))
testutil.AssertSame(t, isValid, true)
txMgr.addWriteSetToBatch(txRWSet, version.NewHeight(1, 1))
txMgr.blockNum = math.MaxUint64
err = txMgr.Commit()
testutil.AssertNoError(t, err, fmt.Sprintf("Error while calling commit(): %s", err))

blockNum, err := txMgr.GetBlockNumFromSavepoint()
testutil.AssertEquals(t, blockNum, txMgr.blockNum)

// simulate tx2 that make changes to existing data
s2, _ := txMgr.NewTxSimulator()
value, _ := s2.GetState("ns1", "key1")
Expand Down Expand Up @@ -258,7 +263,7 @@ func testIterator(t *testing.T, numKeys int, startKeyNum int, endKeyNum int) {
keyNum := begin + count
k := kv.(*ledger.KV).Key
v := kv.(*ledger.KV).Value
t.Logf("Retrieved k=%s, v=%s", k, v)
t.Logf("Retrieved k=%s, v=%s at count=%d start=%s end=%s", k, v, count, startKey, endKey)
testutil.AssertEquals(t, k, createTestKey(keyNum))
testutil.AssertEquals(t, v, createTestValue(keyNum))
count++
Expand Down
31 changes: 31 additions & 0 deletions core/ledger/kvledger/txmgmt/lockbasedtxmgmt/lockbased_txmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package lockbasedtxmgmt

import (
"bytes"
"encoding/binary"
"reflect"
"sync"

"github.com/hyperledger/fabric/core/ledger"
Expand Down Expand Up @@ -50,6 +52,9 @@ type updateSet struct {
m map[string]*versionedValue
}

// savepoint key
const savepointKey = "savepoint"

func newUpdateSet() *updateSet {
return &updateSet{make(map[string]*versionedValue)}
}
Expand All @@ -73,6 +78,7 @@ type LockBasedTxMgr struct {
db *db.DB
updateSet *updateSet
commitRWLock sync.RWMutex
blockNum uint64
}

// NewLockBasedTxMgr constructs a `LockBasedTxMgr`
Expand Down Expand Up @@ -102,6 +108,7 @@ func (txmgr *LockBasedTxMgr) ValidateAndPrepare(block *common.Block) (*common.Bl
invalidTxs := []*pb.InvalidTransaction{}
var valid bool
txmgr.updateSet = newUpdateSet()
txmgr.blockNum = block.Header.Number
logger.Debugf("Validating a block with [%d] transactions", len(block.Data.Data))
for txIndex, envBytes := range block.Data.Data {
// extract actions from the envelope message
Expand Down Expand Up @@ -203,6 +210,15 @@ func (txmgr *LockBasedTxMgr) Commit() error {
batch.Put([]byte(k), encodeValue(v.value, v.version))
}
}

// record the savepoint along with batch
if txmgr.blockNum != 0 {
savepointValue := make([]byte, reflect.TypeOf(txmgr.blockNum).Size())
binary.LittleEndian.PutUint64(savepointValue, txmgr.blockNum)
// Need a composite key for iterator to function correctly - use separator itself as special/hidden namespace
batch.Put(constructCompositeKey(string(compositeKeySep), savepointKey), savepointValue)
}

txmgr.commitRWLock.Lock()
defer txmgr.commitRWLock.Unlock()
defer func() { txmgr.updateSet = nil }()
Expand All @@ -212,9 +228,24 @@ func (txmgr *LockBasedTxMgr) Commit() error {
return nil
}

// GetBlockNumFromSavepoint returns the block num recorded in savepoint,
// returns 0 if NO savepoint is found
func (txmgr *LockBasedTxMgr) GetBlockNumFromSavepoint() (uint64, error) {
var blockNum uint64
savepointValue, err := txmgr.db.Get(constructCompositeKey(string(compositeKeySep), savepointKey))
if err != nil {
return 0, err
}

// savepointValue is not encoded with version
blockNum = binary.LittleEndian.Uint64(savepointValue)
return blockNum, nil
}

// Rollback implements method in interface `txmgmt.TxMgr`
func (txmgr *LockBasedTxMgr) Rollback() {
txmgr.updateSet = nil
txmgr.blockNum = 0
}

func (txmgr *LockBasedTxMgr) getCommitedVersion(ns string, key string) (*version.Height, error) {
Expand Down
36 changes: 34 additions & 2 deletions core/ledger/util/couchdb/couchdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,38 @@ func (dbclient *CouchDBConnectionDef) DropDatabase() (*DBOperationResponse, erro

}

// EnsureFullCommit calls _ensure_full_commit for explicit fsync
func (dbclient *CouchDBConnectionDef) EnsureFullCommit() (*DBOperationResponse, error) {

logger.Debugf("===COUCHDB=== Entering EnsureFullCommit()")

url := fmt.Sprintf("%s/%s/_ensure_full_commit", dbclient.URL, dbclient.Database)

resp, _, err := dbclient.handleRequest(http.MethodPost, url, nil, "", "")
if err != nil {
logger.Errorf("====COUCHDB==== Failed to invoke _ensure_full_commit Error: %s\n", err.Error())
return nil, err
}
defer resp.Body.Close()

dbResponse := &DBOperationResponse{}
json.NewDecoder(resp.Body).Decode(&dbResponse)

if dbResponse.Ok == true {
logger.Debugf("===COUCHDB=== _ensure_full_commit database %s ", dbclient.Database)
}

logger.Debugf("===COUCHDB=== Exiting EnsureFullCommit()")

if dbResponse.Ok == true {

return dbResponse, nil

}

return dbResponse, fmt.Errorf("Error syncing database")
}

//SaveDoc method provides a function to save a document, id and byte array
func (dbclient *CouchDBConnectionDef) SaveDoc(id string, rev string, bytesDoc []byte, attachments []Attachment) (string, error) {

Expand Down Expand Up @@ -497,7 +529,7 @@ func (dbclient *CouchDBConnectionDef) handleRequest(method, url string, data io.
}

//add content header for PUT
if method == http.MethodPut {
if method == http.MethodPut || method == http.MethodPost {

//If the multipartBoundary is not set, then this is a JSON and content-type should be set
//to application/json. Else, this is contains an attachment and needs to be multipart
Expand All @@ -514,7 +546,7 @@ func (dbclient *CouchDBConnectionDef) handleRequest(method, url string, data io.
}

//add content header for PUT
if method == http.MethodPut {
if method == http.MethodPut || method == http.MethodPost {
req.Header.Set("Accept", "application/json")
}

Expand Down

0 comments on commit 87a0ce8

Please sign in to comment.