Skip to content

Commit

Permalink
FAB-2228: CouchDB docs to have consistent header
Browse files Browse the repository at this point in the history
Similar to Couch JSON documents, the version information
 is now stored in header elements of the JSON record.
The ReadDoc and SaveDoc methods operate on CouchDoc type
which encapsulates the JSON and binary data.

Change-Id: I4343161a27fa39a79788063a2b37570046bb782b
Signed-off-by: Balaji Viswanathan <balaji.viswanathan@gmail.com>
  • Loading branch information
bviswana101 committed Feb 16, 2017
1 parent f590121 commit b266c7b
Show file tree
Hide file tree
Showing 3 changed files with 196 additions and 167 deletions.
115 changes: 55 additions & 60 deletions core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ var logger = logging.MustGetLogger("statecouchdb")

var compositeKeySep = []byte{0x00}
var lastKeyIndicator = byte(0x01)
var savePointKey = []byte{0x00}

var binaryWrapper = "valueBytes"

// VersionedDBProvider implements interface VersionedDBProvider
type VersionedDBProvider struct {
Expand Down Expand Up @@ -119,70 +120,71 @@ func (vdb *VersionedDB) GetState(namespace string, key string) (*statedb.Version

compositeKey := constructCompositeKey(namespace, key)

docBytes, _, err := vdb.db.ReadDoc(string(compositeKey))
couchDoc, _, err := vdb.db.ReadDoc(string(compositeKey))
if err != nil {
return nil, err
}
if docBytes == nil {
if couchDoc == nil {
return nil, nil
}

// trace the first 200 bytes of value only, in case it is huge
if docBytes != nil && logger.IsEnabledFor(logging.DEBUG) {
if len(docBytes) < 200 {
logger.Debugf("getCommittedValueAndVersion() Read docBytes %s", docBytes)
if couchDoc.JSONValue != nil && logger.IsEnabledFor(logging.DEBUG) {
if len(couchDoc.JSONValue) < 200 {
logger.Debugf("getCommittedValueAndVersion() Read docBytes %s", couchDoc.JSONValue)
} else {
logger.Debugf("getCommittedValueAndVersion() Read docBytes %s...", docBytes[0:200])
logger.Debugf("getCommittedValueAndVersion() Read docBytes %s...", couchDoc.JSONValue[0:200])
}
}

//remove the data wrapper and return the value and version
returnValue, returnVersion := removeDataWrapper(docBytes)
returnValue, returnVersion := removeDataWrapper(couchDoc.JSONValue, couchDoc.Attachments)

return &statedb.VersionedValue{Value: returnValue, Version: &returnVersion}, nil
}

func removeDataWrapper(wrappedValue []byte) ([]byte, version.Height) {
func removeDataWrapper(wrappedValue []byte, attachments []couchdb.Attachment) ([]byte, version.Height) {

//initialize the return value
returnValue := []byte{}
returnValue := []byte{} // TODO: empty byte or nil

//initialize a default return version
returnVersion := version.NewHeight(0, 0)

//if this is a JSON, then remove the data wrapper
if couchdb.IsJSON(string(wrappedValue)) {

//create a generic map for the json
jsonResult := make(map[string]interface{})
//create a generic map for the json
jsonResult := make(map[string]interface{})

//unmarshal the selected json into the generic map
json.Unmarshal(wrappedValue, &jsonResult)
//unmarshal the selected json into the generic map
json.Unmarshal(wrappedValue, &jsonResult)

// handle binary or json data
if jsonResult[dataWrapper] == nil && attachments != nil { // binary attachment
// get binary data from attachment
for _, attachment := range attachments {
if attachment.Name == binaryWrapper {
returnValue = attachment.AttachmentBytes
}
}
} else {
//place the result json in the data key
returnMap := jsonResult[dataWrapper]

//marshal the mapped data. this wrappers the result in a key named "data"
returnValue, _ = json.Marshal(returnMap)

//create an array containing the blockNum and txNum
versionArray := strings.Split(fmt.Sprintf("%s", jsonResult["version"]), ":")

//convert the blockNum from String to unsigned int
blockNum, _ := strconv.ParseUint(versionArray[0], 10, 64)
}

//convert the txNum from String to unsigned int
txNum, _ := strconv.ParseUint(versionArray[1], 10, 64)
//create an array containing the blockNum and txNum
versionArray := strings.Split(fmt.Sprintf("%s", jsonResult["version"]), ":")

//create the version based on the blockNum and txNum
returnVersion = version.NewHeight(blockNum, txNum)
//convert the blockNum from String to unsigned int
blockNum, _ := strconv.ParseUint(versionArray[0], 10, 64)

} else {
//convert the txNum from String to unsigned int
txNum, _ := strconv.ParseUint(versionArray[1], 10, 64)

//this is a binary, so decode the value and version from the binary
returnValue, returnVersion = statedb.DecodeValue(wrappedValue)

}
//create the version based on the blockNum and txNum
returnVersion = version.NewHeight(blockNum, txNum)

return returnValue, *returnVersion

Expand Down Expand Up @@ -262,41 +264,34 @@ func (vdb *VersionedDB) ApplyUpdates(batch *statedb.UpdateBatch, height *version
vdb.db.DeleteDoc(string(compositeKey), "")

} else {
couchDoc := &couchdb.CouchDoc{}

//Check to see if the value is a valid JSON
//If this is not a valid JSON, then store as an attachment
if couchdb.IsJSON(string(vv.Value)) {

// SaveDoc using couchdb client and use JSON format
rev, err := vdb.db.SaveDoc(string(compositeKey), "", addVersionAndChainCodeID(vv.Value, ns, vv.Version), nil)
if err != nil {
logger.Errorf("Error during Commit(): %s\n", err.Error())
return err
}
if rev != "" {
logger.Debugf("Saved document revision number: %s\n", rev)
}

// Handle it as json
couchDoc.JSONValue = addVersionAndChainCodeID(vv.Value, ns, vv.Version)
} else { // if the data is not JSON, save as binary attachment in Couch

//Create an attachment structure and load the bytes
attachment := &couchdb.Attachment{}
attachment.AttachmentBytes = statedb.EncodeValue(vv.Value, vv.Version)
attachment.AttachmentBytes = vv.Value
attachment.ContentType = "application/octet-stream"
attachment.Name = "valueBytes"
attachment.Name = binaryWrapper

attachments := []couchdb.Attachment{}
attachments = append(attachments, *attachment)
couchDoc.Attachments = append(couchDoc.Attachments, *attachment)
couchDoc.JSONValue = addVersionAndChainCodeID(nil, ns, vv.Version)
}

// SaveDoc using couchdb client and use attachment to persist the binary data
rev, err := vdb.db.SaveDoc(string(compositeKey), "", addVersionAndChainCodeID(nil, ns, vv.Version), attachments)
if err != nil {
logger.Errorf("Error during Commit(): %s\n", err.Error())
return err
}
if rev != "" {
logger.Debugf("Saved document revision number: %s\n", rev)
}
// SaveDoc using couchdb client and use attachment to persist the binary data
rev, err := vdb.db.SaveDoc(string(compositeKey), "", couchDoc)
if err != nil {
logger.Errorf("Error during Commit(): %s\n", err.Error())
return err
}
if rev != "" {
logger.Debugf("Saved document revision number: %s\n", rev)
}
}
}
Expand Down Expand Up @@ -381,7 +376,7 @@ func (vdb *VersionedDB) recordSavepoint(height *version.Height) error {
}

// SaveDoc using couchdb client and use JSON format
_, err = vdb.db.SaveDoc(savepointDocID, "", savepointDocJSON, nil)
_, err = vdb.db.SaveDoc(savepointDocID, "", &couchdb.CouchDoc{JSONValue: savepointDocJSON, Attachments: nil})
if err != nil {
logger.Errorf("Failed to save the savepoint to DB %s\n", err.Error())
return err
Expand All @@ -400,19 +395,19 @@ func (vdb *VersionedDB) recordSavepoint(height *version.Height) error {
func (vdb *VersionedDB) GetLatestSavePoint() (*version.Height, error) {

var err error
savepointJSON, _, err := vdb.db.ReadDoc(savepointDocID)
couchDoc, _, err := vdb.db.ReadDoc(savepointDocID)
if err != nil {
logger.Errorf("Failed to read savepoint data %s\n", err.Error())
return &version.Height{BlockNum: 0, TxNum: 0}, err
}

// ReadDoc() not found (404) will result in nil response, in these cases return height 0
if savepointJSON == nil {
if couchDoc.JSONValue == nil {
return &version.Height{BlockNum: 0, TxNum: 0}, nil
}

savepointDoc := &couchSavepointData{}
err = json.Unmarshal(savepointJSON, &savepointDoc)
err = json.Unmarshal(couchDoc.JSONValue, &savepointDoc)
if err != nil {
logger.Errorf("Failed to unmarshal savepoint data %s\n", err.Error())
return &version.Height{BlockNum: 0, TxNum: 0}, err
Expand Down Expand Up @@ -456,7 +451,7 @@ func (scanner *kvScanner) Next() (statedb.QueryResult, error) {
_, key := splitCompositeKey([]byte(selectedKV.ID))

//remove the data wrapper and return the value and version
returnValue, returnVersion := removeDataWrapper(selectedKV.Value)
returnValue, returnVersion := removeDataWrapper(selectedKV.Value, selectedKV.Attachments)

return &statedb.VersionedKV{
CompositeKey: statedb.CompositeKey{Namespace: scanner.namespace, Key: key},
Expand Down Expand Up @@ -489,7 +484,7 @@ func (scanner *queryScanner) Next() (statedb.QueryResult, error) {
namespace, key := splitCompositeKey([]byte(selectedResultRecord.ID))

//remove the data wrapper and return the value and version
returnValue, returnVersion := removeDataWrapper(selectedResultRecord.Value)
returnValue, returnVersion := removeDataWrapper(selectedResultRecord.Value, selectedResultRecord.Attachments)

return &statedb.VersionedQueryRecord{
Namespace: namespace,
Expand Down
Loading

0 comments on commit b266c7b

Please sign in to comment.