Skip to content

Commit

Permalink
feat: add support for schema evolution (#329)
Browse files Browse the repository at this point in the history
  • Loading branch information
redaLaanait committed Jan 12, 2024
1 parent 35f90ee commit 589f785
Show file tree
Hide file tree
Showing 12 changed files with 2,332 additions and 48 deletions.
8 changes: 5 additions & 3 deletions codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ type ValEncoder interface {

// ReadVal parses Avro value and stores the result in the value pointed to by obj.
func (r *Reader) ReadVal(schema Schema, obj any) {
decoder := r.cfg.getDecoderFromCache(schema.Fingerprint(), reflect2.RTypeOf(obj))
key := cacheFingerprintOf(schema)
decoder := r.cfg.getDecoderFromCache(key, reflect2.RTypeOf(obj))
if decoder == nil {
typ := reflect2.TypeOf(obj)
if typ.Kind() != reflect.Ptr {
Expand Down Expand Up @@ -65,14 +66,15 @@ func (w *Writer) WriteVal(schema Schema, val any) {

func (c *frozenConfig) DecoderOf(schema Schema, typ reflect2.Type) ValDecoder {
rtype := typ.RType()
decoder := c.getDecoderFromCache(schema.Fingerprint(), rtype)
key := cacheFingerprintOf(schema)
decoder := c.getDecoderFromCache(key, rtype)
if decoder != nil {
return decoder
}

ptrType := typ.(*reflect2.UnsafePtrType)
decoder = decoderOfType(c, schema, ptrType.Elem())
c.addDecoderToCache(schema.Fingerprint(), rtype, decoder)
c.addDecoderToCache(key, rtype, decoder)
return decoder
}

Expand Down
57 changes: 57 additions & 0 deletions codec_default.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package avro

import (
"fmt"
"unsafe"

"github.com/modern-go/reflect2"
)

func createDefaultDecoder(cfg *frozenConfig, field *Field, typ reflect2.Type) ValDecoder {
fn := func(def any) ([]byte, error) {
defaultType := reflect2.TypeOf(def)
if defaultType == nil {
defaultType = reflect2.TypeOf((*null)(nil))
}
defaultEncoder := encoderOfType(cfg, field.Type(), defaultType)
if defaultType.LikePtr() {
defaultEncoder = &onePtrEncoder{defaultEncoder}
}
w := cfg.borrowWriter()
defer cfg.returnWriter(w)

defaultEncoder.Encode(reflect2.PtrOf(def), w)
if w.Error != nil {
return nil, w.Error
}
b := w.Buffer()
data := make([]byte, len(b))
copy(data, b)

return data, nil
}

b, err := field.encodeDefault(fn)
if err != nil {
return &errorDecoder{err: fmt.Errorf("decode default: %w", err)}
}
return &defaultDecoder{
data: b,
decoder: decoderOfType(cfg, field.Type(), typ),
}
}

type defaultDecoder struct {
data []byte
decoder ValDecoder
}

// Decode implements ValDecoder.
func (d *defaultDecoder) Decode(ptr unsafe.Pointer, r *Reader) {
rr := r.cfg.borrowReader(d.data)
defer r.cfg.returnReader(rr)

d.decoder.Decode(ptr, rr)
}

var _ ValDecoder = &defaultDecoder{}
Loading

0 comments on commit 589f785

Please sign in to comment.