Skip to content

Commit

Permalink
FAB-1818 Create data wrapper for state data in CouchDB
Browse files Browse the repository at this point in the history
Motivation for this change:
Maintain the version inline with the document, without changing the
structure/value of the stored document.  This will prevent unexpected
data values being returned and will prevent possible name collisions
with document values.

- Create a data wrapper for ledger JSON data stored in CouchDB.

- Return the version based on block number and transaction id

The wrapper will be implemented as a new key named "data" which will
contain the JSON data for the state database.

Prior to change example:

"doc": {
 "_id": "2",
 "_rev": "2-8ee0c31b21ad650e5b872c0b98e59ab5",
 "version":"1:2"
 "asset_name": "marble2",
 "color": "red",
 "owner": "tom",
 "size": "25"
}

 Following change:

 "doc": {
  "_id": "2",
  "_rev": "2-8ee0c31b21ad650e5b872c0b98e59ab5",
  "version":"1:2"
  "data": {
   "asset_name": "marble2",
   "color": "red",
   "owner": "tom",
   "size": "25"
  }
 }

Change-Id: I59391ea926531c46c346fc8448e3d041ca5f3fdf
Signed-off-by: Chris Elder <chris.elder@us.ibm.com>
  • Loading branch information
Chris Elder committed Jan 27, 2017
1 parent cf28448 commit ead6705
Show file tree
Hide file tree
Showing 10 changed files with 385 additions and 56 deletions.
29 changes: 23 additions & 6 deletions core/ledger/kvledger/txmgmt/statedb/commontests/test_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,8 @@ func TestBasicRW(t *testing.T, dbProvider statedb.VersionedDBProvider) {
vv, _ := db.GetState("ns1", "key1")
testutil.AssertEquals(t, vv, &vv1)

//TODO re-enable test after Couch version wrapper is added
//vv, _ = db.GetState("ns2", "key4")
//testutil.AssertEquals(t, vv, &vv4)
vv, _ = db.GetState("ns2", "key4")
testutil.AssertEquals(t, vv, &vv4)

sp, err := db.GetLatestSavePoint()
testutil.AssertNoError(t, err, "")
Expand Down Expand Up @@ -91,9 +90,8 @@ func TestMultiDBBasicRW(t *testing.T, dbProvider statedb.VersionedDBProvider) {
testutil.AssertNoError(t, err, "")
testutil.AssertEquals(t, sp, savePoint1)

//TODO re-enable test after Couch version wrapper is added
//vv, _ = db2.GetState("ns1", "key1")
//testutil.AssertEquals(t, vv, &vv3)
vv, _ = db2.GetState("ns1", "key1")
testutil.AssertEquals(t, vv, &vv3)

sp, err = db2.GetLatestSavePoint()
testutil.AssertNoError(t, err, "")
Expand Down Expand Up @@ -222,4 +220,23 @@ func TestQuery(t *testing.T, dbProvider statedb.VersionedDBProvider) {
queryResult3, err := itr.Next()
testutil.AssertNoError(t, err, "")
testutil.AssertNil(t, queryResult3)

// query with fields
itr, err = db.ExecuteQuery("{\"selector\":{\"owner\":\"jerry\"},\"fields\": [\"owner\", \"asset_name\", \"color\", \"size\"]}")
testutil.AssertNoError(t, err, "")

// verify one jerry result
queryResult1, err = itr.Next()
testutil.AssertNoError(t, err, "")
testutil.AssertNotNil(t, queryResult1)
versionedQueryRecord = queryResult1.(*statedb.VersionedQueryRecord)
stringRecord = string(versionedQueryRecord.Record)
bFoundJerry = strings.Contains(stringRecord, "jerry")
testutil.AssertEquals(t, bFoundJerry, true)

// verify no more results
queryResult2, err = itr.Next()
testutil.AssertNoError(t, err, "")
testutil.AssertNil(t, queryResult2)

}
139 changes: 139 additions & 0 deletions core/ledger/kvledger/txmgmt/statedb/statecouchdb/query_wrapper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package statecouchdb

import (
"encoding/json"
"fmt"
"strings"
)

//ApplyQueryWrapper parses the query string passed to CouchDB
//the wrapper prepends the wrapper "data." to all fields specified in the query
//All fields in the selector must have "data." prepended to the field names
//Fields listed in fields key will have "data." prepended
//Fields in the sort key will have "data." prepended
func ApplyQueryWrapper(queryString string) []byte {

//create a generic map for the query json
jsonQuery := make(map[string]interface{})

//unmarshal the selected json into the generic map
json.Unmarshal([]byte(queryString), &jsonQuery)

//iterate through the JSON query
for jsonKey, jsonValue := range jsonQuery {

//create a case for the data types found in the JSON query
switch jsonQueryPart := jsonValue.(type) {

//if the type is an array, then this is either the "fields" or "sort" part of the query
case []interface{}:

//check to see if this is a "fields" or "sort" array
//if jsonKey == jsonQueryFields || jsonKey == jsonQuerySort {
if jsonKey == jsonQueryFields {

//iterate through the names and add the data wrapper for each field
for itemKey, fieldName := range jsonQueryPart {

//add "data" wrapper to each field definition
jsonQueryPart[itemKey] = fmt.Sprintf("%v.%v", dataWrapper, fieldName)
}

//Add the "_id" and "version" fields, these are needed by default
if jsonKey == jsonQueryFields {

jsonQueryPart = append(jsonQueryPart, "_id")
jsonQueryPart = append(jsonQueryPart, "version")

//Overwrite the query fields if the "_id" field has been added
jsonQuery[jsonQueryFields] = jsonQueryPart
}

}

if jsonKey == jsonQuerySort {

//iterate through the names and add the data wrapper for each field
for sortItemKey, sortField := range jsonQueryPart {

//create a case for the data types found in the JSON query
switch sortFieldType := sortField.(type) {

//if the type is string, then this is a simple array of field names.
//Add the datawrapper to the field name
case string:

//simple case, update the existing array item with the updated name
jsonQueryPart[sortItemKey] = fmt.Sprintf("%v.%v", dataWrapper, sortField)

case interface{}:

//this case is a little more complicated. Here we need to
//iterate over the mapped field names since this is an array of objects
//example: {"fieldname":"desc"}
for key, itemValue := range sortField.(map[string]interface{}) {
//delete the mapping for the field definition, since we have to change the
//value of the key
delete(sortField.(map[string]interface{}), key)

//add the key back into the map with the field name wrapped with then "data" wrapper
sortField.(map[string]interface{})[fmt.Sprintf("%v.%v", dataWrapper, key)] = itemValue
}

default:

logger.Debugf("The type %v was not recognized as a valid sort field type.", sortFieldType)

}

}
}

case interface{}:

//if this is the "selector", the field names need to be mapped with the
//data wrapper
if jsonKey == jsonQuerySelector {

processSelector(jsonQueryPart.(map[string]interface{}))

}

default:

logger.Debugf("The value %v was not recognized as a valid selector field.", jsonKey)

}
}

//Marshal the updated json query
editedQuery, _ := json.Marshal(jsonQuery)

logger.Debugf("Rewritten query with data wrapper: %s", editedQuery)

return editedQuery

}

//processSelector is a recursion function for traversing the selector part of the query
func processSelector(selectorFragment map[string]interface{}) {

//iterate through the top level definitions
for itemKey, itemValue := range selectorFragment {

//check to see if the itemKey starts with a $. If so, this indicates an operator
if strings.HasPrefix(fmt.Sprintf("%s", itemKey), "$") {

processSelector(itemValue.(map[string]interface{}))

} else {

//delete the mapping for the field definition, since we have to change the
//value of the key
delete(selectorFragment, itemKey)
//add the key back into the map with the field name wrapped with then "data" wrapper
selectorFragment[fmt.Sprintf("%v.%v", dataWrapper, itemKey)] = itemValue

}
}
}
107 changes: 95 additions & 12 deletions core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"encoding/json"
"errors"
"fmt"
"strconv"
"strings"
"sync"

Expand All @@ -37,6 +38,11 @@ var compositeKeySep = []byte{0x00}
var lastKeyIndicator = byte(0x01)
var savePointKey = []byte{0x00}

var dataWrapper = "data"
var jsonQuerySort = "sort"
var jsonQueryFields = "fields"
var jsonQuerySelector = "selector"

// VersionedDBProvider implements interface VersionedDBProvider
type VersionedDBProvider struct {
couchInstance *couchdb.CouchInstance
Expand Down Expand Up @@ -135,9 +141,56 @@ func (vdb *VersionedDB) GetState(namespace string, key string) (*statedb.Version
}
}

ver := version.NewHeight(1, 1) //TODO - version hardcoded to 1 is a temporary value for the prototype
//remove the data wrapper and return the value and version
returnValue, returnVersion := removeDataWrapper(docBytes)

return &statedb.VersionedValue{Value: returnValue, Version: &returnVersion}, nil
}

func removeDataWrapper(wrappedValue []byte) ([]byte, version.Height) {

//initialize the return value
returnValue := []byte{}

//initialize a default return version
returnVersion := version.NewHeight(0, 0)

//if this is a JSON, then remove the data wrapper
if couchdb.IsJSON(string(wrappedValue)) {

//create a generic map for the json
jsonResult := make(map[string]interface{})

//unmarshal the selected json into the generic map
json.Unmarshal(wrappedValue, &jsonResult)

//place the result json in the data key
returnMap := jsonResult[dataWrapper]

//marshal the mapped data. this wrappers the result in a key named "data"
returnValue, _ = json.Marshal(returnMap)

//create an array containing the blockNum and txNum
versionArray := strings.Split(fmt.Sprintf("%s", jsonResult["version"]), ":")

//convert the blockNum from String to unsigned int
blockNum, _ := strconv.ParseUint(versionArray[0], 10, 64)

//convert the txNum from String to unsigned int
txNum, _ := strconv.ParseUint(versionArray[1], 10, 64)

//create the version based on the blockNum and txNum
returnVersion = version.NewHeight(blockNum, txNum)

} else {

//this is a binary, so decode the value and version from the binary
returnValue, returnVersion = statedb.DecodeValue(wrappedValue)

}

return returnValue, *returnVersion

return &statedb.VersionedValue{Value: docBytes, Version: ver}, nil
}

// GetStateMultipleKeys implements method in VersionedDB interface
Expand Down Expand Up @@ -181,7 +234,7 @@ func (vdb *VersionedDB) ExecuteQuery(query string) (statedb.ResultsIterator, err
//TODO - limit is currently set at 1000, eventually this will need to be changed
//to reflect a config option and potentially return an exception if the threshold is exceeded
// skip (paging) is not utilized by fabric
queryResult, err := vdb.db.QueryDocuments(query, 1000, 0)
queryResult, err := vdb.db.QueryDocuments(string(ApplyQueryWrapper(query)), 1000, 0)
if err != nil {
logger.Debugf("Error calling QueryDocuments(): %s\n", err.Error())
return nil, err
Expand Down Expand Up @@ -219,7 +272,7 @@ func (vdb *VersionedDB) ApplyUpdates(batch *statedb.UpdateBatch, height *version
if couchdb.IsJSON(string(vv.Value)) {

// SaveDoc using couchdb client and use JSON format
rev, err := vdb.db.SaveDoc(string(compositeKey), "", vv.Value, nil)
rev, err := vdb.db.SaveDoc(string(compositeKey), "", addVersionAndChainCodeID(vv.Value, ns, vv.Version), nil)
if err != nil {
logger.Errorf("Error during Commit(): %s\n", err.Error())
return err
Expand All @@ -232,23 +285,22 @@ func (vdb *VersionedDB) ApplyUpdates(batch *statedb.UpdateBatch, height *version

//Create an attachment structure and load the bytes
attachment := &couchdb.Attachment{}
attachment.AttachmentBytes = vv.Value
attachment.AttachmentBytes = statedb.EncodeValue(vv.Value, vv.Version)
attachment.ContentType = "application/octet-stream"
attachment.Name = "valueBytes"

attachments := []couchdb.Attachment{}
attachments = append(attachments, *attachment)

// SaveDoc using couchdb client and use attachment to persist the binary data
rev, err := vdb.db.SaveDoc(string(compositeKey), "", nil, attachments)
rev, err := vdb.db.SaveDoc(string(compositeKey), "", addVersionAndChainCodeID(nil, ns, vv.Version), attachments)
if err != nil {
logger.Errorf("Error during Commit(): %s\n", err.Error())
return err
}
if rev != "" {
logger.Debugf("Saved document revision number: %s\n", rev)
}

}
}
}
Expand All @@ -263,6 +315,33 @@ func (vdb *VersionedDB) ApplyUpdates(batch *statedb.UpdateBatch, height *version
return nil
}

//addVersionAndChainCodeID adds keys for version and chaincodeID to the JSON value
func addVersionAndChainCodeID(value []byte, chaincodeID string, version *version.Height) []byte {

//create a version mapping
jsonMap := map[string]interface{}{"version": fmt.Sprintf("%v:%v", version.BlockNum, version.TxNum)}

//add the chaincodeID
jsonMap["chaincodeid"] = chaincodeID

//Add the wrapped data if the value is not null
if value != nil {

//create a new genericMap
rawJSON := (*json.RawMessage)(&value)

//add the rawJSON to the map
jsonMap[dataWrapper] = rawJSON

}

//marshal the data to a byte array
returnJSON, _ := json.Marshal(jsonMap)

return returnJSON

}

// Savepoint docid (key) for couchdb
const savepointDocID = "statedb_savepoint"

Expand Down Expand Up @@ -379,10 +458,12 @@ func (scanner *kvScanner) Next() (statedb.QueryResult, error) {

_, key := splitCompositeKey([]byte(selectedKV.ID))

//TODO - change hardcoded version (1,1) when version header is available in CouchDB
//remove the data wrapper and return the value and version
returnValue, returnVersion := removeDataWrapper(selectedKV.Value)

return &statedb.VersionedKV{
CompositeKey: statedb.CompositeKey{Namespace: scanner.namespace, Key: key},
VersionedValue: statedb.VersionedValue{Value: selectedKV.Value, Version: version.NewHeight(1, 1)}}, nil
VersionedValue: statedb.VersionedValue{Value: returnValue, Version: &returnVersion}}, nil
}

func (scanner *kvScanner) Close() {
Expand Down Expand Up @@ -410,12 +491,14 @@ func (scanner *queryScanner) Next() (statedb.QueryResult, error) {

namespace, key := splitCompositeKey([]byte(selectedResultRecord.ID))

//TODO - change hardcoded version (1,1) when version support is available in CouchDB
//remove the data wrapper and return the value and version
returnValue, returnVersion := removeDataWrapper(selectedResultRecord.Value)

return &statedb.VersionedQueryRecord{
Namespace: namespace,
Key: key,
Version: version.NewHeight(1, 1),
Record: selectedResultRecord.Value}, nil
Version: &returnVersion,
Record: returnValue}, nil
}

func (scanner *queryScanner) Close() {
Expand Down
Loading

0 comments on commit ead6705

Please sign in to comment.