Skip to content

Commit

Permalink
Merge "[FAB-3686] CouchDB timeout causes error upon retry"
Browse files Browse the repository at this point in the history
  • Loading branch information
christo4ferris authored and Gerrit Code Review committed Jun 1, 2017
2 parents e5cac85 + b3eef4c commit 72e4d1a
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 44 deletions.
123 changes: 79 additions & 44 deletions core/ledger/util/couchdb/couchdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,7 @@ func (dbclient *CouchDatabase) EnsureFullCommit() (*DBOperationResponse, error)
func (dbclient *CouchDatabase) SaveDoc(id string, rev string, couchDoc *CouchDoc) (string, error) {

logger.Debugf("Entering SaveDoc() id=[%s]", id)

if !utf8.ValidString(id) {
return "", fmt.Errorf("doc id [%x] not a valid utf8 string", id)
}
Expand All @@ -479,19 +480,6 @@ func (dbclient *CouchDatabase) SaveDoc(id string, rev string, couchDoc *CouchDoc
// id can contain a '/', so encode separately
saveURL = &url.URL{Opaque: saveURL.String() + "/" + encodePathElement(id)}

if rev == "" {

//See if the document already exists, we need the rev for save
_, revdoc, err2 := dbclient.ReadDoc(id)
if err2 != nil {
//set the revision to indicate that the document was not found
rev = ""
} else {
//set the revision to the rev returned from the document read
rev = revdoc
}
}

logger.Debugf(" rev=%s", rev)

//Set up a buffer for the data to be pushed to couchdb
Expand Down Expand Up @@ -540,9 +528,10 @@ func (dbclient *CouchDatabase) SaveDoc(id string, rev string, couchDoc *CouchDoc
//get the number of retries
maxRetries := dbclient.CouchInstance.conf.MaxRetries

//handle the request for saving the JSON or attachments
resp, _, err := dbclient.CouchInstance.handleRequest(http.MethodPut, saveURL.String(), data,
rev, defaultBoundary, maxRetries, keepConnectionOpen)
//handle the request for saving document with a retry if there is a revision conflict
resp, _, err := dbclient.handleRequestWithRevisionRetry(id, http.MethodPut,
*saveURL, data, rev, defaultBoundary, maxRetries, keepConnectionOpen)

if err != nil {
return "", err
}
Expand All @@ -560,6 +549,20 @@ func (dbclient *CouchDatabase) SaveDoc(id string, rev string, couchDoc *CouchDoc

}

//getDocumentRevision will return the revision if the document exists, otherwise it will return ""
func (dbclient *CouchDatabase) getDocumentRevision(id string) string {

var rev = ""

//See if the document already exists, we need the rev for saves and deletes
_, revdoc, err := dbclient.ReadDoc(id)
if err == nil {
//set the revision to the rev returned from the document read
rev = revdoc
}
return rev
}

func createAttachmentPart(couchDoc *CouchDoc, defaultBoundary string) (bytes.Buffer, string, error) {

//Create a buffer for writing the result
Expand Down Expand Up @@ -646,7 +649,8 @@ func getRevisionHeader(resp *http.Response) (string, error) {

}

//ReadDoc method provides function to retrieve a document from the database by id
//ReadDoc method provides function to retrieve a document and its revision
//from the database by id
func (dbclient *CouchDatabase) ReadDoc(id string) (*CouchDoc, string, error) {
var couchDoc CouchDoc
attachments := []*Attachment{}
Expand Down Expand Up @@ -906,27 +910,14 @@ func (dbclient *CouchDatabase) DeleteDoc(id, rev string) error {
// id can contain a '/', so encode separately
deleteURL = &url.URL{Opaque: deleteURL.String() + "/" + encodePathElement(id)}

if rev == "" {

//See if the document already exists, we need the rev for delete
_, revdoc, err2 := dbclient.ReadDoc(id)
if err2 != nil {
//set the revision to indicate that the document was not found
rev = ""
} else {
//set the revision to the rev returned from the document read
rev = revdoc
}
}

logger.Debugf(" rev=%s", rev)

//get the number of retries
maxRetries := dbclient.CouchInstance.conf.MaxRetries

resp, couchDBReturn, err := dbclient.CouchInstance.handleRequest(http.MethodDelete, deleteURL.String(), nil, rev, "", maxRetries, true)
//handle the request for saving document with a retry if there is a revision conflict
resp, couchDBReturn, err := dbclient.handleRequestWithRevisionRetry(id, http.MethodDelete,
*deleteURL, nil, "", "", maxRetries, true)

if err != nil {
fmt.Printf("couchDBReturn=%v", couchDBReturn)
if couchDBReturn != nil && couchDBReturn.StatusCode == 404 {
logger.Debug("Document not found (404), returning nil value instead of 404 error")
// non-existent document should return nil value instead of a 404 error
Expand Down Expand Up @@ -1173,9 +1164,52 @@ func (dbclient *CouchDatabase) BatchUpdateDocuments(documents []*CouchDoc) ([]*B

}

//handleRequestWithRevisionRetry method is a generic http request handler with
//a retry for document revision conflict errors,
//which may be detected during saves or deletes that timed out from client http perspective,
//but which eventually succeeded in couchdb
func (dbclient *CouchDatabase) handleRequestWithRevisionRetry(id, method string, connectURL url.URL, data []byte, rev string,
multipartBoundary string, maxRetries int, keepConnectionOpen bool) (*http.Response, *DBReturn, error) {

//Initialize a flag for the revsion conflict
revisionConflictDetected := false
var resp *http.Response
var couchDBReturn *DBReturn
var errResp error

//attempt the http request for the max number of retries
//In this case, the retry is to catch problems where a client timeout may miss a
//successful CouchDB update and cause a document revision conflict on a retry in handleRequest
for attempts := 0; attempts < maxRetries; attempts++ {

//if the revision was not passed in, or if a revision conflict is detected on prior attempt,
//query CouchDB for the document revision
if rev == "" || revisionConflictDetected {
rev = dbclient.getDocumentRevision(id)
}

//handle the request for saving/deleting the couchdb data
resp, couchDBReturn, errResp = dbclient.CouchInstance.handleRequest(method, connectURL.String(),
data, rev, multipartBoundary, maxRetries, keepConnectionOpen)

//If there was a 409 conflict error during the save/delete, log it and retry it.
//Otherwise, break out of the retry loop
if couchDBReturn != nil && couchDBReturn.StatusCode == 409 {
logger.Warningf("CouchDB document revision conflict detected, retrying. Attempt:%v", attempts+1)
revisionConflictDetected = true
} else {
break
}
}

// return the handleRequest results
return resp, couchDBReturn, errResp
}

//handleRequest method is a generic http request handler.
// if it returns an error, it ensures that the response body is closed, else it is the
// callee's responsibility to close response correctly
// If it returns an error, it ensures that the response body is closed, else it is the
// callee's responsibility to close response correctly.
// Any http error or CouchDB error (4XX or 500) will result in a golang error getting returned
func (couchInstance *CouchInstance) handleRequest(method, connectURL string, data []byte, rev string,
multipartBoundary string, maxRetries int, keepConnectionOpen bool) (*http.Response, *DBReturn, error) {

Expand Down Expand Up @@ -1251,20 +1285,20 @@ func (couchInstance *CouchInstance) handleRequest(method, connectURL string, dat
//Execute http request
resp, errResp = couchInstance.client.Do(req)

//if an error is not detected then drop out of the retry
//if there is no golang http error and no CouchDB 500 error, then drop out of the retry
if errResp == nil && resp != nil && resp.StatusCode < 500 {
break
}

//if this is an error, record the retry error, else this is a 500 error
//if this is an unexpected golang http error, log the error and retry
if errResp != nil {

//Log the error with the retry count and continue
logger.Warningf("Retrying couchdb request in %s. Attempt:%v Error:%v",
waitDuration.String(), attempts+1, errResp.Error())

//otherwise this is an unexpected 500 error from CouchDB. Log the error and retry.
} else {

//Read the response body and close it for next attempt
jsonError, err := ioutil.ReadAll(resp.Body)
closeResponseBody(resp)
Expand All @@ -1288,18 +1322,19 @@ func (couchInstance *CouchInstance) handleRequest(method, connectURL string, dat
//backoff, doubling the retry time for next attempt
waitDuration *= 2

}
} // end retry loop

//if the error present, return the error
//if a golang http error is still present after retries are exhausted, return the error
if errResp != nil {
return nil, nil, errResp
}

//set the return code for the couchDB request
couchDBReturn.StatusCode = resp.StatusCode

//check to see if the status code is 400 or higher
//response codes 4XX and 500 will be treated as errors
//check to see if the status code from couchdb is 400 or higher
//response codes 4XX and 500 will be treated as errors -
//golang error will be created from the couchDBReturn contents and both will be returned
if resp.StatusCode >= 400 {
// close the response before returning error
defer closeResponseBody(resp)
Expand All @@ -1325,7 +1360,7 @@ func (couchInstance *CouchInstance) handleRequest(method, connectURL string, dat

logger.Debugf("Exiting handleRequest()")

//If no errors, then return the results
//If no errors, then return the http response and the couchdb return object
return resp, couchDBReturn, nil
}

Expand Down
48 changes: 48 additions & 0 deletions core/ledger/util/couchdb/couchdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,54 @@ func TestDBRequestTimeout(t *testing.T) {
}
}

func TestDBTimeoutConflictRetry(t *testing.T) {

if ledgerconfig.IsCouchDBEnabled() {

database := "testdbtimeoutretry"
err := cleanup(database)
testutil.AssertNoError(t, err, fmt.Sprintf("Error when trying to cleanup Error: %s", err))
defer cleanup(database)

// if there was an error upon cleanup, return here
if err != nil {
return
}

//create a new instance and database object
couchInstance, err := CreateCouchInstance(couchDBDef.URL, couchDBDef.Username, couchDBDef.Password,
couchDBDef.MaxRetries, 3, couchDBDef.RequestTimeout)
testutil.AssertNoError(t, err, fmt.Sprintf("Error when trying to create couch instance"))
db := CouchDatabase{CouchInstance: *couchInstance, DBName: database}

//create a new database
_, errdb := db.CreateDatabaseIfNotExist()
testutil.AssertNoError(t, errdb, fmt.Sprintf("Error when trying to create database"))

//Retrieve the info for the new database and make sure the name matches
dbResp, _, errdb := db.GetDatabaseInfo()
testutil.AssertNoError(t, errdb, fmt.Sprintf("Error when trying to retrieve database information"))
testutil.AssertEquals(t, dbResp.DbName, database)

//Save the test document
_, saveerr := db.SaveDoc("1", "", &CouchDoc{JSONValue: assetJSON, Attachments: nil})
testutil.AssertNoError(t, saveerr, fmt.Sprintf("Error when trying to save a document"))

//Retrieve the test document
_, _, geterr := db.ReadDoc("1")
testutil.AssertNoError(t, geterr, fmt.Sprintf("Error when trying to retrieve a document"))

//Save the test document with an invalid rev. This should cause a retry
_, saveerr = db.SaveDoc("1", "1-11111111111111111111111111111111", &CouchDoc{JSONValue: assetJSON, Attachments: nil})
testutil.AssertNoError(t, saveerr, fmt.Sprintf("Error when trying to save a document with a revision conflict"))

//Delete the test document with an invalid rev. This should cause a retry
deleteerr := db.DeleteDoc("1", "1-11111111111111111111111111111111")
testutil.AssertNoError(t, deleteerr, fmt.Sprintf("Error when trying to delete a document with a revision conflict"))

}
}

func TestDBBadJSON(t *testing.T) {

if ledgerconfig.IsCouchDBEnabled() {
Expand Down

0 comments on commit 72e4d1a

Please sign in to comment.