Skip to content

Commit

Permalink
Couchdb indexes during bootstrap from a sanpshot for legacy chaincodes
Browse files Browse the repository at this point in the history
This commit creates the couchdb indexes for legacy chaincodes
when creating a channel from a snapshot. This is performed for the legacy
chaincodes that are defined (and not yet overridden by the new chaincode lifecycle),
as per the snaphot files for statedb data and are installed on the peer.

Signed-off-by: manish <manish.sethi@gmail.com>
  • Loading branch information
manish-sethi authored and mastersingh24 committed Oct 6, 2020
1 parent 5455e2f commit 9ab2d38
Show file tree
Hide file tree
Showing 4 changed files with 254 additions and 41 deletions.
14 changes: 13 additions & 1 deletion core/ledger/cceventmgmt/mgmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ func TestCCEventMgmt(t *testing.T) {
eventMgr.Register("channel1", handler3)
eventMgr.Register("channel2", handler3)

cc1ExpectedEvent := &mockEvent{cc1Def, cc1DBArtifactsTar}
cc2ExpectedEvent := &mockEvent{cc2Def, cc2DBArtifactsTar}
_ = cc2ExpectedEvent
cc3ExpectedEvent := &mockEvent{cc3Def, cc3DBArtifactsTar}

// Deploy cc3 on chain1 - handler1 and handler3 should receive event because cc3 is being deployed only on chain1
Expand Down Expand Up @@ -95,6 +95,18 @@ func TestCCEventMgmt(t *testing.T) {
)
eventMgr.ChaincodeInstallDone(true)
require.NotContains(t, handler1.eventsRecieved, cc2ExpectedEvent)

mockListener := &mockHandler{}
require.NoError(t,
mgr.RegisterAndInvokeFor([]*ChaincodeDefinition{cc1Def, cc2Def, cc3Def},
"test-ledger", mockListener,
),
)
require.Contains(t, mockListener.eventsRecieved, cc1ExpectedEvent)
require.Contains(t, mockListener.eventsRecieved, cc3ExpectedEvent)
require.NotContains(t, mockListener.eventsRecieved, cc2ExpectedEvent)
require.Equal(t, 2, mockListener.doneRecievedCount)
require.Contains(t, mgr.ccLifecycleListeners["test-ledger"], mockListener)
}

func TestLSCCListener(t *testing.T) {
Expand Down
22 changes: 22 additions & 0 deletions core/ledger/cceventmgmt/mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,28 @@ func (m *Mgr) Register(ledgerid string, l ChaincodeLifecycleEventListener) {
m.ccLifecycleListeners[ledgerid] = append(m.ccLifecycleListeners[ledgerid], l)
}

// RegisterAndInvokeFor registers the listener and in addition invokes the listener for each chaincode that is present in the supplied
// list of legacyChaincodes and is installed on the peer
func (m *Mgr) RegisterAndInvokeFor(legacyChaincodes []*ChaincodeDefinition, ledgerid string, l ChaincodeLifecycleEventListener) error {
m.rwlock.Lock()
defer m.rwlock.Unlock()
m.ccLifecycleListeners[ledgerid] = append(m.ccLifecycleListeners[ledgerid], l)
for _, chaincodeDefinition := range legacyChaincodes {
installed, dbArtifacts, err := m.infoProvider.RetrieveChaincodeArtifacts(chaincodeDefinition)
if err != nil {
return err
}
if !installed {
continue
}
if err := l.HandleChaincodeDeploy(chaincodeDefinition, dbArtifacts); err != nil {
return err
}
l.ChaincodeDeployDone(true)
}
return nil
}

// HandleChaincodeDeploy is expected to be invoked when a chaincode is deployed via a deploy transaction
// The `chaincodeDefinitions` parameter contains all the chaincodes deployed in a block
// We need to store the last received `chaincodeDefinitions` because this function is expected to be invoked
Expand Down
92 changes: 82 additions & 10 deletions core/ledger/kvledger/kv_ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,18 +142,19 @@ func newKVLedger(initializer *lgrInitializer) (*kvLedger, error) {
}
l.isPvtstoreAheadOfBlkstore.Store(isAhead)

// TODO Move the function `GetChaincodeEventListener` to ledger interface and
// this functionality of registering for events to ledgermgmt package so that this
// is reused across other future ledger implementations
ccEventListener := initializer.stateDB.GetChaincodeEventListener()
logger.Debugf("Register state db for chaincode lifecycle events: %t", ccEventListener != nil)
if ccEventListener != nil {
cceventmgmt.GetMgr().Register(ledgerID, ccEventListener)
initializer.ccLifecycleEventProvider.RegisterListener(
ledgerID,
&ccEventListenerAdaptor{ccEventListener},
statedbIndexCreator := initializer.stateDB.GetChaincodeEventListener()
if statedbIndexCreator != nil {
logger.Debugf("Register state db for chaincode lifecycle events")
err := l.registerStateDBIndexCreatorForChaincodeLifecycleEvents(
statedbIndexCreator,
initializer.ccInfoProvider,
initializer.ccLifecycleEventProvider,
cceventmgmt.GetMgr(),
initializer.initializingFromSnapshot,
)
if err != nil {
return nil, err
}
}

//Recover both state DB and history DB if they are out of sync with block storage
Expand All @@ -174,6 +175,77 @@ func newKVLedger(initializer *lgrInitializer) (*kvLedger, error) {
return l, nil
}

func (l *kvLedger) registerStateDBIndexCreatorForChaincodeLifecycleEvents(
stateDBIndexCreator cceventmgmt.ChaincodeLifecycleEventListener,
deployedChaincodesInfoExtractor ledger.DeployedChaincodeInfoProvider,
chaincodesLifecycleEventsProvider ledger.ChaincodeLifecycleEventProvider,
legacyChaincodesLifecycleEventsProvider *cceventmgmt.Mgr,
bootstrappingFromSnapshot bool,
) error {
if !bootstrappingFromSnapshot {
// regular opening of ledger
if err := chaincodesLifecycleEventsProvider.RegisterListener(
l.ledgerID, &ccEventListenerAdaptor{stateDBIndexCreator}, false); err != nil {
return err
}
legacyChaincodesLifecycleEventsProvider.Register(l.ledgerID, stateDBIndexCreator)
return nil
}

// opening of ledger after creating from a snapshot -
// it would have been better if we could explicitly retrieve the list of invocable chaincodes instead of
// passing the flag initializer.initializingFromSnapshot to the ccLifecycleEventProvider (which is essentially
// the _lifecycle cache) for directing ccLifecycleEventProvider to call us back. However, the lock that ensures
// the synchronization with the chaincode installer is maintained in the lifecycle cache and by design the lifecycle
// cache takes the responsibility of calling any listener under the lock
if err := chaincodesLifecycleEventsProvider.RegisterListener(
l.ledgerID, &ccEventListenerAdaptor{stateDBIndexCreator}, true); err != nil {
return errors.WithMessage(err, "error while creating statdb indexes after bootstrapping from snapshot")
}

legacyChaincodes, err := l.listLegacyChaincodesDefined(deployedChaincodesInfoExtractor)
if err != nil {
return errors.WithMessage(err, "error while creating statdb indexes after bootstrapping from snapshot")
}

if err := legacyChaincodesLifecycleEventsProvider.RegisterAndInvokeFor(
legacyChaincodes, l.ledgerID, stateDBIndexCreator); err != nil {
return errors.WithMessage(err, "error while creating statdb indexes after bootstrapping from snapshot")
}
return nil
}

func (l *kvLedger) listLegacyChaincodesDefined(
deployedChaincodesInfoExtractor ledger.DeployedChaincodeInfoProvider) (
[]*cceventmgmt.ChaincodeDefinition, error) {
qe, err := l.txmgr.NewQueryExecutor("")
if err != nil {
return nil, err
}
defer qe.Done()

definedChaincodes, err := deployedChaincodesInfoExtractor.AllChaincodesInfo(l.ledgerID, qe)
if err != nil {
return nil, err
}

legacyChaincodes := []*cceventmgmt.ChaincodeDefinition{}
for _, chaincodeInfo := range definedChaincodes {
if !chaincodeInfo.IsLegacy {
continue
}
legacyChaincodes = append(legacyChaincodes,
&cceventmgmt.ChaincodeDefinition{
Name: chaincodeInfo.Name,
Version: chaincodeInfo.Version,
Hash: chaincodeInfo.Hash,
CollectionConfigs: chaincodeInfo.ExplicitCollectionConfigPkg,
},
)
}
return legacyChaincodes, nil
}

func (l *kvLedger) initTxMgr(initializer *txmgr.Initializer) error {
var err error
txmgr, err := txmgr.NewLockBasedTxMgr(initializer)
Expand Down
167 changes: 137 additions & 30 deletions core/ledger/kvledger/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/hyperledger/fabric/core/ledger/cceventmgmt"
"github.com/hyperledger/fabric/core/ledger/confighistory/confighistorytest"
"github.com/hyperledger/fabric/core/ledger/internal/version"
kvledgermock "github.com/hyperledger/fabric/core/ledger/kvledger/mock"
"github.com/hyperledger/fabric/core/ledger/kvledger/msgs"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb/statecouchdb"
"github.com/hyperledger/fabric/core/ledger/mock"
Expand Down Expand Up @@ -287,39 +288,19 @@ func TestSnapshotCouchDBIndexCreation(t *testing.T) {
CouchDB: couchDBConfig,
}
destinationProvider := testutilNewProvider(destConf, t, &mock.DeployedChaincodeInfoProvider{})
ccLifecycleEventProvider := destinationProvider.initializer.ChaincodeLifecycleEventProvider.(*mock.ChaincodeLifecycleEventProvider)
ccLifecycleEventProvider.RegisterListenerStub = func(
channelID string,
listener ledger.ChaincodeLifecycleEventListener,
callback bool) error {
if callback {
err := listener.HandleChaincodeDeploy(
&ledger.ChaincodeDefinition{
Name: "ns",
},
testutil.CreateTarBytesForTest(
[]*testutil.TarFileEntry{
{
Name: "META-INF/statedb/couchdb/indexes/indexSizeSortName.json",
Body: `{"index":{"fields":[{"size":"desc"}]},"ddoc":"indexSizeSortName","name":"indexSizeSortName","type":"json"}`,
},
},
),
)
require.NoError(t, err)
}
return nil
}
return snapshotDir, couchDBConfig, destinationProvider
}

t.Run("create_indexes_on_couchdb", func(t *testing.T) {
snapshotDir, couchDBConfig, provider := setup()
defer func() {
require.NoError(t, statecouchdb.DropApplicationDBs(couchDBConfig))
}()
lgr, _, err := provider.CreateFromSnapshot(snapshotDir)
require.NoError(t, err)
dbArtifactsBytes := testutil.CreateTarBytesForTest(
[]*testutil.TarFileEntry{
{
Name: "META-INF/statedb/couchdb/indexes/indexSizeSortName.json",
Body: `{"index":{"fields":[{"size":"desc"}]},"ddoc":"indexSizeSortName","name":"indexSizeSortName","type":"json"}`,
},
},
)

verifyIndexCreatedOnMarbleSize := func(lgr ledger.PeerLedger) {
qe, err := lgr.NewQueryExecutor()
require.NoError(t, err)
defer qe.Done()
Expand All @@ -338,6 +319,132 @@ func TestSnapshotCouchDBIndexCreation(t *testing.T) {
require.Len(t, actualResults, 1)
_, err = qe.ExecuteQuery("ns", `{"selector":{"owner":"tom"}, "sort": [{"color": "desc"}]}`)
require.Contains(t, err.Error(), "No index exists for this sort")
}

t.Run("create_indexes_on_couchdb_for_new_lifecycle", func(t *testing.T) {
snapshotDir, couchDBConfig, provider := setup()
defer func() {
require.NoError(t, statecouchdb.DropApplicationDBs(couchDBConfig))
}()

// mimic new lifecycle chaincode "ns" installed and defiend and the package contains an index definition "sort index"
ccLifecycleEventProvider := provider.initializer.ChaincodeLifecycleEventProvider.(*mock.ChaincodeLifecycleEventProvider)
ccLifecycleEventProvider.RegisterListenerStub =
func(
channelID string,
listener ledger.ChaincodeLifecycleEventListener,
callback bool,
) error {
if callback {
err := listener.HandleChaincodeDeploy(
&ledger.ChaincodeDefinition{
Name: "ns",
},
dbArtifactsBytes,
)
require.NoError(t, err)
}
return nil
}

lgr, _, err := provider.CreateFromSnapshot(snapshotDir)
require.NoError(t, err)
verifyIndexCreatedOnMarbleSize(lgr)
})

t.Run("create_indexes_on_couchdb_for_legacy_lifecycle", func(t *testing.T) {
snapshotDir, couchDBConfig, provider := setup()
defer func() {
require.NoError(t, statecouchdb.DropApplicationDBs(couchDBConfig))
}()

// mimic legacy chaincode "ns" installed and defiend and the package contains an index definition "sort index"
deployedCCInfoProvider := provider.initializer.DeployedChaincodeInfoProvider.(*mock.DeployedChaincodeInfoProvider)
deployedCCInfoProvider.AllChaincodesInfoReturns(
map[string]*ledger.DeployedChaincodeInfo{
"ns": {
Name: "ns",
Version: "version",
Hash: []byte("hash"),
IsLegacy: true,
},
"anotherNs": {
Name: "anotherNs",
Version: "version",
Hash: []byte("hash"),
IsLegacy: false,
},
},
nil,
)

installedChaincodeInfoProvider := &kvledgermock.ChaincodeInfoProvider{}
installedChaincodeInfoProvider.RetrieveChaincodeArtifactsReturns(
true, dbArtifactsBytes, nil,
)
cceventmgmt.Initialize(installedChaincodeInfoProvider)
lgr, _, err := provider.CreateFromSnapshot(snapshotDir)
require.NoError(t, err)

require.Equal(t, 1, installedChaincodeInfoProvider.RetrieveChaincodeArtifactsCallCount())
require.Equal(t,
&cceventmgmt.ChaincodeDefinition{
Name: "ns",
Version: "version",
Hash: []byte("hash"),
},
installedChaincodeInfoProvider.RetrieveChaincodeArtifactsArgsForCall(0),
)
verifyIndexCreatedOnMarbleSize(lgr)
})

t.Run("errors-propagation", func(t *testing.T) {
snapshotDir, couchDBConfig, provider := setup()
defer func() {
require.NoError(t, statecouchdb.DropApplicationDBs(couchDBConfig))
}()

t.Run("deployedChaincodeInfoProvider-returns-error", func(t *testing.T) {
deployedCCInfoProvider := provider.initializer.DeployedChaincodeInfoProvider.(*mock.DeployedChaincodeInfoProvider)
deployedCCInfoProvider.AllChaincodesInfoReturns(nil, fmt.Errorf("error-retrieving-all-defined-chaincodes"))

installedChaincodeInfoProvider := &kvledgermock.ChaincodeInfoProvider{}
installedChaincodeInfoProvider.RetrieveChaincodeArtifactsReturns(
true, dbArtifactsBytes, nil,
)
cceventmgmt.Initialize(installedChaincodeInfoProvider)
_, _, err := provider.CreateFromSnapshot(snapshotDir)
require.EqualError(t, err, "error while opening ledger: error while creating statdb indexes after bootstrapping from snapshot: error-retrieving-all-defined-chaincodes")
})

t.Run("legacychaincodes-dbartifacts-retriever-returns-error", func(t *testing.T) {
deployedCCInfoProvider := provider.initializer.DeployedChaincodeInfoProvider.(*mock.DeployedChaincodeInfoProvider)
deployedCCInfoProvider.AllChaincodesInfoReturns(
map[string]*ledger.DeployedChaincodeInfo{
"ns": {
Name: "ns",
Version: "version",
Hash: []byte("hash"),
IsLegacy: true,
},
},
nil,
)

installedChaincodeInfoProvider := &kvledgermock.ChaincodeInfoProvider{}
installedChaincodeInfoProvider.RetrieveChaincodeArtifactsReturns(false, nil, fmt.Errorf("error-retrieving-db-artifacts"))
cceventmgmt.Initialize(installedChaincodeInfoProvider)
_, _, err := provider.CreateFromSnapshot(snapshotDir)
require.EqualError(t, err, "error while opening ledger: error while creating statdb indexes after bootstrapping from snapshot: error-retrieving-db-artifacts")
})

t.Run("chaincodeLifecycleEventProvider-returns-error", func(t *testing.T) {
chaincodeLifecycleEventProvider := provider.initializer.ChaincodeLifecycleEventProvider.(*mock.ChaincodeLifecycleEventProvider)
chaincodeLifecycleEventProvider.RegisterListenerReturns(fmt.Errorf("error-calling-back"))
cceventmgmt.Initialize(nil)
_, _, err := provider.CreateFromSnapshot(snapshotDir)
require.EqualError(t, err, "error while opening ledger: error while creating statdb indexes after bootstrapping from snapshot: error-calling-back")
})
})
}

Expand Down

0 comments on commit 9ab2d38

Please sign in to comment.