Skip to content

Commit

Permalink
FAB-2724: Fix CouchDB max open connections
Browse files Browse the repository at this point in the history
Client and Transport are reusable and thread safe, so
initialize once and reuse across requests. Also close the response
body properly by consuming it fully before closing it to allow
the underlying RoundTripper to reuse the transport for subsequent
requests.

Change-Id: I1d1692ee96db1ed91a0eaccbe81439312c6935ac
Signed-off-by: Balaji Viswanathan <balaji.viswanathan@gmail.com>
Signed-off-by: denyeart <enyeart@us.ibm.com>
bviswana101 authored and ghaskins committed Apr 24, 2017
1 parent 8ce1073 commit 3dcc32f
Showing 2 changed files with 40 additions and 27 deletions.
55 changes: 29 additions & 26 deletions core/ledger/util/couchdb/couchdb.go
Original file line number Diff line number Diff line change
@@ -135,7 +135,8 @@ type CouchConnectionDef struct {

//CouchInstance represents a CouchDB instance
type CouchInstance struct {
conf CouchConnectionDef //connection configuration
conf CouchConnectionDef //connection configuration
client *http.Client // a client to connect to this instance
}

//CouchDatabase represents a database within a CouchDB instance
@@ -206,6 +207,13 @@ type Base64Attachment struct {
AttachmentData string `json:"data"`
}

// closeResponseBody discards the body and then closes it to enable returning it to
// connection pool
func closeResponseBody(resp *http.Response) {
io.Copy(ioutil.Discard, resp.Body) // discard whatever is remaining of body
resp.Body.Close()
}

//CreateConnectionDefinition for a new client connection
func CreateConnectionDefinition(couchDBAddress, username, password string, maxRetries,
maxRetriesOnStartup int, requestTimeout time.Duration) (*CouchConnectionDef, error) {
@@ -264,7 +272,7 @@ func (dbclient *CouchDatabase) CreateDatabaseIfNotExist() (*DBOperationResponse,
if err != nil {
return nil, err
}
defer resp.Body.Close()
defer closeResponseBody(resp)

//Get the response from the create REST call
dbResponse := &DBOperationResponse{}
@@ -305,7 +313,7 @@ func (dbclient *CouchDatabase) GetDatabaseInfo() (*DBInfo, *DBReturn, error) {
if err != nil {
return nil, couchDBReturn, err
}
defer resp.Body.Close()
defer closeResponseBody(resp)

dbResponse := &DBInfo{}
json.NewDecoder(resp.Body).Decode(&dbResponse)
@@ -344,7 +352,7 @@ func (couchInstance *CouchInstance) VerifyCouchConfig() (*ConnectionInfo, *DBRet
if err != nil {
return nil, couchDBReturn, fmt.Errorf("Unable to connect to CouchDB, check the hostname and port: %s", err.Error())
}
defer resp.Body.Close()
defer closeResponseBody(resp)

dbResponse := &ConnectionInfo{}
errJSON := json.NewDecoder(resp.Body).Decode(&dbResponse)
@@ -371,7 +379,6 @@ func (couchInstance *CouchInstance) VerifyCouchConfig() (*ConnectionInfo, *DBRet
}

return dbResponse, couchDBReturn, nil

}

//DropDatabase provides method to drop an existing database
@@ -393,7 +400,7 @@ func (dbclient *CouchDatabase) DropDatabase() (*DBOperationResponse, error) {
if err != nil {
return nil, err
}
defer resp.Body.Close()
defer closeResponseBody(resp)

dbResponse := &DBOperationResponse{}
json.NewDecoder(resp.Body).Decode(&dbResponse)
@@ -434,7 +441,7 @@ func (dbclient *CouchDatabase) EnsureFullCommit() (*DBOperationResponse, error)
logger.Errorf("Failed to invoke _ensure_full_commit Error: %s\n", err.Error())
return nil, err
}
defer resp.Body.Close()
defer closeResponseBody(resp)

dbResponse := &DBOperationResponse{}
json.NewDecoder(resp.Body).Decode(&dbResponse)
@@ -528,7 +535,7 @@ func (dbclient *CouchDatabase) SaveDoc(id string, rev string, couchDoc *CouchDoc
if err != nil {
return "", err
}
defer resp.Body.Close()
defer closeResponseBody(resp)

//get the revision and return
revision, err := getRevisionHeader(resp)
@@ -666,7 +673,7 @@ func (dbclient *CouchDatabase) ReadDoc(id string) (*CouchDoc, string, error) {
logger.Debugf("couchDBReturn=%v\n", couchDBReturn)
return nil, "", err
}
defer resp.Body.Close()
defer closeResponseBody(resp)

//Get the media type from the Content-Type header
mediaType, params, err := mime.ParseMediaType(resp.Header.Get("Content-Type"))
@@ -813,7 +820,7 @@ func (dbclient *CouchDatabase) ReadDocRange(startKey, endKey string, limit, skip
if err != nil {
return nil, err
}
defer resp.Body.Close()
defer closeResponseBody(resp)

if logger.IsEnabledFor(logging.DEBUG) {
dump, err2 := httputil.DumpResponse(resp, true)
@@ -918,7 +925,7 @@ func (dbclient *CouchDatabase) DeleteDoc(id, rev string) error {
}
return err
}
defer resp.Body.Close()
defer closeResponseBody(resp)

logger.Debugf("Exiting DeleteDoc()")

@@ -948,7 +955,7 @@ func (dbclient *CouchDatabase) QueryDocuments(query string) (*[]QueryResult, err
if err != nil {
return nil, err
}
defer resp.Body.Close()
defer closeResponseBody(resp)

if logger.IsEnabledFor(logging.DEBUG) {
dump, err2 := httputil.DumpResponse(resp, true)
@@ -1034,7 +1041,7 @@ func (dbclient *CouchDatabase) BatchRetrieveIDRevision(keys []string) ([]*DocMet
if err != nil {
return nil, err
}
defer resp.Body.Close()
defer closeResponseBody(resp)

if logger.IsEnabledFor(logging.DEBUG) {
dump, _ := httputil.DumpResponse(resp, false)
@@ -1129,7 +1136,7 @@ func (dbclient *CouchDatabase) BatchUpdateDocuments(documents []*CouchDoc) ([]*B
if err != nil {
return nil, err
}
defer resp.Body.Close()
defer closeResponseBody(resp)

if logger.IsEnabledFor(logging.DEBUG) {
dump, _ := httputil.DumpResponse(resp, false)
@@ -1156,7 +1163,9 @@ func (dbclient *CouchDatabase) BatchUpdateDocuments(documents []*CouchDoc) ([]*B

}

//handleRequest method is a generic http request handler
//handleRequest method is a generic http request handler.
// if it returns an error, it ensures that the response body is closed, else it is the
// callee's responsibility to close response correctly
func (couchInstance *CouchInstance) handleRequest(method, connectURL string, data []byte, rev string,
multipartBoundary string, maxRetries int) (*http.Response, *DBReturn, error) {

@@ -1170,9 +1179,6 @@ func (couchInstance *CouchInstance) handleRequest(method, connectURL string, dat
//set initial wait duration for retries
waitDuration := retryWaitTime * time.Millisecond

//get the connection timeout
requestTimeout := couchInstance.conf.RequestTimeout

//attempt the http request for the max number of retries
for attempts := 0; attempts < maxRetries; attempts++ {

@@ -1225,14 +1231,8 @@ func (couchInstance *CouchInstance) handleRequest(method, connectURL string, dat
logger.Debugf("HTTP Request: %s", bytes.Replace(dump, []byte{0x0d, 0x0a}, []byte{0x20, 0x7c, 0x20}, -1))
}

//Create the http client
client := &http.Client{Timeout: requestTimeout}

transport := &http.Transport{Proxy: http.ProxyFromEnvironment}
transport.DisableCompression = false
client.Transport = transport
//Execute http request
resp, errResp = client.Do(req)
resp, errResp = couchInstance.client.Do(req)

//if an error is not detected then drop out of the retry
if errResp == nil && resp != nil && resp.StatusCode < 500 {
@@ -1248,8 +1248,9 @@ func (couchInstance *CouchInstance) handleRequest(method, connectURL string, dat

} else {

//Read the response body
//Read the response body and close it for next attempt
jsonError, err := ioutil.ReadAll(resp.Body)
closeResponseBody(resp)
if err != nil {
return nil, nil, err
}
@@ -1283,6 +1284,8 @@ func (couchInstance *CouchInstance) handleRequest(method, connectURL string, dat
//check to see if the status code is 400 or higher
//response codes 4XX and 500 will be treated as errors
if resp.StatusCode >= 400 {
// close the response before returning error
defer closeResponseBody(resp)

//Read the response body
jsonError, err := ioutil.ReadAll(resp.Body)
12 changes: 11 additions & 1 deletion core/ledger/util/couchdb/couchdbutil.go
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@ package couchdb

import (
"fmt"
"net/http"
"regexp"
"strconv"
"strings"
@@ -38,8 +39,17 @@ func CreateCouchInstance(couchDBConnectURL, id, pw string, maxRetries,
return nil, err
}

// Create the http client once
// Clients and Transports are safe for concurrent use by multiple goroutines
// and for efficiency should only be created once and re-used.
client := &http.Client{Timeout: couchConf.RequestTimeout}

transport := &http.Transport{Proxy: http.ProxyFromEnvironment}
transport.DisableCompression = false
client.Transport = transport

//Create the CouchDB instance
couchInstance := &CouchInstance{conf: *couchConf}
couchInstance := &CouchInstance{conf: *couchConf, client: client}

connectInfo, retVal, verifyErr := couchInstance.VerifyCouchConfig()
if verifyErr != nil {

0 comments on commit 3dcc32f

Please sign in to comment.