diff --git a/core/ledger/util/couchdb/couchdb.go b/core/ledger/util/couchdb/couchdb.go index 7b21e3c4087..8829a033b08 100644 --- a/core/ledger/util/couchdb/couchdb.go +++ b/core/ledger/util/couchdb/couchdb.go @@ -135,7 +135,8 @@ type CouchConnectionDef struct { //CouchInstance represents a CouchDB instance type CouchInstance struct { - conf CouchConnectionDef //connection configuration + conf CouchConnectionDef //connection configuration + client *http.Client // a client to connect to this instance } //CouchDatabase represents a database within a CouchDB instance @@ -206,6 +207,13 @@ type Base64Attachment struct { AttachmentData string `json:"data"` } +// closeResponseBody discards the body and then closes it to enable returning it to +// connection pool +func closeResponseBody(resp *http.Response) { + io.Copy(ioutil.Discard, resp.Body) // discard whatever is remaining of body + resp.Body.Close() +} + //CreateConnectionDefinition for a new client connection func CreateConnectionDefinition(couchDBAddress, username, password string, maxRetries, maxRetriesOnStartup int, requestTimeout time.Duration) (*CouchConnectionDef, error) { @@ -264,7 +272,7 @@ func (dbclient *CouchDatabase) CreateDatabaseIfNotExist() (*DBOperationResponse, if err != nil { return nil, err } - defer resp.Body.Close() + defer closeResponseBody(resp) //Get the response from the create REST call dbResponse := &DBOperationResponse{} @@ -305,7 +313,7 @@ func (dbclient *CouchDatabase) GetDatabaseInfo() (*DBInfo, *DBReturn, error) { if err != nil { return nil, couchDBReturn, err } - defer resp.Body.Close() + defer closeResponseBody(resp) dbResponse := &DBInfo{} json.NewDecoder(resp.Body).Decode(&dbResponse) @@ -344,7 +352,7 @@ func (couchInstance *CouchInstance) VerifyCouchConfig() (*ConnectionInfo, *DBRet if err != nil { return nil, couchDBReturn, fmt.Errorf("Unable to connect to CouchDB, check the hostname and port: %s", err.Error()) } - defer resp.Body.Close() + defer closeResponseBody(resp) dbResponse := &ConnectionInfo{} errJSON := json.NewDecoder(resp.Body).Decode(&dbResponse) @@ -371,7 +379,6 @@ func (couchInstance *CouchInstance) VerifyCouchConfig() (*ConnectionInfo, *DBRet } return dbResponse, couchDBReturn, nil - } //DropDatabase provides method to drop an existing database @@ -393,7 +400,7 @@ func (dbclient *CouchDatabase) DropDatabase() (*DBOperationResponse, error) { if err != nil { return nil, err } - defer resp.Body.Close() + defer closeResponseBody(resp) dbResponse := &DBOperationResponse{} json.NewDecoder(resp.Body).Decode(&dbResponse) @@ -434,7 +441,7 @@ func (dbclient *CouchDatabase) EnsureFullCommit() (*DBOperationResponse, error) logger.Errorf("Failed to invoke _ensure_full_commit Error: %s\n", err.Error()) return nil, err } - defer resp.Body.Close() + defer closeResponseBody(resp) dbResponse := &DBOperationResponse{} json.NewDecoder(resp.Body).Decode(&dbResponse) @@ -528,7 +535,7 @@ func (dbclient *CouchDatabase) SaveDoc(id string, rev string, couchDoc *CouchDoc if err != nil { return "", err } - defer resp.Body.Close() + defer closeResponseBody(resp) //get the revision and return revision, err := getRevisionHeader(resp) @@ -666,7 +673,7 @@ func (dbclient *CouchDatabase) ReadDoc(id string) (*CouchDoc, string, error) { logger.Debugf("couchDBReturn=%v\n", couchDBReturn) return nil, "", err } - defer resp.Body.Close() + defer closeResponseBody(resp) //Get the media type from the Content-Type header mediaType, params, err := mime.ParseMediaType(resp.Header.Get("Content-Type")) @@ -813,7 +820,7 @@ func (dbclient *CouchDatabase) ReadDocRange(startKey, endKey string, limit, skip if err != nil { return nil, err } - defer resp.Body.Close() + defer closeResponseBody(resp) if logger.IsEnabledFor(logging.DEBUG) { dump, err2 := httputil.DumpResponse(resp, true) @@ -918,7 +925,7 @@ func (dbclient *CouchDatabase) DeleteDoc(id, rev string) error { } return err } - defer resp.Body.Close() + defer closeResponseBody(resp) logger.Debugf("Exiting DeleteDoc()") @@ -948,7 +955,7 @@ func (dbclient *CouchDatabase) QueryDocuments(query string) (*[]QueryResult, err if err != nil { return nil, err } - defer resp.Body.Close() + defer closeResponseBody(resp) if logger.IsEnabledFor(logging.DEBUG) { dump, err2 := httputil.DumpResponse(resp, true) @@ -1034,7 +1041,7 @@ func (dbclient *CouchDatabase) BatchRetrieveIDRevision(keys []string) ([]*DocMet if err != nil { return nil, err } - defer resp.Body.Close() + defer closeResponseBody(resp) if logger.IsEnabledFor(logging.DEBUG) { dump, _ := httputil.DumpResponse(resp, false) @@ -1129,7 +1136,7 @@ func (dbclient *CouchDatabase) BatchUpdateDocuments(documents []*CouchDoc) ([]*B if err != nil { return nil, err } - defer resp.Body.Close() + defer closeResponseBody(resp) if logger.IsEnabledFor(logging.DEBUG) { dump, _ := httputil.DumpResponse(resp, false) @@ -1156,7 +1163,9 @@ func (dbclient *CouchDatabase) BatchUpdateDocuments(documents []*CouchDoc) ([]*B } -//handleRequest method is a generic http request handler +//handleRequest method is a generic http request handler. +// if it returns an error, it ensures that the response body is closed, else it is the +// callee's responsibility to close response correctly func (couchInstance *CouchInstance) handleRequest(method, connectURL string, data []byte, rev string, multipartBoundary string, maxRetries int) (*http.Response, *DBReturn, error) { @@ -1170,9 +1179,6 @@ func (couchInstance *CouchInstance) handleRequest(method, connectURL string, dat //set initial wait duration for retries waitDuration := retryWaitTime * time.Millisecond - //get the connection timeout - requestTimeout := couchInstance.conf.RequestTimeout - //attempt the http request for the max number of retries for attempts := 0; attempts < maxRetries; attempts++ { @@ -1225,14 +1231,8 @@ func (couchInstance *CouchInstance) handleRequest(method, connectURL string, dat logger.Debugf("HTTP Request: %s", bytes.Replace(dump, []byte{0x0d, 0x0a}, []byte{0x20, 0x7c, 0x20}, -1)) } - //Create the http client - client := &http.Client{Timeout: requestTimeout} - - transport := &http.Transport{Proxy: http.ProxyFromEnvironment} - transport.DisableCompression = false - client.Transport = transport //Execute http request - resp, errResp = client.Do(req) + resp, errResp = couchInstance.client.Do(req) //if an error is not detected then drop out of the retry if errResp == nil && resp != nil && resp.StatusCode < 500 { @@ -1248,8 +1248,9 @@ func (couchInstance *CouchInstance) handleRequest(method, connectURL string, dat } else { - //Read the response body + //Read the response body and close it for next attempt jsonError, err := ioutil.ReadAll(resp.Body) + closeResponseBody(resp) if err != nil { return nil, nil, err } @@ -1283,6 +1284,8 @@ func (couchInstance *CouchInstance) handleRequest(method, connectURL string, dat //check to see if the status code is 400 or higher //response codes 4XX and 500 will be treated as errors if resp.StatusCode >= 400 { + // close the response before returning error + defer closeResponseBody(resp) //Read the response body jsonError, err := ioutil.ReadAll(resp.Body) diff --git a/core/ledger/util/couchdb/couchdbutil.go b/core/ledger/util/couchdb/couchdbutil.go index a4f30502ad7..aafdbc1a5c5 100644 --- a/core/ledger/util/couchdb/couchdbutil.go +++ b/core/ledger/util/couchdb/couchdbutil.go @@ -18,6 +18,7 @@ package couchdb import ( "fmt" + "net/http" "regexp" "strconv" "strings" @@ -38,8 +39,17 @@ func CreateCouchInstance(couchDBConnectURL, id, pw string, maxRetries, return nil, err } + // Create the http client once + // Clients and Transports are safe for concurrent use by multiple goroutines + // and for efficiency should only be created once and re-used. + client := &http.Client{Timeout: couchConf.RequestTimeout} + + transport := &http.Transport{Proxy: http.ProxyFromEnvironment} + transport.DisableCompression = false + client.Transport = transport + //Create the CouchDB instance - couchInstance := &CouchInstance{conf: *couchConf} + couchInstance := &CouchInstance{conf: *couchConf, client: client} connectInfo, retVal, verifyErr := couchInstance.VerifyCouchConfig() if verifyErr != nil {