From a932b54d63b6baf4869dc44137403aee9982c251 Mon Sep 17 00:00:00 2001 From: Chris Elder Date: Sat, 8 Apr 2017 12:35:06 -0400 Subject: [PATCH] FAB-3046 Add CouchDB batch select operations This is change 2 of 4 for FAB-2725 CouchDB optimizations Motivation for this change: Interactions with CouchDB are currently done individually. Need to switch to using bulk operations to get optimal performance from CouchDB. Need to performance test and stress test. - Add bulk select methods to couchdb Change-Id: I463530e0875176de0e0ab3cdf7971b1e3d7b4b36 Signed-off-by: Chris Elder --- core/ledger/util/couchdb/couchdb.go | 64 +++++++++++++ core/ledger/util/couchdb/couchdb_test.go | 112 ++++++++++++++++++++++- 2 files changed, 175 insertions(+), 1 deletion(-) diff --git a/core/ledger/util/couchdb/couchdb.go b/core/ledger/util/couchdb/couchdb.go index 7af30da601f..e91862150c0 100644 --- a/core/ledger/util/couchdb/couchdb.go +++ b/core/ledger/util/couchdb/couchdb.go @@ -960,6 +960,70 @@ func (dbclient *CouchDatabase) QueryDocuments(query string) (*[]QueryResult, err } +//BatchRetrieveIDRevision - batch method to retrieve IDs and revisions +func (dbclient *CouchDatabase) BatchRetrieveIDRevision(keys []string) ([]*DocMetadata, error) { + + batchURL, err := url.Parse(dbclient.CouchInstance.conf.URL) + if err != nil { + logger.Errorf("URL parse error: %s", err.Error()) + return nil, err + } + batchURL.Path = dbclient.DBName + "/_all_docs" + + queryParms := batchURL.Query() + queryParms.Add("include_docs", "true") + batchURL.RawQuery = queryParms.Encode() + + keymap := make(map[string]interface{}) + + keymap["keys"] = keys + + jsonKeys, err := json.Marshal(keymap) + if err != nil { + return nil, err + } + + //Set up a buffer for the data response from CouchDB + data := new(bytes.Buffer) + + data.ReadFrom(bytes.NewReader(jsonKeys)) + + resp, _, err := dbclient.CouchInstance.handleRequest(http.MethodPost, batchURL.String(), data, "", "") + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if logger.IsEnabledFor(logging.DEBUG) { + dump, _ := httputil.DumpResponse(resp, false) + // compact debug log by replacing carriage return / line feed with dashes to separate http headers + logger.Debugf("HTTP Response: %s", bytes.Replace(dump, []byte{0x0d, 0x0a}, []byte{0x20, 0x7c, 0x20}, -1)) + } + + //handle as JSON document + jsonResponseRaw, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + var jsonResponse = &BatchRetrieveDocMedatadataResponse{} + + err2 := json.Unmarshal(jsonResponseRaw, &jsonResponse) + if err2 != nil { + return nil, err2 + } + + revisionDocs := []*DocMetadata{} + + for _, row := range jsonResponse.Rows { + revisionDoc := &DocMetadata{ID: row.ID, Rev: row.Doc.Rev, Version: row.Doc.Version} + revisionDocs = append(revisionDocs, revisionDoc) + } + + return revisionDocs, nil + +} + //BatchUpdateDocuments - batch method to batch update documents func (dbclient *CouchDatabase) BatchUpdateDocuments(documents []*CouchDoc) ([]*BatchUpdateResponse, error) { diff --git a/core/ledger/util/couchdb/couchdb_test.go b/core/ledger/util/couchdb/couchdb_test.go index 8acb81b5169..b01fe9b0e3a 100644 --- a/core/ledger/util/couchdb/couchdb_test.go +++ b/core/ledger/util/couchdb/couchdb_test.go @@ -777,7 +777,7 @@ func TestRichQuery(t *testing.T) { } } -func TestBatchCreateRetrieve(t *testing.T) { +func TestBatchBatchOperations(t *testing.T) { if ledgerconfig.IsCouchDBEnabled() == true { @@ -893,5 +893,115 @@ func TestBatchCreateRetrieve(t *testing.T) { testutil.AssertEquals(t, updateDoc.Error, updateDocumentConflictError) testutil.AssertEquals(t, updateDoc.Reason, updateDocumentConflictReason) } + + //---------------------------------------------- + //Test Batch Retrieve Keys and Update + + var keys []string + + keys = append(keys, "marble01") + keys = append(keys, "marble03") + + batchRevs, err := db.BatchRetrieveIDRevision(keys) + testutil.AssertNoError(t, err, fmt.Sprintf("Error when attempting retrieve revisions")) + + batchUpdateDocs = []*CouchDoc{} + + //iterate through the revision docs + for _, revdoc := range batchRevs { + if revdoc.ID == "marble01" { + //update the json with the rev and add to the batch + marble01Doc := addRevisionAndDeleteStatus(revdoc.Rev, byteJSON01, false) + batchUpdateDocs = append(batchUpdateDocs, &CouchDoc{JSONValue: marble01Doc, Attachments: attachments1}) + } + + if revdoc.ID == "marble03" { + //update the json with the rev and add to the batch + marble03Doc := addRevisionAndDeleteStatus(revdoc.Rev, byteJSON03, false) + batchUpdateDocs = append(batchUpdateDocs, &CouchDoc{JSONValue: marble03Doc, Attachments: attachments3}) + } + } + + //Update couchdb with the batch + batchUpdateResp, err = db.BatchUpdateDocuments(batchUpdateDocs) + testutil.AssertNoError(t, err, fmt.Sprintf("Error when attempting to update a batch of documents")) + //check to make sure each batch update response was successful + for _, updateDoc := range batchUpdateResp { + testutil.AssertEquals(t, updateDoc.Ok, true) + } + + //---------------------------------------------- + //Test Batch Delete + + keys = []string{} + + keys = append(keys, "marble02") + keys = append(keys, "marble04") + + batchRevs, err = db.BatchRetrieveIDRevision(keys) + testutil.AssertNoError(t, err, fmt.Sprintf("Error when attempting retrieve revisions")) + + batchUpdateDocs = []*CouchDoc{} + + //iterate through the revision docs + for _, revdoc := range batchRevs { + if revdoc.ID == "marble02" { + //update the json with the rev and add to the batch + marble02Doc := addRevisionAndDeleteStatus(revdoc.Rev, byteJSON02, true) + batchUpdateDocs = append(batchUpdateDocs, &CouchDoc{JSONValue: marble02Doc, Attachments: attachments1}) + } + if revdoc.ID == "marble04" { + //update the json with the rev and add to the batch + marble04Doc := addRevisionAndDeleteStatus(revdoc.Rev, byteJSON04, true) + batchUpdateDocs = append(batchUpdateDocs, &CouchDoc{JSONValue: marble04Doc, Attachments: attachments3}) + } + } + + //Update couchdb with the batch + batchUpdateResp, err = db.BatchUpdateDocuments(batchUpdateDocs) + testutil.AssertNoError(t, err, fmt.Sprintf("Error when attempting to update a batch of documents")) + + //check to make sure each batch update response was successful + for _, updateDoc := range batchUpdateResp { + testutil.AssertEquals(t, updateDoc.Ok, true) + } + + //Retrieve the test document + dbGetResp, _, geterr = db.ReadDoc("marble02") + testutil.AssertNoError(t, geterr, fmt.Sprintf("Error when trying to retrieve a document")) + + //assert the value was deleted + testutil.AssertNil(t, dbGetResp) + + //Retrieve the test document + dbGetResp, _, geterr = db.ReadDoc("marble04") + testutil.AssertNoError(t, geterr, fmt.Sprintf("Error when trying to retrieve a document")) + + //assert the value was deleted + testutil.AssertNil(t, dbGetResp) + } +} + +//addRevisionAndDeleteStatus adds keys for version and chaincodeID to the JSON value +func addRevisionAndDeleteStatus(revision string, value []byte, deleted bool) []byte { + + //create a version mapping + jsonMap := make(map[string]interface{}) + + json.Unmarshal(value, &jsonMap) + + //add the revision + if revision != "" { + jsonMap["_rev"] = revision + } + + //If this record is to be deleted, set the "_deleted" property to true + if deleted { + jsonMap["_deleted"] = true } + //marshal the data to a byte array + returnJSON, _ := json.Marshal(jsonMap) + + return returnJSON + }