Skip to content

Commit

Permalink
Merge "FAB-1140 Ledger History Database framework"
Browse files Browse the repository at this point in the history
  • Loading branch information
Srinivasan Muralidharan authored and Gerrit Code Review committed Dec 13, 2016
2 parents 679f635 + d18aa98 commit 4a29b63
Show file tree
Hide file tree
Showing 7 changed files with 277 additions and 51 deletions.
105 changes: 97 additions & 8 deletions core/ledger/history/couchdb_histmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,36 +17,125 @@ limitations under the License.
package history

import (
"bytes"
"strconv"

"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt"
"github.com/hyperledger/fabric/core/ledger/util/couchdb"
"github.com/hyperledger/fabric/protos/common"
putils "github.com/hyperledger/fabric/protos/utils"
logging "github.com/op/go-logging"
)

var logger = logging.MustGetLogger("txhistorymgmt")
var logger = logging.MustGetLogger("history")

// CouchDBHistMgr a simple implementation of interface `histmgmt.TxHistMgr'.
// TODO This implementation does not currently use a lock but may need one to insure query's are consistent
// CouchDBHistMgr a simple implementation of interface `histmgmt.HistMgr'.
// TODO This implementation does not currently use a lock but may need one to ensure query's are consistent
type CouchDBHistMgr struct {
couchDB *couchdb.CouchDBConnectionDef // COUCHDB new properties for CouchDB
}

// NewCouchDBHistMgr constructs a new `CouchDBTxHistMgr`
// NewCouchDBHistMgr constructs a new `CouchDB HistMgr`
func NewCouchDBHistMgr(couchDBConnectURL string, dbName string, id string, pw string) *CouchDBHistMgr {

//TODO locking has not been implemented but may need some sort of locking to insure queries are valid data.

couchDB, err := couchdb.CreateCouchDBConnectionAndDB(couchDBConnectURL, dbName, id, pw)
if err != nil {
logger.Errorf("Error during NewCouchDBHistMgr(): %s\n", err.Error())
logger.Errorf("===HISTORYDB=== Error during NewCouchDBHistMgr(): %s\n", err.Error())
return nil
}

// db and stateIndexCF will not be used for CouchDB. TODO to cleanup
return &CouchDBHistMgr{couchDB: couchDB}
}

// Commit implements method in interface `txhistorymgmt.TxMgr`
// Commit implements method in interface `histmgmt.HistMgr`
// This writes to a separate history database.
func (histmgr *CouchDBHistMgr) Commit(block *common.Block) error {
logger.Debugf("===HISTORYDB=== Entering CouchDBTxHistMgr.Commit()")
logger.Debugf("===HISTORYDB=== Entering CouchDBHistMgr.Commit()")

//Get the blocknumber off of the header
blockNo := block.Header.Number
//Set the starting tranNo to 0
var tranNo uint64

logger.Debugf("===HISTORYDB=== Updating history for blockNo: %v with [%d] transactions",
blockNo, len(block.Data.Data))
for _, envBytes := range block.Data.Data {
tranNo++
logger.Debugf("===HISTORYDB=== Updating history for tranNo: %v", tranNo)

// extract actions from the envelope message
respPayload, err := putils.GetActionFromEnvelope(envBytes)
if err != nil {
return err
}

//preparation for extracting RWSet from transaction
txRWSet := &txmgmt.TxReadWriteSet{}

// Get the Result from the Action and then Unmarshal
// it into a TxReadWriteSet using custom unmarshalling
if err = txRWSet.Unmarshal(respPayload.Results); err != nil {
return err
}

//Transactions that have data that is not JSON such as binary data,
// the write value will not write to history database.
//These types of transactions will have the key written to the history
// database to support history key scans. We do not write the binary
// value to CouchDB since the purpose of the history database value is
// for query andbinary data can not be queried.
for _, nsRWSet := range txRWSet.NsRWs {
ns := nsRWSet.NameSpace

for _, kvWrite := range nsRWSet.Writes {
writeKey := kvWrite.Key
writeValue := kvWrite.Value
compositeKey := constructCompositeKey(ns, writeKey, blockNo, tranNo)
var bytesDoc []byte

logger.Debugf("===HISTORYDB=== ns (namespace or cc id) = %v, writeKey: %v, compositeKey: %v, writeValue = %v",
ns, writeKey, compositeKey, writeValue)

if couchdb.IsJSON(string(writeValue)) {
//logger.Debugf("===HISTORYDB=== yes JSON store writeValue = %v", string(writeValue))
bytesDoc = writeValue
} else {
//For data that is not in JSON format only store the key
//logger.Debugf("===HISTORYDB=== not JSON only store key")
bytesDoc = []byte(`{}`)
}

// SaveDoc using couchdb client and use JSON format
rev, err := histmgr.couchDB.SaveDoc(compositeKey, "", bytesDoc, nil)
if err != nil {
logger.Errorf("===HISTORYDB=== Error during Commit(): %s\n", err.Error())
return err
}
if rev != "" {
logger.Debugf("===HISTORYDB=== Saved document revision number: %s\n", rev)
}

}
}

}
return nil
}

func constructCompositeKey(ns string, key string, blocknum uint64, trannum uint64) string {
//History Key is: "namespace key blocknum trannum"", with namespace being the chaincode id

// TODO - We will likely want sortable varint encoding, rather then a simple number, in order to support sorted key scans
var buffer bytes.Buffer
buffer.WriteString(ns)
buffer.WriteByte(0)
buffer.WriteString(key)
buffer.WriteByte(0)
buffer.WriteString(strconv.Itoa(int(blocknum)))
buffer.WriteByte(0)
buffer.WriteString(strconv.Itoa(int(trannum)))

return buffer.String()
}
60 changes: 21 additions & 39 deletions core/ledger/history/couchdb_histmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,61 +18,35 @@ package history

import (
"fmt"
"os"
"testing"

"github.com/hyperledger/fabric/core/ledger/ledgerconfig"
"github.com/hyperledger/fabric/core/ledger/testutil"
"github.com/hyperledger/fabric/core/ledger/util/couchdb"
)

//Complex setup to test the use of couch in ledger
type testEnvCouch struct {
couchDBPath string
couchDBAddress string
couchDatabaseName string
couchUsername string
couchPassword string
}

func newTestEnvCouch(t testing.TB, dbPath string, dbName string) *testEnvCouch {

couchDBDef := ledgerconfig.GetCouchDBDefinition()
os.RemoveAll(dbPath)

return &testEnvCouch{
couchDBPath: dbPath,
couchDBAddress: couchDBDef.URL,
couchDatabaseName: dbName,
couchUsername: couchDBDef.Username,
couchPassword: couchDBDef.Password,
}
}

func (env *testEnvCouch) cleanup() {
os.RemoveAll(env.couchDBPath)
//create a new connection
couchDB, _ := couchdb.CreateConnectionDefinition(env.couchDBAddress, env.couchDatabaseName, env.couchUsername, env.couchPassword)
//drop the test database if it already existed
couchDB.DropDatabase()
}
/*
Note that these test are only run if HistoryDB is explitily enabled
otherwise HistoryDB may not be installed and all the tests would fail
*/

// couchdb_test.go tests couchdb functions already. This test just tests that a CouchDB history database is auto-created
// upon creating a new history transaction manager
// Note that the couchdb_test.go tests couchdb functions already. This test just tests that a
// CouchDB history database is auto-created upon creating a new history manager
func TestHistoryDatabaseAutoCreate(t *testing.T) {

//call a helper method to load the core.yaml
testutil.SetupCoreYAMLConfig("./../../../peer")
logger.Debugf("===HISTORYDB=== TestHistoryDatabaseAutoCreate IsCouchDBEnabled()value: %v , IsHistoryDBEnabled()value: %v\n",
ledgerconfig.IsCouchDBEnabled(), ledgerconfig.IsHistoryDBEnabled())

//Only run the tests if CouchDB is explitily enabled in the code,
//otherwise CouchDB may not be installed and all the tests would fail
//TODO replace this with external config property rather than config within the code
if ledgerconfig.IsCouchDBEnabled() == true {
if ledgerconfig.IsHistoryDBEnabled() == true {

env := newTestEnvCouch(t, "/tmp/tests/ledger/history", "history-test")
env := newTestEnvHistoryCouchDB(t, "history-test")
env.cleanup() //cleanup at the beginning to ensure the database doesn't exist already
defer env.cleanup() //and cleanup at the end

logger.Debugf("===HISTORYDB=== env.couchDBAddress: %v , env.couchDatabaseName: %v env.couchUsername: %v env.couchPassword: %v\n",
env.couchDBAddress, env.couchDatabaseName, env.couchUsername, env.couchPassword)

histMgr := NewCouchDBHistMgr(
env.couchDBAddress, //couchDB Address
env.couchDatabaseName, //couchDB db name
Expand All @@ -98,5 +72,13 @@ func TestHistoryDatabaseAutoCreate(t *testing.T) {
testutil.AssertEquals(t, dbResp2.DbName, env.couchDatabaseName)

}
}

func TestConstructCompositeKey(t *testing.T) {
compositeKey := constructCompositeKey("ns1", "key1", 1, 1)

var compositeKeySep = []byte{0x00}
var strKeySep = string(compositeKeySep)

testutil.AssertEquals(t, compositeKey, "ns1"+strKeySep+"key1"+strKeySep+"1"+strKeySep+"1")
}
4 changes: 2 additions & 2 deletions core/ledger/history/histmgmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package history

import "github.com/hyperledger/fabric/protos/common"

// TxHistMgr - an interface that a transaction history manager should implement
type TxHistMgr interface {
// HistMgr - an interface that a history manager should implement
type HistMgr interface {
Commit(block *common.Block) error
}
54 changes: 54 additions & 0 deletions core/ledger/history/pkg_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package history

import (
"testing"

"github.com/hyperledger/fabric/core/ledger/ledgerconfig"
"github.com/hyperledger/fabric/core/ledger/util/couchdb"
)

//Complex setup to test the use of couch in ledger
type testEnvHistoryCouchDB struct {
couchDBAddress string
couchDatabaseName string
couchUsername string
couchPassword string
}

func newTestEnvHistoryCouchDB(t testing.TB, dbName string) *testEnvHistoryCouchDB {

couchDBDef := ledgerconfig.GetCouchDBDefinition()

return &testEnvHistoryCouchDB{
couchDBAddress: couchDBDef.URL,
couchDatabaseName: dbName,
couchUsername: couchDBDef.Username,
couchPassword: couchDBDef.Password,
}
}

func (env *testEnvHistoryCouchDB) cleanup() {

//create a new connection
couchDB, err := couchdb.CreateConnectionDefinition(env.couchDBAddress, env.couchDatabaseName, env.couchUsername, env.couchPassword)
if err == nil {
//drop the test database if it already existed
couchDB.DropDatabase()
}
}
31 changes: 29 additions & 2 deletions core/ledger/kvledger/kv_ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/blkstorage"
"github.com/hyperledger/fabric/core/ledger/blkstorage/fsblkstorage"
"github.com/hyperledger/fabric/core/ledger/history"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/couchdbtxmgmt"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/lockbasedtxmgmt"
Expand Down Expand Up @@ -60,6 +61,7 @@ func NewConf(filesystemPath string, maxBlockfileSize int) *Conf {
type KVLedger struct {
blockStore blkstorage.BlockStore
txtmgmt txmgmt.TxMgr
historymgmt history.HistMgr
pendingBlockToCommit *common.Block
}

Expand All @@ -77,7 +79,10 @@ func NewKVLedger(conf *Conf) (*KVLedger, error) {
blockStorageConf := fsblkstorage.NewConf(conf.blockStorageDir, conf.maxBlockfileSize)
blockStore := fsblkstorage.NewFsBlockStore(blockStorageConf, indexConfig)

//State and History database managers
var txmgmt txmgmt.TxMgr
var historymgmt history.HistMgr

if ledgerconfig.IsCouchDBEnabled() == true {
//By default we can talk to CouchDB with empty id and pw (""), or you can add your own id and password to talk to a secured CouchDB
logger.Debugf("===COUCHDB=== NewKVLedger() Using CouchDB instead of RocksDB...hardcoding and passing connection config for now")
Expand All @@ -94,14 +99,26 @@ func NewKVLedger(conf *Conf) (*KVLedger, error) {
// Fall back to using RocksDB lockbased transaction manager
txmgmt = lockbasedtxmgmt.NewLockBasedTxMgr(&lockbasedtxmgmt.Conf{DBPath: conf.txMgrDBPath})
}
l := &KVLedger{blockStore, txmgmt, nil}

if ledgerconfig.IsHistoryDBEnabled() == true {
logger.Debugf("===HISTORYDB=== NewKVLedger() Using CouchDB for transaction history database")

couchDBDef := ledgerconfig.GetCouchDBDefinition()

historymgmt = history.NewCouchDBHistMgr(
couchDBDef.URL, //couchDB connection URL
"system_history", //couchDB db name matches ledger name, TODO for now use system_history ledger, eventually allow passing in subledger name
couchDBDef.Username, //enter couchDB id here
couchDBDef.Password) //enter couchDB pw here
}

l := &KVLedger{blockStore, txmgmt, historymgmt, nil}

if err := recoverStateDB(l); err != nil {
panic(fmt.Errorf(`Error during state DB recovery:%s`, err))
}

return l, nil

}

//Recover the state database by recommitting last valid blocks
Expand Down Expand Up @@ -220,6 +237,16 @@ func (l *KVLedger) Commit() error {
if err := l.txtmgmt.Commit(); err != nil {
panic(fmt.Errorf(`Error during commit to txmgr:%s`, err))
}

//TODO future will want to run async with state db writes. History needs to wait for chain (FSBlock) to write but not the state db
logger.Debugf("===HISTORYDB=== Commit() will write to hisotry if enabled else will be by-passed if not enabled: vledgerconfig.IsHistoryDBEnabled(): %v\n", ledgerconfig.IsHistoryDBEnabled())
if ledgerconfig.IsHistoryDBEnabled() == true {
logger.Debugf("Committing transactions to history database")
if err := l.historymgmt.Commit(l.pendingBlockToCommit); err != nil {
panic(fmt.Errorf(`Error during commit to txthistory:%s`, err))
}
}

l.pendingBlockToCommit = nil
return nil
}
Expand Down
Loading

0 comments on commit 4a29b63

Please sign in to comment.