diff --git a/base/error.go b/base/error.go index 7b4c33b99c..8b41558f49 100644 --- a/base/error.go +++ b/base/error.go @@ -35,7 +35,6 @@ var ( ErrDocumentMigrated = &sgError{"Document migrated"} ErrFatalBucketConnection = &sgError{"Fatal error connecting to bucket"} ErrAuthError = &sgError{"Authentication failure"} - ErrEmptyMetadata = &sgError{"Empty Sync Gateway metadata"} ErrCasFailureShouldRetry = sgbucket.ErrCasFailureShouldRetry ErrIndexerError = &sgError{"Indexer error"} ErrAlreadyExists = &sgError{"Already exists"} @@ -53,9 +52,6 @@ var ( // ErrXattrPartialFound is returned if only a subset of requested xattrs are found ErrXattrPartialFound = &sgError{"Some Requested Xattrs Not Found"} - // ErrXattrInvalidLen is returned if the xattr is corrupt. - ErrXattrInvalidLen = &sgError{"Xattr stream length"} - // ErrPartialViewErrors is returned if the view call contains any partial errors. // This is more of a warning, and inspecting ViewResult.Errors is required for detail. ErrPartialViewErrors = &sgError{"Partial errors in view"} diff --git a/db/attachment_compaction.go b/db/attachment_compaction.go index a0c4c96ba8..0d54835ee1 100644 --- a/db/attachment_compaction.go +++ b/db/attachment_compaction.go @@ -211,15 +211,14 @@ func getAttachmentSyncData(dataType uint8, data []byte) (*AttachmentCompactionDa var documentBody []byte if dataType&base.MemcachedDataTypeXattr != 0 { - body, xattr, _, err := parseXattrStreamData(base.SyncXattrName, "", data) + body, xattrs, err := sgbucket.DecodeValueWithXattrs([]string{base.SyncXattrName}, data) if err != nil { - if errors.Is(err, base.ErrXattrNotFound) || errors.Is(err, base.ErrXattrInvalidLen) { + if errors.Is(err, sgbucket.ErrXattrInvalidLen) { return nil, nil } - return nil, err + return nil, fmt.Errorf("Could not parse DCP attachment sync data: %w", err) } - - err = base.JSONUnmarshal(xattr, &attachmentData) + err = base.JSONUnmarshal(xattrs[base.SyncXattrName], &attachmentData) if err != nil { return nil, err } @@ -312,8 +311,9 @@ func attachmentCompactSweepPhase(ctx context.Context, dataStore base.DataStore, // If the data contains an xattr then the attachment likely has a compaction ID, need to check this value if event.DataType&base.MemcachedDataTypeXattr != 0 { - _, xattr, _, err := parseXattrStreamData(base.AttachmentCompactionXattrName, "", event.Value) - if err != nil && !errors.Is(err, base.ErrXattrNotFound) { + + xattr, err := getAttachmentCompactionXattr(event.Value) + if err != nil { base.WarnfCtx(ctx, "[%s] Unexpected error occurred attempting to parse attachment xattr: %v", compactionLoggingID, err) return true } @@ -424,8 +424,8 @@ func attachmentCompactCleanupPhase(ctx context.Context, dataStore base.DataStore return true } - _, xattr, _, err := parseXattrStreamData(base.AttachmentCompactionXattrName, "", event.Value) - if err != nil && !errors.Is(err, base.ErrXattrNotFound) { + xattr, err := getAttachmentCompactionXattr(event.Value) + if err != nil { base.WarnfCtx(ctx, "[%s] Unexpected error occurred attempting to parse attachment xattr: %v", compactionLoggingID, err) return true } @@ -575,3 +575,12 @@ func GenerateCompactionDCPStreamName(compactionID, compactionAction string) stri compactionAction, ) } + +// getAttachmentCompactionXattr returns the value of the attachment compaction xattr from a DCP stream. The value will be nil if the xattr is not found. +func getAttachmentCompactionXattr(data []byte) ([]byte, error) { + _, xattrs, err := sgbucket.DecodeValueWithXattrs([]string{base.AttachmentCompactionXattrName}, data) + if err != nil { + return nil, err + } + return xattrs[base.AttachmentCompactionXattrName], nil +} diff --git a/db/change_cache.go b/db/change_cache.go index df019b1567..9ab528ed53 100644 --- a/db/change_cache.go +++ b/db/change_cache.go @@ -394,7 +394,7 @@ func (c *changeCache) DocChanged(event sgbucket.FeedEvent) { if event.DataType != base.MemcachedDataTypeRaw { base.DebugfCtx(ctx, base.KeyCache, "Unable to unmarshal sync metadata for feed document %q. Will not be included in channel cache. Error: %v", base.UD(docID), err) } - if err == base.ErrEmptyMetadata { + if errors.Is(err, sgbucket.ErrEmptyMetadata) { base.WarnfCtx(ctx, "Unexpected empty metadata when processing feed event. docid: %s opcode: %v datatype:%v", base.UD(event.Key), event.Opcode, event.DataType) } return diff --git a/db/change_cache_test.go b/db/change_cache_test.go index f0d5ebb396..1be2e4f4a4 100644 --- a/db/change_cache_test.go +++ b/db/change_cache_test.go @@ -10,7 +10,6 @@ package db import ( "context" - "encoding/binary" "errors" "fmt" "log" @@ -1952,42 +1951,6 @@ func (f *testDocChangedFeed) reset() { } } -// makeFeedBytes creates a DCP mutation message w/ xattr (reverse of parseXattrStreamData) -func makeFeedBytes(xattrKey, xattrValue, docValue string) []byte { - xattrKeyBytes := []byte(xattrKey) - xattrValueBytes := []byte(xattrValue) - docValueBytes := []byte(docValue) - separator := []byte("\x00") - - xattrBytes := xattrKeyBytes - xattrBytes = append(xattrBytes, separator...) - xattrBytes = append(xattrBytes, xattrValueBytes...) - xattrBytes = append(xattrBytes, separator...) - xattrLength := len(xattrBytes) + 4 // +4, to include storage for the length bytes - - totalXattrLengthBytes := make([]byte, 4) - binary.BigEndian.PutUint32(totalXattrLengthBytes, uint32(xattrLength)) - syncXattrLengthBytes := make([]byte, 4) - binary.BigEndian.PutUint32(syncXattrLengthBytes, uint32(xattrLength)) - - feedBytes := totalXattrLengthBytes - feedBytes = append(feedBytes, syncXattrLengthBytes...) - feedBytes = append(feedBytes, xattrBytes...) - feedBytes = append(feedBytes, docValueBytes...) - return feedBytes -} - -func TestMakeFeedBytes(t *testing.T) { - - rawBytes := makeFeedBytes(base.SyncPropertyName, `{"rev":"foo"}`, `{"k":"val"}`) - - body, xattr, _, err := parseXattrStreamData(base.SyncXattrName, "", rawBytes) - assert.NoError(t, err) - require.Len(t, body, 11) - require.Len(t, xattr, 13) - -} - func (f *testDocChangedFeed) Next() sgbucket.FeedEvent { // Select the next sequence from a source at random. Simulates unordered global sequences arriving over DCP @@ -2005,8 +1968,7 @@ func (f *testDocChangedFeed) Next() sgbucket.FeedEvent { channelName, ) docBody := fmt.Sprintf(`{"channels":["%s"]}`, channelName) - // docBody := fmt.Sprintf(feedDoc1kFormat, channelName) - value := makeFeedBytes(base.SyncXattrName, xattrValue, docBody) + value := sgbucket.EncodeValueWithXattrs([]byte(docBody), sgbucket.Xattr{Name: base.SyncXattrName, Value: []byte(xattrValue)}) return sgbucket.FeedEvent{ Opcode: sgbucket.FeedOpMutation, @@ -2121,15 +2083,6 @@ func BenchmarkDocChanged(b *testing.B) { } } -func TestInvalidXattrStream(t *testing.T) { - - body, xattr, userXattr, err := parseXattrStreamData(base.SyncXattrName, "", []byte("abcde")) - require.Error(t, err) - require.Nil(t, body) - require.Nil(t, xattr) - require.Nil(t, userXattr) -} - // TestProcessSkippedEntry: // - Creates change cache with minimal pending seq wait time to push sequences to skipped quick // - Push a sequence higher than expected to cache diff --git a/db/document.go b/db/document.go index f81dbe5a25..73efb2f0ee 100644 --- a/db/document.go +++ b/db/document.go @@ -13,7 +13,6 @@ import ( "context" "crypto/sha256" "encoding/base64" - "encoding/binary" "errors" "fmt" "math" @@ -433,30 +432,36 @@ func UnmarshalDocumentSyncData(data []byte, needHistory bool) (*SyncData, error) // Returns the raw body, in case it's needed for import. // TODO: Using a pool of unmarshal workers may help prevent memory spikes under load -func UnmarshalDocumentSyncDataFromFeed(data []byte, dataType uint8, userXattrKey string, needHistory bool) (result *SyncData, rawBody []byte, rawXattr []byte, rawUserXattr []byte, err error) { +func UnmarshalDocumentSyncDataFromFeed(data []byte, dataType uint8, userXattrKey string, needHistory bool) (result *SyncData, rawBody []byte, rawSyncXattr []byte, rawUserXattr []byte, err error) { var body []byte - // If attr datatype flag is set, data includes both xattrs and document body. Check for presence of sync xattr. + // If xattr datatype flag is set, data includes both xattrs and document body. Check for presence of sync xattr. // Note that there could be a non-sync xattr present if dataType&base.MemcachedDataTypeXattr != 0 { - var syncXattr []byte - body, syncXattr, rawUserXattr, err = parseXattrStreamData(base.SyncXattrName, userXattrKey, data) + var xattrs map[string][]byte + xattrKeys := []string{base.SyncXattrName} + if userXattrKey != "" { + xattrKeys = append(xattrKeys, userXattrKey) + } + body, xattrs, err = sgbucket.DecodeValueWithXattrs(xattrKeys, data) if err != nil { return nil, nil, nil, nil, err } + rawSyncXattr = xattrs[base.SyncXattrName] + rawUserXattr = xattrs[userXattrKey] // If the sync xattr is present, use that to build SyncData - if syncXattr != nil && len(syncXattr) > 0 { + if len(rawSyncXattr) > 0 { result = &SyncData{} if needHistory { result.History = make(RevTree) } - err = base.JSONUnmarshal(syncXattr, result) + err = base.JSONUnmarshal(rawSyncXattr, result) if err != nil { - return nil, nil, nil, nil, fmt.Errorf("Found _sync xattr (%q), but could not unmarshal: %w", string(syncXattr), err) + return nil, nil, nil, nil, fmt.Errorf("Found _sync xattr (%q), but could not unmarshal: %w", syncXattr, err) } - return result, body, syncXattr, rawUserXattr, nil + return result, body, rawSyncXattr, rawUserXattr, nil } } else { // Xattr flag not set - data is just the document body @@ -469,93 +474,18 @@ func UnmarshalDocumentSyncDataFromFeed(data []byte, dataType uint8, userXattrKey } func UnmarshalDocumentFromFeed(ctx context.Context, docid string, cas uint64, data []byte, dataType uint8, userXattrKey string) (doc *Document, err error) { - var body []byte - - if dataType&base.MemcachedDataTypeXattr != 0 { - var syncXattr []byte - var userXattr []byte - body, syncXattr, userXattr, err = parseXattrStreamData(base.SyncXattrName, userXattrKey, data) - if err != nil { - return nil, err - } - return unmarshalDocumentWithXattr(ctx, docid, body, syncXattr, userXattr, cas, DocUnmarshalAll) + if dataType&base.MemcachedDataTypeXattr == 0 { + return unmarshalDocument(docid, data) } - - return unmarshalDocument(docid, data) -} - -// parseXattrStreamData returns the raw bytes of the body and the requested xattr (when present) from the raw DCP data bytes. -// Details on format (taken from https://docs.google.com/document/d/18UVa5j8KyufnLLy29VObbWRtoBn9vs8pcxttuMt6rz8/edit#heading=h.caqiui1pmmmb.): -/* - When the XATTR bit is set the first uint32_t in the body contains the size of the entire XATTR section. - - - Byte/ 0 | 1 | 2 | 3 | - / | | | | - |0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7| - +---------------+---------------+---------------+---------------+ - 0| Total xattr length in network byte order | - +---------------+---------------+---------------+---------------+ - - Following the length you'll find an iovector-style encoding of all of the XATTR key-value pairs with the following encoding: - - uint32_t length of next xattr pair (network order) - xattr key in modified UTF-8 - 0x00 - xattr value in modified UTF-8 - 0x00 - - The 0x00 byte after the key saves us from storing a key length, and the trailing 0x00 is just for convenience to allow us to use string functions to search in them. -*/ - -func parseXattrStreamData(xattrName string, userXattrName string, data []byte) (body []byte, xattr []byte, userXattr []byte, err error) { - - if len(data) < 4 { - return nil, nil, nil, base.ErrEmptyMetadata - } - - xattrsLen := binary.BigEndian.Uint32(data[0:4]) - if int(xattrsLen+4) > len(data) { - return nil, nil, nil, fmt.Errorf("%w (%d) from bytes %+v", base.ErrXattrInvalidLen, xattrsLen, data[0:4]) - } - body = data[xattrsLen+4:] - if xattrsLen == 0 { - return body, nil, nil, nil + xattrKeys := []string{base.SyncXattrName} + if userXattrKey != "" { + xattrKeys = append(xattrKeys, userXattrKey) } - - // In the xattr key/value pairs, key and value are both terminated by 0x00 (byte(0)). Use this as a separator to split the byte slice - separator := []byte("\x00") - - // Iterate over xattr key/value pairs - pos := uint32(4) - for pos < xattrsLen { - pairLen := binary.BigEndian.Uint32(data[pos : pos+4]) - if pairLen == 0 || int(pos+pairLen) > len(data) { - return nil, nil, nil, fmt.Errorf("Unexpected xattr pair length (%d) - unable to parse xattrs", pairLen) - } - pos += 4 - pairBytes := data[pos : pos+pairLen] - components := bytes.Split(pairBytes, separator) - // xattr pair has the format [key]0x00[value]0x00, and so should split into three components - if len(components) != 3 { - return nil, nil, nil, fmt.Errorf("Unexpected number of components found in xattr pair: %s", pairBytes) - } - xattrKey := string(components[0]) - if xattrName == xattrKey { - xattr = components[1] - } else if userXattrName != "" && userXattrName == xattrKey { - userXattr = components[1] - } - - // Exit if we have xattrs we want (either both or one if the latter is disabled) - if len(xattr) > 0 && (len(userXattr) > 0 || userXattrName == "") { - return body, xattr, userXattr, nil - } - - pos += pairLen + body, xattrs, err := sgbucket.DecodeValueWithXattrs(xattrKeys, data) + if err != nil { + return nil, err } - - return body, xattr, userXattr, nil + return unmarshalDocumentWithXattr(ctx, docid, body, xattrs[base.SyncXattrName], xattrs[userXattrKey], cas, DocUnmarshalAll) } func (doc *SyncData) HasValidSyncData() bool { diff --git a/db/document_test.go b/db/document_test.go index 16fbd97ff4..b2e6ff8df7 100644 --- a/db/document_test.go +++ b/db/document_test.go @@ -16,6 +16,7 @@ import ( "log" "testing" + sgbucket "github.com/couchbase/sg-bucket" "github.com/couchbase/sync_gateway/base" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -190,42 +191,6 @@ func BenchmarkUnmarshalBody(b *testing.B) { } } -func TestParseXattr(t *testing.T) { - zeroByte := byte(0) - // Build payload for single xattr pair and body - xattrValue := `{"seq":1}` - xattrPairLength := 4 + len(base.SyncXattrName) + len(xattrValue) + 2 - xattrTotalLength := xattrPairLength - body := `{"value":"ABC"}` - - // Build up the dcp Body - dcpBody := make([]byte, 8) - binary.BigEndian.PutUint32(dcpBody[0:4], uint32(xattrTotalLength)) - binary.BigEndian.PutUint32(dcpBody[4:8], uint32(xattrPairLength)) - dcpBody = append(dcpBody, base.SyncXattrName...) - dcpBody = append(dcpBody, zeroByte) - dcpBody = append(dcpBody, xattrValue...) - dcpBody = append(dcpBody, zeroByte) - dcpBody = append(dcpBody, body...) - - resultBody, resultXattr, _, err := parseXattrStreamData(base.SyncXattrName, "", dcpBody) - assert.NoError(t, err, "Unexpected error parsing dcp body") - assert.Equal(t, body, string(resultBody)) - assert.Equal(t, xattrValue, string(resultXattr)) - - // Attempt to retrieve non-existent xattr - resultBody, resultXattr, _, err = parseXattrStreamData("nonexistent", "", dcpBody) - assert.NoError(t, err, "Unexpected error parsing dcp body") - assert.Equal(t, body, string(resultBody)) - assert.Equal(t, "", string(resultXattr)) - - // Attempt to retrieve xattr from empty dcp body - emptyBody, emptyXattr, _, emptyErr := parseXattrStreamData(base.SyncXattrName, "", []byte{}) - assert.Equal(t, base.ErrEmptyMetadata, emptyErr) - assert.True(t, emptyBody == nil, "Nil body expected") - assert.True(t, emptyXattr == nil, "Nil xattr expected") -} - func TestParseDocumentCas(t *testing.T) { syncData := &SyncData{} syncData.Cas = "0x00002ade734fb714" @@ -293,55 +258,76 @@ func TestGetDeepMutableBody(t *testing.T) { } } -func TestInvalidXattrStreamDataLen(t *testing.T) { +func TestDCPDecodeValue(t *testing.T) { testCases := []struct { - name string - body []byte - expectedErr error + name string + body []byte + expectedErr error + expectedBody []byte + expectedSyncXattr []byte }{ { name: "bad value", body: []byte("abcde"), - expectedErr: base.ErrXattrInvalidLen, + expectedErr: sgbucket.ErrXattrInvalidLen, }, { name: "xattr length 4, overflow", body: []byte{0x00, 0x00, 0x00, 0x04, 0x01}, - expectedErr: base.ErrXattrInvalidLen, + expectedErr: sgbucket.ErrXattrInvalidLen, + }, + { + name: "empty", + body: nil, + expectedErr: sgbucket.ErrEmptyMetadata, + }, + { + name: "single xattr pair and body", + body: getSingleXattrDCPBytes(), + expectedBody: []byte(`{"value":"ABC"}`), + expectedSyncXattr: []byte(`{"seq":1}`), }, } for _, test := range testCases { t.Run(test.name, func(t *testing.T) { - // parseXattrStreamData is the underlying function - body, xattr, userXattr, err := parseXattrStreamData(base.SyncXattrName, "", test.body) - require.Error(t, err) + // DecodeValueWithXattrs is the underlying function + body, xattrs, err := sgbucket.DecodeValueWithXattrs([]string{"_sync"}, test.body) require.ErrorIs(t, err, test.expectedErr) - require.Nil(t, body) - require.Nil(t, xattr) - require.Nil(t, userXattr) - // UnmarshalDocumentSyncData wraps parseXattrStreamData + require.Equal(t, test.expectedBody, body) + if test.expectedSyncXattr != nil { + require.Len(t, xattrs, 1) + require.Equal(t, test.expectedSyncXattr, xattrs["_sync"]) + } else { + require.Nil(t, xattrs) + } + // UnmarshalDocumentSyncData wraps DecodeValueWithXattrs result, rawBody, rawXattr, rawUserXattr, err := UnmarshalDocumentSyncDataFromFeed(test.body, base.MemcachedDataTypeXattr, "", false) - require.ErrorIs(t, err, base.ErrXattrInvalidLen) - require.Nil(t, result) - require.Nil(t, rawBody) - require.Nil(t, rawXattr) + require.ErrorIs(t, err, test.expectedErr) + if test.expectedSyncXattr != nil { + require.NotNil(t, result) + } else { + require.Nil(t, result) + } + require.Equal(t, test.expectedBody, rawBody) + require.Equal(t, test.expectedSyncXattr, rawXattr) require.Nil(t, rawUserXattr) }) } } +// TestInvalidXattrStreamEmptyBody tests is a bit different than cases in TestDCPDecodeValue since DecodeValueWithXattrs will pass but UnmarshalDocumentSyncDataFromFeed will fail due to invalid json. func TestInvalidXattrStreamEmptyBody(t *testing.T) { inputStream := []byte{0x00, 0x00, 0x00, 0x01, 0x01} emptyBody := []byte{} - // parseXattrStreamData is the underlying function - body, xattr, userXattr, err := parseXattrStreamData(base.SyncXattrName, "", inputStream) + + // DecodeValueWithXattrs is the underlying function + body, xattrs, err := sgbucket.DecodeValueWithXattrs([]string{"_sync"}, inputStream) require.NoError(t, err) require.Equal(t, emptyBody, body) - require.Nil(t, xattr) - require.Nil(t, userXattr) + require.Empty(t, xattrs) - // UnmarshalDocumentSyncData wraps parseXattrStreamData + // UnmarshalDocumentSyncData wraps DecodeValueWithXattrs result, rawBody, rawXattr, rawUserXattr, err := UnmarshalDocumentSyncDataFromFeed(inputStream, base.MemcachedDataTypeXattr, "", false) require.Error(t, err) // unexpected end of JSON input require.Nil(t, result) @@ -350,3 +336,24 @@ func TestInvalidXattrStreamEmptyBody(t *testing.T) { require.Nil(t, rawUserXattr) } + +// getSingleXattrDCPBytes returns a DCP body with a single xattr pair and body +func getSingleXattrDCPBytes() []byte { + zeroByte := byte(0) + // Build payload for single xattr pair and body + xattrValue := `{"seq":1}` + xattrPairLength := 4 + len(base.SyncXattrName) + len(xattrValue) + 2 + xattrTotalLength := xattrPairLength + body := `{"value":"ABC"}` + + // Build up the dcp Body + dcpBody := make([]byte, 8) + binary.BigEndian.PutUint32(dcpBody[0:4], uint32(xattrTotalLength)) + binary.BigEndian.PutUint32(dcpBody[4:8], uint32(xattrPairLength)) + dcpBody = append(dcpBody, base.SyncXattrName...) + dcpBody = append(dcpBody, zeroByte) + dcpBody = append(dcpBody, xattrValue...) + dcpBody = append(dcpBody, zeroByte) + dcpBody = append(dcpBody, body...) + return dcpBody +} diff --git a/db/import_listener.go b/db/import_listener.go index f1395afe06..403710ac3c 100644 --- a/db/import_listener.go +++ b/db/import_listener.go @@ -12,6 +12,7 @@ package db import ( "context" + "errors" "fmt" "sort" "strings" @@ -169,7 +170,7 @@ func (il *importListener) ProcessFeedEvent(event sgbucket.FeedEvent) (shouldPers func (il *importListener) ImportFeedEvent(ctx context.Context, collection *DatabaseCollectionWithUser, event sgbucket.FeedEvent) { syncData, rawBody, rawXattr, rawUserXattr, err := UnmarshalDocumentSyncDataFromFeed(event.Value, event.DataType, collection.userXattrKey(), false) if err != nil { - if err == base.ErrEmptyMetadata { + if errors.Is(err, sgbucket.ErrEmptyMetadata) { base.WarnfCtx(ctx, "Unexpected empty metadata when processing feed event. docid: %s opcode: %v datatype:%v", base.UD(event.Key), event.Opcode, event.DataType) il.importStats.ImportErrorCount.Add(1) return diff --git a/go.mod b/go.mod index 7d9db52b16..07ef1c7dc4 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/couchbase/gocbcore/v10 v10.3.1 github.com/couchbase/gomemcached v0.2.1 github.com/couchbase/goutils v0.1.2 - github.com/couchbase/sg-bucket v0.0.0-20240402154301-12625d8851a8 + github.com/couchbase/sg-bucket v0.0.0-20240510005938-766555de45c4 github.com/couchbaselabs/go-fleecedelta v0.0.0-20220909152808-6d09efa7a338 github.com/couchbaselabs/gocbconnstr v1.0.5 github.com/couchbaselabs/rosmar v0.0.0-20240417141520-4127f7d4c389 diff --git a/go.sum b/go.sum index ca18142f1b..3259152d60 100644 --- a/go.sum +++ b/go.sum @@ -54,8 +54,8 @@ github.com/couchbase/goprotostellar v1.0.1 h1:mtDVYTgnnDSQ3t7mQRG6jl/tOXKOuuFM9P github.com/couchbase/goprotostellar v1.0.1/go.mod h1:gs1eioLVOHETTFWxDY4v7Q/kRPMgqmX6t/TPcI429ls= github.com/couchbase/goutils v0.1.2 h1:gWr8B6XNWPIhfalHNog3qQKfGiYyh4K4VhO3P2o9BCs= github.com/couchbase/goutils v0.1.2/go.mod h1:h89Ek/tiOxxqjz30nPPlwZdQbdB8BwgnuBxeoUe/ViE= -github.com/couchbase/sg-bucket v0.0.0-20240402154301-12625d8851a8 h1:kfWMYvUgSg2yIZJx+t63Ucl+zorvFqlYayXPkiXFtSE= -github.com/couchbase/sg-bucket v0.0.0-20240402154301-12625d8851a8/go.mod h1:5me3TJLTPfR0s3aMJZcPoTu5FT8oelaINz5l7Q3cApE= +github.com/couchbase/sg-bucket v0.0.0-20240510005938-766555de45c4 h1:UzmQ2oaCUEPkD/7l1e3s7U9OA4BFptp/Oi5UgaAVwBg= +github.com/couchbase/sg-bucket v0.0.0-20240510005938-766555de45c4/go.mod h1:5me3TJLTPfR0s3aMJZcPoTu5FT8oelaINz5l7Q3cApE= github.com/couchbase/tools-common/cloud v1.0.0 h1:SQZIccXoedbrThehc/r9BJbpi/JhwJ8X00PDjZ2gEBE= github.com/couchbase/tools-common/cloud v1.0.0/go.mod h1:6KVlRpbcnDWrvickUJ+xpqCWx1vgYYlEli/zL4xmZAg= github.com/couchbase/tools-common/fs v1.0.0 h1:HFA4xCF/r3BtZShFJUxzVvGuXtDkqGnaPzYJP3Kp1mw= diff --git a/xdcr/rosmar_xdcr.go b/xdcr/rosmar_xdcr.go index c052ffb56d..610be23b0c 100644 --- a/xdcr/rosmar_xdcr.go +++ b/xdcr/rosmar_xdcr.go @@ -192,8 +192,8 @@ func opWithMeta(ctx context.Context, collection *rosmar.Collection, originalCas var body []byte if event.DataType&sgbucket.FeedDataTypeXattr != 0 { var err error - var dcpXattrs []sgbucket.Xattr - body, dcpXattrs, err = sgbucket.DecodeValueWithXattrs(event.Value) + var dcpXattrs map[string][]byte + body, dcpXattrs, err = sgbucket.DecodeValueWithAllXattrs(event.Value) if err != nil { return err } @@ -224,11 +224,11 @@ func (r *rosmarManager) Stats(context.Context) (*Stats, error) { }, nil } -// xattrToBytes converts a slice of Xattrs to a byte slice of marshalled json. -func xattrToBytes(xattrs []sgbucket.Xattr) ([]byte, error) { +// xattrToBytes converts a map of xattrs of marshalled json. +func xattrToBytes(xattrs map[string][]byte) ([]byte, error) { xattrMap := make(map[string]json.RawMessage) - for _, xattr := range xattrs { - xattrMap[xattr.Name] = xattr.Value + for k, v := range xattrs { + xattrMap[k] = v } return json.Marshal(xattrMap) }