Skip to content

Commit

Permalink
Merge "FAB-2959 Add CouchDB batch update operations"
Browse files Browse the repository at this point in the history
  • Loading branch information
hacera-jonathan authored and Gerrit Code Review committed Apr 9, 2017
2 parents 2663d8b + 73a2a6f commit 0640d43
Show file tree
Hide file tree
Showing 2 changed files with 250 additions and 5 deletions.
130 changes: 126 additions & 4 deletions core/ledger/util/couchdb/couchdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package couchdb
import (
"bytes"
"compress/gzip"
"encoding/base64"
"encoding/json"
"fmt"
"io"
Expand Down Expand Up @@ -157,10 +158,11 @@ type Attachment struct {
AttachmentBytes []byte
}

//DocRev returns the Id and revision for a couchdb document
type DocRev struct {
Id string `json:"_id"`
Rev string `json:"_rev"`
//DocMetadata returns the ID, version and revision for a couchdb document
type DocMetadata struct {
ID string
Rev string
Version string
}

//FileDetails defines the structure needed to send an attachment to couchdb
Expand All @@ -176,6 +178,33 @@ type CouchDoc struct {
Attachments []Attachment
}

//BatchRetrieveDocMedatadataResponse is used for processing REST batch responses from CouchDB
type BatchRetrieveDocMedatadataResponse struct {
Rows []struct {
ID string `json:"id"`
Doc struct {
ID string `json:"_id"`
Rev string `json:"_rev"`
Version string `json:"version"`
} `json:"doc"`
} `json:"rows"`
}

//BatchUpdateResponse defines a structure for batch update response
type BatchUpdateResponse struct {
ID string `json:"id"`
Error string `json:"error"`
Reason string `json:"reason"`
Ok bool `json:"ok"`
Rev string `json:"rev"`
}

//Base64Attachment contains the definition for an attached file for couchdb
type Base64Attachment struct {
ContentType string `json:"content_type"`
AttachmentData string `json:"data"`
}

//CreateConnectionDefinition for a new client connection
func CreateConnectionDefinition(couchDBAddress, username, password string) (*CouchConnectionDef, error) {

Expand Down Expand Up @@ -568,6 +597,7 @@ func getRevisionHeader(resp *http.Response) (string, error) {
//ReadDoc method provides function to retrieve a document from the database by id
func (dbclient *CouchDatabase) ReadDoc(id string) (*CouchDoc, string, error) {
var couchDoc CouchDoc

logger.Debugf("Entering ReadDoc() id=[%s]", id)
if !utf8.ValidString(id) {
return nil, "", fmt.Errorf("doc id [%x] not a valid utf8 string", id)
Expand Down Expand Up @@ -930,6 +960,98 @@ func (dbclient *CouchDatabase) QueryDocuments(query string) (*[]QueryResult, err

}

//BatchUpdateDocuments - batch method to batch update documents
func (dbclient *CouchDatabase) BatchUpdateDocuments(documents []*CouchDoc) ([]*BatchUpdateResponse, error) {

logger.Debugf("Entering BatchUpdateDocuments() documents=%v", documents)

batchURL, err := url.Parse(dbclient.CouchInstance.conf.URL)
if err != nil {
logger.Errorf("URL parse error: %s", err.Error())
return nil, err
}
batchURL.Path = dbclient.DBName + "/_bulk_docs"

documentMap := make(map[string]interface{})

var jsonDocumentMap []interface{}

for _, jsonDocument := range documents {

//create a document map
var document = make(map[string]interface{})

//unmarshal the JSON component of the CouchDoc into the document
json.Unmarshal(jsonDocument.JSONValue, &document)

//iterate through any attachments
if len(jsonDocument.Attachments) > 0 {

//create a file attachment map
fileAttachment := make(map[string]interface{})

//for each attachment, create a Base64Attachment, name the attachment,
//add the content type and base64 encode the attachment
for _, attachment := range jsonDocument.Attachments {
fileAttachment[attachment.Name] = Base64Attachment{attachment.ContentType,
base64.StdEncoding.EncodeToString(attachment.AttachmentBytes)}
}

//add attachments to the document
document["_attachments"] = fileAttachment

}

//Append the document to the map of documents
jsonDocumentMap = append(jsonDocumentMap, document)

}

//Add the documents to the "docs" item
documentMap["docs"] = jsonDocumentMap

jsonKeys, err := json.Marshal(documentMap)

if err != nil {
return nil, err
}

//Set up a buffer for the data to be pushed to couchdb
data := new(bytes.Buffer)

data.ReadFrom(bytes.NewReader(jsonKeys))

resp, _, err := dbclient.CouchInstance.handleRequest(http.MethodPost, batchURL.String(), data, "", "")
if err != nil {
return nil, err
}
defer resp.Body.Close()

if logger.IsEnabledFor(logging.DEBUG) {
dump, _ := httputil.DumpResponse(resp, false)
// compact debug log by replacing carriage return / line feed with dashes to separate http headers
logger.Debugf("HTTP Response: %s", bytes.Replace(dump, []byte{0x0d, 0x0a}, []byte{0x20, 0x7c, 0x20}, -1))
}

//handle as JSON document
jsonResponseRaw, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}

var jsonResponse = []*BatchUpdateResponse{}

err2 := json.Unmarshal(jsonResponseRaw, &jsonResponse)
if err2 != nil {
return nil, err2
}

logger.Debugf("Exiting BatchUpdateDocuments()")

return jsonResponse, nil

}

//handleRequest method is a generic http request handler
func (couchInstance *CouchInstance) handleRequest(method, connectURL string, data io.Reader, rev string, multipartBoundary string) (*http.Response, *DBReturn, error) {

Expand Down
125 changes: 124 additions & 1 deletion core/ledger/util/couchdb/couchdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ var badConnectURL = "couchdb:5990"
var username = ""
var password = ""

const updateDocumentConflictError = "conflict"
const updateDocumentConflictReason = "Document update conflict."

func cleanup(database string) error {
//create a new connection
couchInstance, err := CreateCouchInstance(connectURL, username, password)
Expand Down Expand Up @@ -142,6 +145,7 @@ func TestDBCreateEnsureFullCommit(t *testing.T) {
}
}
}

func TestDBBadDatabaseName(t *testing.T) {

if ledgerconfig.IsCouchDBEnabled() == true {
Expand Down Expand Up @@ -318,7 +322,7 @@ func TestDBBadJSON(t *testing.T) {
}

func TestPrefixScan(t *testing.T) {
if !ledgerconfig.IsCouchDBEnabled() {
if !ledgerconfig.IsCouchDBEnabled() == true {
return
}
database := "testprefixscan"
Expand Down Expand Up @@ -772,3 +776,122 @@ func TestRichQuery(t *testing.T) {
}
}
}

func TestBatchCreateRetrieve(t *testing.T) {

if ledgerconfig.IsCouchDBEnabled() == true {

byteJSON01 := []byte(`{"_id":"marble01","asset_name":"marble01","color":"blue","size":"1","owner":"jerry"}`)
byteJSON02 := []byte(`{"_id":"marble02","asset_name":"marble02","color":"red","size":"2","owner":"tom"}`)
byteJSON03 := []byte(`{"_id":"marble03","asset_name":"marble03","color":"green","size":"3","owner":"jerry"}`)
byteJSON04 := []byte(`{"_id":"marble04","asset_name":"marble04","color":"purple","size":"4","owner":"tom"}`)
byteJSON05 := []byte(`{"_id":"marble05","asset_name":"marble05","color":"blue","size":"5","owner":"jerry"}`)

attachment1 := &Attachment{}
attachment1.AttachmentBytes = []byte(`marble01 - test attachment`)
attachment1.ContentType = "application/octet-stream"
attachment1.Name = "data"
attachments1 := []Attachment{}
attachments1 = append(attachments1, *attachment1)

attachment2 := &Attachment{}
attachment2.AttachmentBytes = []byte(`marble02 - test attachment`)
attachment2.ContentType = "application/octet-stream"
attachment2.Name = "data"
attachments2 := []Attachment{}
attachments2 = append(attachments2, *attachment2)

attachment3 := &Attachment{}
attachment3.AttachmentBytes = []byte(`marble03 - test attachment`)
attachment3.ContentType = "application/octet-stream"
attachment3.Name = "data"
attachments3 := []Attachment{}
attachments3 = append(attachments3, *attachment3)

attachment4 := &Attachment{}
attachment4.AttachmentBytes = []byte(`marble04 - test attachment`)
attachment4.ContentType = "application/octet-stream"
attachment4.Name = "data"
attachments4 := []Attachment{}
attachments4 = append(attachments4, *attachment4)

attachment5 := &Attachment{}
attachment5.AttachmentBytes = []byte(`marble05 - test attachment`)
attachment5.ContentType = "application/octet-stream"
attachment5.Name = "data"
attachments5 := []Attachment{}
attachments5 = append(attachments5, *attachment5)

database := "testbatch"
err := cleanup(database)
testutil.AssertNoError(t, err, fmt.Sprintf("Error when trying to cleanup Error: %s", err))
defer cleanup(database)

//create a new instance and database object --------------------------------------------------------
couchInstance, err := CreateCouchInstance(connectURL, username, password)
testutil.AssertNoError(t, err, fmt.Sprintf("Error when trying to create couch instance"))
db := CouchDatabase{CouchInstance: *couchInstance, DBName: database}

//create a new database
_, errdb := db.CreateDatabaseIfNotExist()
testutil.AssertNoError(t, errdb, fmt.Sprintf("Error when trying to create database"))

batchUpdateDocs := []*CouchDoc{}

value1 := &CouchDoc{JSONValue: byteJSON01, Attachments: attachments1}
value2 := &CouchDoc{JSONValue: byteJSON02, Attachments: attachments2}
value3 := &CouchDoc{JSONValue: byteJSON03, Attachments: attachments3}
value4 := &CouchDoc{JSONValue: byteJSON04, Attachments: attachments4}
value5 := &CouchDoc{JSONValue: byteJSON05, Attachments: attachments5}

batchUpdateDocs = append(batchUpdateDocs, value1)
batchUpdateDocs = append(batchUpdateDocs, value2)
batchUpdateDocs = append(batchUpdateDocs, value3)
batchUpdateDocs = append(batchUpdateDocs, value4)
batchUpdateDocs = append(batchUpdateDocs, value5)

batchUpdateResp, err := db.BatchUpdateDocuments(batchUpdateDocs)
testutil.AssertNoError(t, err, fmt.Sprintf("Error when attempting to update a batch of documents"))

//check to make sure each batch update response was successful
for _, updateDoc := range batchUpdateResp {
testutil.AssertEquals(t, updateDoc.Ok, true)
}

//----------------------------------------------
//Test Retrieve JSON
dbGetResp, _, geterr := db.ReadDoc("marble01")
testutil.AssertNoError(t, geterr, fmt.Sprintf("Error when attempting read a document"))

assetResp := &Asset{}
geterr = json.Unmarshal(dbGetResp.JSONValue, &assetResp)
testutil.AssertNoError(t, geterr, fmt.Sprintf("Error when trying to retrieve a document"))
//Verify the owner retrieved matches
testutil.AssertEquals(t, assetResp.Owner, "jerry")

//----------------------------------------------
//Test retrieve binary
dbGetResp, _, geterr = db.ReadDoc("marble03")
testutil.AssertNoError(t, geterr, fmt.Sprintf("Error when attempting read a document"))
//Retrieve the attachments
attachments := dbGetResp.Attachments
//Only one was saved, so take the first
retrievedAttachment := attachments[0]
//Verify the text matches
testutil.AssertEquals(t, attachment3.AttachmentBytes, retrievedAttachment.AttachmentBytes)
//----------------------------------------------
//Test Bad Updates
batchUpdateDocs = []*CouchDoc{}
batchUpdateDocs = append(batchUpdateDocs, value1)
batchUpdateDocs = append(batchUpdateDocs, value2)
batchUpdateResp, err = db.BatchUpdateDocuments(batchUpdateDocs)
testutil.AssertNoError(t, err, fmt.Sprintf("Error when attempting to update a batch of documents"))
//No revision was provided, so these two updates should fail
//Verify that the "Ok" field is returned as false
for _, updateDoc := range batchUpdateResp {
testutil.AssertEquals(t, updateDoc.Ok, false)
testutil.AssertEquals(t, updateDoc.Error, updateDocumentConflictError)
testutil.AssertEquals(t, updateDoc.Reason, updateDocumentConflictReason)
}
}
}

0 comments on commit 0640d43

Please sign in to comment.