Skip to content

Commit

Permalink
Create statedb indexes when creating ledger from snapshot (#1864)
Browse files Browse the repository at this point in the history
Signed-off-by: manish <manish.sethi@gmail.com>
  • Loading branch information
manish-sethi authored Sep 10, 2020
1 parent d5c0fe2 commit b534995
Show file tree
Hide file tree
Showing 11 changed files with 427 additions and 24 deletions.
22 changes: 20 additions & 2 deletions core/chaincode/lifecycle/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,8 +588,26 @@ func (c *Cache) update(initializing bool, channelID string, dirtyChaincodes map[
}

// RegisterListener registers an event listener for receiving an event when a chaincode becomes invokable
func (c *Cache) RegisterListener(channelID string, listener ledger.ChaincodeLifecycleEventListener) {
c.eventBroker.RegisterListener(channelID, listener)
func (c *Cache) RegisterListener(
channelID string,
listener ledger.ChaincodeLifecycleEventListener,
needsExistingChaincodesDefinitions bool,
) error {
c.mutex.RLock()
defer c.mutex.RUnlock()

channelChaincodes, ok := c.definedChaincodes[channelID]
if !ok {
return errors.Errorf("unknown channel '%s'", channelID)
}

if needsExistingChaincodesDefinitions {
c.eventBroker.RegisterListener(channelID, listener, channelChaincodes.Chaincodes)
} else {
c.eventBroker.RegisterListener(channelID, listener, nil)
}

return nil
}

func (c *Cache) InitializeMetadata(channel string) {
Expand Down
185 changes: 184 additions & 1 deletion core/chaincode/lifecycle/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -768,6 +768,189 @@ var _ = Describe("Cache", func() {
})
})

Describe("RegisterListener", func() {
var fakeListener *ledgermock.ChaincodeLifecycleEventListener
BeforeEach(func() {
fakeListener = &ledgermock.ChaincodeLifecycleEventListener{}
channelCache = &lifecycle.ChannelCache{
Chaincodes: map[string]*lifecycle.CachedChaincodeDefinition{
"definedInstalledAndApprovedCC": {
Definition: &lifecycle.ChaincodeDefinition{
Sequence: 3,
EndorsementInfo: &lb.ChaincodeEndorsementInfo{
Version: "chaincode-version",
},
ValidationInfo: &lb.ChaincodeValidationInfo{
ValidationParameter: []byte("validation-parameter"),
},
Collections: &pb.CollectionConfigPackage{},
},
Approved: true,
},
"anotherDefinedInstalledAndApprovedCC": {
Definition: &lifecycle.ChaincodeDefinition{
Sequence: 3,
EndorsementInfo: &lb.ChaincodeEndorsementInfo{
Version: "chaincode-version",
},
ValidationInfo: &lb.ChaincodeValidationInfo{
ValidationParameter: []byte("validation-parameter"),
},
Collections: &pb.CollectionConfigPackage{},
},
Approved: true,
},
"idontapprove": {
Definition: &lifecycle.ChaincodeDefinition{
Sequence: 3,
EndorsementInfo: &lb.ChaincodeEndorsementInfo{
Version: "chaincode-version",
},
ValidationInfo: &lb.ChaincodeValidationInfo{
ValidationParameter: []byte("validation-parameter"),
},
Collections: &pb.CollectionConfigPackage{},
},
Approved: false,
},
"ididntinstall": {
Definition: &lifecycle.ChaincodeDefinition{
Sequence: 3,
EndorsementInfo: &lb.ChaincodeEndorsementInfo{
Version: "chaincode-version",
},
ValidationInfo: &lb.ChaincodeValidationInfo{
ValidationParameter: []byte("validation-parameter"),
},
Collections: &pb.CollectionConfigPackage{},
},
Approved: true,
},
},
}

localChaincodes = map[string]*lifecycle.LocalChaincode{
string(util.ComputeSHA256(protoutil.MarshalOrPanic(&lb.StateData{
Type: &lb.StateData_String_{String_: "packageID"},
}))): {
References: map[string]map[string]*lifecycle.CachedChaincodeDefinition{
"channel-id": {
"definedInstalledAndApprovedCC": channelCache.Chaincodes["definedInstalledAndApprovedCC"],
"idontapprove": channelCache.Chaincodes["idontapprove"],
},
},
},

string(util.ComputeSHA256(protoutil.MarshalOrPanic(&lb.StateData{
Type: &lb.StateData_String_{String_: "anotherPackageID"},
}))): {
References: map[string]map[string]*lifecycle.CachedChaincodeDefinition{
"channel-id": {
"anotherDefinedInstalledAndApprovedCC": channelCache.Chaincodes["anotherDefinedInstalledAndApprovedCC"],
},
},
},
}

fakeCCStore.ListInstalledChaincodesReturns([]chaincode.InstalledChaincode{
{
Hash: []byte("hash"),
PackageID: "packageID",
},
{
Hash: []byte("hash"),
PackageID: "anotherPackageID",
},
}, nil)

lifecycle.SetChaincodeMap(c, "channel-id", channelCache)
lifecycle.SetLocalChaincodesMap(c, localChaincodes)
err := c.InitializeLocalChaincodes()
Expect(err).NotTo(HaveOccurred())
})

Context("when channel does not exist", func() {
It("returns error", func() {
err := c.RegisterListener("non-existing-channel", fakeListener, true)
Expect(err).To(MatchError("unknown channel 'non-existing-channel'"))
})
})

Context("when listener wants existing chaincode info", func() {
It("calls back the listener with only invocable chaincodes", func() {
err := c.RegisterListener("channel-id", fakeListener, true)
Expect(err).NotTo(HaveOccurred())
Expect(fakeListener.HandleChaincodeDeployCallCount()).To(Equal(2))
Expect(fakeListener.ChaincodeDeployDoneCallCount()).To(Equal(2))
ccdef0, dbArtifacts0 := fakeListener.HandleChaincodeDeployArgsForCall(0)
ccdef1, dbArtifacts1 := fakeListener.HandleChaincodeDeployArgsForCall(1)
Expect(
[]*ledger.ChaincodeDefinition{
ccdef0,
ccdef1,
},
).To(ConsistOf(
[]*ledger.ChaincodeDefinition{
{
Name: "definedInstalledAndApprovedCC",
Version: "chaincode-version",
Hash: []byte("packageID"),
CollectionConfigs: &pb.CollectionConfigPackage{},
},
{
Name: "anotherDefinedInstalledAndApprovedCC",
Version: "chaincode-version",
Hash: []byte("anotherPackageID"),
CollectionConfigs: &pb.CollectionConfigPackage{},
},
},
))
Expect([][]byte{dbArtifacts0, dbArtifacts1}).To(Equal([][]byte{[]byte("db-artifacts"), []byte("db-artifacts")}))
})

Context("when chaincode store returns error for one of the chaincodes", func() {
BeforeEach(func() {
fakeCCStore.LoadStub = func(packageID string) ([]byte, error) {
if packageID == "packageID" {
return nil, fmt.Errorf("loading-error")
}
return []byte("package-bytes"), nil
}
})
It("supresses the error", func() {
err := c.RegisterListener("channel-id", fakeListener, true)
Expect(err).NotTo(HaveOccurred())
Expect(fakeListener.HandleChaincodeDeployCallCount()).To(Equal(1))
Expect(fakeListener.ChaincodeDeployDoneCallCount()).To(Equal(1))
})
})

Context("when chaincode package parser returns error for both the chaincodes", func() {
BeforeEach(func() {
fakeParser.ParseReturns(nil, fmt.Errorf("parsing-error"))
})
It("supresses the error", func() {
err := c.RegisterListener("channel-id", fakeListener, true)
Expect(err).NotTo(HaveOccurred())
Expect(fakeListener.HandleChaincodeDeployCallCount()).To(Equal(0))
Expect(fakeListener.ChaincodeDeployDoneCallCount()).To(Equal(0))
})
})

Context("when listener returns error", func() {
BeforeEach(func() {
fakeListener.HandleChaincodeDeployReturns(fmt.Errorf("listener-error"))
})
It("supresses the error", func() {
err := c.RegisterListener("channel-id", fakeListener, true)
Expect(err).NotTo(HaveOccurred())
Expect(fakeListener.HandleChaincodeDeployCallCount()).To(Equal(2))
Expect(fakeListener.HandleChaincodeDeployCallCount()).To(Equal(2))
})
})
})
})

Describe("StateListener", func() {
Describe("InterestedInNamespaces", func() {
It("returns _lifecycle", func() {
Expand Down Expand Up @@ -906,7 +1089,7 @@ var _ = Describe("Cache", func() {

BeforeEach(func() {
fakeListener = &ledgermock.ChaincodeLifecycleEventListener{}
c.RegisterListener("channel-id", fakeListener)
c.RegisterListener("channel-id", fakeListener, false)

})

Expand Down
48 changes: 43 additions & 5 deletions core/chaincode/lifecycle/event_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,45 @@ func NewEventBroker(chaincodeStore ChaincodeStore, pkgParser PackageParser, ebMe
}
}

func (b *EventBroker) RegisterListener(channelID string, listener ledger.ChaincodeLifecycleEventListener) {
func (b *EventBroker) RegisterListener(
channelID string,
listener ledger.ChaincodeLifecycleEventListener,
existingCachedChaincodes map[string]*CachedChaincodeDefinition) {
// when invoking chaincode event listener with existing invocable chaincodes, we logs
// errors instead of returning the error from this function to keep the consustent behavior
// similar to the code path when we invoke the listener later on as a response to the chaincode
// lifecycle events. See other functions below for details on this behavior.
for chaincodeName, cachedChaincode := range existingCachedChaincodes {
if !isChaincodeInvocable(cachedChaincode) {
continue
}

dbArtifacts, err := b.loadDBArtifacts(cachedChaincode.InstallInfo.PackageID)
if err != nil {
logger.Errorw(
"error while loading db artifacts for chaincode package. Continuing...",
"packageID", cachedChaincode.InstallInfo.PackageID,
"error", err,
)
continue
}
legacyDefinition := &ledger.ChaincodeDefinition{
Name: chaincodeName,
Version: cachedChaincode.Definition.EndorsementInfo.Version,
Hash: []byte(cachedChaincode.InstallInfo.PackageID),
CollectionConfigs: cachedChaincode.Definition.Collections,
}

if err := listener.HandleChaincodeDeploy(legacyDefinition, dbArtifacts); err != nil {
logger.Errorw(
"error while invoking chaincode lifecycle events listener. Continuing...",
"packageID", cachedChaincode.InstallInfo.PackageID,
"error", err,
)
}
listener.ChaincodeDeployDone(true)
}

b.mutex.Lock()
defer b.mutex.Unlock()
b.listeners[channelID] = append(b.listeners[channelID], listener)
Expand All @@ -53,7 +91,7 @@ func (b *EventBroker) ProcessInstallEvent(localChaincode *LocalChaincode) {
for channelID, channelCache := range localChaincode.References {
listenersInvokedOnChannel := false
for chaincodeName, cachedChaincode := range channelCache {
if !isChaincodeInvokable(cachedChaincode) {
if !isChaincodeInvocable(cachedChaincode) {
continue
}
ccdef := &ledger.ChaincodeDefinition{
Expand Down Expand Up @@ -86,7 +124,7 @@ func (b *EventBroker) ProcessInstallEvent(localChaincode *LocalChaincode) {
// invokes this function when approve and define both become true.
func (b *EventBroker) ProcessApproveOrDefineEvent(channelID string, chaincodeName string, cachedChaincode *CachedChaincodeDefinition) {
logger.Debugw("processApproveOrDefineEvent()", "channelID", channelID, "chaincodeName", chaincodeName, "cachedChaincode", cachedChaincode)
if !isChaincodeInvokable(cachedChaincode) {
if !isChaincodeInvocable(cachedChaincode) {
return
}
dbArtifacts, err := b.loadDBArtifacts(cachedChaincode.InstallInfo.PackageID)
Expand Down Expand Up @@ -176,7 +214,7 @@ func (b *EventBroker) loadDBArtifacts(packageID string) ([]byte, error) {
return pkg.DBArtifacts, nil
}

// isChaincodeInvokable returns true iff a chaincode is approved and installed and defined
func isChaincodeInvokable(ccInfo *CachedChaincodeDefinition) bool {
// isChaincodeInvocable returns true iff a chaincode is approved and installed and defined
func isChaincodeInvocable(ccInfo *CachedChaincodeDefinition) bool {
return ccInfo.Approved && ccInfo.InstallInfo != nil && ccInfo.Definition != nil
}
2 changes: 1 addition & 1 deletion core/chaincode/lifecycle/event_broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ var _ = Describe("EventBroker", func() {
},
References: make(map[string]map[string]*lifecycle.CachedChaincodeDefinition),
}
eventBroker.RegisterListener("channel-1", fakeListener)
eventBroker.RegisterListener("channel-1", fakeListener, nil)
pkgParser.ParseReturns(&persistence.ChaincodePackage{
DBArtifacts: []byte("db-artifacts"),
}, nil)
Expand Down
7 changes: 6 additions & 1 deletion core/ledger/kvledger/kv_ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ type kvLedger struct {

type lgrInitializer struct {
ledgerID string
initializingFromSnapshot bool
bootSnapshotMetadata *snapshotMetadata
blockStore *blkstorage.BlockStore
pvtdataStore *pvtdatastorage.Store
Expand Down Expand Up @@ -148,7 +149,11 @@ func newKVLedger(initializer *lgrInitializer) (*kvLedger, error) {
logger.Debugf("Register state db for chaincode lifecycle events: %t", ccEventListener != nil)
if ccEventListener != nil {
cceventmgmt.GetMgr().Register(ledgerID, ccEventListener)
initializer.ccLifecycleEventProvider.RegisterListener(ledgerID, &ccEventListenerAdaptor{ccEventListener})
initializer.ccLifecycleEventProvider.RegisterListener(
ledgerID,
&ccEventListenerAdaptor{ccEventListener},
initializer.initializingFromSnapshot,
)
}

//Recover both state DB and history DB if they are out of sync with block storage
Expand Down
7 changes: 4 additions & 3 deletions core/ledger/kvledger/kv_ledger_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func (p *Provider) CreateFromGenesisBlock(genesisBlock *common.Block) (ledger.Pe
return nil, err
}

lgr, err := p.open(ledgerID, nil)
lgr, err := p.open(ledgerID, nil, false)
if err != nil {
return nil, p.deleteUnderConstructionLedger(lgr, ledgerID, err)
}
Expand Down Expand Up @@ -324,10 +324,10 @@ func (p *Provider) Open(ledgerID string) (ledger.PeerLedger, error) {
if err != nil {
return nil, err
}
return p.open(ledgerID, bootSnapshotMetadata)
return p.open(ledgerID, bootSnapshotMetadata, false)
}

func (p *Provider) open(ledgerID string, bootSnapshotMetadata *snapshotMetadata) (ledger.PeerLedger, error) {
func (p *Provider) open(ledgerID string, bootSnapshotMetadata *snapshotMetadata, initializingFromSnapshot bool) (ledger.PeerLedger, error) {
// Get the block store for a chain/ledger
blockStore, err := p.blkStoreProvider.Open(ledgerID)
if err != nil {
Expand Down Expand Up @@ -375,6 +375,7 @@ func (p *Provider) open(ledgerID string, bootSnapshotMetadata *snapshotMetadata)
hashProvider: p.initializer.HashProvider,
config: p.initializer.Config,
bootSnapshotMetadata: bootSnapshotMetadata,
initializingFromSnapshot: initializingFromSnapshot,
}

l, err := newKVLedger(initializer)
Expand Down
10 changes: 9 additions & 1 deletion core/ledger/kvledger/kv_ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,17 @@ import (
"github.com/stretchr/testify/require"
)

var couchDBAddress string
var stopCouchDBFunc func()

func TestMain(m *testing.M) {
flogging.ActivateSpec("lockbasedtxmgr,statevalidator,valimpl,confighistory,pvtstatepurgemgmt=debug")
os.Exit(m.Run())
exitCode := m.Run()
if couchDBAddress != "" {
couchDBAddress = ""
stopCouchDBFunc()
}
os.Exit(exitCode)
}

func TestKVLedgerNilHistoryDBProvider(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion core/ledger/kvledger/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ func (p *Provider) CreateFromSnapshot(snapshotDir string) (ledger.PeerLedger, st
}
}

lgr, err := p.open(ledgerID, metadata)
lgr, err := p.open(ledgerID, metadata, true)
if err != nil {
return nil, "", p.deleteUnderConstructionLedger(
lgr,
Expand Down
Loading

0 comments on commit b534995

Please sign in to comment.