Skip to content

Commit

Permalink
[FAB-17793] Generate db name to namespace mapping for state couchdb (#…
Browse files Browse the repository at this point in the history
…1268)

This PR generates namespace to namespaceDBInfo mapping for state couchdb and
stores the mapping data in channel’s metadata db. Due to couchdb's
length restriction on db names, the mapping is needed to drop all
the databases for a channel as well as snapshot support.

Signed-off-by: Wenjian Qiao <wenjianq@gmail.com>
  • Loading branch information
wenjianqiao authored May 19, 2020
1 parent 80e1fa1 commit d638b02
Show file tree
Hide file tree
Showing 4 changed files with 220 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,12 @@ func constructCouchDBUrl(connectURL *url.URL, dbName string, pathElements ...str

// constructMetadataDBName truncates the db name to couchdb allowed length to
// construct the metadataDBName
// Note:
// Currently there is a non-deterministic collision between metadataDB and namespaceDB with namespace="".
// When channel name is not truncated, metadataDB and namespaceDB with namespace="" have the same db name.
// When channel name is truncated, these two DBs have different db names.
// We have to deal with this behavior for now. In the future, we may rename metadataDB and
// migrate the content to avoid the collision.
func constructMetadataDBName(dbName string) string {
if len(dbName) > maxLength {
untruncatedDBName := dbName
Expand Down
31 changes: 31 additions & 0 deletions core/ledger/kvledger/txmgmt/statedb/statecouchdb/couchdoc_conv.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,17 @@ type couchSavepointData struct {
TxNum uint64 `json:"TxNum"`
}

type channelMetadata struct {
ChannelName string `json:"ChannelName"`
// namespace to namespaceDBInfo mapping
NamespaceDBsInfo map[string]*namespaceDBInfo `json:"NamespaceDBsInfo"`
}

type namespaceDBInfo struct {
Namespace string `json:"Namespace"`
DBName string `json:"DBName"`
}

func encodeSavepoint(height *version.Height) (*couchDoc, error) {
var err error
var savepointDoc couchSavepointData
Expand All @@ -209,6 +220,26 @@ func decodeSavepoint(couchDoc *couchDoc) (*version.Height, error) {
return &version.Height{BlockNum: savepointDoc.BlockNum, TxNum: savepointDoc.TxNum}, nil
}

func encodeChannelMetadata(metadataDoc *channelMetadata) (*couchDoc, error) {
metadataJSON, err := json.Marshal(metadataDoc)
if err != nil {
err = errors.Wrap(err, "failed to marshal channel metadata")
logger.Errorf("%+v", err)
return nil, err
}
return &couchDoc{jsonValue: metadataJSON, attachments: nil}, nil
}

func decodeChannelMetadata(couchDoc *couchDoc) (*channelMetadata, error) {
metadataDoc := &channelMetadata{}
if err := json.Unmarshal(couchDoc.jsonValue, &metadataDoc); err != nil {
err = errors.Wrap(err, "failed to unmarshal channel metadata")
logger.Errorf("%+v", err)
return nil, err
}
return metadataDoc, nil
}

type dataformatInfo struct {
Version string `json:"Version"`
}
Expand Down
60 changes: 59 additions & 1 deletion core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ var logger = flogging.MustGetLogger("statecouchdb")
const (
// savepointDocID is used as a key for maintaining savepoint (maintained in metadatadb for a channel)
savepointDocID = "statedb_savepoint"
// channelMetadataDocID is used as a key to store the channel metadata for a channel (maintained in the channel's metadatadb).
// Due to CouchDB's length restriction on db names, channel names and namepsaces may be truncated in db names.
// The metadata is used for dropping channel-specific databases and snapshot support.
channelMetadataDocID = "channel_metadata"
// fabricInternalDBName is used to create a db in couch that would be used for internal data such as the version of the data format
// a double underscore ensures that the dbname does not clash with the dbnames created for the chaincodes
fabricInternalDBName = "fabric__internal"
Expand Down Expand Up @@ -169,7 +173,8 @@ type VersionedDB struct {
couchInstance *couchInstance
metadataDB *couchDatabase // A database per channel to store metadata such as savepoint.
chainName string // The name of the chain/channel.
namespaceDBs map[string]*couchDatabase // One database per deployed chaincode.
namespaceDBs map[string]*couchDatabase // One database per namespace.
channelMetadata *channelMetadata // Store channel name and namespaceDBInfo
committedDataCache *versionsCache // Used as a local cache during bulk processing of a block.
verCacheLock sync.RWMutex
mux sync.RWMutex
Expand Down Expand Up @@ -197,6 +202,21 @@ func newVersionedDB(couchInstance *couchInstance, redoLogger *redoLogger, dbName
redoLogger: redoLogger,
cache: cache,
}

vdb.channelMetadata, err = vdb.readChannelMetadata()
if err != nil {
return nil, err
}
if vdb.channelMetadata == nil {
vdb.channelMetadata = &channelMetadata{
ChannelName: chainName,
NamespaceDBsInfo: make(map[string]*namespaceDBInfo),
}
if err = vdb.writeChannelMetadata(); err != nil {
return nil, err
}
}

logger.Debugf("chain [%s]: checking for redolog record", chainName)
redologRecord, err := redoLogger.load()
if err != nil {
Expand Down Expand Up @@ -241,6 +261,16 @@ func (vdb *VersionedDB) getNamespaceDBHandle(namespace string) (*couchDatabase,
db = vdb.namespaceDBs[namespace]
if db == nil {
var err error
if _, ok := vdb.channelMetadata.NamespaceDBsInfo[namespace]; !ok {
logger.Debugf("[%s] add namespaceDBInfo for namespace %s", vdb.chainName, namespace)
vdb.channelMetadata.NamespaceDBsInfo[namespace] = &namespaceDBInfo{
Namespace: namespace,
DBName: namespaceDBName,
}
if err = vdb.writeChannelMetadata(); err != nil {
return nil, err
}
}
db, err = createCouchDatabase(vdb.couchInstance, namespaceDBName)
if err != nil {
return nil, err
Expand Down Expand Up @@ -690,6 +720,19 @@ func (vdb *VersionedDB) Close() {
// no need to close db since a shared couch instance is used
}

// writeChannelMetadata saves channel metadata to metadataDB
func (vdb *VersionedDB) writeChannelMetadata() error {
couchDoc, err := encodeChannelMetadata(vdb.channelMetadata)
if err != nil {
return err
}
if _, err := vdb.metadataDB.saveDoc(channelMetadataDocID, "", couchDoc); err != nil {
return err
}
_, err = vdb.metadataDB.ensureFullCommit()
return err
}

// ensureFullCommitAndRecordSavepoint flushes all the dbs (corresponding to `namespaces`) to disk
// and Record a savepoint in the metadata db.
// Couch parallelizes writes in cluster or sharded setup and ordering is not guaranteed.
Expand Down Expand Up @@ -769,6 +812,21 @@ func (vdb *VersionedDB) GetLatestSavePoint() (*version.Height, error) {
return decodeSavepoint(couchDoc)
}

// readChannelMetadata returns channel metadata stored in metadataDB
func (vdb *VersionedDB) readChannelMetadata() (*channelMetadata, error) {
var err error
couchDoc, _, err := vdb.metadataDB.readDoc(channelMetadataDocID)
if err != nil {
logger.Errorf("Failed to read db name mapping data %s", err.Error())
return nil, err
}
// ReadDoc() not found (404) will result in nil response, in these cases return nil
if couchDoc == nil || couchDoc.jsonValue == nil {
return nil, nil
}
return decodeChannelMetadata(couchDoc)
}

func (vdb *VersionedDB) GetFullScanIterator(skipNamespace func(string) bool) (statedb.FullScanIterator, byte, error) {
return nil, byte(0), errors.New("Not yet implemented")
}
Expand Down
124 changes: 124 additions & 0 deletions core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1365,3 +1365,127 @@ func TestMissingRevisionRetrievalFromCache(t *testing.T) {
require.Equal(t, "rev1", revisions["key1"])
require.Equal(t, "rev2", revisions["key2"])
}

func TestChannelMetadata(t *testing.T) {
vdbEnv.init(t, sysNamespaces)
defer vdbEnv.cleanup()
channelName := "testchannelmetadata"

db, err := vdbEnv.DBProvider.GetDBHandle(channelName)
require.NoError(t, err)
vdb := db.(*VersionedDB)
expectedChannelMetadata := &channelMetadata{
ChannelName: channelName,
NamespaceDBsInfo: make(map[string]*namespaceDBInfo),
}
savedChannelMetadata, err := vdb.readChannelMetadata()
require.NoError(t, err)
require.Equal(t, expectedChannelMetadata, savedChannelMetadata)
require.Equal(t, expectedChannelMetadata, vdb.channelMetadata)

// call getNamespaceDBHandle for new dbs, verify that new db names are added to dbMetadataMapping
namepsaces := make([]string, 10)
for i := 0; i < 10; i++ {
ns := fmt.Sprintf("nsname_%d", i)
_, err := vdb.getNamespaceDBHandle(ns)
require.NoError(t, err)
namepsaces[i] = ns
expectedChannelMetadata.NamespaceDBsInfo[ns] = &namespaceDBInfo{
Namespace: ns,
DBName: constructNamespaceDBName(channelName, ns),
}
}

savedChannelMetadata, err = vdb.readChannelMetadata()
require.NoError(t, err)
require.Equal(t, expectedChannelMetadata, savedChannelMetadata)
require.Equal(t, expectedChannelMetadata, vdb.channelMetadata)

// call getNamespaceDBHandle for existing dbs, verify that no new db names are added to dbMetadataMapping
for _, ns := range namepsaces {
_, err := vdb.getNamespaceDBHandle(ns)
require.NoError(t, err)
}

savedChannelMetadata, err = vdb.readChannelMetadata()
require.NoError(t, err)
require.Equal(t, expectedChannelMetadata, savedChannelMetadata)
require.Equal(t, expectedChannelMetadata, vdb.channelMetadata)
}

func TestChannelMetadata_NegativeTests(t *testing.T) {
vdbEnv.init(t, sysNamespaces)
defer vdbEnv.cleanup()

channelName := "testchannelmetadata-errorpropagation"
origCouchAddress := vdbEnv.config.Address
vdbEnv.config.MaxRetries = 1
vdbEnv.config.MaxRetriesOnStartup = 1
vdbEnv.config.RequestTimeout = 1 * time.Second

// simulate db connection error by setting an invalid address before GetDBHandle, verify error is propagated
vdbEnv.config.Address = "127.0.0.1:1"
expectedErrMsg := fmt.Sprintf("http error calling couchdb: Get \"http://%s/testchannelmetadata-errorpropagation_\": dial tcp %s: connect: connection refused",
vdbEnv.config.Address, vdbEnv.config.Address)
_, err := vdbEnv.DBProvider.GetDBHandle(channelName)
require.EqualError(t, err, expectedErrMsg)
vdbEnv.config.Address = origCouchAddress

// simulate db connection error by setting an invalid address before getNamespaceDBHandle, verify error is propagated
db, err := vdbEnv.DBProvider.GetDBHandle(channelName)
require.NoError(t, err)
vdb := db.(*VersionedDB)
vdbEnv.config.Address = "127.0.0.1:1"
expectedErrMsg = fmt.Sprintf("http error calling couchdb: Put \"http://%s/testchannelmetadata-errorpropagation_/channel_metadata\": dial tcp %s: connect: connection refused",
vdbEnv.config.Address, vdbEnv.config.Address)
_, err = vdb.getNamespaceDBHandle("testnamepsace1")
require.EqualError(t, err, expectedErrMsg)
vdb.couchInstance.conf.Address = origCouchAddress

// call createCouchDatabase to simulate peer crashes after metadataDB is created but before channelMetadata is updated
// then call DBProvider.GetDBHandle and verify channelMetadata is correctly generated
channelName = "testchannelmetadata-simulatefailure-inbetween"
couchInstance, err := createCouchInstance(vdbEnv.config, &disabled.Provider{})
metadatadbName := constructMetadataDBName(channelName)
metadataDB, err := createCouchDatabase(couchInstance, metadatadbName)
vdb = &VersionedDB{
metadataDB: metadataDB,
}
savedChannelMetadata, err := vdb.readChannelMetadata()
require.NoError(t, err)
require.Nil(t, savedChannelMetadata)

db, err = vdbEnv.DBProvider.GetDBHandle(channelName)
require.NoError(t, err)
vdb = db.(*VersionedDB)
expectedChannelMetadata := &channelMetadata{
ChannelName: channelName,
NamespaceDBsInfo: make(map[string]*namespaceDBInfo),
}
savedChannelMetadata, err = vdb.readChannelMetadata()
require.NoError(t, err)
require.Equal(t, expectedChannelMetadata, savedChannelMetadata)
require.Equal(t, expectedChannelMetadata, vdb.channelMetadata)

// call writeChannelMetadata to simulate peer crashes after channelMetada is saved but before namespace DB is created
// then call vdb.getNamespaceDBHandle and verify namespaceDB is created and channelMetadata is correct
namespace := "testnamepsace2"
namespaceDBName := constructNamespaceDBName(channelName, namespace)
vdb.channelMetadata.NamespaceDBsInfo[namespace] = &namespaceDBInfo{Namespace: namespace, DBName: namespaceDBName}
err = vdb.writeChannelMetadata()
require.NoError(t, err)
expectedChannelMetadata.NamespaceDBsInfo = map[string]*namespaceDBInfo{
namespace: {Namespace: namespace, DBName: namespaceDBName},
}
savedChannelMetadata, err = vdb.readChannelMetadata()
require.NoError(t, err)
require.Equal(t, expectedChannelMetadata, savedChannelMetadata)
require.Equal(t, expectedChannelMetadata, vdb.channelMetadata)

_, err = vdb.getNamespaceDBHandle(namespace)
require.NoError(t, err)
savedChannelMetadata, err = vdb.readChannelMetadata()
require.NoError(t, err)
require.Equal(t, expectedChannelMetadata, savedChannelMetadata)
require.Equal(t, expectedChannelMetadata, vdb.channelMetadata)
}

0 comments on commit d638b02

Please sign in to comment.