Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CBG-3913 put fixes from sync_gateway into DecodeValueWithXattrs #122

Merged
merged 1 commit into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 39 additions & 9 deletions tap.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package sgbucket
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"math"
"time"
Expand Down Expand Up @@ -100,6 +101,12 @@ const FeedResume = 1
// checkpoint persistence (used to avoid recursive checkpoint document processing)
type FeedEventCallbackFunc func(event FeedEvent) bool

// ErrXattrInvalidLen is returned if the xattr is corrupt.
var ErrXattrInvalidLen = errors.New("Xattr stream length")

// ErrEmptyMetadata is returned when there is no Sync Gateway metadata
var ErrEmptyMetadata = errors.New("Empty Sync Gateway metadata")

// The name and value of an extended attribute (xattr)
type Xattr struct {
Name string
Expand Down Expand Up @@ -147,24 +154,47 @@ func EncodeValueWithXattrs(body []byte, xattrs ...Xattr) []byte {

// DecodeValueWithXattrs converts DCP Xattrs value format into a body and zero or more Xattrs.
// Call this if the event DataType has the FeedDataTypeXattr flag.
func DecodeValueWithXattrs(data []byte) (body []byte, xattrs []Xattr, err error) {
// Details on format (taken from https://docs.google.com/document/d/18UVa5j8KyufnLLy29VObbWRtoBn9vs8pcxttuMt6rz8/edit#heading=h.caqiui1pmmmb.):
/*
When the XATTR bit is set the first uint32_t in the body contains the size of the entire XATTR section.


Byte/ 0 | 1 | 2 | 3 |
/ | | | |
|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|
+---------------+---------------+---------------+---------------+
0| Total xattr length in network byte order |
+---------------+---------------+---------------+---------------+

Following the length you'll find an iovector-style encoding of all of the XATTR key-value pairs with the following encoding:

uint32_t length of next xattr pair (network order)
xattr key in modified UTF-8
0x00
xattr value in modified UTF-8
0x00

The 0x00 byte after the key saves us from storing a key length, and the trailing 0x00 is just for convenience to allow us to use string functions to search in them.
*/

func DecodeValueWithXattrs(data []byte) ([]byte, []Xattr, error) {
if len(data) < 4 {
err = fmt.Errorf("invalid DCP xattr data: truncated (%d bytes)", len(data))
return
return nil, nil, fmt.Errorf("invalid DCP xattr data: %w truncated (%d bytes)", ErrEmptyMetadata, len(data))
}

xattrsLen := binary.BigEndian.Uint32(data[0:4])
if int(xattrsLen)+4 > len(data) {
err = fmt.Errorf("invalid DCP xattr data: invalid xattrs length %d (data is only %d bytes)", xattrsLen, len(data))
return nil, nil, fmt.Errorf("invalid DCP xattr data: %w length %d (data is only %d bytes)", ErrXattrInvalidLen, xattrsLen, len(data))
}
body = data[xattrsLen+4:]
body := data[xattrsLen+4:]
if xattrsLen == 0 {
return
return body, nil, nil
}

// In the xattr key/value pairs, key and value are both terminated by 0x00 (byte(0)). Use this as a separator to split the byte slice
separator := []byte("\x00")

xattrs := make([]Xattr, 0)
// Iterate over xattr key/value pairs
pos := uint32(4)
for pos < xattrsLen {
Expand All @@ -176,11 +206,11 @@ func DecodeValueWithXattrs(data []byte) (body []byte, xattrs []Xattr, err error)
pairBytes := data[pos : pos+pairLen]
components := bytes.Split(pairBytes, separator)
// xattr pair has the format [key]0x00[value]0x00, and so should split into three components
if len(components) != 3 || len(components[2]) != 0 {
return nil, nil, fmt.Errorf("invalid DCP xattr data: %s", pairBytes)
if len(components) != 3 {
return nil, nil, fmt.Errorf("Unexpected number of components found in xattr pair: %s", pairBytes)
}
xattrs = append(xattrs, Xattr{string(components[0]), components[1]})
pos += pairLen
}
return
return body, xattrs, nil
}
81 changes: 81 additions & 0 deletions tap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
package sgbucket

import (
"encoding/binary"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestDCPEncodeXattrs(t *testing.T) {
Expand Down Expand Up @@ -46,3 +48,82 @@ func TestDCPEncodeXattrs(t *testing.T) {
assert.Equal(t, 0, len(gotXattrs))
}
}

func TestDCPDecodeValue(t *testing.T) {
testCases := []struct {
name string
body []byte
expectedErr error
expectedBody []byte
expectedSyncXattr []byte
}{
{
name: "bad value",
body: []byte("abcde"),
expectedErr: ErrXattrInvalidLen,
},
{
name: "xattr length 4, overflow",
body: []byte{0x00, 0x00, 0x00, 0x04, 0x01},
expectedErr: ErrXattrInvalidLen,
},
{
name: "empty",
body: nil,
expectedErr: ErrEmptyMetadata,
},
{
name: "single xattr pair and body",
body: getSingleXattrDCPBytes(),
expectedBody: []byte(`{"value":"ABC"}`),
expectedSyncXattr: []byte(`{"seq":1}`),
},
}
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
// DecodeValueWithXattrs is the underlying function
body, xattrs, err := DecodeValueWithXattrs(test.body)
require.ErrorIs(t, err, test.expectedErr)
require.Equal(t, test.expectedBody, body)
if test.expectedSyncXattr != nil {
require.Len(t, xattrs, 1)
require.Equal(t, "_sync", xattrs[0].Name)
require.Equal(t, test.expectedSyncXattr, xattrs[0].Value)
} else {
require.Nil(t, xattrs)
}
})
}
}

// TestInvalidXattrStreamEmptyBody tests is a bit different than cases in TestDCPDecodeValue since DecodeValueWithXattrs will pass but UnmarshalDocumentSyncDataFromFeed will fail due to invalid json.
func TestInvalidXattrStreamEmptyBody(t *testing.T) {
inputStream := []byte{0x00, 0x00, 0x00, 0x01, 0x01}
emptyBody := []byte{}

body, xattrs, err := DecodeValueWithXattrs(inputStream)
require.NoError(t, err)
require.Empty(t, xattrs)
require.Equal(t, emptyBody, body)
}

// getSingleXattrDCPBytes returns a DCP body with a single xattr pair and body
func getSingleXattrDCPBytes() []byte {
zeroByte := byte(0)
// Build payload for single xattr pair and body
xattrValue := `{"seq":1}`
xattrPairLength := 4 + len("_sync") + len(xattrValue) + 2
xattrTotalLength := xattrPairLength
body := `{"value":"ABC"}`

// Build up the dcp Body
dcpBody := make([]byte, 8)
binary.BigEndian.PutUint32(dcpBody[0:4], uint32(xattrTotalLength))
binary.BigEndian.PutUint32(dcpBody[4:8], uint32(xattrPairLength))
dcpBody = append(dcpBody, "_sync"...)
dcpBody = append(dcpBody, zeroByte)
dcpBody = append(dcpBody, xattrValue...)
dcpBody = append(dcpBody, zeroByte)
dcpBody = append(dcpBody, body...)
return dcpBody
}