Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CBG-3913 replace parseStreamWithXattr with sg-bucket function #6812

Merged
merged 3 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions base/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ var (
ErrDocumentMigrated = &sgError{"Document migrated"}
ErrFatalBucketConnection = &sgError{"Fatal error connecting to bucket"}
ErrAuthError = &sgError{"Authentication failure"}
ErrEmptyMetadata = &sgError{"Empty Sync Gateway metadata"}
ErrCasFailureShouldRetry = sgbucket.ErrCasFailureShouldRetry
ErrIndexerError = &sgError{"Indexer error"}
ErrAlreadyExists = &sgError{"Already exists"}
Expand All @@ -53,9 +52,6 @@ var (
// ErrXattrPartialFound is returned if only a subset of requested xattrs are found
ErrXattrPartialFound = &sgError{"Some Requested Xattrs Not Found"}

// ErrXattrInvalidLen is returned if the xattr is corrupt.
ErrXattrInvalidLen = &sgError{"Xattr stream length"}

// ErrPartialViewErrors is returned if the view call contains any partial errors.
// This is more of a warning, and inspecting ViewResult.Errors is required for detail.
ErrPartialViewErrors = &sgError{"Partial errors in view"}
Expand Down
27 changes: 18 additions & 9 deletions db/attachment_compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,15 +211,14 @@ func getAttachmentSyncData(dataType uint8, data []byte) (*AttachmentCompactionDa
var documentBody []byte

if dataType&base.MemcachedDataTypeXattr != 0 {
body, xattr, _, err := parseXattrStreamData(base.SyncXattrName, "", data)
body, xattrs, err := sgbucket.DecodeValueWithXattrs([]string{base.SyncXattrName}, data)
if err != nil {
if errors.Is(err, base.ErrXattrNotFound) || errors.Is(err, base.ErrXattrInvalidLen) {
if errors.Is(err, sgbucket.ErrXattrInvalidLen) {
return nil, nil
}
return nil, err
return nil, fmt.Errorf("Could not parse DCP attachment sync data: %w", err)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a functional change if the xattr isn't found. Should it be checking if value was found?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a functional change but parseXattrStreamData never returned ErrXattrNotFound. That said, I should catch ErrXattrInvalidLen though realistically this is also an error condition where we probably could consider stopping.

}

err = base.JSONUnmarshal(xattr, &attachmentData)
err = base.JSONUnmarshal(xattrs[base.SyncXattrName], &attachmentData)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -312,8 +311,9 @@ func attachmentCompactSweepPhase(ctx context.Context, dataStore base.DataStore,

// If the data contains an xattr then the attachment likely has a compaction ID, need to check this value
if event.DataType&base.MemcachedDataTypeXattr != 0 {
_, xattr, _, err := parseXattrStreamData(base.AttachmentCompactionXattrName, "", event.Value)
if err != nil && !errors.Is(err, base.ErrXattrNotFound) {

xattr, err := getAttachmentCompactionXattr(event.Value)
if err != nil {
base.WarnfCtx(ctx, "[%s] Unexpected error occurred attempting to parse attachment xattr: %v", compactionLoggingID, err)
return true
}
Expand Down Expand Up @@ -424,8 +424,8 @@ func attachmentCompactCleanupPhase(ctx context.Context, dataStore base.DataStore
return true
}

_, xattr, _, err := parseXattrStreamData(base.AttachmentCompactionXattrName, "", event.Value)
if err != nil && !errors.Is(err, base.ErrXattrNotFound) {
xattr, err := getAttachmentCompactionXattr(event.Value)
if err != nil {
base.WarnfCtx(ctx, "[%s] Unexpected error occurred attempting to parse attachment xattr: %v", compactionLoggingID, err)
return true
}
Expand Down Expand Up @@ -575,3 +575,12 @@ func GenerateCompactionDCPStreamName(compactionID, compactionAction string) stri
compactionAction,
)
}

// getAttachmentCompactionXattr returns the value of the attachment compaction xattr from a DCP stream. The value will be nil if the xattr is not found.
func getAttachmentCompactionXattr(data []byte) ([]byte, error) {
_, xattrs, err := sgbucket.DecodeValueWithXattrs([]string{base.AttachmentCompactionXattrName}, data)
if err != nil {
return nil, err
}
return xattrs[base.AttachmentCompactionXattrName], nil
}
2 changes: 1 addition & 1 deletion db/change_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ func (c *changeCache) DocChanged(event sgbucket.FeedEvent) {
if event.DataType != base.MemcachedDataTypeRaw {
base.DebugfCtx(ctx, base.KeyCache, "Unable to unmarshal sync metadata for feed document %q. Will not be included in channel cache. Error: %v", base.UD(docID), err)
}
if err == base.ErrEmptyMetadata {
if errors.Is(err, sgbucket.ErrEmptyMetadata) {
base.WarnfCtx(ctx, "Unexpected empty metadata when processing feed event. docid: %s opcode: %v datatype:%v", base.UD(event.Key), event.Opcode, event.DataType)
}
return
Expand Down
49 changes: 1 addition & 48 deletions db/change_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ package db

import (
"context"
"encoding/binary"
"errors"
"fmt"
"log"
Expand Down Expand Up @@ -1952,42 +1951,6 @@ func (f *testDocChangedFeed) reset() {
}
}

// makeFeedBytes creates a DCP mutation message w/ xattr (reverse of parseXattrStreamData)
func makeFeedBytes(xattrKey, xattrValue, docValue string) []byte {
xattrKeyBytes := []byte(xattrKey)
xattrValueBytes := []byte(xattrValue)
docValueBytes := []byte(docValue)
separator := []byte("\x00")

xattrBytes := xattrKeyBytes
xattrBytes = append(xattrBytes, separator...)
xattrBytes = append(xattrBytes, xattrValueBytes...)
xattrBytes = append(xattrBytes, separator...)
xattrLength := len(xattrBytes) + 4 // +4, to include storage for the length bytes

totalXattrLengthBytes := make([]byte, 4)
binary.BigEndian.PutUint32(totalXattrLengthBytes, uint32(xattrLength))
syncXattrLengthBytes := make([]byte, 4)
binary.BigEndian.PutUint32(syncXattrLengthBytes, uint32(xattrLength))

feedBytes := totalXattrLengthBytes
feedBytes = append(feedBytes, syncXattrLengthBytes...)
feedBytes = append(feedBytes, xattrBytes...)
feedBytes = append(feedBytes, docValueBytes...)
return feedBytes
}

func TestMakeFeedBytes(t *testing.T) {

rawBytes := makeFeedBytes(base.SyncPropertyName, `{"rev":"foo"}`, `{"k":"val"}`)

body, xattr, _, err := parseXattrStreamData(base.SyncXattrName, "", rawBytes)
assert.NoError(t, err)
require.Len(t, body, 11)
require.Len(t, xattr, 13)

}

func (f *testDocChangedFeed) Next() sgbucket.FeedEvent {

// Select the next sequence from a source at random. Simulates unordered global sequences arriving over DCP
Expand All @@ -2005,8 +1968,7 @@ func (f *testDocChangedFeed) Next() sgbucket.FeedEvent {
channelName,
)
docBody := fmt.Sprintf(`{"channels":["%s"]}`, channelName)
// docBody := fmt.Sprintf(feedDoc1kFormat, channelName)
value := makeFeedBytes(base.SyncXattrName, xattrValue, docBody)
value := sgbucket.EncodeValueWithXattrs([]byte(docBody), sgbucket.Xattr{Name: base.SyncXattrName, Value: []byte(xattrValue)})

return sgbucket.FeedEvent{
Opcode: sgbucket.FeedOpMutation,
Expand Down Expand Up @@ -2121,15 +2083,6 @@ func BenchmarkDocChanged(b *testing.B) {
}
}

func TestInvalidXattrStream(t *testing.T) {

body, xattr, userXattr, err := parseXattrStreamData(base.SyncXattrName, "", []byte("abcde"))
require.Error(t, err)
require.Nil(t, body)
require.Nil(t, xattr)
require.Nil(t, userXattr)
}

// TestProcessSkippedEntry:
// - Creates change cache with minimal pending seq wait time to push sequences to skipped quick
// - Push a sequence higher than expected to cache
Expand Down
116 changes: 23 additions & 93 deletions db/document.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"context"
"crypto/sha256"
"encoding/base64"
"encoding/binary"
"errors"
"fmt"
"math"
Expand Down Expand Up @@ -433,30 +432,36 @@ func UnmarshalDocumentSyncData(data []byte, needHistory bool) (*SyncData, error)
// Returns the raw body, in case it's needed for import.

// TODO: Using a pool of unmarshal workers may help prevent memory spikes under load
func UnmarshalDocumentSyncDataFromFeed(data []byte, dataType uint8, userXattrKey string, needHistory bool) (result *SyncData, rawBody []byte, rawXattr []byte, rawUserXattr []byte, err error) {
func UnmarshalDocumentSyncDataFromFeed(data []byte, dataType uint8, userXattrKey string, needHistory bool) (result *SyncData, rawBody []byte, rawSyncXattr []byte, rawUserXattr []byte, err error) {

var body []byte

// If attr datatype flag is set, data includes both xattrs and document body. Check for presence of sync xattr.
// If xattr datatype flag is set, data includes both xattrs and document body. Check for presence of sync xattr.
// Note that there could be a non-sync xattr present
if dataType&base.MemcachedDataTypeXattr != 0 {
var syncXattr []byte
body, syncXattr, rawUserXattr, err = parseXattrStreamData(base.SyncXattrName, userXattrKey, data)
var xattrs map[string][]byte
xattrKeys := []string{base.SyncXattrName}
if userXattrKey != "" {
xattrKeys = append(xattrKeys, userXattrKey)
}
body, xattrs, err = sgbucket.DecodeValueWithXattrs(xattrKeys, data)
if err != nil {
return nil, nil, nil, nil, err
}
rawSyncXattr = xattrs[base.SyncXattrName]
rawUserXattr = xattrs[userXattrKey]

// If the sync xattr is present, use that to build SyncData
if syncXattr != nil && len(syncXattr) > 0 {
if len(rawSyncXattr) > 0 {
result = &SyncData{}
if needHistory {
result.History = make(RevTree)
}
err = base.JSONUnmarshal(syncXattr, result)
err = base.JSONUnmarshal(rawSyncXattr, result)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("Found _sync xattr (%q), but could not unmarshal: %w", string(syncXattr), err)
return nil, nil, nil, nil, fmt.Errorf("Found _sync xattr (%q), but could not unmarshal: %w", syncXattr, err)
}
return result, body, syncXattr, rawUserXattr, nil
return result, body, rawSyncXattr, rawUserXattr, nil
}
} else {
// Xattr flag not set - data is just the document body
Expand All @@ -469,93 +474,18 @@ func UnmarshalDocumentSyncDataFromFeed(data []byte, dataType uint8, userXattrKey
}

func UnmarshalDocumentFromFeed(ctx context.Context, docid string, cas uint64, data []byte, dataType uint8, userXattrKey string) (doc *Document, err error) {
var body []byte

if dataType&base.MemcachedDataTypeXattr != 0 {
var syncXattr []byte
var userXattr []byte
body, syncXattr, userXattr, err = parseXattrStreamData(base.SyncXattrName, userXattrKey, data)
if err != nil {
return nil, err
}
return unmarshalDocumentWithXattr(ctx, docid, body, syncXattr, userXattr, cas, DocUnmarshalAll)
if dataType&base.MemcachedDataTypeXattr == 0 {
return unmarshalDocument(docid, data)
}

return unmarshalDocument(docid, data)
}

// parseXattrStreamData returns the raw bytes of the body and the requested xattr (when present) from the raw DCP data bytes.
// Details on format (taken from https://docs.google.com/document/d/18UVa5j8KyufnLLy29VObbWRtoBn9vs8pcxttuMt6rz8/edit#heading=h.caqiui1pmmmb.):
/*
When the XATTR bit is set the first uint32_t in the body contains the size of the entire XATTR section.


Byte/ 0 | 1 | 2 | 3 |
/ | | | |
|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|
+---------------+---------------+---------------+---------------+
0| Total xattr length in network byte order |
+---------------+---------------+---------------+---------------+

Following the length you'll find an iovector-style encoding of all of the XATTR key-value pairs with the following encoding:

uint32_t length of next xattr pair (network order)
xattr key in modified UTF-8
0x00
xattr value in modified UTF-8
0x00

The 0x00 byte after the key saves us from storing a key length, and the trailing 0x00 is just for convenience to allow us to use string functions to search in them.
*/

func parseXattrStreamData(xattrName string, userXattrName string, data []byte) (body []byte, xattr []byte, userXattr []byte, err error) {

if len(data) < 4 {
return nil, nil, nil, base.ErrEmptyMetadata
}

xattrsLen := binary.BigEndian.Uint32(data[0:4])
if int(xattrsLen+4) > len(data) {
return nil, nil, nil, fmt.Errorf("%w (%d) from bytes %+v", base.ErrXattrInvalidLen, xattrsLen, data[0:4])
}
body = data[xattrsLen+4:]
if xattrsLen == 0 {
return body, nil, nil, nil
xattrKeys := []string{base.SyncXattrName}
if userXattrKey != "" {
xattrKeys = append(xattrKeys, userXattrKey)
}

// In the xattr key/value pairs, key and value are both terminated by 0x00 (byte(0)). Use this as a separator to split the byte slice
separator := []byte("\x00")

// Iterate over xattr key/value pairs
pos := uint32(4)
for pos < xattrsLen {
pairLen := binary.BigEndian.Uint32(data[pos : pos+4])
if pairLen == 0 || int(pos+pairLen) > len(data) {
return nil, nil, nil, fmt.Errorf("Unexpected xattr pair length (%d) - unable to parse xattrs", pairLen)
}
pos += 4
pairBytes := data[pos : pos+pairLen]
components := bytes.Split(pairBytes, separator)
// xattr pair has the format [key]0x00[value]0x00, and so should split into three components
if len(components) != 3 {
return nil, nil, nil, fmt.Errorf("Unexpected number of components found in xattr pair: %s", pairBytes)
}
xattrKey := string(components[0])
if xattrName == xattrKey {
xattr = components[1]
} else if userXattrName != "" && userXattrName == xattrKey {
userXattr = components[1]
}

// Exit if we have xattrs we want (either both or one if the latter is disabled)
if len(xattr) > 0 && (len(userXattr) > 0 || userXattrName == "") {
return body, xattr, userXattr, nil
}

pos += pairLen
body, xattrs, err := sgbucket.DecodeValueWithXattrs(xattrKeys, data)
if err != nil {
return nil, err
}

return body, xattr, userXattr, nil
return unmarshalDocumentWithXattr(ctx, docid, body, xattrs[base.SyncXattrName], xattrs[userXattrKey], cas, DocUnmarshalAll)
}

func (doc *SyncData) HasValidSyncData() bool {
Expand Down
Loading
Loading