Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Serde Header #467

Merged
merged 16 commits into from
Jul 10, 2023
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
228 changes: 166 additions & 62 deletions pkg/sr/serde.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,6 @@ import (
"sync/atomic"
)

// The wire format for encoded types is 0, then big endian uint32 of the ID,
// then the encoded message.
//
// https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format

var (
// ErrNotRegistered is returned from Serde when attempting to encode a
// value or decode an ID that has not been registered, or when using
Expand All @@ -22,7 +17,7 @@ var (

// ErrBadHeader is returned from Decode when the input slice is shorter
// than five bytes, or if the first byte is not the magic 0 byte.
ErrBadHeader = errors.New("5 byte header for value is missing or does no have 0 magic byte")
ErrBadHeader = errors.New("5 byte header for value is missing or does not have 0 magic byte")
)

type (
Expand Down Expand Up @@ -80,8 +75,9 @@ type tserde struct {
gen func() any
typeof reflect.Type

index []int // for encoding, an optional index we use
subindex map[int]tserde // for decoding, we look up sub-indices in the payload
index []int // for encoding, an optional index we use
subindex map[int]tserde // for decoding, we look up sub-indices in the payload
subindexDepth int // for decoding, we need to know the maximum depth of the subindex map
}

// Serde encodes and decodes values according to the schema registry wire
Expand All @@ -101,6 +97,7 @@ type Serde struct {
mu sync.Mutex

defaults []SerdeOpt
h SerdeHeader
}

var (
Expand Down Expand Up @@ -134,6 +131,26 @@ func (s *Serde) SetDefaults(opts ...SerdeOpt) {
s.defaults = opts
}

// DecodeID decodes an ID from in, returning the ID and the remaining bytes,
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

decodes an ID from b but I can sneak this into a quickfix PR that I plan to do

// or an error.
func (s *Serde) DecodeID(b []byte) (id int, out []byte, err error) {
return s.header().DecodeID(b)
}

// DecodeIndex decodes at most maxLength of a schema index from in, returning
// the index and remaining bytes, or an error. It expects b to be the output of
// DecodeID (schema ID should already be stripped away).
func (s *Serde) DecodeIndex(in []byte, maxLength int) (index []int, out []byte, err error) {
return s.header().DecodeIndex(in, maxLength)
}

func (s *Serde) header() SerdeHeader {
if s.h == nil {
return defaultSerdeHeader
}
return s.h
}

// Register registers a schema ID and the value it corresponds to, as well as
// the encoding or decoding functions. You need to register functions depending
// on whether you are only encoding, only decoding, or both.
Expand All @@ -152,27 +169,42 @@ func (s *Serde) Register(id int, v any, opts ...SerdeOpt) {
defer s.mu.Unlock()

// Type mapping is easy: we just map the type to the final tserde.
// We defer the store because we modify the tserde below, and we
// may delete a type key.
dupTypes := make(map[reflect.Type]tserde)
for k, v := range s.loadTypes() {
dupTypes[k] = v
}

// For IDs, we deeply clone any path that is changing.
dupIDs := tserdeMapClone(s.loadIDs(), id, t.index)

// We defer the store because we modify the tserde below, and we
// may delete a type key.
defer func() {
dupTypes[typeof] = t
s.types.Store(dupTypes)
s.ids.Store(dupIDs)
}()

// For IDs, we deeply clone any path that is changing.
m := tserdeMapClone(s.loadIDs(), id, t.index)
s.ids.Store(m)

// Now we have a full path down index initialized (or, the top
// level map if there is no index). We iterate down the index
// tree to find the end node we are initializing.
k := id
m := dupIDs
at := m[k]
depth := len(t.index)
max := func(i, j int) int {
if i > j {
return i
}
return j
}
for _, idx := range t.index {
// SAFETY: tserdeMapClone deeply clones all maps through the index, so
// our modified value is being saved to a new map that is not being read.
at.subindexDepth = max(at.subindexDepth, depth)
m[k] = at
depth--

m = at.subindex
k = idx
at = m[k]
Expand All @@ -186,15 +218,16 @@ func (s *Serde) Register(id int, v any, opts ...SerdeOpt) {

// Now, we initialize the end node.
t = tserde{
id: uint32(id),
exists: true,
encode: t.encode,
appendEncode: t.appendEncode,
decode: t.decode,
gen: t.gen,
typeof: typeof,
index: t.index,
subindex: at.subindex,
id: uint32(id),
exists: true,
encode: t.encode,
appendEncode: t.appendEncode,
decode: t.decode,
gen: t.gen,
typeof: typeof,
index: t.index,
subindex: at.subindex,
subindexDepth: at.subindexDepth,
}
m[k] = t
}
Expand All @@ -219,8 +252,8 @@ func tserdeMapClone(m map[int]tserde, at int, index []int) map[int]tserde {
return dup
}

// Encode encodes a value according to the schema registry wire format and
// returns it. If EncodeFn was not used, this returns ErrNotRegistered.
// Encode encodes a value and prepends the header according to the configured
// SerdeHeader. If EncodeFn was not used, this returns ErrNotRegistered.
func (s *Serde) Encode(v any) ([]byte, error) {
return s.AppendEncode(nil, v)
}
Expand All @@ -234,23 +267,9 @@ func (s *Serde) AppendEncode(b []byte, v any) ([]byte, error) {
return b, ErrNotRegistered
}

b = append(b,
0,
byte(t.id>>24),
byte(t.id>>16),
byte(t.id>>8),
byte(t.id>>0),
)

if len(t.index) > 0 {
if len(t.index) == 1 && t.index[0] == 0 {
b = append(b, 0) // first-index shortcut (one type in the protobuf)
} else {
b = binary.AppendVarint(b, int64(len(t.index)))
for _, idx := range t.index {
b = binary.AppendVarint(b, int64(idx))
}
}
b, err := s.header().AppendEncode(b, int(t.id), t.index)
if err != nil {
return nil, err
}

if t.appendEncode != nil {
Expand Down Expand Up @@ -316,37 +335,122 @@ func (s *Serde) DecodeNew(b []byte) (any, error) {
}

func (s *Serde) decodeFind(b []byte) ([]byte, tserde, error) {
if len(b) < 5 || b[0] != 0 {
return nil, tserde{}, ErrBadHeader
id, b, err := s.DecodeID(b)
if err != nil {
return nil, tserde{}, err
}
id := binary.BigEndian.Uint32(b[1:5])
b = b[5:]

t := s.loadIDs()[int(id)]
t := s.loadIDs()[id]
if len(t.subindex) > 0 {
r := bReader{b}
br := io.ByteReader(&r)
l, err := binary.ReadVarint(br)
if l == 0 { // length 0 is a shortcut for length 1, index 0
t = t.subindex[0]
}
for err == nil && t.subindex != nil && l > 0 {
var idx int64
idx, err = binary.ReadVarint(br)
t = t.subindex[int(idx)]
l--
}
var index []int
index, b, err = s.DecodeIndex(b, t.subindexDepth)
if err != nil {
return nil, t, err
return nil, tserde{}, err
}
for _, idx := range index {
if t.subindex == nil {
return nil, tserde{}, ErrNotRegistered
}
t = t.subindex[idx]
}
b = r.b
}
if !t.exists {
return nil, t, ErrNotRegistered
return nil, tserde{}, ErrNotRegistered
}
return b, t, nil
}

// SerdeHeader encodes and decodes a message header.
type SerdeHeader interface {
// AppendEncode encodes a schema ID and optional index to b, returning the
// updated slice or an error.
AppendEncode(b []byte, id int, index []int) ([]byte, error)
// DecodeID decodes an ID from in, returning the ID and the remaining bytes,
// or an error.
DecodeID(in []byte) (id int, out []byte, err error)
// DecodeIndex decodes at most maxLength of a schema index from in,
// returning the index and remaining bytes, or an error.
DecodeIndex(in []byte, maxLength int) (index []int, out []byte, err error)
}

var defaultSerdeHeader = new(confluentHeader)

// confluentHeader is a SerdeHeader that produces the Confluent wire format. It
// starts with 0, then big endian uint32 of the ID, then index (only protobuf),
// then the encoded message.
//
// https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format
type confluentHeader struct{}

// AppendEncode appends an encoded header to b according to the Confluent wire
// format and returns it. Error is always nil.
func (*confluentHeader) AppendEncode(b []byte, id int, index []int) ([]byte, error) {
b = append(
b,
0,
byte(id>>24),
byte(id>>16),
byte(id>>8),
byte(id>>0),
)

if len(index) > 0 {
if len(index) == 1 && index[0] == 0 {
b = append(b, 0) // first-index shortcut (one type in the protobuf)
} else {
b = binary.AppendVarint(b, int64(len(index)))
for _, idx := range index {
b = binary.AppendVarint(b, int64(idx))
}
}
}

return b, nil
}

// DecodeID strips and decodes the schema ID from b. It returns the ID alongside
// the unread bytes. If the header does not contain the magic byte or b contains
// less than 5 bytes it returns ErrBadHeader.
func (*confluentHeader) DecodeID(b []byte) (int, []byte, error) {
if len(b) < 5 || b[0] != 0 {
return 0, nil, ErrBadHeader
}
id := binary.BigEndian.Uint32(b[1:5])
return int(id), b[5:], nil
}

// DecodeIndex strips and decodes indices from b. It returns the index slice
// alongside the unread bytes. It expects b to be the output of DecodeID (schema
// ID should already be stripped away). If maxLength is greater than 0 and the
// encoded data contains more indices than maxLength the function returns
// ErrNotRegistered.
func (*confluentHeader) DecodeIndex(b []byte, maxLength int) ([]int, []byte, error) {
r := bReader{b}
br := io.ByteReader(&r)
l, err := binary.ReadVarint(br)
if err != nil {
return nil, nil, err
}
if l == 0 { // length 0 is a shortcut for length 1, index 0
return []int{0}, r.b, nil
}
if l < 0 { // index length can't be negative
return nil, nil, ErrBadHeader
}
if maxLength > 0 && int(l) > maxLength { // index count is greater than expected
return nil, nil, ErrNotRegistered
}
index := make([]int, l)
for i := range index {
idx, err := binary.ReadVarint(br)
if err != nil {
return nil, nil, err
}
index[i] = int(idx)
}
return index, r.b, nil
}

type bReader struct{ b []byte }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if there was a specific reason for the custom reader type, I replaced it with bytes.Buffer to simplify.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I often misremember that bytes.Reader is relatively small, and mix it up with bytes.Buffer being a pretty big struct -- but it looks like that's not even true anymore, since the bootstrap field was removed in 2018: golang/go@9c2be4c


func (b *bReader) ReadByte() (byte, error) {
Expand Down
Loading