Skip to content

Commit

Permalink
CBG-3913 put fixes from sync_gateway into DecodeValueWithXattrs (#122)
Browse files Browse the repository at this point in the history
- return in the case that xattrsLen is bigger than the data
- allow for xattr to have a nil value
  • Loading branch information
torcolvin authored May 9, 2024
1 parent 2a857ae commit f4b95ec
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 9 deletions.
48 changes: 39 additions & 9 deletions tap.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package sgbucket
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"math"
"time"
Expand Down Expand Up @@ -100,6 +101,12 @@ const FeedResume = 1
// checkpoint persistence (used to avoid recursive checkpoint document processing)
type FeedEventCallbackFunc func(event FeedEvent) bool

// ErrXattrInvalidLen is returned if the xattr is corrupt.
var ErrXattrInvalidLen = errors.New("Xattr stream length")

// ErrEmptyMetadata is returned when there is no Sync Gateway metadata
var ErrEmptyMetadata = errors.New("Empty Sync Gateway metadata")

// The name and value of an extended attribute (xattr)
type Xattr struct {
Name string
Expand Down Expand Up @@ -147,24 +154,47 @@ func EncodeValueWithXattrs(body []byte, xattrs ...Xattr) []byte {

// DecodeValueWithXattrs converts DCP Xattrs value format into a body and zero or more Xattrs.
// Call this if the event DataType has the FeedDataTypeXattr flag.
func DecodeValueWithXattrs(data []byte) (body []byte, xattrs []Xattr, err error) {
// Details on format (taken from https://docs.google.com/document/d/18UVa5j8KyufnLLy29VObbWRtoBn9vs8pcxttuMt6rz8/edit#heading=h.caqiui1pmmmb.):
/*
When the XATTR bit is set the first uint32_t in the body contains the size of the entire XATTR section.
Byte/ 0 | 1 | 2 | 3 |
/ | | | |
|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|
+---------------+---------------+---------------+---------------+
0| Total xattr length in network byte order |
+---------------+---------------+---------------+---------------+
Following the length you'll find an iovector-style encoding of all of the XATTR key-value pairs with the following encoding:
uint32_t length of next xattr pair (network order)
xattr key in modified UTF-8
0x00
xattr value in modified UTF-8
0x00
The 0x00 byte after the key saves us from storing a key length, and the trailing 0x00 is just for convenience to allow us to use string functions to search in them.
*/

func DecodeValueWithXattrs(data []byte) ([]byte, []Xattr, error) {
if len(data) < 4 {
err = fmt.Errorf("invalid DCP xattr data: truncated (%d bytes)", len(data))
return
return nil, nil, fmt.Errorf("invalid DCP xattr data: %w truncated (%d bytes)", ErrEmptyMetadata, len(data))
}

xattrsLen := binary.BigEndian.Uint32(data[0:4])
if int(xattrsLen)+4 > len(data) {
err = fmt.Errorf("invalid DCP xattr data: invalid xattrs length %d (data is only %d bytes)", xattrsLen, len(data))
return nil, nil, fmt.Errorf("invalid DCP xattr data: %w length %d (data is only %d bytes)", ErrXattrInvalidLen, xattrsLen, len(data))
}
body = data[xattrsLen+4:]
body := data[xattrsLen+4:]
if xattrsLen == 0 {
return
return body, nil, nil
}

// In the xattr key/value pairs, key and value are both terminated by 0x00 (byte(0)). Use this as a separator to split the byte slice
separator := []byte("\x00")

xattrs := make([]Xattr, 0)
// Iterate over xattr key/value pairs
pos := uint32(4)
for pos < xattrsLen {
Expand All @@ -176,11 +206,11 @@ func DecodeValueWithXattrs(data []byte) (body []byte, xattrs []Xattr, err error)
pairBytes := data[pos : pos+pairLen]
components := bytes.Split(pairBytes, separator)
// xattr pair has the format [key]0x00[value]0x00, and so should split into three components
if len(components) != 3 || len(components[2]) != 0 {
return nil, nil, fmt.Errorf("invalid DCP xattr data: %s", pairBytes)
if len(components) != 3 {
return nil, nil, fmt.Errorf("Unexpected number of components found in xattr pair: %s", pairBytes)
}
xattrs = append(xattrs, Xattr{string(components[0]), components[1]})
pos += pairLen
}
return
return body, xattrs, nil
}
81 changes: 81 additions & 0 deletions tap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
package sgbucket

import (
"encoding/binary"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestDCPEncodeXattrs(t *testing.T) {
Expand Down Expand Up @@ -46,3 +48,82 @@ func TestDCPEncodeXattrs(t *testing.T) {
assert.Equal(t, 0, len(gotXattrs))
}
}

func TestDCPDecodeValue(t *testing.T) {
testCases := []struct {
name string
body []byte
expectedErr error
expectedBody []byte
expectedSyncXattr []byte
}{
{
name: "bad value",
body: []byte("abcde"),
expectedErr: ErrXattrInvalidLen,
},
{
name: "xattr length 4, overflow",
body: []byte{0x00, 0x00, 0x00, 0x04, 0x01},
expectedErr: ErrXattrInvalidLen,
},
{
name: "empty",
body: nil,
expectedErr: ErrEmptyMetadata,
},
{
name: "single xattr pair and body",
body: getSingleXattrDCPBytes(),
expectedBody: []byte(`{"value":"ABC"}`),
expectedSyncXattr: []byte(`{"seq":1}`),
},
}
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
// DecodeValueWithXattrs is the underlying function
body, xattrs, err := DecodeValueWithXattrs(test.body)
require.ErrorIs(t, err, test.expectedErr)
require.Equal(t, test.expectedBody, body)
if test.expectedSyncXattr != nil {
require.Len(t, xattrs, 1)
require.Equal(t, "_sync", xattrs[0].Name)
require.Equal(t, test.expectedSyncXattr, xattrs[0].Value)
} else {
require.Nil(t, xattrs)
}
})
}
}

// TestInvalidXattrStreamEmptyBody tests is a bit different than cases in TestDCPDecodeValue since DecodeValueWithXattrs will pass but UnmarshalDocumentSyncDataFromFeed will fail due to invalid json.
func TestInvalidXattrStreamEmptyBody(t *testing.T) {
inputStream := []byte{0x00, 0x00, 0x00, 0x01, 0x01}
emptyBody := []byte{}

body, xattrs, err := DecodeValueWithXattrs(inputStream)
require.NoError(t, err)
require.Empty(t, xattrs)
require.Equal(t, emptyBody, body)
}

// getSingleXattrDCPBytes returns a DCP body with a single xattr pair and body
func getSingleXattrDCPBytes() []byte {
zeroByte := byte(0)
// Build payload for single xattr pair and body
xattrValue := `{"seq":1}`
xattrPairLength := 4 + len("_sync") + len(xattrValue) + 2
xattrTotalLength := xattrPairLength
body := `{"value":"ABC"}`

// Build up the dcp Body
dcpBody := make([]byte, 8)
binary.BigEndian.PutUint32(dcpBody[0:4], uint32(xattrTotalLength))
binary.BigEndian.PutUint32(dcpBody[4:8], uint32(xattrPairLength))
dcpBody = append(dcpBody, "_sync"...)
dcpBody = append(dcpBody, zeroByte)
dcpBody = append(dcpBody, xattrValue...)
dcpBody = append(dcpBody, zeroByte)
dcpBody = append(dcpBody, body...)
return dcpBody
}

0 comments on commit f4b95ec

Please sign in to comment.