Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: reader returns errors on unexpected EOF #382

Merged
merged 1 commit into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion codec_skip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func TestDecoder_SkipRecord(t *testing.T) {
func TestDecoder_SkipRef(t *testing.T) {
defer ConfigTeardown()

data := []byte{0x02, 0x66, 0x06, 0x66, 0x6f, 0x6f}
data := []byte{0x02, 0x66, 0x06, 0x66, 0x6f, 0x6f, 0x02, 0x66}
schema := `{
"type": "record",
"name": "test",
Expand Down
56 changes: 56 additions & 0 deletions decoder_native_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,20 @@ func TestDecoder_Int(t *testing.T) {
assert.Equal(t, 27, i)
}

func TestDecoder_IntShortRead(t *testing.T) {
defer ConfigTeardown()

data := []byte{0xe6}
schema := "int"
dec, err := avro.NewDecoder(schema, bytes.NewReader(data))
require.NoError(t, err)

var i int
err = dec.Decode(&i)

assert.Error(t, err)
}

func TestDecoder_IntInvalidSchema(t *testing.T) {
defer ConfigTeardown()

Expand Down Expand Up @@ -272,6 +286,20 @@ func TestDecoder_Int64(t *testing.T) {
assert.Equal(t, int64(27), i)
}

func TestDecoder_Int64ShortRead(t *testing.T) {
defer ConfigTeardown()

data := []byte{0xe6}
schema := "long"
dec, err := avro.NewDecoder(schema, bytes.NewReader(data))
require.NoError(t, err)

var i int64
err = dec.Decode(&i)

assert.Error(t, err)
}

func TestDecoder_Int64InvalidSchema(t *testing.T) {
defer ConfigTeardown()

Expand Down Expand Up @@ -359,6 +387,20 @@ func TestDecoder_String(t *testing.T) {
assert.Equal(t, "foo", str)
}

func TestDecoder_StringShortRead(t *testing.T) {
defer ConfigTeardown()

data := []byte{0x08}
schema := "string"
dec, err := avro.NewDecoder(schema, bytes.NewReader(data))
require.NoError(t, err)

var str string
err = dec.Decode(&str)

require.Error(t, err)
}

func TestDecoder_StringInvalidSchema(t *testing.T) {
defer ConfigTeardown()

Expand Down Expand Up @@ -388,6 +430,20 @@ func TestDecoder_Bytes(t *testing.T) {
assert.Equal(t, []byte{0xEC, 0xAB, 0x44, 0x00}, b)
}

func TestDecoder_BytesShortRead(t *testing.T) {
defer ConfigTeardown()

data := []byte{0x08, 0xEC}
schema := "bytes"
dec, err := avro.NewDecoder(schema, bytes.NewReader(data))
require.NoError(t, err)

var b []byte
err = dec.Decode(&b)

assert.Error(t, err)
}

func TestDecoder_BytesInvalidSchema(t *testing.T) {
defer ConfigTeardown()

Expand Down
13 changes: 0 additions & 13 deletions decoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,19 +68,6 @@ func TestDecoder_DecodeNilPtr(t *testing.T) {
assert.Error(t, err)
}

func TestDecoder_DecodeEOFDoesntReturnError(t *testing.T) {
defer ConfigTeardown()

data := []byte{0xE2}
schema := "int"
dec, _ := avro.NewDecoder(schema, bytes.NewReader(data))

var i int
err := dec.Decode(&i)

assert.NoError(t, err)
}

func TestUnmarshal(t *testing.T) {
defer ConfigTeardown()

Expand Down
8 changes: 8 additions & 0 deletions ocf/ocf.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,16 @@ func (d *Decoder) Error() error {
}

func (d *Decoder) readBlock() int64 {
_ = d.reader.Peek()
if errors.Is(d.reader.Error, io.EOF) {
// There is no next block
return 0
}

count := d.reader.ReadLong()
size := d.reader.ReadLong()

// Read the blocks data
if count > 0 {
data := make([]byte, size)
d.reader.Read(data)
Expand All @@ -129,6 +136,7 @@ func (d *Decoder) readBlock() int64 {
d.resetReader.Reset(data)
}

// Read the sync.
var sync [16]byte
d.reader.Read(sync[:])
if d.sync != sync && !errors.Is(d.reader.Error, io.EOF) {
Expand Down
18 changes: 18 additions & 0 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,17 @@ func (r *Reader) readByte() byte {
return b
}

// Peek returns the next byte in the buffer.
// The Reader Error will be io.EOF if no next byte exists.
func (r *Reader) Peek() byte {
if r.head == r.tail {
if !r.loadMore() {
return 0
}
}
return r.buf[r.head]
}

// Read reads data into the given bytes.
func (r *Reader) Read(b []byte) {
size := len(b)
Expand All @@ -117,6 +128,7 @@ func (r *Reader) Read(b []byte) {
for read < size {
if r.head == r.tail {
if !r.loadMore() {
r.Error = io.ErrUnexpectedEOF
return
}
}
Expand All @@ -138,6 +150,8 @@ func (r *Reader) ReadBool() bool {
}

// ReadInt reads an Int from the Reader.
//
//nolint:dupl
func (r *Reader) ReadInt() int32 {
if r.Error != nil {
return 0
Expand Down Expand Up @@ -174,12 +188,15 @@ func (r *Reader) ReadInt() int32 {
// We ran out of buffer and are not at the end of the int,
// Read more into the buffer.
if !r.loadMore() {
r.Error = fmt.Errorf("reading int: %w", r.Error)
return 0
}
}
}

// ReadLong reads a Long from the Reader.
//
//nolint:dupl
func (r *Reader) ReadLong() int64 {
if r.Error != nil {
return 0
Expand Down Expand Up @@ -216,6 +233,7 @@ func (r *Reader) ReadLong() int64 {
// We ran out of buffer and are not at the end of the long,
// Read more into the buffer.
if !r.loadMore() {
r.Error = fmt.Errorf("reading long: %w", r.Error)
return 0
}
}
Expand Down
27 changes: 25 additions & 2 deletions reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package avro_test
import (
"bytes"
"errors"
"io"
"strconv"
"testing"

Expand Down Expand Up @@ -44,6 +45,28 @@ func TestReader_ReportErrorExistingError(t *testing.T) {
assert.Equal(t, err, r.Error)
}

func TestReader_Peek(t *testing.T) {
r := (&avro.Reader{}).Reset([]byte{0x36})

b := r.Peek()

i := r.ReadInt()

require.NoError(t, r.Error)
assert.Equal(t, byte(0x36), b)
assert.Equal(t, int32(27), i)
}

func TestReader_PeekNoData(t *testing.T) {
r := (&avro.Reader{}).Reset([]byte{0x36})

_ = r.ReadInt()

_ = r.Peek()

assert.ErrorIs(t, r.Error, io.EOF)
}

func TestReader_ReadPastBuffer(t *testing.T) {
r := (&avro.Reader{}).Reset([]byte{0xE2})

Expand Down Expand Up @@ -77,7 +100,7 @@ func TestReader_Read(t *testing.T) {
},
{
name: "eof",
data: []byte{0xAC}, // io.EOF
data: []byte{0xAC}, // io.ErrUnexpectedEOF
want: []byte{0xAC, 0x00, 0x00, 0x00, 0x00, 0x00},
wantErr: require.Error,
},
Expand Down Expand Up @@ -124,7 +147,7 @@ func TestReader_ReadBool(t *testing.T) {
},
{
name: "eof",
data: []byte(nil), // io.EOF
data: []byte(nil), // io.ErrUnexpectedEOF
want: false,
wantErr: require.Error,
},
Expand Down
Loading