From 9edfbf363c8425a9b4dfeab09f9419c2cc33311a Mon Sep 17 00:00:00 2001 From: senthil Date: Thu, 28 May 2020 17:12:56 +0530 Subject: [PATCH] fix infinite loop during full range query We use a single scanner for achieving both paginated and non-paginated range query. We have internalQueryLimit and pageSize. For each _all_docs?startKey="XXX"&endKey="YYY" REST API call to CouchDB, we fetch atmost internalQueryLimit number of records only by appending limit=internalQueryLimit. When the requested pageSize is higher than the internalQueryLimit or the total number of records present in the given range is higher than the internalQueryLimit, iterator would execute the query again once the records fetched in the first cycle is consumed and so on. In order to do that, after an execution of the REST API in each cycle, it updates the initially passed startKey to the nextStartKey. If there is no nextStartKey, it is set to the endKey. Currently, when the nextStartKey and the endKey are the same, we still run one REST API call which is actually not needed as we always set inclusive_end=false. However, this causes infinite loop in a particular case. When we want to retrieve all the records, we would pass an empty string as the startKey and the endKey. When the startKey is an empty string, the REST API call would become _all_docs?endKey="YYY". When both are empty, it would become _all_docs Given that we set startKey to endKey when there is no nextStartKey and still execute one REST API call, it gets into infinite loop by fetching all records again and again. We avovid this infinite loop by setting scanner.exhausted = true whe the startKey and endKey become the same when there is no nextStartKey Signed-off-by: senthil --- .../statedb/statecouchdb/statecouchdb.go | 14 +- .../statedb/statecouchdb/statecouchdb_test.go | 142 ++++++++++++++++++ 2 files changed, 154 insertions(+), 2 deletions(-) diff --git a/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb.go b/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb.go index a41ea6214f9..740a51b0de6 100644 --- a/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb.go +++ b/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb.go @@ -520,8 +520,14 @@ func (scanner *queryScanner) getNextStateRangeScanResults() error { return err } scanner.resultsInfo.results = queryResult - scanner.queryDefinition.startKey = nextStartKey scanner.paginationInfo.cursor = 0 + if scanner.queryDefinition.endKey == nextStartKey { + // as we always set inclusive_end=false to match the behavior of + // goleveldb iterator, it is safe to mark the scanner as exhausted + scanner.exhausted = true + // we still need to update the startKey as it is returned as bookmark + } + scanner.queryDefinition.startKey = nextStartKey return nil } @@ -877,6 +883,7 @@ type queryScanner struct { queryDefinition *queryDefinition paginationInfo *paginationInfo resultsInfo *resultsInfo + exhausted bool } type queryDefinition struct { @@ -899,7 +906,7 @@ type resultsInfo struct { func newQueryScanner(namespace string, db *couchDatabase, query string, internalQueryLimit, limit int32, bookmark, startKey, endKey string) (*queryScanner, error) { - scanner := &queryScanner{namespace, db, &queryDefinition{startKey, endKey, query, internalQueryLimit}, &paginationInfo{-1, limit, bookmark}, &resultsInfo{0, nil}} + scanner := &queryScanner{namespace, db, &queryDefinition{startKey, endKey, query, internalQueryLimit}, &paginationInfo{-1, limit, bookmark}, &resultsInfo{0, nil}, false} var err error // query is defined, then execute the query and return the records and bookmark if scanner.queryDefinition.query != "" { @@ -924,6 +931,9 @@ func (scanner *queryScanner) Next() (statedb.QueryResult, error) { // check to see if additional records are needed // requery if the cursor exceeds the internalQueryLimit if scanner.paginationInfo.cursor >= scanner.queryDefinition.internalQueryLimit { + if scanner.exhausted { + return nil, nil + } var err error // query is defined, then execute the query and return the records and bookmark if scanner.queryDefinition.query != "" { diff --git a/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb_test.go b/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb_test.go index 0ba9d6cb23d..57ebb9f7ad2 100644 --- a/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb_test.go +++ b/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb_test.go @@ -13,6 +13,7 @@ import ( "strings" "testing" "time" + "unicode/utf8" "github.com/hyperledger/fabric/common/flogging" "github.com/hyperledger/fabric/common/ledger/dataformat" @@ -1477,3 +1478,144 @@ func TestChannelMetadata_NegativeTests(t *testing.T) { require.Equal(t, expectedChannelMetadata, savedChannelMetadata) require.Equal(t, expectedChannelMetadata, vdb.channelMetadata) } + +func TestRangeQueryWithInternalLimitAndPageSize(t *testing.T) { + // generateSampleData returns a slice of KVs. The returned value contains 12 KVs for a namespace ns1 + generateSampleData := func() []*statedb.VersionedKV { + sampleData := []*statedb.VersionedKV{} + ver := version.NewHeight(1, 1) + sampleKV := &statedb.VersionedKV{ + CompositeKey: statedb.CompositeKey{Namespace: "ns1", Key: string('\u0000')}, + VersionedValue: statedb.VersionedValue{Value: []byte("v0"), Version: ver, Metadata: []byte("m0")}, + } + sampleData = append(sampleData, sampleKV) + for i := 0; i < 10; i++ { + sampleKV = &statedb.VersionedKV{ + CompositeKey: statedb.CompositeKey{ + Namespace: "ns1", + Key: fmt.Sprintf("key-%d", i), + }, + VersionedValue: statedb.VersionedValue{ + Value: []byte(fmt.Sprintf("value-for-key-%d-for-ns1", i)), + Version: ver, + Metadata: []byte(fmt.Sprintf("metadata-for-key-%d-for-ns1", i)), + }, + } + sampleData = append(sampleData, sampleKV) + } + sampleKV = &statedb.VersionedKV{ + CompositeKey: statedb.CompositeKey{Namespace: "ns1", Key: string(utf8.MaxRune)}, + VersionedValue: statedb.VersionedValue{Value: []byte("v1"), Version: ver, Metadata: []byte("m1")}, + } + sampleData = append(sampleData, sampleKV) + return sampleData + } + + vdbEnv.init(t, nil) + defer vdbEnv.cleanup() + channelName := "ch1" + vdb, err := vdbEnv.DBProvider.GetDBHandle(channelName) + require.NoError(t, err) + db := vdb.(*VersionedDB) + + sampleData := generateSampleData() + batch := statedb.NewUpdateBatch() + for _, d := range sampleData { + batch.PutValAndMetadata(d.Namespace, d.Key, d.Value, d.Metadata, d.Version) + } + db.ApplyUpdates(batch, version.NewHeight(1, 1)) + + defaultLimit := vdbEnv.config.InternalQueryLimit + + // Scenario 1: We try to fetch either 11 records or all 12 records. We pass various internalQueryLimits. + // key utf8.MaxRune would not be included as inclusive_end is always set to false + testRangeQueryWithInternalLimit(t, "ns1", db, 2, string('\u0000'), string(utf8.MaxRune), sampleData[:len(sampleData)-1]) + testRangeQueryWithInternalLimit(t, "ns1", db, 5, string('\u0000'), string(utf8.MaxRune), sampleData[:len(sampleData)-1]) + testRangeQueryWithInternalLimit(t, "ns1", db, 2, string('\u0000'), "", sampleData) + testRangeQueryWithInternalLimit(t, "ns1", db, 5, string('\u0000'), "", sampleData) + testRangeQueryWithInternalLimit(t, "ns1", db, 2, "", string(utf8.MaxRune), sampleData[:len(sampleData)-1]) + testRangeQueryWithInternalLimit(t, "ns1", db, 5, "", string(utf8.MaxRune), sampleData[:len(sampleData)-1]) + testRangeQueryWithInternalLimit(t, "ns1", db, 2, "", "", sampleData) + testRangeQueryWithInternalLimit(t, "ns1", db, 5, "", "", sampleData) + + // Scenario 2: We try to fetch either 11 records or all 12 records using pagination. We pass various page sizes while + // keeping the internalQueryLimit as the default one, i.e., 1000. + vdbEnv.config.InternalQueryLimit = defaultLimit + testRangeQueryWithPageSize(t, "ns1", db, 2, string('\u0000'), string(utf8.MaxRune), sampleData[:len(sampleData)-1]) + testRangeQueryWithPageSize(t, "ns1", db, 15, string('\u0000'), string(utf8.MaxRune), sampleData[:len(sampleData)-1]) + testRangeQueryWithPageSize(t, "ns1", db, 2, string('\u0000'), "", sampleData) + testRangeQueryWithPageSize(t, "ns1", db, 15, string('\u0000'), "", sampleData) + testRangeQueryWithPageSize(t, "ns1", db, 2, "", string(utf8.MaxRune), sampleData[:len(sampleData)-1]) + testRangeQueryWithPageSize(t, "ns1", db, 15, "", string(utf8.MaxRune), sampleData[:len(sampleData)-1]) + testRangeQueryWithPageSize(t, "ns1", db, 2, "", "", sampleData) + testRangeQueryWithPageSize(t, "ns1", db, 15, "", "", sampleData) + + // Scenario 3: We try to fetch either 11 records or all 12 records using pagination. We pass various page sizes while + // keeping the internalQueryLimit to 1. + vdbEnv.config.InternalQueryLimit = 1 + testRangeQueryWithPageSize(t, "ns1", db, 2, string('\u0000'), string(utf8.MaxRune), sampleData[:len(sampleData)-1]) + testRangeQueryWithPageSize(t, "ns1", db, 15, string('\u0000'), string(utf8.MaxRune), sampleData[:len(sampleData)-1]) + testRangeQueryWithPageSize(t, "ns1", db, 2, string('\u0000'), "", sampleData) + testRangeQueryWithPageSize(t, "ns1", db, 15, string('\u0000'), "", sampleData) + testRangeQueryWithPageSize(t, "ns1", db, 2, "", string(utf8.MaxRune), sampleData[:len(sampleData)-1]) + testRangeQueryWithPageSize(t, "ns1", db, 15, "", string(utf8.MaxRune), sampleData[:len(sampleData)-1]) + testRangeQueryWithPageSize(t, "ns1", db, 2, "", "", sampleData) + testRangeQueryWithPageSize(t, "ns1", db, 15, "", "", sampleData) +} + +func testRangeQueryWithInternalLimit( + t *testing.T, + ns string, + db *VersionedDB, + limit int, + startKey, endKey string, + expectedResults []*statedb.VersionedKV, +) { + vdbEnv.config.InternalQueryLimit = limit + require.Equal(t, int32(limit), db.couchInstance.internalQueryLimit()) + itr, err := db.GetStateRangeScanIterator(ns, startKey, endKey) + require.NoError(t, err) + require.Equal(t, int32(limit), itr.(*queryScanner).queryDefinition.internalQueryLimit) + results := []*statedb.VersionedKV{} + for { + result, err := itr.Next() + require.NoError(t, err) + if result == nil { + itr.Close() + break + } + kv := result.(*statedb.VersionedKV) + results = append(results, kv) + } + require.Equal(t, expectedResults, results) +} + +func testRangeQueryWithPageSize( + t *testing.T, + ns string, + db *VersionedDB, + pageSize int, + startKey, endKey string, + expectedResults []*statedb.VersionedKV, +) { + itr, err := db.GetStateRangeScanIteratorWithPagination(ns, startKey, endKey, int32(pageSize)) + require.NoError(t, err) + results := []*statedb.VersionedKV{} + for { + result, err := itr.Next() + require.NoError(t, err) + if result != nil { + kv := result.(*statedb.VersionedKV) + results = append(results, kv) + continue + } + nextStartKey := itr.GetBookmarkAndClose() + if nextStartKey == endKey { + break + } + itr, err = db.GetStateRangeScanIteratorWithPagination(ns, nextStartKey, endKey, int32(pageSize)) + require.NoError(t, err) + continue + } + require.Equal(t, expectedResults, results) +}