Skip to content

Commit

Permalink
fix: Long timestamp default decoding
Browse files Browse the repository at this point in the history
  • Loading branch information
redaLaanait committed Sep 6, 2024
1 parent 91b9f6f commit cb3d6f5
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 11 deletions.
7 changes: 4 additions & 3 deletions codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ import (
)

var (
timeType = reflect.TypeOf(time.Time{})
ratType = reflect.TypeOf(big.Rat{})
durType = reflect.TypeOf(LogicalDuration{})
timeType = reflect.TypeOf(time.Time{})
timeDurationType = reflect.TypeOf(time.Duration(0))
ratType = reflect.TypeOf(big.Rat{})
durType = reflect.TypeOf(LogicalDuration{})
)

type null struct{}
Expand Down
27 changes: 27 additions & 0 deletions codec_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package avro

import (
"fmt"
"time"
"unsafe"

"github.com/modern-go/reflect2"
Expand All @@ -10,6 +11,32 @@ import (
func createDefaultDecoder(d *decoderContext, field *Field, typ reflect2.Type) ValDecoder {
cfg := d.cfg
fn := func(def any) ([]byte, error) {
// Solution 1:

// def's Go type is decided by JSON decode.
// Force conversion of some Go types to ensure compatibility with AVRO codec.
switch schema := field.Type().Type(); schema {
case Long:
schema := field.Type().(*PrimitiveSchema)
if schema.Logical() == nil {
break
}
switch schema.Logical().Type() {
case TimestampMillis:
d, ok := def.(int64)
if !ok {
break
}
def = time.UnixMilli(d)
case TimestampMicros:
d, ok := def.(int64)
if !ok {
break
}
def = time.UnixMicro(d)
}
}

defaultType := reflect2.TypeOf(def)
if defaultType == nil {
defaultType = reflect2.TypeOf((*null)(nil))
Expand Down
37 changes: 29 additions & 8 deletions codec_native.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,27 @@ func createDecoderOfNative(schema *PrimitiveSchema, typ reflect2.Type) ValDecode
convert: createLongConverter(schema.encodedType),
}

case st == Long && lt == "":
// Solution 2:
case st == Long:
timestampLogicalType := (lt == TimestampMillis || lt == TimestampMicros)
if timestampLogicalType && typ.Type1() == timeDurationType {
return &errorDecoder{err: fmt.Errorf("avro: %s is unsupported for Avro %s and logicalType %s",
typ.Type1().String(), schema.Type(), lt)}
}
if resolved {
return &longConvCodec[int64]{convert: createLongConverter(schema.encodedType)}
}
return &longCodec[int64]{}

case lt != "":
return &errorDecoder{err: fmt.Errorf("avro: %s is unsupported for Avro %s and logicalType %s",
typ.String(), schema.Type(), lt)}
// case st == Long && lt == "":
// if resolved {
// return &longConvCodec[int64]{convert: createLongConverter(schema.encodedType)}
// }
// return &longCodec[int64]{}

// case lt != "":
// return &errorDecoder{err: fmt.Errorf("avro: %s is unsupported for Avro %s and logicalType %s",
// typ.String(), schema.Type(), lt)}

default:
break
Expand Down Expand Up @@ -245,12 +257,21 @@ func createEncoderOfNative(schema Schema, typ reflect2.Type) ValEncoder {
case st == Long && lt == TimeMicros: // time.Duration
return &timeMicrosCodec{}

case st == Long && lt == "":
// Solution 2:
case st == Long:
timestampLogicalType := (lt == TimestampMillis || lt == TimestampMicros)
if timestampLogicalType && typ.Type1() == timeDurationType {
return &errorEncoder{err: fmt.Errorf("avro: %s is unsupported for Avro %s and logicalType %s",
typ.Type1().String(), schema.Type(), lt)}
}
return &longCodec[int64]{}

case lt != "":
return &errorEncoder{err: fmt.Errorf("avro: %s is unsupported for Avro %s and logicalType %s",
typ.String(), schema.Type(), lt)}
// case st == Long && lt == "":
// return &longCodec[int64]{}

// case lt != "":
// return &errorEncoder{err: fmt.Errorf("avro: %s is unsupported for Avro %s and logicalType %s",
// typ.String(), schema.Type(), lt)}

default:
break
Expand Down
57 changes: 57 additions & 0 deletions schema_compatibility_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package avro_test

import (
"math/big"
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -815,6 +816,62 @@ func TestSchemaCompatibility_Resolve(t *testing.T) {
"b": map[string]any{"a": int64(20)},
},
},
{
name: "Record Writer Field Missing With Long timestamp-millis Default",
reader: `{
"type":"record", "name":"test", "namespace": "org.hamba.avro",
"fields":[
{"name": "a", "type": "string"},
{
"name": "b",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
},
"default": ` + strconv.FormatInt(1725616800000, 10) + `
}
]
}`,
writer: `{
"type":"record", "name":"test", "namespace": "org.hamba.avro",
"fields":[
{"name": "a", "type": "string"}
]
}`,
value: map[string]any{"a": "foo"},
want: map[string]any{
"a": "foo",
"b": time.UnixMilli(1725616800000).UTC(), // 2024-09-06 10:00:00
},
},
{
name: "Record Writer Field Missing With Long timestamp-micros Default",
reader: `{
"type":"record", "name":"test", "namespace": "org.hamba.avro",
"fields":[
{"name": "a", "type": "string"},
{
"name": "b",
"type": {
"type": "long",
"logicalType": "timestamp-micros"
},
"default": ` + strconv.FormatInt(1725616800000000, 10) + `
}
]
}`,
writer: `{
"type":"record", "name":"test", "namespace": "org.hamba.avro",
"fields":[
{"name": "a", "type": "string"}
]
}`,
value: map[string]any{"a": "foo"},
want: map[string]any{
"a": "foo",
"b": time.UnixMicro(1725616800000000).UTC(), // 2024-09-06 10:00:00
},
},
}

for _, test := range tests {
Expand Down

0 comments on commit cb3d6f5

Please sign in to comment.