Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose a Reader for TLV metadata #86

Merged
merged 2 commits into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 112 additions & 46 deletions pkg/tlv/tlv.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package tlv

import (
"bytes"
"encoding/binary"
"io"
"math/big"

"github.com/Azure/adx-mon/pkg/pool"
)
Expand All @@ -16,7 +16,12 @@ type TLV struct {

type Tag uint16

var buf = pool.NewBytes(1024)
var (
buf = pool.NewBytes(1024)
magicn = Tag(0x1)
)

const sizeOfHeader = binary.MaxVarintLen16 /* T */ + binary.MaxVarintLen32 /* L */ + binary.MaxVarintLen32 /* V */

func New(tag Tag, value []byte) *TLV {

Expand All @@ -38,65 +43,126 @@ func (t *TLV) Encode() []byte {
// Encode the TLVs by prefixing a TLV as a header that
// contains the number of TLVs contained within.
func Encode(tlvs ...*TLV) []byte {
var b []byte

// First create our header
v := append(big.NewInt(int64(len(tlvs))).Bytes(), byte(0))
header := &TLV{
Tag: 0x1,
Length: uint32(len(v)),
Value: v,
}
b = append(b, header.Encode()...)
var b bytes.Buffer

// Now append all our elements
for _, t := range tlvs {
b = append(b, t.Encode()...)
b.Write(t.Encode())
}

return b
// Header is TLV where V is a uint32 instead of a byte slice.
// T is a magic number 0x1
// L is the number of TLVs
// V is the size in bytes of all the TLVs
v := buf.Get(sizeOfHeader)
defer buf.Put(v)
binary.BigEndian.PutUint16(v, uint16(magicn)) // T
binary.BigEndian.PutUint32(v[binary.MaxVarintLen16:], uint32(b.Len())) // L
binary.BigEndian.PutUint32(v[binary.MaxVarintLen16+binary.MaxVarintLen32:], uint32(len(tlvs))) // V

return append(v, b.Bytes()...)
}

func Decode(s io.ReadSeeker) ([]*TLV, error) {
data := buf.Get(1024)
defer buf.Put(data)
data = data[0:]
type Reader struct {
source io.Reader
discovered bool
header []TLV
buf []byte
}

_, err := s.Read(data)
if err != nil {
func NewReader(r io.Reader) *Reader {
return &Reader{source: r}
}

func (r *Reader) Read(p []byte) (n int, err error) {
// extract our header
if !r.discovered {
if err := r.decode(); err != nil {
return 0, err
}
}
// drain
if len(r.buf) != 0 {
n = copy(p, r.buf)
r.buf = r.buf[n:]
return
}
// fast path
n, err = r.source.Read(p)
return
}

func (r *Reader) Header() ([]TLV, error) {
if r.discovered {
return r.header, nil
}

if err := r.decode(); err != nil {
return nil, err
}

header := &TLV{
Tag: Tag(binary.BigEndian.Uint16(data[0:])),
Length: binary.BigEndian.Uint32(data[binary.MaxVarintLen16:]),
return r.header, nil
}

func (r *Reader) decode() error {
p := buf.Get(sizeOfHeader)
defer buf.Put(p)

n, err := r.source.Read(p)
if err != nil {
return err
}

// source has no header
if Tag(binary.BigEndian.Uint16(p)) != magicn {
r.discovered = true
// we need to keep these bytes around until someone calls Read
r.buf = make([]byte, len(p))
copy(r.buf, p)
return nil
}
header.Value = data[binary.MaxVarintLen16+binary.MaxVarintLen32 : binary.MaxVarintLen16+binary.MaxVarintLen32+header.Length]
elements := int(big.NewInt(0).SetBytes(header.Value[:header.Length-1]).Int64())

// Now decode all the TLVs
var (
tlvs []*TLV
offset = binary.MaxVarintLen16 + binary.MaxVarintLen32 + int(header.Length)
)
offset := binary.MaxVarintLen16

sizeOfElements := binary.BigEndian.Uint32(p[offset:])
offset += binary.MaxVarintLen32
elements := int(binary.BigEndian.Uint32(p[offset:]))
offset += binary.MaxVarintLen32

// at this point we know how much data we need from our source, so fill the buffer
if n < int(sizeOfElements) {
// read the remaining bytes needed to extract our header
l := &io.LimitedReader{R: r.source, N: int64(int(sizeOfElements))}
var read []byte
read, err = io.ReadAll(l)
if err != nil {

// we thought we had a header, but we just got unlucky
// with the first byte being our magic number.
if err == io.EOF {
r.discovered = true
r.buf = make([]byte, len(read)+n)
copy(r.buf, p)
copy(r.buf[n:], read)
return nil
}
return err
}
// resize
p = append(p, read...)
}

// no bounds checks are necessary, all sizes are known
r.header = make([]TLV, elements)
for i := 0; i < elements; i++ {
t := &TLV{}
t.Tag = Tag(binary.BigEndian.Uint16(data[offset:]))
t := TLV{}
t.Tag = Tag(binary.BigEndian.Uint16(p[offset:]))
offset += binary.MaxVarintLen16
t.Length = binary.BigEndian.Uint32(data[offset:])
t.Length = binary.BigEndian.Uint32(p[offset:])
offset += binary.MaxVarintLen32
if offset+int(t.Length) > len(data) {
break
}
t.Value = data[offset : offset+int(t.Length)]
t.Value = p[offset : offset+int(t.Length)]
offset += int(t.Length)
tlvs = append(tlvs, t)
}

// Seek past our TLVs and at the beginning of our payload
if _, err := s.Seek(int64(offset), io.SeekStart); err != nil {
return nil, err
r.header[i] = t
}

return tlvs, nil
r.discovered = true
return nil
}
85 changes: 83 additions & 2 deletions pkg/tlv/tlv_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package tlv_test

import (
"bytes"
"io"
"os"
"reflect"
"testing"

"github.com/Azure/adx-mon/pkg/tlv"
Expand Down Expand Up @@ -30,12 +32,91 @@ func TestTLV(t *testing.T) {
require.NoError(t, err)
defer tf.Close()

tlvs, err := tlv.Decode(tf)
r := tlv.NewReader(tf)
tlvs, err := r.Header()
require.NoError(t, err)
require.Equal(t, 1, len(tlvs))
require.Equal(t, v, string(tlvs[0].Value))

data, err := io.ReadAll(tf)
data, err := io.ReadAll(r)
require.NoError(t, err)
require.Equal(t, randomBytes, data)
}

func TestReader(t *testing.T) {
tests := []struct {
Name string
HeaderLen int
SkipHeader bool
}{
{
Name: "single header entry",
HeaderLen: 1,
},
{
Name: "this payload contains no tlv header",
},
{
Name: "Invoke Read without first invoking Header even though TLVs exist",
HeaderLen: 2,
SkipHeader: true,
},
{
Name: "Several header entries",
HeaderLen: 5,
},
}
for _, tt := range tests {
t.Run(tt.Name, func(t *testing.T) {
b := []byte(tt.Name)

if tt.HeaderLen != 0 {
var tlvs []*tlv.TLV
for i := 0; i < tt.HeaderLen; i++ {
tlvs = append(tlvs, tlv.New(tlv.Tag(i), []byte(tt.Name)))
}
b = append(tlv.Encode(tlvs...), b...)
}

source := bytes.NewBuffer(b)
r := tlv.NewReader(source)

if !tt.SkipHeader {
h, err := r.Header()
require.NoError(t, err)
require.Equal(t, tt.HeaderLen, len(h))
for _, metadata := range h {
require.Equal(t, tt.Name, string(metadata.Value))
}
}

have, err := io.ReadAll(r)
require.NoError(t, err)
require.Equal(t, tt.Name, string(have))
})
}
}

func TestUnluckyMagicNumber(t *testing.T) {
var b bytes.Buffer
_, err := b.Write([]byte{0x1})
require.NoError(t, err)
_, err = b.WriteString("I kind of look like a TLV")
require.NoError(t, err)
r := tlv.NewReader(bytes.NewBuffer(b.Bytes()))
have, err := io.ReadAll(r)
require.NoError(t, err)
require.True(t, reflect.DeepEqual(have, b.Bytes()))
}

func BenchmarkReader(b *testing.B) {
t := tlv.New(tlv.Tag(0x2), []byte("some tag payload"))
h := tlv.Encode(t)
p := bytes.NewReader(append(h, []byte("body payload")...))
r := tlv.NewReader(p)

b.ResetTimer()
for i := 0; i < b.N; i++ {
io.ReadAll(r)
}
}