From 6cbebf4a3b7088f3b4647c4794b25dca79e20778 Mon Sep 17 00:00:00 2001 From: senthil <cendhu@gmail.com> Date: Tue, 12 Dec 2017 21:22:48 +0530 Subject: [PATCH] [FAB-7131] CouchDB: per channel-chaincode DB For every chaincode, use a separate database instead of using the same channel database. This CR makes the following changes: (i) creates a database per channel_chaincode and two DBs per channel_chaincode_collection (one for hashed data and another for actual private data). (ii) modifies GetState, RangeQuery, and ExecuteQuery implementation to use appropriate chaincode DB as per the given namespace. (iii) modifies commit process (ApplyUpdates) to apply changes to respective chaincode DBs. In the subsequent CR, for loadCommittedVersions and ApplyUpdates, goroutines would be used to process each chaincode batches parallely. Change-Id: I05b5f70e6c253cec683382c9e5b2b39b6a6bcce8 Signed-off-by: senthil <cendhu@gmail.com> --- .../privacyenabledstate/common_storage_db.go | 2 +- .../statedb/statecouchdb/statecouchdb.go | 374 +++++++++++------- .../statedb/statecouchdb/statecouchdb_test.go | 13 - .../txmgmt/txmgr/lockbasedtxmgr/txmgr_test.go | 8 +- core/ledger/util/couchdb/couchdbutil.go | 15 +- core/ledger/util/couchdb/couchdbutil_test.go | 4 +- 6 files changed, 243 insertions(+), 173 deletions(-) diff --git a/core/ledger/kvledger/txmgmt/privacyenabledstate/common_storage_db.go b/core/ledger/kvledger/txmgmt/privacyenabledstate/common_storage_db.go index fd72bf28387..ce9aa2f9742 100644 --- a/core/ledger/kvledger/txmgmt/privacyenabledstate/common_storage_db.go +++ b/core/ledger/kvledger/txmgmt/privacyenabledstate/common_storage_db.go @@ -18,7 +18,7 @@ import ( ) const ( - nsJoiner = "/" + nsJoiner = "$" pvtDataPrefix = "p" hashDataPrefix = "h" ) diff --git a/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb.go b/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb.go index 115337baefd..19900c1748e 100644 --- a/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb.go +++ b/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb.go @@ -25,9 +25,6 @@ import ( var logger = flogging.MustGetLogger("statecouchdb") -var compositeKeySep = []byte{0x00} -var lastKeyIndicator = byte(0x01) - var binaryWrapper = "valueBytes" // querySkip is implemented for future use by query paging @@ -72,6 +69,7 @@ func NewVersionedDBProvider() (*VersionedDBProvider, error) { // GetDBHandle gets the handle to a named database func (provider *VersionedDBProvider) GetDBHandle(dbName string) (statedb.VersionedDB, error) { + provider.mux.Lock() defer provider.mux.Unlock() @@ -94,24 +92,59 @@ func (provider *VersionedDBProvider) Close() { // VersionedDB implements VersionedDB interface type VersionedDB struct { - db *couchdb.CouchDatabase - dbName string + couchInstance *couchdb.CouchInstance + metadataDB *couchdb.CouchDatabase // A database per channel to store metadata such as savepoint. + dbName string // The name of the channel database. + namespaceDBs map[string]*couchdb.CouchDatabase // One database per deployed chaincode. + //TODO: Decide whether to split committedDataCache into multiple cahces, i.e., one per namespace. committedDataCache *CommittedVersions // Used as a local cache during bulk processing of a block. + mux sync.RWMutex } // newVersionedDB constructs an instance of VersionedDB func newVersionedDB(couchInstance *couchdb.CouchInstance, dbName string) (*VersionedDB, error) { // CreateCouchDatabase creates a CouchDB database object, as well as the underlying database if it does not exist - db, err := couchdb.CreateCouchDatabase(*couchInstance, dbName) + + dbName = dbName + "_" + metadataDB, err := couchdb.CreateCouchDatabase(*couchInstance, dbName) if err != nil { return nil, err } versionMap := make(map[statedb.CompositeKey]*version.Height) revMap := make(map[statedb.CompositeKey]string) + namespaceDBMap := make(map[string]*couchdb.CouchDatabase) committedDataCache := &CommittedVersions{committedVersions: versionMap, revisionNumbers: revMap} - return &VersionedDB{db, dbName, committedDataCache}, nil + return &VersionedDB{couchInstance, metadataDB, dbName, namespaceDBMap, committedDataCache, sync.RWMutex{}}, nil +} + +// getNamespaceDBHandle gets the handle to a named chaincode database +func (vdb *VersionedDB) getNamespaceDBHandle(namespace string) (*couchdb.CouchDatabase, error) { + + // TODO: lower casing the namespace will be handled more appropriately when + // we address the additional name mapping logic specified in FAB-7130. + namespaceDBName := vdb.dbName + strings.ToLower(namespace) + vdb.mux.RLock() + db := vdb.namespaceDBs[namespaceDBName] + vdb.mux.RUnlock() + + if db != nil { + return db, nil + } + + vdb.mux.Lock() + defer vdb.mux.Unlock() + db = vdb.namespaceDBs[namespaceDBName] + if db == nil { + var err error + db, err = couchdb.CreateCouchDatabase(*vdb.couchInstance, namespaceDBName) + if err != nil { + return nil, err + } + vdb.namespaceDBs[namespaceDBName] = db + } + return db, nil } // Open implements method in VersionedDB interface @@ -142,9 +175,11 @@ func (vdb *VersionedDB) BytesKeySuppoted() bool { func (vdb *VersionedDB) GetState(namespace string, key string) (*statedb.VersionedValue, error) { logger.Debugf("GetState(). ns=%s, key=%s", namespace, key) - compositeKey := constructCompositeKey(namespace, key) - - couchDoc, _, err := vdb.db.ReadDoc(string(compositeKey)) + db, err := vdb.getNamespaceDBHandle(namespace) + if err != nil { + return nil, err + } + couchDoc, _, err := db.ReadDoc(key) if err != nil { return nil, err } @@ -184,8 +219,12 @@ func (vdb *VersionedDB) GetVersion(namespace string, key string) (*version.Heigh // If the version was not found in the committed data cache, retrieve it from statedb. if !keyFound { - couchDBCompositeKey := constructCompositeKey(namespace, key) - couchDoc, _, err := vdb.db.ReadDoc(string(couchDBCompositeKey)) + db, err := vdb.getNamespaceDBHandle(namespace) + if err != nil { + return nil, err + } + couchDoc, _, err := db.ReadDoc(key) + if err != nil { return nil, err } @@ -268,12 +307,12 @@ func (vdb *VersionedDB) GetStateRangeScanIterator(namespace string, startKey str // Get the querylimit from core.yaml queryLimit := ledgerconfig.GetQueryLimit() - compositeStartKey := constructCompositeKey(namespace, startKey) - compositeEndKey := constructCompositeKey(namespace, endKey) - if endKey == "" { - compositeEndKey[len(compositeEndKey)-1] = lastKeyIndicator + db, err := vdb.getNamespaceDBHandle(namespace) + if err != nil { + return nil, err } - queryResult, err := vdb.db.ReadDocRange(string(compositeStartKey), string(compositeEndKey), queryLimit, querySkip) + + queryResult, err := db.ReadDocRange(startKey, endKey, queryLimit, querySkip) if err != nil { logger.Debugf("Error calling ReadDocRange(): %s\n", err.Error()) return nil, err @@ -289,19 +328,25 @@ func (vdb *VersionedDB) ExecuteQuery(namespace, query string) (statedb.ResultsIt // Get the querylimit from core.yaml queryLimit := ledgerconfig.GetQueryLimit() + // TODO: Remove namespace from wrapper query queryString, err := ApplyQueryWrapper(namespace, query, queryLimit, 0) if err != nil { logger.Debugf("Error calling ApplyQueryWrapper(): %s\n", err.Error()) return nil, err } - queryResult, err := vdb.db.QueryDocuments(queryString) + db, err := vdb.getNamespaceDBHandle(namespace) + if err != nil { + return nil, err + } + queryResult, err := db.QueryDocuments(queryString) if err != nil { logger.Debugf("Error calling QueryDocuments(): %s\n", err.Error()) return nil, err } + logger.Debugf("Exiting ExecuteQuery") - return newQueryScanner(*queryResult), nil + return newQueryScanner(namespace, *queryResult), nil } // ApplyUpdates implements method in VersionedDB interface @@ -326,6 +371,8 @@ func (vdb *VersionedDB) ApplyUpdates(batch *statedb.UpdateBatch, height *version // using the max batch size namespaces := batch.GetUpdatedNamespaces() for _, ns := range namespaces { + // TODO: Need to run the following code block in a goroutine so that each chaincode batch + // can be processed and committed parallely. nsUpdates := batch.GetUpdates(ns) for k, vv := range nsUpdates { @@ -372,22 +419,27 @@ func (vdb *VersionedDB) ApplyUpdates(batch *statedb.UpdateBatch, height *version // reset the missing key list missingKeys = []*statedb.CompositeKey{} - } + } + //STEP 3: PROCESS ANY REMAINING DOCUMENTS + err := vdb.processUpdateBatch(processBatch, missingKeys) + if err != nil { + return err } - } - //STEP 3: PROCESS ANY REMAINING DOCUMENTS - err := vdb.processUpdateBatch(processBatch, missingKeys) - if err != nil { - return err - } + //reset the batch size counter + batchSizeCounter = 0 - // STEP 4: IF THERE WAS SUCCESS UPDATING COUCHDB, THEN RECORD A SAVEPOINT FOR THIS BLOCK HEIGHT + //create a new process batch + processBatch = statedb.NewUpdateBatch() + + // reset the missing key list + missingKeys = []*statedb.CompositeKey{} + } // Record a savepoint at a given height - err = vdb.recordSavepoint(height) + err := vdb.recordSavepoint(height, namespaces) if err != nil { logger.Errorf("Error during recordSavepoint: %s\n", err.Error()) return err @@ -419,11 +471,13 @@ func (vdb *VersionedDB) processUpdateBatch(updateBatch *statedb.UpdateBatch, mis // this will be used in case there are retries required batchUpdateMap := make(map[string]*BatchableDocument) + //TODO: processUpdateBatch is called with updateBatch of a single namespace/chaincode at a time. + // Hence, retrieving namespaces from updateBatch and looping over it is not required. Need to remove + // only the outer for loop. namespaces := updateBatch.GetUpdatedNamespaces() for _, ns := range namespaces { nsUpdates := updateBatch.GetUpdates(ns) - for k, vv := range nsUpdates { - compositeKey := constructCompositeKey(ns, k) + for key, vv := range nsUpdates { // Create a document structure couchDoc := &couchdb.CouchDoc{} @@ -431,25 +485,28 @@ func (vdb *VersionedDB) processUpdateBatch(updateBatch *statedb.UpdateBatch, mis // retrieve the couchdb revision from the cache // Documents that do not exist in couchdb will not have revision numbers and will // exist in the cache with a revision value of nil - revision := vdb.committedDataCache.revisionNumbers[statedb.CompositeKey{Namespace: ns, Key: k}] + revision := vdb.committedDataCache.revisionNumbers[statedb.CompositeKey{Namespace: ns, Key: key}] var isDelete bool // initialized to false if vv.Value == nil { isDelete = true } - logger.Debugf("Channel [%s]: key(string)=[%s] key(bytes)=[%#v], prior revision=[%s], isDelete=[%t]", - vdb.dbName, string(compositeKey), compositeKey, revision, isDelete) + logger.Debugf("Channel [%s]: namespace=[%s] key=[%#v], prior revision=[%s], isDelete=[%t]", + vdb.dbName, ns, key, revision, isDelete) if isDelete { // this is a deleted record. Set the _deleted property to true - couchDoc.JSONValue = createCouchdbDocJSON(string(compositeKey), revision, nil, ns, vv.Version, true) + //couchDoc.JSONValue = createCouchdbDocJSON(string(compositeKey), revision, nil, ns, vv.Version, true) + //TODO: Remove ns/chaincodeID from json doc + couchDoc.JSONValue = createCouchdbDocJSON(key, revision, nil, ns, vv.Version, true) } else { if couchdb.IsJSON(string(vv.Value)) { // Handle as json - couchDoc.JSONValue = createCouchdbDocJSON(string(compositeKey), revision, vv.Value, ns, vv.Version, false) + //TODO: Remove ns from json doc + couchDoc.JSONValue = createCouchdbDocJSON(key, revision, vv.Value, ns, vv.Version, false) } else { // if value is not json, handle as a couchdb attachment @@ -460,86 +517,94 @@ func (vdb *VersionedDB) processUpdateBatch(updateBatch *statedb.UpdateBatch, mis attachments := append([]*couchdb.AttachmentInfo{}, attachment) couchDoc.Attachments = attachments - couchDoc.JSONValue = createCouchdbDocJSON(string(compositeKey), revision, nil, ns, vv.Version, false) + //TODO: Remove ns from json doc + couchDoc.JSONValue = createCouchdbDocJSON(key, revision, nil, ns, vv.Version, false) } } // Add the current docment, revision and delete flag to the update map - batchUpdateMap[string(compositeKey)] = &BatchableDocument{CouchDoc: *couchDoc, Deleted: isDelete} + batchUpdateMap[key] = &BatchableDocument{CouchDoc: *couchDoc, Deleted: isDelete} } - } - if len(batchUpdateMap) > 0 { + //TODO: Run the following in a goroutine so that commit on each namespaceDB can happen parallely + if len(batchUpdateMap) > 0 { - //Add the documents to the batch update array - batchUpdateDocs := []*couchdb.CouchDoc{} - for _, updateDocument := range batchUpdateMap { - batchUpdateDocument := updateDocument - batchUpdateDocs = append(batchUpdateDocs, &batchUpdateDocument.CouchDoc) - } + //Add the documents to the batch update array + batchUpdateDocs := []*couchdb.CouchDoc{} + for _, updateDocument := range batchUpdateMap { + batchUpdateDocument := updateDocument + batchUpdateDocs = append(batchUpdateDocs, &batchUpdateDocument.CouchDoc) + } - // Do the bulk update into couchdb - // Note that this will do retries if the entire bulk update fails or times out - batchUpdateResp, err := vdb.db.BatchUpdateDocuments(batchUpdateDocs) - if err != nil { - return err - } + // Do the bulk update into couchdb + // Note that this will do retries if the entire bulk update fails or times out - // STEP 2: IF INDIVIDUAL DOCUMENTS IN THE BULK UPDATE DID NOT SUCCEED, TRY THEM INDIVIDUALLY + db, err := vdb.getNamespaceDBHandle(ns) + if err != nil { + return err + } + batchUpdateResp, err := db.BatchUpdateDocuments(batchUpdateDocs) + if err != nil { + return err + } - // iterate through the response from CouchDB by document - for _, respDoc := range batchUpdateResp { + // STEP 2: IF INDIVIDUAL DOCUMENTS IN THE BULK UPDATE DID NOT SUCCEED, TRY THEM INDIVIDUALLY - // If the document returned an error, retry the individual document - if respDoc.Ok != true { + // iterate through the response from CouchDB by document + for _, respDoc := range batchUpdateResp { - batchUpdateDocument := batchUpdateMap[respDoc.ID] + // If the document returned an error, retry the individual document + if respDoc.Ok != true { - var err error + batchUpdateDocument := batchUpdateMap[respDoc.ID] - //Remove the "_rev" from the JSON before saving - //this will allow the CouchDB retry logic to retry revisions without encountering - //a mismatch between the "If-Match" and the "_rev" tag in the JSON - if batchUpdateDocument.CouchDoc.JSONValue != nil { - err = removeJSONRevision(&batchUpdateDocument.CouchDoc.JSONValue) - if err != nil { - return err + var err error + + //Remove the "_rev" from the JSON before saving + //this will allow the CouchDB retry logic to retry revisions without encountering + //a mismatch between the "If-Match" and the "_rev" tag in the JSON + if batchUpdateDocument.CouchDoc.JSONValue != nil { + err = removeJSONRevision(&batchUpdateDocument.CouchDoc.JSONValue) + if err != nil { + return err + } } - } - // Check to see if the document was added to the batch as a delete type document - if batchUpdateDocument.Deleted { + // Check to see if the document was added to the batch as a delete type document + if batchUpdateDocument.Deleted { - //Log the warning message that a retry is being attempted for batch delete issue - logger.Warningf("CouchDB batch document delete encountered an problem. Retrying delete for document ID:%s", respDoc.ID) + //Log the warning message that a retry is being attempted for batch delete issue + logger.Warningf("CouchDB batch document delete encountered an problem. Retrying delete for document ID:%s", respDoc.ID) - // If this is a deleted document, then retry the delete - // If the delete fails due to a document not being found (404 error), - // the document has already been deleted and the DeleteDoc will not return an error - err = vdb.db.DeleteDoc(respDoc.ID, "") - } else { + // If this is a deleted document, then retry the delete + // If the delete fails due to a document not being found (404 error), + // the document has already been deleted and the DeleteDoc will not return an error + err = db.DeleteDoc(respDoc.ID, "") + } else { - //Log the warning message that a retry is being attempted for batch update issue - logger.Warningf("CouchDB batch document update encountered an problem. Retrying update for document ID:%s", respDoc.ID) + //Log the warning message that a retry is being attempted for batch update issue + logger.Warningf("CouchDB batch document update encountered an problem. Retrying update for document ID:%s", respDoc.ID) - // Save the individual document to couchdb - // Note that this will do retries as needed - _, err = vdb.db.SaveDoc(respDoc.ID, "", &batchUpdateDocument.CouchDoc) - } + // Save the individual document to couchdb + // Note that this will do retries as needed + _, err = db.SaveDoc(respDoc.ID, "", &batchUpdateDocument.CouchDoc) + } - // If the single document update or delete returns an error, then throw the error - if err != nil { + // If the single document update or delete returns an error, then throw the error + if err != nil { - errorString := fmt.Sprintf("Error occurred while saving document ID = %v Error: %s Reason: %s\n", - respDoc.ID, respDoc.Error, respDoc.Reason) + errorString := fmt.Sprintf("Error occurred while saving document ID = %v Error: %s Reason: %s\n", + respDoc.ID, respDoc.Error, respDoc.Reason) - logger.Errorf(errorString) - return fmt.Errorf(errorString) + logger.Errorf(errorString) + return fmt.Errorf(errorString) + } } } + } } @@ -568,14 +633,12 @@ func (vdb *VersionedDB) LoadCommittedVersions(keys []*statedb.CompositeKey) erro versionMap := vdb.committedDataCache.committedVersions revMap := vdb.committedDataCache.revisionNumbers - missingKeys := []string{} + missingKeys := make(map[string][]string) // for each namespace/chaincode, store the missingKeys + for _, key := range keys { logger.Debugf("Load into version cache: %s~%s", key.Key, key.Namespace) - // create composite key for couchdb - compositeDBKey := constructCompositeKey(key.Namespace, key.Key) - // create the compositeKey compositeKey := statedb.CompositeKey{Namespace: key.Namespace, Key: key.Key} @@ -594,8 +657,8 @@ func (vdb *VersionedDB) LoadCommittedVersions(keys []*statedb.CompositeKey) erro // if the compositeKey was not found in the revision or version part of the cache, then add the key to the // list of keys to be retrieved if !revFound || !versionFound { - // add the composite key to the list of required keys - missingKeys = append(missingKeys, string(compositeDBKey)) + // add the missing key to the list of required keys + missingKeys[key.Namespace] = append(missingKeys[key.Namespace], key.Key) } } @@ -603,34 +666,39 @@ func (vdb *VersionedDB) LoadCommittedVersions(keys []*statedb.CompositeKey) erro //get the max size of the batch from core.yaml maxBatchSize := ledgerconfig.GetMaxBatchUpdateSize() - // Initialize the array of keys to be retrieved - keysToRetrieve := []string{} - // Call the batch retrieve if there is one or more keys to retrieve if len(missingKeys) > 0 { - // Iterate through the missingKeys and build a batch of keys for batch retrieval - for _, key := range missingKeys { + for namespace, _ := range missingKeys { + // TODO: For each namespace, we need to parallely load missing keys into the cache + // The following codes need be moved to goroutine and introduce RWlock for cache. - keysToRetrieve = append(keysToRetrieve, key) + // Initialize the array of keys to be retrieved + keysToRetrieve := []string{} - // check to see if the number of keys is greater than the max batch size - if len(keysToRetrieve) >= maxBatchSize { - err := vdb.batchRetrieveMetaData(keysToRetrieve) - if err != nil { - return err + // Iterate through the missingKeys and build a batch of keys for batch retrieval + for _, key := range missingKeys[namespace] { + + keysToRetrieve = append(keysToRetrieve, key) + + // check to see if the number of keys is greater than the max batch size + if len(keysToRetrieve) >= maxBatchSize { + err := vdb.batchRetrieveMetaData(namespace, keysToRetrieve) + if err != nil { + return err + } + // reset the array + keysToRetrieve = []string{} } - // reset the array - keysToRetrieve = []string{} - } - } + } - // If there are any remaining, retrieve the final batch - if len(keysToRetrieve) > 0 { - err := vdb.batchRetrieveMetaData(keysToRetrieve) - if err != nil { - return err + // If there are any remaining, retrieve the final batch + if len(keysToRetrieve) > 0 { + err := vdb.batchRetrieveMetaData(namespace, keysToRetrieve) + if err != nil { + return err + } } } @@ -640,12 +708,17 @@ func (vdb *VersionedDB) LoadCommittedVersions(keys []*statedb.CompositeKey) erro } // batchRetrieveMetaData retrieves a batch of keys and loads metadata into cache -func (vdb *VersionedDB) batchRetrieveMetaData(keys []string) error { +func (vdb *VersionedDB) batchRetrieveMetaData(namespace string, keys []string) error { versionMap := vdb.committedDataCache.committedVersions revMap := vdb.committedDataCache.revisionNumbers - documentMetadataArray, err := vdb.db.BatchRetrieveDocumentMetadata(keys) + db, err := vdb.getNamespaceDBHandle(namespace) + if err != nil { + return err + } + + documentMetadataArray, err := db.BatchRetrieveDocumentMetadata(keys) if err != nil { logger.Errorf("Batch retrieval of document metadata failed %s\n", err.Error()) @@ -655,8 +728,7 @@ func (vdb *VersionedDB) batchRetrieveMetaData(keys []string) error { for _, documentMetadata := range documentMetadataArray { if len(documentMetadata.Version) != 0 { - ns, key := splitCompositeKey([]byte(documentMetadata.ID)) - compositeKey := statedb.CompositeKey{Namespace: ns, Key: key} + compositeKey := statedb.CompositeKey{Namespace: namespace, Key: documentMetadata.ID} versionMap[compositeKey] = createVersionHeightFromVersionString(documentMetadata.Version) revMap[compositeKey] = documentMetadata.Rev @@ -719,6 +791,7 @@ func createCouchdbDocJSON(id, revision string, value []byte, chaincodeID string, } else { + // TODO: Remove chaincodeID from header // add the chaincodeID jsonMap["chaincodeid"] = chaincodeID @@ -769,36 +842,35 @@ const savepointDocID = "statedb_savepoint" // Savepoint data for couchdb type couchSavepointData struct { - BlockNum uint64 `json:"BlockNum"` - TxNum uint64 `json:"TxNum"` - UpdateSeq string `json:"UpdateSeq"` + BlockNum uint64 `json:"BlockNum"` + TxNum uint64 `json:"TxNum"` } // recordSavepoint Record a savepoint in statedb. // Couch parallelizes writes in cluster or sharded setup and ordering is not guaranteed. -// Hence we need to fence the savepoint with sync. So ensure_full_commit is called before -// savepoint to ensure all block writes are flushed. Savepoint itself does not need to be flushed, -// it will get flushed with next block if not yet committed. -func (vdb *VersionedDB) recordSavepoint(height *version.Height) error { +// Hence we need to fence the savepoint with sync. So ensure_full_commit on all updated +// namespace DBs is called before savepoint to ensure all block writes are flushed. Savepoint +// itself is flushed to the metadataDB. +func (vdb *VersionedDB) recordSavepoint(height *version.Height, namespaces []string) error { var err error var savepointDoc couchSavepointData - // ensure full commit to flush all changes until now to disk - dbResponse, err := vdb.db.EnsureFullCommit() - if err != nil || dbResponse.Ok != true { - logger.Errorf("Failed to perform full commit\n") - return errors.New("Failed to perform full commit") - } + // ensure full commit to flush all changes on updated namespaces until now to disk + for _, ns := range namespaces { + // TODO: Ensure full commit can be parallelized to improve performance + db, err := vdb.getNamespaceDBHandle(ns) + if err != nil { + return err + } + dbResponse, err := db.EnsureFullCommit() - // construct savepoint document - // UpdateSeq would be useful if we want to get all db changes since a logical savepoint - dbInfo, _, err := vdb.db.GetDatabaseInfo() - if err != nil { - logger.Errorf("Failed to get DB info %s\n", err.Error()) - return err + if err != nil || dbResponse.Ok != true { + logger.Errorf("Failed to perform full commit\n") + return errors.New("Failed to perform full commit") + } } + // construct savepoint document savepointDoc.BlockNum = height.BlockNum savepointDoc.TxNum = height.TxNum - savepointDoc.UpdateSeq = dbInfo.UpdateSeq savepointDocJSON, err := json.Marshal(savepointDoc) if err != nil { @@ -807,12 +879,19 @@ func (vdb *VersionedDB) recordSavepoint(height *version.Height) error { } // SaveDoc using couchdb client and use JSON format - _, err = vdb.db.SaveDoc(savepointDocID, "", &couchdb.CouchDoc{JSONValue: savepointDocJSON, Attachments: nil}) + _, err = vdb.metadataDB.SaveDoc(savepointDocID, "", &couchdb.CouchDoc{JSONValue: savepointDocJSON, Attachments: nil}) if err != nil { logger.Errorf("Failed to save the savepoint to DB %s\n", err.Error()) return err } + dbResponse, err := vdb.metadataDB.EnsureFullCommit() + + if err != nil || dbResponse.Ok != true { + logger.Errorf("Failed to perform full commit\n") + return errors.New("Failed to perform full commit") + } + return nil } @@ -820,7 +899,7 @@ func (vdb *VersionedDB) recordSavepoint(height *version.Height) error { func (vdb *VersionedDB) GetLatestSavePoint() (*version.Height, error) { var err error - couchDoc, _, err := vdb.db.ReadDoc(savepointDocID) + couchDoc, _, err := vdb.metadataDB.ReadDoc(savepointDocID) if err != nil { logger.Errorf("Failed to read savepoint data %s\n", err.Error()) return nil, err @@ -841,6 +920,7 @@ func (vdb *VersionedDB) GetLatestSavePoint() (*version.Height, error) { return &version.Height{BlockNum: savepointDoc.BlockNum, TxNum: savepointDoc.TxNum}, nil } +/* func constructCompositeKey(ns string, key string) []byte { compositeKey := []byte(ns) compositeKey = append(compositeKey, compositeKeySep...) @@ -852,6 +932,7 @@ func splitCompositeKey(compositeKey []byte) (string, string) { split := bytes.SplitN(compositeKey, compositeKeySep, 2) return string(split[0]), string(split[1]) } +*/ type kvScanner struct { cursor int @@ -873,7 +954,7 @@ func (scanner *kvScanner) Next() (statedb.QueryResult, error) { selectedKV := scanner.results[scanner.cursor] - _, key := splitCompositeKey([]byte(selectedKV.ID)) + key := selectedKV.ID // remove the data wrapper and return the value and version returnValue, returnVersion := removeDataWrapper(selectedKV.Value, selectedKV.Attachments) @@ -888,12 +969,13 @@ func (scanner *kvScanner) Close() { } type queryScanner struct { - cursor int - results []couchdb.QueryResult + cursor int + namespace string + results []couchdb.QueryResult } -func newQueryScanner(queryResults []couchdb.QueryResult) *queryScanner { - return &queryScanner{-1, queryResults} +func newQueryScanner(namespace string, queryResults []couchdb.QueryResult) *queryScanner { + return &queryScanner{-1, namespace, queryResults} } func (scanner *queryScanner) Next() (statedb.QueryResult, error) { @@ -906,13 +988,13 @@ func (scanner *queryScanner) Next() (statedb.QueryResult, error) { selectedResultRecord := scanner.results[scanner.cursor] - namespace, key := splitCompositeKey([]byte(selectedResultRecord.ID)) + key := selectedResultRecord.ID // remove the data wrapper and return the value and version returnValue, returnVersion := removeDataWrapper(selectedResultRecord.Value, selectedResultRecord.Attachments) return &statedb.VersionedKV{ - CompositeKey: statedb.CompositeKey{Namespace: namespace, Key: key}, + CompositeKey: statedb.CompositeKey{Namespace: scanner.namespace, Key: key}, VersionedValue: statedb.VersionedValue{Value: returnValue, Version: returnVersion}}, nil } diff --git a/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb_test.go b/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb_test.go index bdda2500b0e..794a0f9627e 100644 --- a/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb_test.go +++ b/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb_test.go @@ -100,19 +100,6 @@ func testValueAndVersionEncoding(t *testing.T, value []byte, version *version.He testutil.AssertEquals(t, ver, version) } -func TestCompositeKey(t *testing.T) { - testCompositeKey(t, "ns", "key") - testCompositeKey(t, "ns", "") -} - -func testCompositeKey(t *testing.T, ns string, key string) { - compositeKey := constructCompositeKey(ns, key) - t.Logf("compositeKey=%#v", compositeKey) - ns1, key1 := splitCompositeKey(compositeKey) - testutil.AssertEquals(t, ns1, ns) - testutil.AssertEquals(t, key1, key) -} - // The following tests are unique to couchdb, they are not used in leveldb // query test func TestQuery(t *testing.T) { diff --git a/core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr/txmgr_test.go b/core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr/txmgr_test.go index f80a84b850f..74149585246 100644 --- a/core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr/txmgr_test.go +++ b/core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr/txmgr_test.go @@ -308,7 +308,7 @@ func TestIterator(t *testing.T) { } func testIterator(t *testing.T, env testEnv, numKeys int, startKeyNum int, endKeyNum int) { - cID := "cID" + cID := "cid" txMgr := env.getTxMgr() txMgrHelper := newTxMgrTestHelper(t, txMgr) s, _ := txMgr.NewTxSimulator("test_tx1") @@ -376,7 +376,7 @@ func TestIteratorWithDeletes(t *testing.T) { } func testIteratorWithDeletes(t *testing.T, env testEnv) { - cID := "cID" + cID := "cid" txMgr := env.getTxMgr() txMgrHelper := newTxMgrTestHelper(t, txMgr) s, _ := txMgr.NewTxSimulator("test_tx1") @@ -418,7 +418,7 @@ func TestTxValidationWithItr(t *testing.T) { } func testTxValidationWithItr(t *testing.T, env testEnv) { - cID := "cID" + cID := "cid" txMgr := env.getTxMgr() txMgrHelper := newTxMgrTestHelper(t, txMgr) @@ -483,7 +483,7 @@ func TestGetSetMultipeKeys(t *testing.T) { } func testGetSetMultipeKeys(t *testing.T, env testEnv) { - cID := "cID" + cID := "cid" txMgr := env.getTxMgr() txMgrHelper := newTxMgrTestHelper(t, txMgr) // simulate tx1 diff --git a/core/ledger/util/couchdb/couchdbutil.go b/core/ledger/util/couchdb/couchdbutil.go index c4875a3c985..ab1d044743a 100644 --- a/core/ledger/util/couchdb/couchdbutil.go +++ b/core/ledger/util/couchdb/couchdbutil.go @@ -25,7 +25,7 @@ import ( "time" ) -var expectedChannelNamePattern = `[a-z][a-z0-9.-]*` +var expectedDatabaseNamePattern = `[a-z][a-z0-9.$_-]*` var maxLength = 249 //CreateCouchInstance creates a CouchDB instance @@ -143,10 +143,10 @@ func CreateSystemDatabasesIfNotExist(couchInstance CouchInstance) error { // //Restictions have already been applied to the database name from Orderer based on //restrictions required by Kafka and couchDB (except a '.' char). The databaseName -// passed in here is expected to follow `[a-z][a-z0-9.-]*` pattern. +// passed in here is expected to follow `[a-z][a-z0-9.$_-]*` pattern. // //This validation will simply check whether the database name matches the above pattern and will replace -// all occurence of '.' by '-'. This will not cause collisions in the trnasformed named +// all occurence of '.' by '$'. This will not cause collisions in the transformed named func mapAndValidateDatabaseName(databaseName string) (string, error) { // test Length if len(databaseName) <= 0 { @@ -155,15 +155,16 @@ func mapAndValidateDatabaseName(databaseName string) (string, error) { if len(databaseName) > maxLength { return "", fmt.Errorf("Database name is illegal, cannot be longer than %d", maxLength) } - re, err := regexp.Compile(expectedChannelNamePattern) + re, err := regexp.Compile(expectedDatabaseNamePattern) if err != nil { return "", err } matched := re.FindString(databaseName) if len(matched) != len(databaseName) { - return "", fmt.Errorf("databaseName '%s' does not matches pattern '%s'", databaseName, expectedChannelNamePattern) + return "", fmt.Errorf("databaseName '%s' does not matches pattern '%s'", databaseName, expectedDatabaseNamePattern) } - // replace all '.' to '_'. The databaseName passed in will never contain an '_'. So, this translation will not cause collisions - databaseName = strings.Replace(databaseName, ".", "_", -1) + // replace all '.' to '$'. The databaseName passed in will never contain an '$'. + // So, this translation will not cause collisions + databaseName = strings.Replace(databaseName, ".", "$", -1) return databaseName, nil } diff --git a/core/ledger/util/couchdb/couchdbutil_test.go b/core/ledger/util/couchdb/couchdbutil_test.go index 2dc92c64f82..c4b682161ac 100644 --- a/core/ledger/util/couchdb/couchdbutil_test.go +++ b/core/ledger/util/couchdb/couchdbutil_test.go @@ -89,7 +89,7 @@ func TestDatabaseMapping(t *testing.T) { testutil.AssertError(t, err, "Error expected because the name contains capital letters") //create a new instance and database object using a database name with special characters - _, err = mapAndValidateDatabaseName("test1234_1") + _, err = mapAndValidateDatabaseName("test1234/1") testutil.AssertError(t, err, "Error expected because the name contains illegal chars") //create a new instance and database object using a database name with special characters @@ -108,5 +108,5 @@ func TestDatabaseMapping(t *testing.T) { transformedName, err := mapAndValidateDatabaseName("test.my.db-1") testutil.AssertNoError(t, err, "") - testutil.AssertEquals(t, transformedName, "test_my_db-1") + testutil.AssertEquals(t, transformedName, "test$my$db-1") }