From 16aecd051355b0c9537cb023c7d7b39e0c0f224b Mon Sep 17 00:00:00 2001 From: manish Date: Tue, 4 Dec 2018 01:18:04 -0500 Subject: [PATCH] Fix: Filter couchdb internal docs from query results FAB-13072 #done Change-Id: Ib91827ec08da752ea862dd12a6ab4b167acf4364 Signed-off-by: manish --- .../statedb/statecouchdb/statecouchdb.go | 78 ++++++++++--------- .../statedb/statecouchdb/statecouchdb_test.go | 73 ++++++++++++++++- 2 files changed, 115 insertions(+), 36 deletions(-) diff --git a/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb.go b/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb.go index 1a409d1887b..5e436e472fb 100644 --- a/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb.go +++ b/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb.go @@ -119,12 +119,10 @@ func (vdb *VersionedDB) getNamespaceDBHandle(namespace string) (*couchdb.CouchDa // ProcessIndexesForChaincodeDeploy creates indexes for a specified namespace func (vdb *VersionedDB) ProcessIndexesForChaincodeDeploy(namespace string, fileEntries []*ccprovider.TarFileEntry) error { - db, err := vdb.getNamespaceDBHandle(namespace) if err != nil { return err } - for _, fileEntry := range fileEntries { indexData := fileEntry.FileContent filename := fileEntry.FileHeader.Name @@ -134,9 +132,7 @@ func (vdb *VersionedDB) ProcessIndexesForChaincodeDeploy(namespace string, fileE "error creating index from file [%s] for channel [%s]", filename, namespace)) } } - return nil - } func (vdb *VersionedDB) GetDBType() string { @@ -273,9 +269,7 @@ const returnCount = "count" // endKey is exclusive // metadata contains a map of additional query options func (vdb *VersionedDB) GetStateRangeScanIteratorWithMetadata(namespace string, startKey string, endKey string, metadata map[string]interface{}) (statedb.QueryResultsIterator, error) { - logger.Debugf("Entering GetStateRangeScanIteratorWithMetadata namespace: %s startKey: %s endKey: %s metadata: %v", namespace, startKey, endKey, metadata) - // Get the internalQueryLimit from core.yaml internalQueryLimit := int32(ledgerconfig.GetInternalQueryLimit()) requestedLimit := int32(0) @@ -298,7 +292,6 @@ func (vdb *VersionedDB) GetStateRangeScanIteratorWithMetadata(namespace string, } func (scanner *queryScanner) getNextStateRangeScanResults() error { - queryLimit := scanner.queryDefinition.internalQueryLimit if scanner.paginationInfo.requestedLimit > 0 { moreResultsNeeded := scanner.paginationInfo.requestedLimit - scanner.resultsInfo.totalRecordsReturned @@ -306,34 +299,69 @@ func (scanner *queryScanner) getNextStateRangeScanResults() error { queryLimit = moreResultsNeeded } } - queryResult, nextStartKey, err := scanner.db.ReadDocRange(scanner.queryDefinition.startKey, scanner.queryDefinition.endKey, - queryLimit) + queryResult, nextStartKey, err := rangeScanFilterCouchInternalDocs(scanner.db, + scanner.queryDefinition.startKey, scanner.queryDefinition.endKey, queryLimit) if err != nil { - logger.Debugf("Error calling ReadDocRange(): %s\n", err.Error()) return err } - scanner.resultsInfo.results = queryResult scanner.queryDefinition.startKey = nextStartKey scanner.paginationInfo.cursor = 0 - return nil } +func rangeScanFilterCouchInternalDocs(db *couchdb.CouchDatabase, + startKey, endKey string, queryLimit int32, +) ([]*couchdb.QueryResult, string, error) { + var finalResults []*couchdb.QueryResult + var finalNextStartKey string + for { + results, nextStartKey, err := db.ReadDocRange(startKey, endKey, queryLimit) + if err != nil { + logger.Debugf("Error calling ReadDocRange(): %s\n", err.Error()) + return nil, "", err + } + var filteredResults []*couchdb.QueryResult + for _, doc := range results { + if !isCouchInternalKey(doc.ID) { + filteredResults = append(filteredResults, doc) + } + } + + finalResults = append(finalResults, filteredResults...) + finalNextStartKey = nextStartKey + queryLimit = int32(len(results) - len(filteredResults)) + if queryLimit == 0 || finalNextStartKey == "" { + break + } + startKey = finalNextStartKey + } + var err error + for i := 0; isCouchInternalKey(finalNextStartKey); i++ { + _, finalNextStartKey, err = db.ReadDocRange(finalNextStartKey, endKey, 1) + logger.Debugf("i=%d, finalNextStartKey=%s", i, finalNextStartKey) + if err != nil { + return nil, "", err + } + } + return finalResults, finalNextStartKey, nil +} + +func isCouchInternalKey(key string) bool { + return len(key) != 0 && key[0] == '_' +} + // ExecuteQuery implements method in VersionedDB interface func (vdb *VersionedDB) ExecuteQuery(namespace, query string) (statedb.ResultsIterator, error) { - queryResult, err := vdb.ExecuteQueryWithMetadata(namespace, query, nil) if err != nil { return nil, err } - return queryResult, nil } // ExecuteQueryWithMetadata implements method in VersionedDB interface func (vdb *VersionedDB) ExecuteQueryWithMetadata(namespace, query string, metadata map[string]interface{}) (statedb.QueryResultsIterator, error) { - logger.Debugf("Entering ExecuteQueryWithMetadata namespace: %s, query: %s, metadata: %v", namespace, query, metadata) // Get the querylimit from core.yaml internalQueryLimit := int32(ledgerconfig.GetInternalQueryLimit()) @@ -367,38 +395,32 @@ func (vdb *VersionedDB) ExecuteQueryWithMetadata(namespace, query string, metada // executeQueryWithBookmark executes a "paging" query with a bookmark, this method allows a // paged query without returning a new query iterator func (scanner *queryScanner) executeQueryWithBookmark() error { - queryLimit := scanner.queryDefinition.internalQueryLimit if scanner.paginationInfo.requestedLimit > 0 { if scanner.paginationInfo.requestedLimit-scanner.resultsInfo.totalRecordsReturned < scanner.queryDefinition.internalQueryLimit { queryLimit = scanner.paginationInfo.requestedLimit - scanner.resultsInfo.totalRecordsReturned } } - queryString, err := applyAdditionalQueryOptions(scanner.queryDefinition.query, queryLimit, scanner.paginationInfo.bookmark) if err != nil { logger.Debugf("Error calling applyAdditionalQueryOptions(): %s\n", err.Error()) return err } - queryResult, bookmark, err := scanner.db.QueryDocuments(queryString) if err != nil { logger.Debugf("Error calling QueryDocuments(): %s\n", err.Error()) return err } - scanner.resultsInfo.results = queryResult scanner.paginationInfo.bookmark = bookmark scanner.paginationInfo.cursor = 0 - return nil } func validateQueryMetadata(metadata map[string]interface{}) error { for key, keyVal := range metadata { switch key { - case optionBookmark: //Verify the bookmark is a string if _, ok := keyVal.(string); ok { @@ -599,10 +621,8 @@ type resultsInfo struct { func newQueryScanner(namespace string, db *couchdb.CouchDatabase, query string, internalQueryLimit, limit int32, bookmark, startKey, endKey string) (*queryScanner, error) { - scanner := &queryScanner{namespace, db, &queryDefinition{startKey, endKey, query, internalQueryLimit}, &paginationInfo{-1, limit, bookmark}, &resultsInfo{0, nil}} var err error - // query is defined, then execute the query and return the records and bookmark if scanner.queryDefinition.query != "" { err = scanner.executeQueryWithBookmark() @@ -613,24 +633,19 @@ func newQueryScanner(namespace string, db *couchdb.CouchDatabase, query string, return nil, err } scanner.paginationInfo.cursor = -1 - return scanner, nil } func (scanner *queryScanner) Next() (statedb.QueryResult, error) { - //test for no results case if len(scanner.resultsInfo.results) == 0 { return nil, nil } - // increment the cursor scanner.paginationInfo.cursor++ - // check to see if additional records are needed // requery if the cursor exceeds the internalQueryLimit if scanner.paginationInfo.cursor >= scanner.queryDefinition.internalQueryLimit { - var err error // query is defined, then execute the query and return the records and bookmark if scanner.queryDefinition.query != "" { @@ -641,30 +656,23 @@ func (scanner *queryScanner) Next() (statedb.QueryResult, error) { if err != nil { return nil, err } - //if no more results, then return if len(scanner.resultsInfo.results) == 0 { return nil, nil } - } - //If the cursor is greater than or equal to the number of result records, return if scanner.paginationInfo.cursor >= int32(len(scanner.resultsInfo.results)) { return nil, nil } - selectedResultRecord := scanner.resultsInfo.results[scanner.paginationInfo.cursor] key := selectedResultRecord.ID - // remove the reserved fields from CouchDB JSON and return the value and version kv, err := couchDocToKeyValue(&couchdb.CouchDoc{JSONValue: selectedResultRecord.Value, Attachments: selectedResultRecord.Attachments}) if err != nil { return nil, err } - scanner.resultsInfo.totalRecordsReturned++ - return &statedb.VersionedKV{ CompositeKey: statedb.CompositeKey{Namespace: scanner.namespace, Key: key}, VersionedValue: *kv.VersionedValue}, nil diff --git a/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb_test.go b/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb_test.go index 51eb2a70010..8e93daa45c4 100644 --- a/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb_test.go +++ b/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb_test.go @@ -20,6 +20,7 @@ import ( "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb/commontests" "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version" ledgertestutil "github.com/hyperledger/fabric/core/ledger/testutil" + "github.com/hyperledger/fabric/core/ledger/util/couchdb" "github.com/hyperledger/fabric/integration/runner" "github.com/spf13/viper" "github.com/stretchr/testify/assert" @@ -684,7 +685,6 @@ func TestPaginatedQueryValidation(t *testing.T) { err = validateQueryMetadata(queryOptions) assert.Error(t, err, "An should have been thrown for an invalid options") - } func TestApplyUpdatesWithNilHeight(t *testing.T) { @@ -692,3 +692,74 @@ func TestApplyUpdatesWithNilHeight(t *testing.T) { defer env.Cleanup() commontests.TestApplyUpdatesWithNilHeight(t, env.DBProvider) } + +func TestRangeScanWithCouchInternalDocsPresent(t *testing.T) { + env := NewTestVDBEnv(t) + defer env.Cleanup() + db, err := env.DBProvider.GetDBHandle("testrangescanfiltercouchinternaldocs") + assert.NoError(t, err) + couchDatabse, err := db.(*VersionedDB).getNamespaceDBHandle("ns") + assert.NoError(t, err) + db.Open() + defer db.Close() + _, err = couchDatabse.CreateIndex(`{ + "index" : {"fields" : ["asset_name"]}, + "ddoc" : "indexAssetName", + "name" : "indexAssetName", + "type" : "json" + }`) + assert.NoError(t, err) + + _, err = couchDatabse.CreateIndex(`{ + "index" : {"fields" : ["assetValue"]}, + "ddoc" : "indexAssetValue", + "name" : "indexAssetValue", + "type" : "json" + }`) + assert.NoError(t, err) + + batch := statedb.NewUpdateBatch() + for i := 1; i <= 3; i++ { + keySmallerThanDesignDoc := fmt.Sprintf("Key-%d", i) + keyGreaterThanDesignDoc := fmt.Sprintf("key-%d", i) + jsonValue := fmt.Sprintf(`{"asset_name": "marble-%d"}`, i) + batch.Put("ns", keySmallerThanDesignDoc, []byte(jsonValue), version.NewHeight(1, uint64(i))) + batch.Put("ns", keyGreaterThanDesignDoc, []byte(jsonValue), version.NewHeight(1, uint64(i))) + } + db.ApplyUpdates(batch, version.NewHeight(2, 2)) + assert.NoError(t, err) + + // The Keys in db are in this order + // Key-1, Key-2, Key-3,_design/indexAssetNam, _design/indexAssetValue, key-1, key-2, key-3 + // query different ranges and verify results + s, err := newQueryScanner("ns", couchDatabse, "", 3, 3, "", "", "") + assert.NoError(t, err) + assertQueryResults(t, s.resultsInfo.results, []string{"Key-1", "Key-2", "Key-3"}) + assert.Equal(t, "key-1", s.queryDefinition.startKey) + + s, err = newQueryScanner("ns", couchDatabse, "", 4, 4, "", "", "") + assert.NoError(t, err) + assertQueryResults(t, s.resultsInfo.results, []string{"Key-1", "Key-2", "Key-3", "key-1"}) + assert.Equal(t, "key-2", s.queryDefinition.startKey) + + s, err = newQueryScanner("ns", couchDatabse, "", 2, 2, "", "", "") + assert.NoError(t, err) + assertQueryResults(t, s.resultsInfo.results, []string{"Key-1", "Key-2"}) + assert.Equal(t, "Key-3", s.queryDefinition.startKey) + s.getNextStateRangeScanResults() + assertQueryResults(t, s.resultsInfo.results, []string{"Key-3", "key-1"}) + assert.Equal(t, "key-2", s.queryDefinition.startKey) + + s, err = newQueryScanner("ns", couchDatabse, "", 2, 2, "", "_", "") + assert.NoError(t, err) + assertQueryResults(t, s.resultsInfo.results, []string{"key-1", "key-2"}) + assert.Equal(t, "key-3", s.queryDefinition.startKey) +} + +func assertQueryResults(t *testing.T, results []*couchdb.QueryResult, expectedIds []string) { + var actualIds []string + for _, res := range results { + actualIds = append(actualIds, res.ID) + } + assert.Equal(t, expectedIds, actualIds) +}