From d638b02bc01436d4596c536192a8ad196ba4f5f2 Mon Sep 17 00:00:00 2001 From: Wenjian Qiao Date: Tue, 19 May 2020 17:17:31 -0400 Subject: [PATCH] [FAB-17793] Generate db name to namespace mapping for state couchdb (#1268) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR generates namespace to namespaceDBInfo mapping for state couchdb and stores the mapping data in channel’s metadata db. Due to couchdb's length restriction on db names, the mapping is needed to drop all the databases for a channel as well as snapshot support. Signed-off-by: Wenjian Qiao --- .../statedb/statecouchdb/couchdbutil.go | 6 + .../statedb/statecouchdb/couchdoc_conv.go | 31 +++++ .../statedb/statecouchdb/statecouchdb.go | 60 ++++++++- .../statedb/statecouchdb/statecouchdb_test.go | 124 ++++++++++++++++++ 4 files changed, 220 insertions(+), 1 deletion(-) diff --git a/core/ledger/kvledger/txmgmt/statedb/statecouchdb/couchdbutil.go b/core/ledger/kvledger/txmgmt/statedb/statecouchdb/couchdbutil.go index 0c68fc67234..b43e778b207 100644 --- a/core/ledger/kvledger/txmgmt/statedb/statecouchdb/couchdbutil.go +++ b/core/ledger/kvledger/txmgmt/statedb/statecouchdb/couchdbutil.go @@ -183,6 +183,12 @@ func constructCouchDBUrl(connectURL *url.URL, dbName string, pathElements ...str // constructMetadataDBName truncates the db name to couchdb allowed length to // construct the metadataDBName +// Note: +// Currently there is a non-deterministic collision between metadataDB and namespaceDB with namespace="". +// When channel name is not truncated, metadataDB and namespaceDB with namespace="" have the same db name. +// When channel name is truncated, these two DBs have different db names. +// We have to deal with this behavior for now. In the future, we may rename metadataDB and +// migrate the content to avoid the collision. func constructMetadataDBName(dbName string) string { if len(dbName) > maxLength { untruncatedDBName := dbName diff --git a/core/ledger/kvledger/txmgmt/statedb/statecouchdb/couchdoc_conv.go b/core/ledger/kvledger/txmgmt/statedb/statecouchdb/couchdoc_conv.go index e428625b6ca..9dd9ca1739a 100644 --- a/core/ledger/kvledger/txmgmt/statedb/statecouchdb/couchdoc_conv.go +++ b/core/ledger/kvledger/txmgmt/statedb/statecouchdb/couchdoc_conv.go @@ -184,6 +184,17 @@ type couchSavepointData struct { TxNum uint64 `json:"TxNum"` } +type channelMetadata struct { + ChannelName string `json:"ChannelName"` + // namespace to namespaceDBInfo mapping + NamespaceDBsInfo map[string]*namespaceDBInfo `json:"NamespaceDBsInfo"` +} + +type namespaceDBInfo struct { + Namespace string `json:"Namespace"` + DBName string `json:"DBName"` +} + func encodeSavepoint(height *version.Height) (*couchDoc, error) { var err error var savepointDoc couchSavepointData @@ -209,6 +220,26 @@ func decodeSavepoint(couchDoc *couchDoc) (*version.Height, error) { return &version.Height{BlockNum: savepointDoc.BlockNum, TxNum: savepointDoc.TxNum}, nil } +func encodeChannelMetadata(metadataDoc *channelMetadata) (*couchDoc, error) { + metadataJSON, err := json.Marshal(metadataDoc) + if err != nil { + err = errors.Wrap(err, "failed to marshal channel metadata") + logger.Errorf("%+v", err) + return nil, err + } + return &couchDoc{jsonValue: metadataJSON, attachments: nil}, nil +} + +func decodeChannelMetadata(couchDoc *couchDoc) (*channelMetadata, error) { + metadataDoc := &channelMetadata{} + if err := json.Unmarshal(couchDoc.jsonValue, &metadataDoc); err != nil { + err = errors.Wrap(err, "failed to unmarshal channel metadata") + logger.Errorf("%+v", err) + return nil, err + } + return metadataDoc, nil +} + type dataformatInfo struct { Version string `json:"Version"` } diff --git a/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb.go b/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb.go index e149a61ae85..46a02828d46 100644 --- a/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb.go +++ b/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb.go @@ -27,6 +27,10 @@ var logger = flogging.MustGetLogger("statecouchdb") const ( // savepointDocID is used as a key for maintaining savepoint (maintained in metadatadb for a channel) savepointDocID = "statedb_savepoint" + // channelMetadataDocID is used as a key to store the channel metadata for a channel (maintained in the channel's metadatadb). + // Due to CouchDB's length restriction on db names, channel names and namepsaces may be truncated in db names. + // The metadata is used for dropping channel-specific databases and snapshot support. + channelMetadataDocID = "channel_metadata" // fabricInternalDBName is used to create a db in couch that would be used for internal data such as the version of the data format // a double underscore ensures that the dbname does not clash with the dbnames created for the chaincodes fabricInternalDBName = "fabric__internal" @@ -169,7 +173,8 @@ type VersionedDB struct { couchInstance *couchInstance metadataDB *couchDatabase // A database per channel to store metadata such as savepoint. chainName string // The name of the chain/channel. - namespaceDBs map[string]*couchDatabase // One database per deployed chaincode. + namespaceDBs map[string]*couchDatabase // One database per namespace. + channelMetadata *channelMetadata // Store channel name and namespaceDBInfo committedDataCache *versionsCache // Used as a local cache during bulk processing of a block. verCacheLock sync.RWMutex mux sync.RWMutex @@ -197,6 +202,21 @@ func newVersionedDB(couchInstance *couchInstance, redoLogger *redoLogger, dbName redoLogger: redoLogger, cache: cache, } + + vdb.channelMetadata, err = vdb.readChannelMetadata() + if err != nil { + return nil, err + } + if vdb.channelMetadata == nil { + vdb.channelMetadata = &channelMetadata{ + ChannelName: chainName, + NamespaceDBsInfo: make(map[string]*namespaceDBInfo), + } + if err = vdb.writeChannelMetadata(); err != nil { + return nil, err + } + } + logger.Debugf("chain [%s]: checking for redolog record", chainName) redologRecord, err := redoLogger.load() if err != nil { @@ -241,6 +261,16 @@ func (vdb *VersionedDB) getNamespaceDBHandle(namespace string) (*couchDatabase, db = vdb.namespaceDBs[namespace] if db == nil { var err error + if _, ok := vdb.channelMetadata.NamespaceDBsInfo[namespace]; !ok { + logger.Debugf("[%s] add namespaceDBInfo for namespace %s", vdb.chainName, namespace) + vdb.channelMetadata.NamespaceDBsInfo[namespace] = &namespaceDBInfo{ + Namespace: namespace, + DBName: namespaceDBName, + } + if err = vdb.writeChannelMetadata(); err != nil { + return nil, err + } + } db, err = createCouchDatabase(vdb.couchInstance, namespaceDBName) if err != nil { return nil, err @@ -690,6 +720,19 @@ func (vdb *VersionedDB) Close() { // no need to close db since a shared couch instance is used } +// writeChannelMetadata saves channel metadata to metadataDB +func (vdb *VersionedDB) writeChannelMetadata() error { + couchDoc, err := encodeChannelMetadata(vdb.channelMetadata) + if err != nil { + return err + } + if _, err := vdb.metadataDB.saveDoc(channelMetadataDocID, "", couchDoc); err != nil { + return err + } + _, err = vdb.metadataDB.ensureFullCommit() + return err +} + // ensureFullCommitAndRecordSavepoint flushes all the dbs (corresponding to `namespaces`) to disk // and Record a savepoint in the metadata db. // Couch parallelizes writes in cluster or sharded setup and ordering is not guaranteed. @@ -769,6 +812,21 @@ func (vdb *VersionedDB) GetLatestSavePoint() (*version.Height, error) { return decodeSavepoint(couchDoc) } +// readChannelMetadata returns channel metadata stored in metadataDB +func (vdb *VersionedDB) readChannelMetadata() (*channelMetadata, error) { + var err error + couchDoc, _, err := vdb.metadataDB.readDoc(channelMetadataDocID) + if err != nil { + logger.Errorf("Failed to read db name mapping data %s", err.Error()) + return nil, err + } + // ReadDoc() not found (404) will result in nil response, in these cases return nil + if couchDoc == nil || couchDoc.jsonValue == nil { + return nil, nil + } + return decodeChannelMetadata(couchDoc) +} + func (vdb *VersionedDB) GetFullScanIterator(skipNamespace func(string) bool) (statedb.FullScanIterator, byte, error) { return nil, byte(0), errors.New("Not yet implemented") } diff --git a/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb_test.go b/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb_test.go index 721c6dec997..5fb67531fd3 100644 --- a/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb_test.go +++ b/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb_test.go @@ -1365,3 +1365,127 @@ func TestMissingRevisionRetrievalFromCache(t *testing.T) { require.Equal(t, "rev1", revisions["key1"]) require.Equal(t, "rev2", revisions["key2"]) } + +func TestChannelMetadata(t *testing.T) { + vdbEnv.init(t, sysNamespaces) + defer vdbEnv.cleanup() + channelName := "testchannelmetadata" + + db, err := vdbEnv.DBProvider.GetDBHandle(channelName) + require.NoError(t, err) + vdb := db.(*VersionedDB) + expectedChannelMetadata := &channelMetadata{ + ChannelName: channelName, + NamespaceDBsInfo: make(map[string]*namespaceDBInfo), + } + savedChannelMetadata, err := vdb.readChannelMetadata() + require.NoError(t, err) + require.Equal(t, expectedChannelMetadata, savedChannelMetadata) + require.Equal(t, expectedChannelMetadata, vdb.channelMetadata) + + // call getNamespaceDBHandle for new dbs, verify that new db names are added to dbMetadataMapping + namepsaces := make([]string, 10) + for i := 0; i < 10; i++ { + ns := fmt.Sprintf("nsname_%d", i) + _, err := vdb.getNamespaceDBHandle(ns) + require.NoError(t, err) + namepsaces[i] = ns + expectedChannelMetadata.NamespaceDBsInfo[ns] = &namespaceDBInfo{ + Namespace: ns, + DBName: constructNamespaceDBName(channelName, ns), + } + } + + savedChannelMetadata, err = vdb.readChannelMetadata() + require.NoError(t, err) + require.Equal(t, expectedChannelMetadata, savedChannelMetadata) + require.Equal(t, expectedChannelMetadata, vdb.channelMetadata) + + // call getNamespaceDBHandle for existing dbs, verify that no new db names are added to dbMetadataMapping + for _, ns := range namepsaces { + _, err := vdb.getNamespaceDBHandle(ns) + require.NoError(t, err) + } + + savedChannelMetadata, err = vdb.readChannelMetadata() + require.NoError(t, err) + require.Equal(t, expectedChannelMetadata, savedChannelMetadata) + require.Equal(t, expectedChannelMetadata, vdb.channelMetadata) +} + +func TestChannelMetadata_NegativeTests(t *testing.T) { + vdbEnv.init(t, sysNamespaces) + defer vdbEnv.cleanup() + + channelName := "testchannelmetadata-errorpropagation" + origCouchAddress := vdbEnv.config.Address + vdbEnv.config.MaxRetries = 1 + vdbEnv.config.MaxRetriesOnStartup = 1 + vdbEnv.config.RequestTimeout = 1 * time.Second + + // simulate db connection error by setting an invalid address before GetDBHandle, verify error is propagated + vdbEnv.config.Address = "127.0.0.1:1" + expectedErrMsg := fmt.Sprintf("http error calling couchdb: Get \"http://%s/testchannelmetadata-errorpropagation_\": dial tcp %s: connect: connection refused", + vdbEnv.config.Address, vdbEnv.config.Address) + _, err := vdbEnv.DBProvider.GetDBHandle(channelName) + require.EqualError(t, err, expectedErrMsg) + vdbEnv.config.Address = origCouchAddress + + // simulate db connection error by setting an invalid address before getNamespaceDBHandle, verify error is propagated + db, err := vdbEnv.DBProvider.GetDBHandle(channelName) + require.NoError(t, err) + vdb := db.(*VersionedDB) + vdbEnv.config.Address = "127.0.0.1:1" + expectedErrMsg = fmt.Sprintf("http error calling couchdb: Put \"http://%s/testchannelmetadata-errorpropagation_/channel_metadata\": dial tcp %s: connect: connection refused", + vdbEnv.config.Address, vdbEnv.config.Address) + _, err = vdb.getNamespaceDBHandle("testnamepsace1") + require.EqualError(t, err, expectedErrMsg) + vdb.couchInstance.conf.Address = origCouchAddress + + // call createCouchDatabase to simulate peer crashes after metadataDB is created but before channelMetadata is updated + // then call DBProvider.GetDBHandle and verify channelMetadata is correctly generated + channelName = "testchannelmetadata-simulatefailure-inbetween" + couchInstance, err := createCouchInstance(vdbEnv.config, &disabled.Provider{}) + metadatadbName := constructMetadataDBName(channelName) + metadataDB, err := createCouchDatabase(couchInstance, metadatadbName) + vdb = &VersionedDB{ + metadataDB: metadataDB, + } + savedChannelMetadata, err := vdb.readChannelMetadata() + require.NoError(t, err) + require.Nil(t, savedChannelMetadata) + + db, err = vdbEnv.DBProvider.GetDBHandle(channelName) + require.NoError(t, err) + vdb = db.(*VersionedDB) + expectedChannelMetadata := &channelMetadata{ + ChannelName: channelName, + NamespaceDBsInfo: make(map[string]*namespaceDBInfo), + } + savedChannelMetadata, err = vdb.readChannelMetadata() + require.NoError(t, err) + require.Equal(t, expectedChannelMetadata, savedChannelMetadata) + require.Equal(t, expectedChannelMetadata, vdb.channelMetadata) + + // call writeChannelMetadata to simulate peer crashes after channelMetada is saved but before namespace DB is created + // then call vdb.getNamespaceDBHandle and verify namespaceDB is created and channelMetadata is correct + namespace := "testnamepsace2" + namespaceDBName := constructNamespaceDBName(channelName, namespace) + vdb.channelMetadata.NamespaceDBsInfo[namespace] = &namespaceDBInfo{Namespace: namespace, DBName: namespaceDBName} + err = vdb.writeChannelMetadata() + require.NoError(t, err) + expectedChannelMetadata.NamespaceDBsInfo = map[string]*namespaceDBInfo{ + namespace: {Namespace: namespace, DBName: namespaceDBName}, + } + savedChannelMetadata, err = vdb.readChannelMetadata() + require.NoError(t, err) + require.Equal(t, expectedChannelMetadata, savedChannelMetadata) + require.Equal(t, expectedChannelMetadata, vdb.channelMetadata) + + _, err = vdb.getNamespaceDBHandle(namespace) + require.NoError(t, err) + savedChannelMetadata, err = vdb.readChannelMetadata() + require.NoError(t, err) + require.Equal(t, expectedChannelMetadata, savedChannelMetadata) + require.Equal(t, expectedChannelMetadata, vdb.channelMetadata) +}