Skip to content

Commit

Permalink
[FAB-17795] Build channel metadata retroactively if not present
Browse files Browse the repository at this point in the history
Channel metadata has been added to statecouchdb to store the mapping of db namespaces
to db names. However, the channel metadata is not available in v2.0/v2.1 peers. This PR
adds support to retroactively build channel metadata at peer start if such metadata
is not present in statecouchdb.

When a state CouchDB is opened (GetDBHandle), it retoactively builds channel metadata if
the metadata is not present. A NamespaceProvider is implemented to build possible namespaces
for the channel. The possible namespaces will be verified by the existing databases and
only namespaces matching existing databases will be added to channel metadata.

Signed-off-by: Wenjian Qiao <wenjianq@gmail.com>
  • Loading branch information
wenjianqiao committed Jun 9, 2020
1 parent 6598f88 commit 57a5baa
Show file tree
Hide file tree
Showing 22 changed files with 888 additions and 99 deletions.
73 changes: 73 additions & 0 deletions core/chaincode/lifecycle/mock/legacy_ccinfo.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

115 changes: 113 additions & 2 deletions core/ledger/kvledger/channelinfo_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,76 @@ package kvledger

import (
cb "github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric-protos-go/ledger/queryresult"
"github.com/hyperledger/fabric/bccsp/factory"
"github.com/hyperledger/fabric/common/channelconfig"
commonledger "github.com/hyperledger/fabric/common/ledger"
"github.com/hyperledger/fabric/common/ledger/blkstorage"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb"
"github.com/hyperledger/fabric/protoutil"
"github.com/pkg/errors"
)

type channelInfoProvider struct {
channelName string
blockStore *blkstorage.BlockStore
ledger.DeployedChaincodeInfoProvider
}

// GetAllMSPIDs retrieves the MSPIDs of application organizations in all the channel configurations,
// NamespacesAndCollections returns namespaces and their collections for the channel.
func (p *channelInfoProvider) NamespacesAndCollections(vdb statedb.VersionedDB) (map[string][]string, error) {
mspIDs, err := p.getAllMSPIDs()
if err != nil {
return nil, err
}
implicitCollNames := make([]string, len(mspIDs))
for i, mspID := range mspIDs {
implicitCollNames[i] = p.GenerateImplicitCollectionForOrg(mspID).Name
}
chaincodesInfo, err := p.AllChaincodesInfo(p.channelName, &simpleQueryExecutor{vdb})
if err != nil {
return nil, err
}

retNamespaces := map[string][]string{}
// iterate each chaincode, add implicit collections and explicit collections
for _, ccInfo := range chaincodesInfo {
ccName := ccInfo.Name
retNamespaces[ccName] = []string{}
for _, implicitCollName := range implicitCollNames {
retNamespaces[ccName] = append(retNamespaces[ccName], implicitCollName)
}
if ccInfo.ExplicitCollectionConfigPkg == nil {
continue
}
for _, config := range ccInfo.ExplicitCollectionConfigPkg.Config {
collConfig := config.GetStaticCollectionConfig()
if collConfig != nil {
retNamespaces[ccName] = append(retNamespaces[ccName], collConfig.Name)
}
}
}

// add lifecycle management namespaces with implicit collections (not applicable to legacy lifecycle)
for _, ns := range p.Namespaces() {
retNamespaces[ns] = []string{}
if ns == "lscc" {
continue
}
for _, implicitCollName := range implicitCollNames {
retNamespaces[ns] = append(retNamespaces[ns], implicitCollName)
}
}

// add namespace ""
retNamespaces[""] = []string{}
return retNamespaces, nil
}

// getAllMSPIDs retrieves the MSPIDs of application organizations in all the channel configurations,
// including current and previous channel configurations.
func (p *channelInfoProvider) GetAllMSPIDs() ([]string, error) {
func (p *channelInfoProvider) getAllMSPIDs() ([]string, error) {
blockchainInfo, err := p.blockStore.GetBlockchainInfo()
if err != nil {
return nil, err
Expand Down Expand Up @@ -74,3 +130,58 @@ func (p *channelInfoProvider) mostRecentConfigBlockAsOf(blockNum uint64) (*cb.Bl
}
return p.blockStore.RetrieveBlockByNumber(configBlockNum)
}

// simpleQueryExecutor implements ledger.SimpleQueryExecutor interface
type simpleQueryExecutor struct {
statedb.VersionedDB
}

func (sqe *simpleQueryExecutor) GetState(ns string, key string) ([]byte, error) {
versionedValue, err := sqe.VersionedDB.GetState(ns, key)
if err != nil {
return nil, err
}
var value []byte
if versionedValue != nil {
value = versionedValue.Value
}
return value, nil
}

func (sqe *simpleQueryExecutor) GetStateRangeScanIterator(ns string, startKey string, endKey string) (commonledger.ResultsIterator, error) {
dbItr, err := sqe.VersionedDB.GetStateRangeScanIterator(ns, startKey, endKey)
if err != nil {
return nil, err
}
itr := &resultsItr{ns: ns, dbItr: dbItr}
return itr, nil
}

// GetPrivateDataHash is not implemented and should not be called
func (sqe *simpleQueryExecutor) GetPrivateDataHash(namespace, collection, key string) ([]byte, error) {
return nil, errors.New("not implemented yet")
}

type resultsItr struct {
ns string
dbItr statedb.ResultsIterator
}

// Next implements method in interface ledger.ResultsIterator
func (itr *resultsItr) Next() (commonledger.QueryResult, error) {
queryResult, err := itr.dbItr.Next()
if err != nil {
return nil, err
}
// itr.updateRangeQueryInfo(queryResult)
if queryResult == nil {
return nil, nil
}
versionedKV := queryResult.(*statedb.VersionedKV)
return &queryresult.KV{Namespace: versionedKV.Namespace, Key: versionedKV.Key, Value: versionedKV.Value}, nil
}

// Close implements method in interface ledger.ResultsIterator
func (itr *resultsItr) Close() {
itr.dbItr.Close()
}
95 changes: 87 additions & 8 deletions core/ledger/kvledger/channelinfo_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package kvledger

import (
"bytes"
"fmt"
"io/ioutil"
"os"
"testing"
Expand All @@ -16,16 +17,80 @@ import (
"github.com/hyperledger/fabric-config/protolator"
"github.com/hyperledger/fabric-protos-go/common"
cb "github.com/hyperledger/fabric-protos-go/common"
pb "github.com/hyperledger/fabric-protos-go/peer"
"github.com/hyperledger/fabric/common/channelconfig"
"github.com/hyperledger/fabric/common/configtx/test"
"github.com/hyperledger/fabric/common/ledger/blkstorage"
"github.com/hyperledger/fabric/common/ledger/testutil"
"github.com/hyperledger/fabric/common/metrics/disabled"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/mock"
"github.com/hyperledger/fabric/protoutil"
"github.com/stretchr/testify/require"
)

// TestGetAllMSPIDs verifies GetAllMSPIDs by adding and removing organizations to the channel config.
func TestNamespacesAndCollections(t *testing.T) {
channelName := "testnamespacesandcollections"
basePath, err := ioutil.TempDir("", "testchannelinfoprovider")
require.NoError(t, err)
defer os.RemoveAll(basePath)
blkStoreProvider, blkStore := openBlockStorage(t, channelName, basePath)
defer blkStoreProvider.Close()

// add genesis block and another config block so that we can retrieve MSPIDs
// 3 orgs/MSPIDs are added: SampleOrg/SampleOrg, org1/Org1MSP, org2/Org2MSP
genesisBlock, err := test.MakeGenesisBlock(channelName)
require.NoError(t, err)
require.NoError(t, blkStore.AddBlock(genesisBlock))
orgGroups := createTestOrgGroups(t)
config := getConfigFromBlock(genesisBlock)
config.ChannelGroup.Groups[channelconfig.ApplicationGroupKey].Groups["org1"] = orgGroups["org1"]
config.ChannelGroup.Groups[channelconfig.ApplicationGroupKey].Groups["org2"] = orgGroups["org2"]
configEnv := getEnvelopeFromConfig(channelName, config)
configBlock := newBlock([]*cb.Envelope{configEnv}, 1, 1, protoutil.BlockHeaderHash(genesisBlock.Header))
require.NoError(t, blkStore.AddBlock(configBlock))

// prepare fakeDeployedCCInfoProvider to create mocked test data
deployedccInfo := map[string]*ledger.DeployedChaincodeInfo{
"cc1": {
Name: "cc1",
Version: "version",
Hash: []byte("cc1-hash"),
ExplicitCollectionConfigPkg: prepareCollectionConfigPackage([]string{"collectionA", "collectionB"}),
},
"cc2": {
Name: "cc2",
Version: "version",
Hash: []byte("cc2-hash"),
},
}
fakeDeployedCCInfoProvider := &mock.DeployedChaincodeInfoProvider{}
fakeDeployedCCInfoProvider.NamespacesReturns([]string{"_lifecycle", "lscc"})
fakeDeployedCCInfoProvider.AllChaincodesInfoReturns(deployedccInfo, nil)
fakeDeployedCCInfoProvider.GenerateImplicitCollectionForOrgStub = func(mspID string) *pb.StaticCollectionConfig {
return &pb.StaticCollectionConfig{
Name: fmt.Sprintf("_implicit_org_%s", mspID),
}
}

// verify NamespacesAndCollections
channelInfoProvider := &channelInfoProvider{channelName, blkStore, fakeDeployedCCInfoProvider}
expectedNamespacesAndColls := map[string][]string{
"cc1": {"_implicit_org_Org1MSP", "_implicit_org_Org2MSP", "_implicit_org_SampleOrg", "collectionA", "collectionB"},
"cc2": {"_implicit_org_Org1MSP", "_implicit_org_Org2MSP", "_implicit_org_SampleOrg"},
"_lifecycle": {"_implicit_org_Org1MSP", "_implicit_org_Org2MSP", "_implicit_org_SampleOrg"},
"lscc": {},
"": {},
}
namespacesAndColls, err := channelInfoProvider.NamespacesAndCollections(nil)
require.NoError(t, err)
require.Equal(t, len(expectedNamespacesAndColls), len(namespacesAndColls))
for ns, colls := range expectedNamespacesAndColls {
require.ElementsMatch(t, colls, namespacesAndColls[ns])
}
}

// TestGetAllMSPIDs verifies getAllMSPIDs by adding and removing organizations to the channel config.
func TestGetAllMSPIDs(t *testing.T) {
channelName := "testgetallmspids"
basePath, err := ioutil.TempDir("", "testchannelinfoprovider")
Expand All @@ -34,7 +99,7 @@ func TestGetAllMSPIDs(t *testing.T) {

blkStoreProvider, blkStore := openBlockStorage(t, channelName, basePath)
defer blkStoreProvider.Close()
channelInfoProvider := &channelInfoProvider{channelName, blkStore}
channelInfoProvider := &channelInfoProvider{channelName, blkStore, nil}

var block *cb.Block
var configBlock *cb.Block
Expand Down Expand Up @@ -112,7 +177,7 @@ func TestGetAllMSPIDs_NegativeTests(t *testing.T) {

blkStoreProvider, blkStore := openBlockStorage(t, channelName, basePath)
defer blkStoreProvider.Close()
channelInfoProvider := &channelInfoProvider{channelName, blkStore}
channelInfoProvider := &channelInfoProvider{channelName, blkStore, nil}

var configBlock *cb.Block
var lastBlockNum = uint64(0)
Expand All @@ -128,15 +193,15 @@ func TestGetAllMSPIDs_NegativeTests(t *testing.T) {
lastConfigBlockNum++
configBlock = newBlock([]*cb.Envelope{}, lastBlockNum, lastConfigBlockNum, protoutil.BlockHeaderHash(configBlock.Header))
require.NoError(t, blkStore.AddBlock(configBlock))
_, err = channelInfoProvider.GetAllMSPIDs()
_, err = channelInfoProvider.getAllMSPIDs()
require.EqualError(t, err, "malformed configuration block: envelope index out of bounds")

// test RetrieveBlockByNumber error by using a non-existent block num for config block index
lastBlockNum++
lastConfigBlockNum++
configBlock = newBlock(nil, lastBlockNum, lastBlockNum+1, protoutil.BlockHeaderHash(configBlock.Header))
require.NoError(t, blkStore.AddBlock(configBlock))
_, err = channelInfoProvider.GetAllMSPIDs()
_, err = channelInfoProvider.getAllMSPIDs()
require.EqualError(t, err, "Entry not found in index")

// test GetLastConfigIndexFromBlock error by using invalid bytes for LastConfig metadata value
Expand All @@ -145,12 +210,12 @@ func TestGetAllMSPIDs_NegativeTests(t *testing.T) {
configBlock = newBlock(nil, lastBlockNum, lastConfigBlockNum, protoutil.BlockHeaderHash(configBlock.Header))
configBlock.Metadata.Metadata[cb.BlockMetadataIndex_SIGNATURES] = []byte("invalid_bytes")
require.NoError(t, blkStore.AddBlock(configBlock))
_, err = channelInfoProvider.GetAllMSPIDs()
_, err = channelInfoProvider.getAllMSPIDs()
require.EqualError(t, err, "failed to retrieve metadata: error unmarshaling metadata at index [SIGNATURES]: unexpected EOF")

// test RetrieveBlockByNumber error (before calling GetLastConfigIndexFromBlock) by closing block store provider
blkStoreProvider.Close()
_, err = channelInfoProvider.GetAllMSPIDs()
_, err = channelInfoProvider.getAllMSPIDs()
require.Contains(t, err.Error(), "leveldb: closed")
}

Expand All @@ -167,7 +232,7 @@ func openBlockStorage(t *testing.T, channelName string, basePath string) (*blkst
}

func verifyGetAllMSPIDs(t *testing.T, channelInfoProvider *channelInfoProvider, expectedMSPIDs []string) {
mspids, err := channelInfoProvider.GetAllMSPIDs()
mspids, err := channelInfoProvider.getAllMSPIDs()
require.NoError(t, err)
require.ElementsMatch(t, expectedMSPIDs, mspids)
}
Expand Down Expand Up @@ -230,3 +295,17 @@ func createTestOrgGroups(t *testing.T) map[string]*cb.ConfigGroup {
config := getConfigFromBlock(block)
return config.ChannelGroup.Groups[channelconfig.ApplicationGroupKey].Groups
}

func prepareCollectionConfigPackage(collNames []string) *pb.CollectionConfigPackage {
collConfigs := make([]*pb.CollectionConfig, len(collNames))
for i, name := range collNames {
collConfigs[i] = &pb.CollectionConfig{
Payload: &pb.CollectionConfig_StaticCollectionConfig{
StaticCollectionConfig: &pb.StaticCollectionConfig{
Name: name,
},
},
}
}
return &pb.CollectionConfigPackage{Config: collConfigs}
}
Loading

0 comments on commit 57a5baa

Please sign in to comment.