diff --git a/core/ledger/kvledger/txmgmt/couchdbtxmgmt/couchdb/couchdb.go b/core/ledger/kvledger/txmgmt/couchdbtxmgmt/couchdb/couchdb.go index 71b5d8ed371..911a0c7dd80 100644 --- a/core/ledger/kvledger/txmgmt/couchdbtxmgmt/couchdb/couchdb.go +++ b/core/ledger/kvledger/txmgmt/couchdbtxmgmt/couchdb/couchdb.go @@ -18,11 +18,16 @@ package couchdb import ( "bytes" + "compress/gzip" "encoding/json" "fmt" "io" "io/ioutil" + "log" + "mime" + "mime/multipart" "net/http" + "net/textproto" "net/url" "regexp" "strconv" @@ -70,13 +75,34 @@ type CouchDBConnectionDef struct { Database string } -//CouchDBReturn contains an error reported by CouchDB -type CouchDBReturn struct { +//DBReturn contains an error reported by CouchDB +type DBReturn struct { StatusCode int `json:"status_code"` Error string `json:"error"` Reason string `json:"reason"` } +//Attachment contains the definition for an attached file for couchdb +type Attachment struct { + Name string + ContentType string + Length uint64 + AttachmentBytes []byte +} + +//DocRev returns the Id and revision for a couchdb document +type DocRev struct { + Id string `json:"_id"` + Rev string `json:"_rev"` +} + +//FileDetails defines the structure needed to send an attachment to couchdb +type FileDetails struct { + Follows bool `json:"follows"` + ContentType string `json:"content_type"` + Length int `json:"length"` +} + //CreateConnectionDefinition for a new client connection func CreateConnectionDefinition(host string, port int, databaseName, username, password string) (*CouchDBConnectionDef, error) { @@ -126,7 +152,7 @@ func (dbclient *CouchDBConnectionDef) CreateDatabaseIfNotExist() (*DBOperationRe url := fmt.Sprintf("%s/%s", dbclient.URL, dbclient.Database) //process the URL with a PUT, creates the database - resp, _, err := dbclient.handleRequest(http.MethodPut, url, nil) + resp, _, err := dbclient.handleRequest(http.MethodPut, url, nil, "", "") if err != nil { return nil, err } @@ -155,11 +181,11 @@ func (dbclient *CouchDBConnectionDef) CreateDatabaseIfNotExist() (*DBOperationRe } //GetDatabaseInfo method provides function to retrieve database information -func (dbclient *CouchDBConnectionDef) GetDatabaseInfo() (*DBInfo, *CouchDBReturn, error) { +func (dbclient *CouchDBConnectionDef) GetDatabaseInfo() (*DBInfo, *DBReturn, error) { url := fmt.Sprintf("%s/%s", dbclient.URL, dbclient.Database) - resp, couchDBReturn, err := dbclient.handleRequest(http.MethodGet, url, nil) + resp, couchDBReturn, err := dbclient.handleRequest(http.MethodGet, url, nil, "", "") if err != nil { return nil, couchDBReturn, err } @@ -187,7 +213,7 @@ func (dbclient *CouchDBConnectionDef) DropDatabase() (*DBOperationResponse, erro url := fmt.Sprintf("%s/%s", dbclient.URL, dbclient.Database) - resp, _, err := dbclient.handleRequest(http.MethodDelete, url, nil) + resp, _, err := dbclient.handleRequest(http.MethodDelete, url, nil, "", "") if err != nil { return nil, err } @@ -206,16 +232,14 @@ func (dbclient *CouchDBConnectionDef) DropDatabase() (*DBOperationResponse, erro return dbResponse, nil - } else { - - return dbResponse, fmt.Errorf("Error dropping database") - } + return dbResponse, fmt.Errorf("Error dropping database") + } //SaveDoc method provides a function to save a document, id and byte array -func (dbclient *CouchDBConnectionDef) SaveDoc(id string, bytesDoc []byte) (string, error) { +func (dbclient *CouchDBConnectionDef) SaveDoc(id string, rev string, bytesDoc []byte, attachments []Attachment) (string, error) { logger.Debugf("===COUCHDB=== Entering SaveDoc()") @@ -223,14 +247,62 @@ func (dbclient *CouchDBConnectionDef) SaveDoc(id string, bytesDoc []byte) (strin logger.Debugf("===COUCHDB=== id=%s, value=%s", id, string(bytesDoc)) - data := bytes.NewReader(bytesDoc) + if rev == "" { + + //See if the document already exists, we need the rev for save + _, revdoc, err := dbclient.ReadDoc(id) + if err != nil { + //set the revision to indicate that the document was not found + rev = "" + } else { + //set the revision to the rev returned from the document read + rev = revdoc + } + } + + logger.Debugf("===COUCHDB=== rev=%s", rev) + + //Set up a buffer for the data to be pushed to couchdb + data := new(bytes.Buffer) + + //Set up a default boundary for use by multipart if sending attachments + defaultBoundary := "" + + //check to see if attachments is nil, if so, then this is a JSON only + if attachments == nil { + + //Test to see if this is a valid JSON + if IsJSON(string(bytesDoc)) != true { + return "", fmt.Errorf("JSON format is not valid") + } + + // if there are no attachments, then use the bytes passed in as the JSON + data.ReadFrom(bytes.NewReader(bytesDoc)) + + } else { + + //attachments are included, create the multipart definition + multipartData, multipartBoundary, err := createAttachmentPart(*data, attachments, defaultBoundary) + if err != nil { + return "", err + } + + //Set the data buffer to the data from the create multi-part data + data.ReadFrom(&multipartData) + + //Set the default boundary to the value generated in the multipart creation + defaultBoundary = multipartBoundary - resp, _, err := dbclient.handleRequest(http.MethodPut, url, data) + } + + //handle the request for saving the JSON or attachments + resp, _, err := dbclient.handleRequest(http.MethodPut, url, data, rev, defaultBoundary) if err != nil { return "", err } defer resp.Body.Close() + //get the revision and return revision, err := getRevisionHeader(resp) if err != nil { return "", err @@ -242,16 +314,71 @@ func (dbclient *CouchDBConnectionDef) SaveDoc(id string, bytesDoc []byte) (strin } +func createAttachmentPart(data bytes.Buffer, attachments []Attachment, defaultBoundary string) (bytes.Buffer, string, error) { + + // read the attachment and save as an attachment + writer := multipart.NewWriter(&data) + + //retrieve the boundary for the multipart + defaultBoundary = writer.Boundary() + + fileAttachments := map[string]FileDetails{} + + for _, attachment := range attachments { + + fileAttachments[attachment.Name] = FileDetails{true, attachment.ContentType, len(attachment.AttachmentBytes)} + + } + + attachmentJSONMap := map[string]interface{}{ + "_attachments": fileAttachments} + + filesForUpload, _ := json.Marshal(attachmentJSONMap) + logger.Debugf(string(filesForUpload)) + + //create the header for the JSON + header := make(textproto.MIMEHeader) + header.Set("Content-Type", "application/json") + + part, err := writer.CreatePart(header) + if err != nil { + return data, defaultBoundary, err + } + + part.Write(filesForUpload) + + for _, attachment := range attachments { + + header := make(textproto.MIMEHeader) + part, err2 := writer.CreatePart(header) + if err2 != nil { + return data, defaultBoundary, err2 + } + part.Write(attachment.AttachmentBytes) + + } + + err = writer.Close() + if err != nil { + return data, defaultBoundary, err + } + + return data, defaultBoundary, nil + +} + func getRevisionHeader(resp *http.Response) (string, error) { - if revision := resp.Header.Get("Etag"); revision == "" { + revision := resp.Header.Get("Etag") + + if revision == "" { return "", fmt.Errorf("No revision tag detected") - } else { - reg := regexp.MustCompile(`"([^"]*)"`) - revisionNoQuotes := reg.ReplaceAllString(revision, "${1}") - return revisionNoQuotes, nil } + reg := regexp.MustCompile(`"([^"]*)"`) + revisionNoQuotes := reg.ReplaceAllString(revision, "${1}") + return revisionNoQuotes, nil + } //ReadDoc method provides function to retrieve a document from the database by id @@ -259,34 +386,115 @@ func (dbclient *CouchDBConnectionDef) ReadDoc(id string) ([]byte, string, error) logger.Debugf("===COUCHDB=== Entering ReadDoc() id=%s", id) - url := fmt.Sprintf("%s/%s/%s", dbclient.URL, dbclient.Database, id) + url := fmt.Sprintf("%s/%s/%s?attachments=true", dbclient.URL, dbclient.Database, id) - resp, _, err := dbclient.handleRequest(http.MethodGet, url, nil) + resp, _, err := dbclient.handleRequest(http.MethodGet, url, nil, "", "") if err != nil { return nil, "", err } defer resp.Body.Close() - jsonDoc, err := ioutil.ReadAll(resp.Body) + /* + dump, err := httputil.DumpResponse(resp, true) + if err != nil { + log.Fatal(err) + } + fmt.Printf("%s", dump) + */ + + //Get the media type from the Content-Type header + mediaType, params, err := mime.ParseMediaType(resp.Header.Get("Content-Type")) if err != nil { - return nil, "", err + log.Fatal(err) } - logger.Debugf("===COUCHDB=== Read document, id=%s, value=%s", id, string(jsonDoc)) - + //Get the revision from header revision, err := getRevisionHeader(resp) if err != nil { return nil, "", err } - logger.Debugf("===COUCHDB=== Exiting ReadDoc()") + //check to see if the is multipart, handle as attachment if multipart is detected + if strings.HasPrefix(mediaType, "multipart/") { + + //Set up the multipart reader based on the boundary + multipartReader := multipart.NewReader(resp.Body, params["boundary"]) + for { + + p, err := multipartReader.NextPart() + + if err == io.EOF { + return nil, "", err + } + + if err != nil { + return nil, "", err + } + + logger.Debugf("===COUCHDB=== part header=%s", p.Header) + + //See if the part is gzip encoded + switch p.Header.Get("Content-Encoding") { + case "gzip": + + var respBody []byte + + gr, err := gzip.NewReader(p) + if err != nil { + return nil, "", err + } + respBody, err = ioutil.ReadAll(gr) + if err != nil { + return nil, "", err + } + + logger.Debugf("===COUCHDB=== Retrieved attachment data") + + if p.Header.Get("Content-Disposition") == "attachment; filename=\"valueBytes\"" { + + return respBody, revision, nil + + } + + default: + + //retrieve the data, this is not gzip + partdata, err := ioutil.ReadAll(p) + if err != nil { + return nil, "", err + } + logger.Debugf("===COUCHDB=== Retrieved attachment data") + + if p.Header.Get("Content-Disposition") == "attachment; filename=\"valueBytes\"" { + + return partdata, revision, nil + + } - return jsonDoc, revision, nil + } + + } + + } else { + + //handle as JSON document + jsonDoc, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, "", err + } + + logger.Debugf("===COUCHDB=== Read document, id=%s, value=%s", id, string(jsonDoc)) + + logger.Debugf("===COUCHDB=== Exiting ReadDoc()") + + return jsonDoc, revision, nil + + } } //handleRequest method is a generic http request handler -func (dbclient *CouchDBConnectionDef) handleRequest(method, url string, data io.Reader) (*http.Response, *CouchDBReturn, error) { +func (dbclient *CouchDBConnectionDef) handleRequest(method, url string, data io.Reader, rev string, multipartBoundary string) (*http.Response, *DBReturn, error) { logger.Debugf("===COUCHDB=== Entering handleRequest() method=%s url=%s", method, url) @@ -298,22 +506,52 @@ func (dbclient *CouchDBConnectionDef) handleRequest(method, url string, data io. //add content header for PUT if method == http.MethodPut { - req.Header.Set("Content-Type", "application/json") + + //If the multipartBoundary is not set, then this is a JSON and content-type should be set + //to application/json. Else, this is contains an attachment and needs to be multipart + if multipartBoundary == "" { + req.Header.Set("Content-Type", "application/json") + } else { + req.Header.Set("Content-Type", "multipart/related;boundary=\""+multipartBoundary+"\"") + } + + //check to see if the revision is set, if so, pass as a header + if rev != "" { + req.Header.Set("If-Match", rev) + } } - //add content header for PUT or GET - if method == http.MethodGet || method == http.MethodPut { + //add content header for PUT + if method == http.MethodPut { req.Header.Set("Accept", "application/json") } + //add content header for GET + if method == http.MethodGet { + req.Header.Set("Accept", "multipart/related") + } + //If username and password are set the use basic auth if dbclient.Username != "" && dbclient.Password != "" { req.SetBasicAuth(dbclient.Username, dbclient.Password) } + /* + dump, err := httputil.DumpRequestOut(req, true) + if err != nil { + log.Fatal(err) + } + + fmt.Printf("%s", dump) + */ + //Create the http client client := &http.Client{} + transport := &http.Transport{Proxy: http.ProxyFromEnvironment} + transport.DisableCompression = false + client.Transport = transport + //Execute http request resp, err := client.Do(req) if err != nil { @@ -321,7 +559,7 @@ func (dbclient *CouchDBConnectionDef) handleRequest(method, url string, data io. } //create the return object for couchDB - couchDBReturn := &CouchDBReturn{} + couchDBReturn := &DBReturn{} //set the return code for the couchDB request couchDBReturn.StatusCode = resp.StatusCode @@ -350,3 +588,9 @@ func (dbclient *CouchDBConnectionDef) handleRequest(method, url string, data io. //If no errors, then return the results return resp, couchDBReturn, nil } + +//IsJSON tests a string to determine if a valid JSON +func IsJSON(s string) bool { + var js map[string]interface{} + return json.Unmarshal([]byte(s), &js) == nil +} diff --git a/core/ledger/kvledger/txmgmt/couchdbtxmgmt/couchdb/couchdb_test.go b/core/ledger/kvledger/txmgmt/couchdbtxmgmt/couchdb/couchdb_test.go index 655481a2414..0fc4a27b222 100644 --- a/core/ledger/kvledger/txmgmt/couchdbtxmgmt/couchdb/couchdb_test.go +++ b/core/ledger/kvledger/txmgmt/couchdbtxmgmt/couchdb/couchdb_test.go @@ -74,13 +74,9 @@ func TestDBCreateSaveWithoutRevision(t *testing.T) { testutil.AssertNoError(t, errdb, fmt.Sprintf("Error when trying to create database")) //Save the test document - _, saveerr := db.SaveDoc("2", assetJSON) + _, saveerr := db.SaveDoc("2", "", assetJSON, nil) testutil.AssertNoError(t, saveerr, fmt.Sprintf("Error when trying to save a document")) - //Attempt to save the document again without updating the revision. this should fail - _, saveerr = db.SaveDoc("2", assetJSON) - testutil.AssertError(t, saveerr, fmt.Sprintf("Error should have thown for a missing revision number")) - } } @@ -97,7 +93,7 @@ func TestDBBadConnection(t *testing.T) { testutil.AssertError(t, errdb, fmt.Sprintf("Error should have been thrown while creating a database with an invalid connecion")) //Save the test document - _, saveerr := db.SaveDoc("3", assetJSON) + _, saveerr := db.SaveDoc("3", "", assetJSON, nil) testutil.AssertError(t, saveerr, fmt.Sprintf("Error should have been thrown while saving a document with an invalid connecion")) //Retrieve the updated test document @@ -127,7 +123,7 @@ func TestDBCreateDatabaseAndPersist(t *testing.T) { testutil.AssertEquals(t, dbResp.DbName, database) //Save the test document - _, saveerr := db.SaveDoc("1", assetJSON) + _, saveerr := db.SaveDoc("1", "", assetJSON, nil) testutil.AssertNoError(t, saveerr, fmt.Sprintf("Error when trying to save a document")) //Retrieve the test document @@ -148,7 +144,7 @@ func TestDBCreateDatabaseAndPersist(t *testing.T) { assetDocUpdated, _ := json.Marshal(assetResp) //Save the updated test document - _, saveerr = db.SaveDoc("1", assetDocUpdated) + _, saveerr = db.SaveDoc("1", "", assetDocUpdated, nil) testutil.AssertNoError(t, saveerr, fmt.Sprintf("Error when trying to save the updated document")) //Retrieve the updated test document @@ -174,6 +170,70 @@ func TestDBCreateDatabaseAndPersist(t *testing.T) { } +func TestDBBadJSON(t *testing.T) { + + if kvledgerconfig.IsCouchDBEnabled() == true { + + cleanup() + + //create a new connection + db, err := CreateConnectionDefinition(hostname, port, database, username, password) + testutil.AssertNoError(t, err, fmt.Sprintf("Error when trying to create database connection definition")) + + //create a new database + _, errdb := db.CreateDatabaseIfNotExist() + testutil.AssertNoError(t, errdb, fmt.Sprintf("Error when trying to create database")) + + //Retrieve the info for the new database and make sure the name matches + dbResp, _, errdb := db.GetDatabaseInfo() + testutil.AssertNoError(t, errdb, fmt.Sprintf("Error when trying to retrieve database information")) + testutil.AssertEquals(t, dbResp.DbName, database) + + badJSON := []byte(`{"asset_name"}`) + + //Save the test document + _, saveerr := db.SaveDoc("1", "", badJSON, nil) + testutil.AssertError(t, saveerr, fmt.Sprintf("Error should have been thrown for a bad JSON")) + + } + +} + +func TestDBSaveAttachment(t *testing.T) { + + if kvledgerconfig.IsCouchDBEnabled() == true { + + cleanup() + defer cleanup() + byteText := []byte(`This is a test document. This is only a test`) + + attachment := Attachment{} + attachment.AttachmentBytes = byteText + attachment.ContentType = "text/plain" + attachment.Name = "valueBytes" + + attachments := []Attachment{} + attachments = append(attachments, attachment) + + //create a new connection + db, err := CreateConnectionDefinition(hostname, port, database, username, password) + testutil.AssertNoError(t, err, fmt.Sprintf("Error when trying to create database connection definition")) + + //create a new database + _, errdb := db.CreateDatabaseIfNotExist() + testutil.AssertNoError(t, errdb, fmt.Sprintf("Error when trying to create database")) + + //Save the test document + _, saveerr := db.SaveDoc("10", "", nil, attachments) + testutil.AssertNoError(t, saveerr, fmt.Sprintf("Error when trying to save a document")) + + //Attempt to retrieve the updated test document with attachments + _, _, geterr2 := db.ReadDoc("10") + testutil.AssertNoError(t, geterr2, fmt.Sprintf("Error when trying to retrieve a document with attachment")) + + } +} + func TestDBRetrieveNonExistingDocument(t *testing.T) { if kvledgerconfig.IsCouchDBEnabled() == true { @@ -196,6 +256,64 @@ func TestDBRetrieveNonExistingDocument(t *testing.T) { } } +func TestDBTestExistingDB(t *testing.T) { + + if kvledgerconfig.IsCouchDBEnabled() == true { + + cleanup() + defer cleanup() + + //create a new connection + db, err := CreateConnectionDefinition(hostname, port, database, username, password) + testutil.AssertNoError(t, err, fmt.Sprintf("Error when trying to create database connection definition")) + + //create a new database + _, errdb := db.CreateDatabaseIfNotExist() + testutil.AssertNoError(t, errdb, fmt.Sprintf("Error when attempting to create a database")) + + //run the create if not exist again. should not return an error + _, errdb = db.CreateDatabaseIfNotExist() + testutil.AssertNoError(t, errdb, fmt.Sprintf("Error when attempting to create a database")) + + } +} + +func TestDBTestDropNonExistDatabase(t *testing.T) { + + if kvledgerconfig.IsCouchDBEnabled() == true { + + cleanup() + defer cleanup() + + //create a new connection + db, err := CreateConnectionDefinition(hostname, port, database, username, password) + testutil.AssertNoError(t, err, fmt.Sprintf("Error when trying to create database connection definition")) + + //Attempt to drop the database without creating first + _, errdbdrop := db.DropDatabase() + testutil.AssertError(t, errdbdrop, fmt.Sprintf("Error should have been reported for attempting to drop a database before creation")) + } + +} + +func TestDBTestDropDatabaseBadConnection(t *testing.T) { + + if kvledgerconfig.IsCouchDBEnabled() == true { + + cleanup() + defer cleanup() + + //create a new connection + db, err := CreateConnectionDefinition(hostname, port+4, database, username, password) + testutil.AssertNoError(t, err, fmt.Sprintf("Error when trying to create database connection definition")) + + //Attempt to drop the database without creating first + _, errdbdrop := db.DropDatabase() + testutil.AssertError(t, errdbdrop, fmt.Sprintf("Error should have been reported for attempting to drop a database before creation")) + } + +} + func cleanup() { //create a new connection diff --git a/core/ledger/kvledger/txmgmt/couchdbtxmgmt/couchdb_txmgr.go b/core/ledger/kvledger/txmgmt/couchdbtxmgmt/couchdb_txmgr.go index bae6e7b5b34..6e46c2ad117 100644 --- a/core/ledger/kvledger/txmgmt/couchdbtxmgmt/couchdb_txmgr.go +++ b/core/ledger/kvledger/txmgmt/couchdbtxmgmt/couchdb_txmgr.go @@ -270,15 +270,39 @@ func (txmgr *CouchDBTxMgr) Commit() error { defer txmgr.commitRWLock.Unlock() defer func() { txmgr.updateSet = nil }() - // SaveDoc using couchdb client - rev, err := txmgr.couchDB.SaveDoc(k, v.value) + if couchdb.IsJSON(string(v.value)) { + + // SaveDoc using couchdb client and use JSON format + rev, err := txmgr.couchDB.SaveDoc(k, "", v.value, nil) + if err != nil { + logger.Errorf("===COUCHDB=== Error during Commit(): %s\n", err.Error()) + return err + } + if rev != "" { + logger.Debugf("===COUCHDB=== Saved document revision number: %s\n", rev) + } + + } else { + + //Create an attachment structure and load the bytes + attachment := &couchdb.Attachment{} + attachment.AttachmentBytes = v.value + attachment.ContentType = "application/octet-stream" + attachment.Name = "valueBytes" + + attachments := []couchdb.Attachment{} + attachments = append(attachments, *attachment) + + // SaveDoc using couchdb client and use attachment + rev, err := txmgr.couchDB.SaveDoc(k, "", nil, attachments) + if err != nil { + logger.Errorf("===COUCHDB=== Error during Commit(): %s\n", err.Error()) + return err + } + if rev != "" { + logger.Debugf("===COUCHDB=== Saved document revision number: %s\n", rev) + } - if err != nil { - logger.Errorf("===COUCHDB=== Error during Commit(): %s\n", err.Error()) - return err - } - if rev != "" { - logger.Debugf("===COUCHDB=== Saved document revision number: %s\n", rev) } }