From c497883bbdc676ae2cf9fb4cdb09eb082fdc85e9 Mon Sep 17 00:00:00 2001 From: Balaji Viswanathan Date: Fri, 16 Dec 2016 14:29:24 +0530 Subject: [PATCH] FAB-1425: Record savepoint for historyDB At time of commit() record a savepoint in historydb to store and help in recovery. Removed extra fsync. Returning err or SaveDoc error Change-Id: I93a2ab101c6bcde58b5e3050f86c621e9d13157d Signed-off-by: Balaji Viswanathan --- core/ledger/history/couchdb_histmgr.go | 79 ++++++++++++++++++++- core/ledger/history/couchdb_histmgr_test.go | 34 +++++++++ 2 files changed, 112 insertions(+), 1 deletion(-) diff --git a/core/ledger/history/couchdb_histmgr.go b/core/ledger/history/couchdb_histmgr.go index 6bb95318cab..8ebee247b07 100644 --- a/core/ledger/history/couchdb_histmgr.go +++ b/core/ledger/history/couchdb_histmgr.go @@ -18,6 +18,8 @@ package history import ( "bytes" + "encoding/json" + "fmt" "strconv" "github.com/hyperledger/fabric/core/ledger" @@ -28,6 +30,15 @@ import ( logging "github.com/op/go-logging" ) +// Savepoint docid (key) for couchdb +const savepointDocID = "histdb_savepoint" + +// Savepoint data for couchdb +type couchSavepointData struct { + BlockNum uint64 `json:"BlockNum"` + UpdateSeq string `json:"UpdateSeq"` +} + var logger = logging.MustGetLogger("history") var compositeKeySep = []byte{0x00} @@ -125,10 +136,55 @@ func (histmgr *CouchDBHistMgr) Commit(block *common.Block) error { if rev != "" { logger.Debugf("===HISTORYDB=== Saved document revision number: %s\n", rev) } - } } + } + + // Record a savepoint + err := histmgr.recordSavepoint(blockNo) + if err != nil { + logger.Debugf("===COUCHDB=== Error during recordSavepoint: %s\n", err) + return err + } + + return nil +} + +// recordSavepoint Record a savepoint in historydb. +// Couch parallelizes writes in cluster or sharded setup and ordering is not guaranteed. +// Hence we need to fence the savepoint with sync. So ensure_full_commit is called before AND after writing savepoint document +// TODO: Optimization - merge 2nd ensure_full_commit with savepoint by using X-Couch-Full-Commit header +func (txmgr *CouchDBHistMgr) recordSavepoint(blockNo uint64) error { + var err error + var savepointDoc couchSavepointData + // ensure full commit to flush all changes until now to disk + dbResponse, err := txmgr.couchDB.EnsureFullCommit() + if err != nil || dbResponse.Ok != true { + logger.Debugf("====COUCHDB==== Failed to perform full commit\n") + return fmt.Errorf("Failed to perform full commit. Err: %s", err) + } + + // construct savepoint document + // UpdateSeq would be useful if we want to get all db changes since a logical savepoint + dbInfo, _, err := txmgr.couchDB.GetDatabaseInfo() + if err != nil { + logger.Debugf("====COUCHDB==== Failed to get DB info %s\n", err) + return err + } + savepointDoc.BlockNum = blockNo + savepointDoc.UpdateSeq = dbInfo.UpdateSeq + savepointDocJSON, err := json.Marshal(savepointDoc) + if err != nil { + logger.Debugf("====COUCHDB==== Failed to create savepoint data %s\n", err) + return err + } + + // SaveDoc using couchdb client and use JSON format + _, err = txmgr.couchDB.SaveDoc(savepointDocID, "", savepointDocJSON, nil) + if err != nil { + logger.Debugf("====CouchDB==== Failed to save the savepoint to DB %s\n", err) + return err } return nil } @@ -149,6 +205,27 @@ func (histmgr *CouchDBHistMgr) getTransactionsForNsKey(namespace string, key str return newHistScanner(compositeStartKey, *queryResult), nil } +// GetBlockNumFromSavepoint Reads the savepoint from database and returns the corresponding block number. +// If no savepoint is found, it returns 0 +func (txmgr *CouchDBHistMgr) GetBlockNumFromSavepoint() (uint64, error) { + var err error + savepointJSON, _, err := txmgr.couchDB.ReadDoc(savepointDocID) + if err != nil { + // TODO: differentiate between 404 and some other error code + logger.Debugf("====COUCHDB==== Failed to read savepoint data %s\n", err) + return 0, err + } + + savepointDoc := &couchSavepointData{} + err = json.Unmarshal(savepointJSON, &savepointDoc) + if err != nil { + logger.Debugf("====COUCHDB==== Failed to read savepoint data %s\n", err) + return 0, err + } + + return savepointDoc.BlockNum, nil +} + func constructCompositeKey(ns string, key string, blocknum uint64, trannum uint64) string { //History Key is: "namespace key blocknum trannum"", with namespace being the chaincode id diff --git a/core/ledger/history/couchdb_histmgr_test.go b/core/ledger/history/couchdb_histmgr_test.go index 29d1fde255d..90bae2a14d6 100644 --- a/core/ledger/history/couchdb_histmgr_test.go +++ b/core/ledger/history/couchdb_histmgr_test.go @@ -83,6 +83,40 @@ func TestConstructCompositeKey(t *testing.T) { testutil.AssertEquals(t, compositeKey, "ns1"+strKeySep+"key1"+strKeySep+"1"+strKeySep+"1") } +//TestSavepoint tests the recordSavepoint and GetBlockNumfromSavepoint methods for recording and reading a savepoint document +func TestSavepoint(t *testing.T) { + + if ledgerconfig.IsHistoryDBEnabled() == true { + + env := newTestEnvHistoryCouchDB(t, "history-test") + env.cleanup() //cleanup at the beginning to ensure the database doesn't exist already + defer env.cleanup() //and cleanup at the end + + logger.Debugf("===HISTORYDB=== env.couchDBAddress: %v , env.couchDatabaseName: %v env.couchUsername: %v env.couchPassword: %v\n", + env.couchDBAddress, env.couchDatabaseName, env.couchUsername, env.couchPassword) + + histMgr := NewCouchDBHistMgr( + env.couchDBAddress, //couchDB Address + env.couchDatabaseName, //couchDB db name + env.couchUsername, //enter couchDB id + env.couchPassword) //enter couchDB pw + + // read the savepoint + blockNum, err := histMgr.GetBlockNumFromSavepoint() + testutil.AssertEquals(t, blockNum, 0) + + // record savepoint + blockNo := uint64(5) + err = histMgr.recordSavepoint(blockNo) + testutil.AssertNoError(t, err, fmt.Sprintf("Error when saving recordpoint data")) + + // read the savepoint + blockNum, err = histMgr.GetBlockNumFromSavepoint() + testutil.AssertNoError(t, err, fmt.Sprintf("Error when saving recordpoint data")) + testutil.AssertEquals(t, blockNo, blockNum) + } +} + //History Database commit and read is being tested with kv_ledger_test.go. //This test will push some of the testing down into history itself func TestHistoryDatabaseCommit(t *testing.T) {