Skip to content

Commit

Permalink
Merge "[FAB-5907] coordinator and transient decoupling"
Browse files Browse the repository at this point in the history
  • Loading branch information
manish-sethi authored and Gerrit Code Review committed Aug 29, 2017
2 parents 20b079b + fad6ca2 commit 297d393
Show file tree
Hide file tree
Showing 30 changed files with 517 additions and 367 deletions.
2 changes: 0 additions & 2 deletions common/ledger/ledger_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ type Ledger interface {
GetBlocksIterator(startBlockNumber uint64) (ResultsIterator, error)
// Close closes the ledger
Close()
// Commit adds a new block
Commit(block *common.Block) error
}

// ResultsIterator - an iterator for query result set
Expand Down
4 changes: 3 additions & 1 deletion core/chaincode/exectransaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,9 @@ func endTxSimulation(chainID string, ccid *pb.ChaincodeID, txsim ledger.TxSimula
//see comment on _commitLock_
_commitLock_.Lock()
defer _commitLock_.Unlock()
if err := lgr.Commit(block); err != nil {
if err := lgr.CommitWithPvtData(&ledger.BlockAndPvtData{
Block: block,
}); err != nil {
return err
}
}
Expand Down
3 changes: 0 additions & 3 deletions core/committer/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@ import (
// change
type Committer interface {

// Commit block to the ledger
Commit(block *common.Block) error

// CommitWithPvtData block and private data into the ledger
CommitWithPvtData(blockAndPvtData *ledger.BlockAndPvtData) error

Expand Down
5 changes: 4 additions & 1 deletion core/committer/txvalidator/txvalidator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/hyperledger/fabric/common/ledger/testutil"
util2 "github.com/hyperledger/fabric/common/util"
"github.com/hyperledger/fabric/core/common/sysccprovider"
ledger2 "github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/ledgermgmt"
"github.com/hyperledger/fabric/core/ledger/util"
ledgerUtil "github.com/hyperledger/fabric/core/ledger/util"
Expand Down Expand Up @@ -153,7 +154,9 @@ func TestNewTxValidator_DuplicateTransactions(t *testing.T) {
// Initialize metadata
utils.InitBlockMetadata(block)
// Commit block to the ledger
ledger.Commit(block)
ledger.CommitWithPvtData(&ledger2.BlockAndPvtData{
Block: block,
})

// Validation should invalidate transaction,
// because it's already committed
Expand Down
4 changes: 3 additions & 1 deletion core/committer/txvalidator/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,9 @@ func putCCInfoWithVSCCAndVer(theLedger ledger.PeerLedger, ccname, vscc, ver stri
pubSimulationBytes, err := simRes.GetPubSimulationBytes()
assert.NoError(t, err)
block0 := testutil.ConstructBlock(t, 1, []byte("hash"), [][]byte{pubSimulationBytes}, true)
err = theLedger.Commit(block0)
err = theLedger.CommitWithPvtData(&ledger.BlockAndPvtData{
Block: block0,
})
assert.NoError(t, err)
}

Expand Down
37 changes: 27 additions & 10 deletions core/endorser/endorser.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,20 +52,24 @@ var endorserLogger = flogging.MustGetLogger("endorser")
// The Jira issue that documents Endorser flow along with its relationship to
// the lifecycle chaincode - https://jira.hyperledger.org/browse/FAB-181

type privateDataDistributor func(channel string, txID string, privateData []byte) error

// Endorser provides the Endorser service ProcessProposal
type Endorser struct {
policyChecker policy.PolicyChecker
policyChecker policy.PolicyChecker
distributePrivateData privateDataDistributor
}

// NewEndorserServer creates and returns a new Endorser server instance.
func NewEndorserServer() pb.EndorserServer {
e := new(Endorser)
e.policyChecker = policy.NewPolicyChecker(
peer.NewChannelPolicyManagerGetter(),
mgmt.GetLocalMSP(),
mgmt.NewLocalMSPPrincipalGetter(),
)

func NewEndorserServer(privDist privateDataDistributor) pb.EndorserServer {
e := &Endorser{
distributePrivateData: privDist,
policyChecker: policy.NewPolicyChecker(
peer.NewChannelPolicyManagerGetter(),
mgmt.GetLocalMSP(),
mgmt.NewLocalMSPPrincipalGetter(),
),
}
return e
}

Expand Down Expand Up @@ -257,6 +261,7 @@ func (e *Endorser) simulateProposal(ctx context.Context, chainID string, txid st
//---3. execute the proposal and get simulation results
var simResult *ledger.TxSimulationResults
var pubSimResBytes []byte
var prvtSimResBytes []byte
var res *pb.Response
var ccevent *pb.ChaincodeEvent
res, ccevent, err = e.callChaincode(ctx, chainID, version, txid, signedProp, prop, cis, cid, txsim)
Expand All @@ -270,6 +275,16 @@ func (e *Endorser) simulateProposal(ctx context.Context, chainID string, txid st
return nil, nil, nil, nil, err
}

if prvtSimResBytes, err = simResult.GetPvtSimulationBytes(); err != nil {
return nil, nil, nil, nil, err
}

if len(prvtSimResBytes) > 0 {
if err := e.distributePrivateData(chainID, txid, prvtSimResBytes); err != nil {
return nil, nil, nil, nil, err
}
}

if pubSimResBytes, err = simResult.GetPubSimulationBytes(); err != nil {
return nil, nil, nil, nil, err
}
Expand Down Expand Up @@ -541,7 +556,9 @@ func (e *Endorser) commitTxSimulation(proposal *pb.Proposal, chainID string, sig
block := common.NewBlock(blockNumber, []byte{})
block.Data.Data = [][]byte{txBytes}
block.Header.DataHash = block.Data.Hash()
if err = lgr.Commit(block); err != nil {
if err = lgr.CommitWithPvtData(&ledger.BlockAndPvtData{
Block: block,
}); err != nil {
return err
}

Expand Down
4 changes: 3 additions & 1 deletion core/endorser/endorser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -746,7 +746,9 @@ func TestMain(m *testing.M) {
return
}

endorserServer = NewEndorserServer()
endorserServer = NewEndorserServer(func(channel string, txID string, privateData []byte) error {
return nil
})

// setup the MSP manager so that we can sign/verify
err = msptesttools.LoadMSPSetupForTesting()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,8 @@ func newTestHistoryEnv(t *testing.T) *levelDBLockBasedHistoryEnv {
testDB := testDBEnv.GetDBHandle(testLedgerID)

testTStoreEnv := transientstore.NewTestStoreEnv(t)
testTransientStore, err := testTStoreEnv.TestStoreProvider.OpenStore(testLedgerID)
testutil.AssertNoError(t, err, "")

txMgr := lockbasedtxmgr.NewLockBasedTxMgr(testDB, testTransientStore)
txMgr := lockbasedtxmgr.NewLockBasedTxMgr(testDB)
testHistoryDBProvider := NewHistoryDBProvider()
testHistoryDB, err := testHistoryDBProvider.GetDBHandle("TestHistoryDB")
testutil.AssertNoError(t, err, "")
Expand Down
28 changes: 7 additions & 21 deletions core/ledger/kvledger/kv_ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,27 +43,25 @@ var logger = flogging.MustGetLogger("kvledger")
// KVLedger provides an implementation of `ledger.PeerLedger`.
// This implementation provides a key-value based data model
type kvLedger struct {
ledgerID string
blockStore *ledgerstorage.Store
txtmgmt txmgr.TxMgr
historyDB historydb.HistoryDB
transientStore transientstore.Store
ledgerID string
blockStore *ledgerstorage.Store
txtmgmt txmgr.TxMgr
historyDB historydb.HistoryDB
}

// NewKVLedger constructs new `KVLedger`
func newKVLedger(ledgerID string, blockStore *ledgerstorage.Store,
versionedDB privacyenabledstate.DB, historyDB historydb.HistoryDB,
transientStore transientstore.Store) (*kvLedger, error) {
versionedDB privacyenabledstate.DB, historyDB historydb.HistoryDB) (*kvLedger, error) {

logger.Debugf("Creating KVLedger ledgerID=%s: ", ledgerID)

//Initialize transaction manager using state database
var txmgmt txmgr.TxMgr
txmgmt = pvtdatatxmgr.NewLockbasedTxMgr(versionedDB, transientStore)
txmgmt = pvtdatatxmgr.NewLockbasedTxMgr(versionedDB)

// Create a kvLedger for this chain/ledger, which encasulates the underlying
// id store, blockstore, txmgr (state database), history database
l := &kvLedger{ledgerID, blockStore, txmgmt, historyDB, transientStore}
l := &kvLedger{ledgerID, blockStore, txmgmt, historyDB}

//Recover both state DB and history DB if they are out of sync with block storage
if err := l.recoverDBs(); err != nil {
Expand Down Expand Up @@ -210,17 +208,6 @@ func (l *kvLedger) NewHistoryQueryExecutor() (ledger.HistoryQueryExecutor, error
return l.historyDB.NewHistoryQueryExecutor(l.blockStore)
}

// Commit commits the valid block (returned in the method RemoveInvalidTransactionsAndPrepare) and related state changes
// TODO when we move the transient store outside the ledger, the commiter would invoke function `CommitWithPvtData` and this
// function will be removed
func (l *kvLedger) Commit(block *common.Block) error {
pvtdata, err := retrievePrivateData(l.transientStore, block)
if err != nil {
return err
}
return l.CommitWithPvtData(&ledger.BlockAndPvtData{Block: block, BlockPvtData: pvtdata})
}

// CommitWithPvtData commits the block and the corresponding pvt data in an atomic operation
func (l *kvLedger) CommitWithPvtData(pvtdataAndBlock *ledger.BlockAndPvtData) error {
var err error
Expand Down Expand Up @@ -282,7 +269,6 @@ func (l *kvLedger) PrivateDataMinBlockNum() (uint64, error) {
func (l *kvLedger) Close() {
l.blockStore.Shutdown()
l.txtmgmt.Shutdown()
l.transientStore.Shutdown()
}

// retrievePrivateData retrieves the pvt data from the transient store for committing it into the
Expand Down
33 changes: 13 additions & 20 deletions core/ledger/kvledger/kv_ledger_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/privacyenabledstate"
"github.com/hyperledger/fabric/core/ledger/ledgerconfig"
"github.com/hyperledger/fabric/core/ledger/ledgerstorage"
"github.com/hyperledger/fabric/core/transientstore"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/utils"
"github.com/syndtr/goleveldb/leveldb"
Expand All @@ -49,11 +48,10 @@ var (

// Provider implements interface ledger.PeerLedgerProvider
type Provider struct {
idStore *idStore
ledgerStoreProvider *ledgerstorage.Provider
vdbProvider privacyenabledstate.DBProvider
historydbProvider historydb.HistoryDBProvider
transientStoreProvider transientstore.StoreProvider
idStore *idStore
ledgerStoreProvider *ledgerstorage.Provider
vdbProvider privacyenabledstate.DBProvider
historydbProvider historydb.HistoryDBProvider
}

// NewProvider instantiates a new Provider.
Expand All @@ -73,14 +71,14 @@ func NewProvider() (ledger.PeerLedgerProvider, error) {
return nil, err
}
// Initialize the transient store (temporary storage of private rwset)
transientStoreProvider := transientstore.NewCustomPathStoreProvider(ledgerconfig.GetTransientStorePath())
//transientStoreProvider := transientstore.NewCustomPathStoreProvider(ledgerconfig.GetTransientStorePath())

// Initialize the history database (index for history of values by key)
var historydbProvider historydb.HistoryDBProvider
historydbProvider = historyleveldb.NewHistoryDBProvider()

logger.Info("ledger provider Initialized")
provider := &Provider{idStore, ledgerStoreProvider, vdbProvider, historydbProvider, transientStoreProvider}
provider := &Provider{idStore, ledgerStoreProvider, vdbProvider, historydbProvider}
provider.recoverUnderConstructionLedger()
return provider, nil
}
Expand All @@ -105,19 +103,21 @@ func (provider *Provider) Create(genesisBlock *common.Block) (ledger.PeerLedger,
if err = provider.idStore.setUnderConstructionFlag(ledgerID); err != nil {
return nil, err
}
ledger, err := provider.openInternal(ledgerID)
lgr, err := provider.openInternal(ledgerID)
if err != nil {
logger.Errorf("Error in opening a new empty ledger. Unsetting under construction flag. Err: %s", err)
panicOnErr(provider.runCleanup(ledgerID), "Error while running cleanup for ledger id [%s]", ledgerID)
panicOnErr(provider.idStore.unsetUnderConstructionFlag(), "Error while unsetting under construction flag")
return nil, err
}
if err := ledger.Commit(genesisBlock); err != nil {
ledger.Close()
if err := lgr.CommitWithPvtData(&ledger.BlockAndPvtData{
Block: genesisBlock,
}); err != nil {
lgr.Close()
return nil, err
}
panicOnErr(provider.idStore.createLedgerID(ledgerID, genesisBlock), "Error while marking ledger as created")
return ledger, nil
return lgr, nil
}

// Open implements the corresponding method from interface ledger.PeerLedgerProvider
Expand Down Expand Up @@ -153,15 +153,9 @@ func (provider *Provider) openInternal(ledgerID string) (ledger.PeerLedger, erro
return nil, err
}

// Get the transient store for a chain/ledger
transientStore, err := provider.transientStoreProvider.OpenStore(ledgerID)
if err != nil {
return nil, err
}

// Create a kvLedger for this chain/ledger, which encasulates the underlying data stores
// (id store, blockstore, state database, history database)
l, err := newKVLedger(ledgerID, blockStore, vDB, historyDB, transientStore)
l, err := newKVLedger(ledgerID, blockStore, vDB, historyDB)
if err != nil {
return nil, err
}
Expand All @@ -184,7 +178,6 @@ func (provider *Provider) Close() {
provider.ledgerStoreProvider.Close()
provider.vdbProvider.Close()
provider.historydbProvider.Close()
provider.transientStoreProvider.Close()
}

// recoverUnderConstructionLedger checks whether the under construction flag is set - this would be the case
Expand Down
14 changes: 7 additions & 7 deletions core/ledger/kvledger/kv_ledger_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/hyperledger/fabric/common/ledger/blkstorage/fsblkstorage"
"github.com/hyperledger/fabric/common/ledger/testutil"
"github.com/hyperledger/fabric/common/util"
"github.com/hyperledger/fabric/core/ledger"
ledgerproto "github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/ledgerconfig"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/ledger/queryresult"
Expand Down Expand Up @@ -90,7 +90,7 @@ func TestRecovery(t *testing.T) {
// now create the genesis block
genesisBlock, _ := configtxtest.MakeGenesisBlock(constructTestLedgerID(1))
ledger, err := provider.(*Provider).openInternal(constructTestLedgerID(1))
ledger.Commit(genesisBlock)
ledger.CommitWithPvtData(&ledgerproto.BlockAndPvtData{Block: genesisBlock})
ledger.Close()

// Case 1: assume a crash happens, force underconstruction flag to be set to simulate
Expand Down Expand Up @@ -128,7 +128,7 @@ func TestMultipleLedgerBasicRW(t *testing.T) {
defer env.cleanup()
numLedgers := 10
provider, _ := NewProvider()
ledgers := make([]ledger.PeerLedger, numLedgers)
ledgers := make([]ledgerproto.PeerLedger, numLedgers)
for i := 0; i < numLedgers; i++ {
bg, gb := testutil.NewBlockGenerator(t, constructTestLedgerID(i), false)
l, err := provider.Create(gb)
Expand All @@ -143,7 +143,7 @@ func TestMultipleLedgerBasicRW(t *testing.T) {
testutil.AssertNoError(t, err, "")
pubSimBytes, _ := res.GetPubSimulationBytes()
b := bg.NextBlock([][]byte{pubSimBytes})
err = l.Commit(b)
err = l.CommitWithPvtData(&ledgerproto.BlockAndPvtData{Block: b})
l.Close()
testutil.AssertNoError(t, err, "")
}
Expand All @@ -152,7 +152,7 @@ func TestMultipleLedgerBasicRW(t *testing.T) {

provider, _ = NewProvider()
defer provider.Close()
ledgers = make([]ledger.PeerLedger, numLedgers)
ledgers = make([]ledgerproto.PeerLedger, numLedgers)
for i := 0; i < numLedgers; i++ {
l, err := provider.Open(constructTestLedgerID(i))
testutil.AssertNoError(t, err, "")
Expand Down Expand Up @@ -191,7 +191,7 @@ func TestLedgerBackup(t *testing.T) {
simRes, _ := simulator.GetTxSimulationResults()
pubSimBytes, _ := simRes.GetPubSimulationBytes()
block1 := bg.NextBlock([][]byte{pubSimBytes})
ledger.Commit(block1)
ledger.CommitWithPvtData(&ledgerproto.BlockAndPvtData{Block: block1})

txid = util.GenerateUUID()
simulator, _ = ledger.NewTxSimulator(txid)
Expand All @@ -202,7 +202,7 @@ func TestLedgerBackup(t *testing.T) {
simRes, _ = simulator.GetTxSimulationResults()
pubSimBytes, _ = simRes.GetPubSimulationBytes()
block2 := bg.NextBlock([][]byte{pubSimBytes})
ledger.Commit(block2)
ledger.CommitWithPvtData(&ledgerproto.BlockAndPvtData{Block: block2})

ledger.Close()
provider.Close()
Expand Down
Loading

0 comments on commit 297d393

Please sign in to comment.