Skip to content

Commit

Permalink
Merge "FAB-1395 - Generic query API for CouchDB"
Browse files Browse the repository at this point in the history
  • Loading branch information
Srinivasan Muralidharan authored and Gerrit Code Review committed Dec 17, 2016
2 parents 83b9915 + 0567b34 commit e29e1dd
Show file tree
Hide file tree
Showing 7 changed files with 373 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ limitations under the License.
package couchdbtxmgmt

import (
"errors"

"github.com/hyperledger/fabric/core/ledger"
)

Expand Down Expand Up @@ -55,7 +53,7 @@ func (q *CouchDBQueryExecutor) GetStateMultipleKeys(namespace string, keys []str
// GetStateRangeScanIterator implements method in interface `ledger.QueryExecutor`
func (q *CouchDBQueryExecutor) GetStateRangeScanIterator(namespace string, startKey string, endKey string) (ledger.ResultsIterator, error) {
//q.checkDone()
scanner, err := q.txmgr.getCommittedRangeScanner(namespace, startKey, endKey)
scanner, err := q.txmgr.getRangeScanner(namespace, startKey, endKey)
if err != nil {
return nil, err
}
Expand All @@ -64,7 +62,11 @@ func (q *CouchDBQueryExecutor) GetStateRangeScanIterator(namespace string, start

// ExecuteQuery implements method in interface `ledger.QueryExecutor`
func (q *CouchDBQueryExecutor) ExecuteQuery(query string) (ledger.ResultsIterator, error) {
return nil, errors.New("Not supported by KV data model")
scanner, err := q.txmgr.getQuery(query)
if err != nil {
return nil, err
}
return &qQueryItr{scanner}, nil
}

// Done implements method in interface `ledger.QueryExecutor`
Expand All @@ -76,20 +78,42 @@ type qKVItr struct {
s *kvScanner
}

type qQueryItr struct {
s *queryScanner
}

// Next implements Next() method in ledger.ResultsIterator
func (itr *qKVItr) Next() (ledger.QueryResult, error) {
committedKV, err := itr.s.next()
KV, err := itr.s.next()
if err != nil {
return nil, err
}
if committedKV == nil {
if KV == nil {
return nil, nil
}

return &ledger.KV{Key: committedKV.key, Value: committedKV.value}, nil
return &ledger.KV{Key: KV.key, Value: KV.value}, nil
}

// Close implements Close() method in ledger.ResultsIterator
func (itr *qKVItr) Close() {
itr.s.close()
}

// Next implements Next() method in ledger.ResultsIterator
func (itr *qQueryItr) Next() (ledger.QueryResult, error) {
queryRecord, err := itr.s.next()
if err != nil {
return nil, err
}
if queryRecord == nil {
return nil, nil
}

return &ledger.QueryRecord{Namespace: queryRecord.namespace, Key: queryRecord.key, Record: queryRecord.record}, nil
}

// Close implements Close() method in ledger.ResultsIterator
func (itr *qQueryItr) Close() {
itr.s.close()
}
49 changes: 43 additions & 6 deletions core/ledger/kvledger/txmgmt/couchdbtxmgmt/couchdb_tx_simulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,22 @@ func (s *CouchDBTxSimulator) GetState(ns string, key string) ([]byte, error) {
// GetStateRangeScanIterator implements method in interface `ledger.QueryExecutor`
func (s *CouchDBTxSimulator) GetStateRangeScanIterator(namespace string, startKey string, endKey string) (ledger.ResultsIterator, error) {
//s.checkDone()
scanner, err := s.txmgr.getCommittedRangeScanner(namespace, startKey, endKey)
scanner, err := s.txmgr.getRangeScanner(namespace, startKey, endKey)
if err != nil {
return nil, err
}
return &sKVItr{scanner, s}, nil
}

// ExecuteQuery implements method in interface `ledger.QueryExecutor`
func (s *CouchDBTxSimulator) ExecuteQuery(query string) (ledger.ResultsIterator, error) {
scanner, err := s.txmgr.getQuery(query)
if err != nil {
return nil, err
}
return &sQueryItr{scanner, s}, nil
}

// SetState implements method in interface `ledger.TxSimulator`
func (s *CouchDBTxSimulator) SetState(ns string, key string, value []byte) error {
logger.Debugf("===COUCHDB=== Entering CouchDBTxSimulator.SetState()")
Expand Down Expand Up @@ -224,28 +233,56 @@ type sKVItr struct {
simulator *CouchDBTxSimulator
}

type sQueryItr struct {
scanner *queryScanner
simulator *CouchDBTxSimulator
}

// Next implements Next() method in ledger.ResultsIterator
// Returns the next item in the result set. The `QueryResult` is expected to be nil when
// the iterator gets exhausted
func (itr *sKVItr) Next() (ledger.QueryResult, error) {
committedKV, err := itr.scanner.next()
kv, err := itr.scanner.next()
if err != nil {
return nil, err
}
if committedKV == nil {
if kv == nil {
return nil, nil
}

// Get existing cache for RW at the namespace of the result set if it exists. If none exists, then create it.
nsRWs := itr.simulator.getOrCreateNsRWHolder(itr.scanner.namespace)
nsRWs.readMap[committedKV.key] = &kvReadCache{
&rwset.KVRead{Key: committedKV.key, Version: committedKV.version}, committedKV.value}
nsRWs.readMap[kv.key] = &kvReadCache{
&rwset.KVRead{Key: kv.key, Version: kv.version}, kv.value}

return &ledger.KV{Key: committedKV.key, Value: committedKV.value}, nil
return &ledger.KV{Key: kv.key, Value: kv.value}, nil
}

// Close implements Close() method in ledger.ResultsIterator
// which releases resources occupied by the iterator.
func (itr *sKVItr) Close() {
itr.scanner.close()
}

// Next implements Next() method in ledger.ResultsIterator
func (itr *sQueryItr) Next() (ledger.QueryResult, error) {
queryRecord, err := itr.scanner.next()
if err != nil {
return nil, err
}
if queryRecord == nil {
return nil, nil
}

// Get existing cache for RW at the namespace of the result set if it exists. If none exists, then create it.
nsRWs := itr.simulator.getOrCreateNsRWHolder(queryRecord.namespace)
nsRWs.readMap[queryRecord.key] = &kvReadCache{
&rwset.KVRead{Key: queryRecord.key, Version: queryRecord.version}, queryRecord.record}

return &ledger.QueryRecord{Namespace: queryRecord.namespace, Key: queryRecord.key, Record: queryRecord.record}, nil
}

// Close implements Close() method in ledger.ResultsIterator
func (itr *sQueryItr) Close() {
itr.scanner.close()
}
93 changes: 93 additions & 0 deletions core/ledger/kvledger/txmgmt/couchdbtxmgmt/couchdb_txmgmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@ limitations under the License.
package couchdbtxmgmt

import (
"encoding/json"
"fmt"
"os"
"testing"

"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version"
"github.com/hyperledger/fabric/core/ledger/ledgerconfig"
"github.com/hyperledger/fabric/core/ledger/testutil"
"github.com/hyperledger/fabric/core/ledger/util/couchdb"
Expand Down Expand Up @@ -139,3 +142,93 @@ func TestSavepoint(t *testing.T) {
txMgr.Shutdown()
}
}

func TestDatabaseQuery(t *testing.T) {

//call a helper method to load the core.yaml
testutil.SetupCoreYAMLConfig("./../../../../../peer")

//Only run the tests if CouchDB is explitily enabled in the code,
//otherwise CouchDB may not be installed and all the tests would fail
//TODO replace this with external config property rather than config within the code
if ledgerconfig.IsCouchDBEnabled() == true {

env := newTestEnv(t)
//env.Cleanup() //cleanup at the beginning to ensure the database doesn't exist already
//defer env.Cleanup() //and cleanup at the end

txMgr := NewCouchDBTxMgr(env.conf,
env.couchDBAddress, //couchDB Address
env.couchDatabaseName, //couchDB db name
env.couchUsername, //enter couchDB id
env.couchPassword) //enter couchDB pw

type Asset struct {
ID string `json:"_id"`
Rev string `json:"_rev"`
AssetName string `json:"asset_name"`
Color string `json:"color"`
Size string `json:"size"`
Owner string `json:"owner"`
}

s1, _ := txMgr.NewTxSimulator()

s1.SetState("ns1", "key1", []byte("value1"))
s1.SetState("ns1", "key2", []byte("value2"))
s1.SetState("ns1", "key3", []byte("value3"))
s1.SetState("ns1", "key4", []byte("value4"))
s1.SetState("ns1", "key5", []byte("value5"))
s1.SetState("ns1", "key6", []byte("value6"))
s1.SetState("ns1", "key7", []byte("value7"))
s1.SetState("ns1", "key8", []byte("value8"))

s1.SetState("ns1", "key9", []byte(`{"asset_name":"marble1","color":"red","size":"25","owner":"jerry"}`))
s1.SetState("ns1", "key10", []byte(`{"asset_name":"marble2","color":"blue","size":"10","owner":"bob"}`))
s1.SetState("ns1", "key11", []byte(`{"asset_name":"marble3","color":"blue","size":"35","owner":"jerry"}`))
s1.SetState("ns1", "key12", []byte(`{"asset_name":"marble4","color":"green","size":"15","owner":"bob"}`))
s1.SetState("ns1", "key13", []byte(`{"asset_name":"marble5","color":"red","size":"35","owner":"jerry"}`))
s1.SetState("ns1", "key14", []byte(`{"asset_name":"marble6","color":"blue","size":"25","owner":"bob"}`))

s1.Done()

// validate and commit RWset
txRWSet := s1.(*CouchDBTxSimulator).getTxReadWriteSet()
isValid, err := txMgr.validateTx(txRWSet)
testutil.AssertNoError(t, err, fmt.Sprintf("Error in validateTx(): %s", err))
testutil.AssertSame(t, isValid, true)
txMgr.addWriteSetToBatch(txRWSet, version.NewHeight(1, 1))
err = txMgr.Commit()
testutil.AssertNoError(t, err, fmt.Sprintf("Error while calling commit(): %s", err))

queryExecuter, _ := txMgr.NewQueryExecutor()
queryString := "{\"selector\":{\"owner\": {\"$eq\": \"bob\"}},\"limit\": 10,\"skip\": 0}"

itr, _ := queryExecuter.ExecuteQuery(queryString)

counter := 0
for {
queryRecord, _ := itr.Next()
if queryRecord == nil {
break
}

//Unmarshal the document to Asset structure
assetResp := &Asset{}
json.Unmarshal(queryRecord.(*ledger.QueryRecord).Record, &assetResp)

//Verify the owner retrieved matches
testutil.AssertEquals(t, assetResp.Owner, "bob")

counter++

}

//Ensure the query returns 3 documents
testutil.AssertEquals(t, counter, 3)

txMgr.Shutdown()

}

}
54 changes: 52 additions & 2 deletions core/ledger/kvledger/txmgmt/couchdbtxmgmt/couchdb_txmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,8 +406,9 @@ func (txmgr *CouchDBTxMgr) getCommittedValueAndVersion(ns string, key string) ([
return docBytes, ver, nil
}

//getCommittedRangeScanner contructs composite start and end keys based on the namespace then calls the CouchDB range scanner
func (txmgr *CouchDBTxMgr) getCommittedRangeScanner(namespace string, startKey string, endKey string) (*kvScanner, error) {
//getRangeScanner contructs composite start and end keys based on the namespace then calls the CouchDB range scanner
//TODO the limit and offset are currently hard coded. The limit should eventually be a config option
func (txmgr *CouchDBTxMgr) getRangeScanner(namespace string, startKey string, endKey string) (*kvScanner, error) {
var compositeStartKey []byte
var compositeEndKey []byte
if startKey != "" {
Expand All @@ -422,6 +423,17 @@ func (txmgr *CouchDBTxMgr) getCommittedRangeScanner(namespace string, startKey s
return newKVScanner(namespace, *queryResult), nil
}

//getQuery calls the CouchDB query documents method (CouchDB _find API)
//TODO the limit and offset are currently hard coded. The limit should eventually be a config option
func (txmgr *CouchDBTxMgr) getQuery(query string) (*queryScanner, error) {

//TODO - limit is currently set at 1000, eventually this will need to be changed
//to reflect a config option and potentially return an exception if the threshold is exceeded
queryResult, _ := txmgr.couchDB.QueryDocuments(query, 1000, 0)

return newQueryScanner(*queryResult), nil
}

func encodeValue(value []byte, version uint64) []byte {
versionBytes := proto.EncodeVarint(version)
deleteMarker := 0
Expand Down Expand Up @@ -495,3 +507,41 @@ func (scanner *kvScanner) close() {

scanner = nil
}

type queryScanner struct {
cursor int
results []couchdb.QueryResult
}

type queryRecord struct {
namespace string
key string
version *version.Height
record []byte
}

func newQueryScanner(queryResults []couchdb.QueryResult) *queryScanner {
return &queryScanner{-1, queryResults}
}

func (scanner *queryScanner) next() (*queryRecord, error) {

scanner.cursor++

if scanner.cursor >= len(scanner.results) {
return nil, nil
}

selectedValue := scanner.results[scanner.cursor]

namespace, key := splitCompositeKey([]byte(selectedValue.ID))

//TODO - change hardcoded version when version support is available in CouchDB
return &queryRecord{namespace, key, version.NewHeight(1, 1), selectedValue.Value}, nil

}

func (scanner *queryScanner) close() {

scanner = nil
}
9 changes: 9 additions & 0 deletions core/ledger/ledger_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ type QueryExecutor interface {
// The returned ResultsIterator contains results of type *KV
GetStateRangeScanIterator(namespace string, startKey string, endKey string) (ResultsIterator, error)
// ExecuteQuery executes the given query and returns an iterator that contains results of type specific to the underlying data store.
// Only used for state databases that support query
ExecuteQuery(query string) (ResultsIterator, error)
// Done releases resources occupied by the QueryExecutor
Done()
Expand Down Expand Up @@ -142,6 +143,14 @@ type KeyModification struct {
Transaction *pb.Transaction
}

// QueryRecord - Result structure for query records. Holds a namespace, key and record.
// Only used for state databases that support query
type QueryRecord struct {
Namespace string
Key string
Record []byte
}

// BlockHolder holds block returned by the iterator in GetBlocksIterator.
// The sole purpose of this holder is to avoid desrialization if block is desired in raw bytes form (e.g., for transfer)
type BlockHolder interface {
Expand Down
Loading

0 comments on commit e29e1dd

Please sign in to comment.