Skip to content

Commit

Permalink
Merge "Fix: Filter couchdb internal docs from query results"
Browse files Browse the repository at this point in the history
  • Loading branch information
denyeart authored and Gerrit Code Review committed Dec 5, 2018
2 parents 385f437 + 16aecd0 commit f63c95d
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 36 deletions.
78 changes: 43 additions & 35 deletions core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,10 @@ func (vdb *VersionedDB) getNamespaceDBHandle(namespace string) (*couchdb.CouchDa

// ProcessIndexesForChaincodeDeploy creates indexes for a specified namespace
func (vdb *VersionedDB) ProcessIndexesForChaincodeDeploy(namespace string, fileEntries []*ccprovider.TarFileEntry) error {

db, err := vdb.getNamespaceDBHandle(namespace)
if err != nil {
return err
}

for _, fileEntry := range fileEntries {
indexData := fileEntry.FileContent
filename := fileEntry.FileHeader.Name
Expand All @@ -134,9 +132,7 @@ func (vdb *VersionedDB) ProcessIndexesForChaincodeDeploy(namespace string, fileE
"error creating index from file [%s] for channel [%s]", filename, namespace))
}
}

return nil

}

func (vdb *VersionedDB) GetDBType() string {
Expand Down Expand Up @@ -273,9 +269,7 @@ const returnCount = "count"
// endKey is exclusive
// metadata contains a map of additional query options
func (vdb *VersionedDB) GetStateRangeScanIteratorWithMetadata(namespace string, startKey string, endKey string, metadata map[string]interface{}) (statedb.QueryResultsIterator, error) {

logger.Debugf("Entering GetStateRangeScanIteratorWithMetadata namespace: %s startKey: %s endKey: %s metadata: %v", namespace, startKey, endKey, metadata)

// Get the internalQueryLimit from core.yaml
internalQueryLimit := int32(ledgerconfig.GetInternalQueryLimit())
requestedLimit := int32(0)
Expand All @@ -298,42 +292,76 @@ func (vdb *VersionedDB) GetStateRangeScanIteratorWithMetadata(namespace string,
}

func (scanner *queryScanner) getNextStateRangeScanResults() error {

queryLimit := scanner.queryDefinition.internalQueryLimit
if scanner.paginationInfo.requestedLimit > 0 {
moreResultsNeeded := scanner.paginationInfo.requestedLimit - scanner.resultsInfo.totalRecordsReturned
if moreResultsNeeded < scanner.queryDefinition.internalQueryLimit {
queryLimit = moreResultsNeeded
}
}
queryResult, nextStartKey, err := scanner.db.ReadDocRange(scanner.queryDefinition.startKey, scanner.queryDefinition.endKey,
queryLimit)
queryResult, nextStartKey, err := rangeScanFilterCouchInternalDocs(scanner.db,
scanner.queryDefinition.startKey, scanner.queryDefinition.endKey, queryLimit)
if err != nil {
logger.Debugf("Error calling ReadDocRange(): %s\n", err.Error())
return err
}

scanner.resultsInfo.results = queryResult
scanner.queryDefinition.startKey = nextStartKey
scanner.paginationInfo.cursor = 0

return nil
}

func rangeScanFilterCouchInternalDocs(db *couchdb.CouchDatabase,
startKey, endKey string, queryLimit int32,
) ([]*couchdb.QueryResult, string, error) {
var finalResults []*couchdb.QueryResult
var finalNextStartKey string
for {
results, nextStartKey, err := db.ReadDocRange(startKey, endKey, queryLimit)
if err != nil {
logger.Debugf("Error calling ReadDocRange(): %s\n", err.Error())
return nil, "", err
}
var filteredResults []*couchdb.QueryResult
for _, doc := range results {
if !isCouchInternalKey(doc.ID) {
filteredResults = append(filteredResults, doc)
}
}

finalResults = append(finalResults, filteredResults...)
finalNextStartKey = nextStartKey
queryLimit = int32(len(results) - len(filteredResults))
if queryLimit == 0 || finalNextStartKey == "" {
break
}
startKey = finalNextStartKey
}
var err error
for i := 0; isCouchInternalKey(finalNextStartKey); i++ {
_, finalNextStartKey, err = db.ReadDocRange(finalNextStartKey, endKey, 1)
logger.Debugf("i=%d, finalNextStartKey=%s", i, finalNextStartKey)
if err != nil {
return nil, "", err
}
}
return finalResults, finalNextStartKey, nil
}

func isCouchInternalKey(key string) bool {
return len(key) != 0 && key[0] == '_'
}

// ExecuteQuery implements method in VersionedDB interface
func (vdb *VersionedDB) ExecuteQuery(namespace, query string) (statedb.ResultsIterator, error) {

queryResult, err := vdb.ExecuteQueryWithMetadata(namespace, query, nil)
if err != nil {
return nil, err
}

return queryResult, nil
}

// ExecuteQueryWithMetadata implements method in VersionedDB interface
func (vdb *VersionedDB) ExecuteQueryWithMetadata(namespace, query string, metadata map[string]interface{}) (statedb.QueryResultsIterator, error) {

logger.Debugf("Entering ExecuteQueryWithMetadata namespace: %s, query: %s, metadata: %v", namespace, query, metadata)
// Get the querylimit from core.yaml
internalQueryLimit := int32(ledgerconfig.GetInternalQueryLimit())
Expand Down Expand Up @@ -367,38 +395,32 @@ func (vdb *VersionedDB) ExecuteQueryWithMetadata(namespace, query string, metada
// executeQueryWithBookmark executes a "paging" query with a bookmark, this method allows a
// paged query without returning a new query iterator
func (scanner *queryScanner) executeQueryWithBookmark() error {

queryLimit := scanner.queryDefinition.internalQueryLimit
if scanner.paginationInfo.requestedLimit > 0 {
if scanner.paginationInfo.requestedLimit-scanner.resultsInfo.totalRecordsReturned < scanner.queryDefinition.internalQueryLimit {
queryLimit = scanner.paginationInfo.requestedLimit - scanner.resultsInfo.totalRecordsReturned
}
}

queryString, err := applyAdditionalQueryOptions(scanner.queryDefinition.query,
queryLimit, scanner.paginationInfo.bookmark)
if err != nil {
logger.Debugf("Error calling applyAdditionalQueryOptions(): %s\n", err.Error())
return err
}

queryResult, bookmark, err := scanner.db.QueryDocuments(queryString)
if err != nil {
logger.Debugf("Error calling QueryDocuments(): %s\n", err.Error())
return err
}

scanner.resultsInfo.results = queryResult
scanner.paginationInfo.bookmark = bookmark
scanner.paginationInfo.cursor = 0

return nil
}

func validateQueryMetadata(metadata map[string]interface{}) error {
for key, keyVal := range metadata {
switch key {

case optionBookmark:
//Verify the bookmark is a string
if _, ok := keyVal.(string); ok {
Expand Down Expand Up @@ -599,10 +621,8 @@ type resultsInfo struct {

func newQueryScanner(namespace string, db *couchdb.CouchDatabase, query string, internalQueryLimit,
limit int32, bookmark, startKey, endKey string) (*queryScanner, error) {

scanner := &queryScanner{namespace, db, &queryDefinition{startKey, endKey, query, internalQueryLimit}, &paginationInfo{-1, limit, bookmark}, &resultsInfo{0, nil}}
var err error

// query is defined, then execute the query and return the records and bookmark
if scanner.queryDefinition.query != "" {
err = scanner.executeQueryWithBookmark()
Expand All @@ -613,24 +633,19 @@ func newQueryScanner(namespace string, db *couchdb.CouchDatabase, query string,
return nil, err
}
scanner.paginationInfo.cursor = -1

return scanner, nil
}

func (scanner *queryScanner) Next() (statedb.QueryResult, error) {

//test for no results case
if len(scanner.resultsInfo.results) == 0 {
return nil, nil
}

// increment the cursor
scanner.paginationInfo.cursor++

// check to see if additional records are needed
// requery if the cursor exceeds the internalQueryLimit
if scanner.paginationInfo.cursor >= scanner.queryDefinition.internalQueryLimit {

var err error
// query is defined, then execute the query and return the records and bookmark
if scanner.queryDefinition.query != "" {
Expand All @@ -641,30 +656,23 @@ func (scanner *queryScanner) Next() (statedb.QueryResult, error) {
if err != nil {
return nil, err
}

//if no more results, then return
if len(scanner.resultsInfo.results) == 0 {
return nil, nil
}

}

//If the cursor is greater than or equal to the number of result records, return
if scanner.paginationInfo.cursor >= int32(len(scanner.resultsInfo.results)) {
return nil, nil
}

selectedResultRecord := scanner.resultsInfo.results[scanner.paginationInfo.cursor]
key := selectedResultRecord.ID

// remove the reserved fields from CouchDB JSON and return the value and version
kv, err := couchDocToKeyValue(&couchdb.CouchDoc{JSONValue: selectedResultRecord.Value, Attachments: selectedResultRecord.Attachments})
if err != nil {
return nil, err
}

scanner.resultsInfo.totalRecordsReturned++

return &statedb.VersionedKV{
CompositeKey: statedb.CompositeKey{Namespace: scanner.namespace, Key: key},
VersionedValue: *kv.VersionedValue}, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb/commontests"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version"
ledgertestutil "github.com/hyperledger/fabric/core/ledger/testutil"
"github.com/hyperledger/fabric/core/ledger/util/couchdb"
"github.com/hyperledger/fabric/integration/runner"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -688,11 +689,81 @@ func TestPaginatedQueryValidation(t *testing.T) {

err = validateQueryMetadata(queryOptions)
assert.Error(t, err, "An should have been thrown for an invalid options")

}

func TestApplyUpdatesWithNilHeight(t *testing.T) {
env := NewTestVDBEnv(t)
defer env.Cleanup()
commontests.TestApplyUpdatesWithNilHeight(t, env.DBProvider)
}

func TestRangeScanWithCouchInternalDocsPresent(t *testing.T) {
env := NewTestVDBEnv(t)
defer env.Cleanup()
db, err := env.DBProvider.GetDBHandle("testrangescanfiltercouchinternaldocs")
assert.NoError(t, err)
couchDatabse, err := db.(*VersionedDB).getNamespaceDBHandle("ns")
assert.NoError(t, err)
db.Open()
defer db.Close()
_, err = couchDatabse.CreateIndex(`{
"index" : {"fields" : ["asset_name"]},
"ddoc" : "indexAssetName",
"name" : "indexAssetName",
"type" : "json"
}`)
assert.NoError(t, err)

_, err = couchDatabse.CreateIndex(`{
"index" : {"fields" : ["assetValue"]},
"ddoc" : "indexAssetValue",
"name" : "indexAssetValue",
"type" : "json"
}`)
assert.NoError(t, err)

batch := statedb.NewUpdateBatch()
for i := 1; i <= 3; i++ {
keySmallerThanDesignDoc := fmt.Sprintf("Key-%d", i)
keyGreaterThanDesignDoc := fmt.Sprintf("key-%d", i)
jsonValue := fmt.Sprintf(`{"asset_name": "marble-%d"}`, i)
batch.Put("ns", keySmallerThanDesignDoc, []byte(jsonValue), version.NewHeight(1, uint64(i)))
batch.Put("ns", keyGreaterThanDesignDoc, []byte(jsonValue), version.NewHeight(1, uint64(i)))
}
db.ApplyUpdates(batch, version.NewHeight(2, 2))
assert.NoError(t, err)

// The Keys in db are in this order
// Key-1, Key-2, Key-3,_design/indexAssetNam, _design/indexAssetValue, key-1, key-2, key-3
// query different ranges and verify results
s, err := newQueryScanner("ns", couchDatabse, "", 3, 3, "", "", "")
assert.NoError(t, err)
assertQueryResults(t, s.resultsInfo.results, []string{"Key-1", "Key-2", "Key-3"})
assert.Equal(t, "key-1", s.queryDefinition.startKey)

s, err = newQueryScanner("ns", couchDatabse, "", 4, 4, "", "", "")
assert.NoError(t, err)
assertQueryResults(t, s.resultsInfo.results, []string{"Key-1", "Key-2", "Key-3", "key-1"})
assert.Equal(t, "key-2", s.queryDefinition.startKey)

s, err = newQueryScanner("ns", couchDatabse, "", 2, 2, "", "", "")
assert.NoError(t, err)
assertQueryResults(t, s.resultsInfo.results, []string{"Key-1", "Key-2"})
assert.Equal(t, "Key-3", s.queryDefinition.startKey)
s.getNextStateRangeScanResults()
assertQueryResults(t, s.resultsInfo.results, []string{"Key-3", "key-1"})
assert.Equal(t, "key-2", s.queryDefinition.startKey)

s, err = newQueryScanner("ns", couchDatabse, "", 2, 2, "", "_", "")
assert.NoError(t, err)
assertQueryResults(t, s.resultsInfo.results, []string{"key-1", "key-2"})
assert.Equal(t, "key-3", s.queryDefinition.startKey)
}

func assertQueryResults(t *testing.T, results []*couchdb.QueryResult, expectedIds []string) {
var actualIds []string
for _, res := range results {
actualIds = append(actualIds, res.ID)
}
assert.Equal(t, expectedIds, actualIds)
}

0 comments on commit f63c95d

Please sign in to comment.