Skip to content

Commit

Permalink
Typeconv package (#14144)
Browse files Browse the repository at this point in the history
Move type conversion support to parse arbitrary go types into common.MapStr (and other types) from the memlog registry to libbeat for reuse.
  • Loading branch information
Steffen Siering committed Nov 21, 2019
1 parent ef6e515 commit 40bed88
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,36 +15,29 @@
// specific language governing permissions and limitations
// under the License.

package memlog
package typeconv

import (
"errors"
"fmt"
"sync"
"time"

"github.com/elastic/beats/libbeat/common"
structform "github.com/elastic/go-structform"
"github.com/elastic/go-structform/gotype"
)

// typeConv can convert structured data between arbitrary typed (serializable)
// Converter converts structured data between arbitrary typed (serializable)
// go structures and maps/slices/arrays. It uses go-structform/gotype for input
// and output values each, such that any arbitrary structures can be used.
//
// Internally typeConv is used by the ValueDecoder for (de-)serializing an
// users value into common.MapStr, which is used to store objects in the memory
// store.
type typeConv struct {
// The converter computes and caches mapping operations for go structures it
// has visited.
type Converter struct {
fold *gotype.Iterator
unfold *gotype.Unfolder
}

type valueDecoder struct {
tx memTxParent
value common.MapStr
}

type timeUnfolder struct {
gotype.BaseUnfoldState
to *time.Time
Expand All @@ -62,26 +55,20 @@ const (
timeUnfoldWaitDone
)

var typeConvPool = sync.Pool{
var convPool = sync.Pool{
New: func() interface{} {
t := &typeConv{}
t.init()
return t
return &Converter{}
},
}

func newTypeConv() *typeConv {
tc := typeConvPool.Get().(*typeConv)
return tc
}

func (t *typeConv) release() {
if t != nil {
typeConvPool.Put(t)
}
// NewConverter creates a new converter with local state for tracking known
// type conversations.
func NewConverter() *Converter {
c := &Converter{}
return c
}

func (t *typeConv) init() {
func (c *Converter) init() {
unfold, _ := gotype.NewUnfolder(nil, gotype.Unfolders(
unfoldTimestamp,
))
Expand All @@ -92,32 +79,44 @@ func (t *typeConv) init() {
panic(err)
}

t.unfold = unfold
t.fold = fold
c.unfold = unfold
c.fold = fold
}

func (t *typeConv) Convert(to, from interface{}) error {
err := t.unfold.SetTarget(to)
if err != nil {
return err
// Convert transforms the value of from into to, by translating the structure
// from into a set of events (go-structform.Visitor) that can applied to the
// value given by to.
// The operation fails if the values are not compatible (for example trying to
// convert an object into an int), or `to` is no pointer.
func (c *Converter) Convert(to, from interface{}) (err error) {
if c.unfold == nil || c.fold == nil {
c.init()
}

defer t.unfold.Reset()
return t.fold.Fold(from)
}
defer func() {
if err != nil {
c.fold = nil
c.unfold = nil
}
}()

func newValueDecoder(tx memTxParent, value common.MapStr) *valueDecoder {
return &valueDecoder{
tx: tx,
value: value,
if err = c.unfold.SetTarget(to); err != nil {
return err
}

defer c.unfold.Reset()
return c.fold.Fold(from)
}

func (d *valueDecoder) Decode(to interface{}) (err error) {
if err = d.tx.checkRead(); err == nil {
err = d.tx.getTypeConv().Convert(to, d.value)
}
return
// Convert transforms the value of from into to, by translating the structure
// from into a set of events (go-structform.Visitor) that can applied to the
// value given by to.
// The operation fails if the values are not compatible (for example trying to
// convert an object into an int), or `to` is no pointer.
func Convert(to, from interface{}) (err error) {
c := convPool.Get().(*Converter)
defer convPool.Put(c)
return c.Convert(to, from)
}

func foldTimestamp(in *time.Time, v structform.ExtVisitor) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

package memlog
package typeconv

import (
"testing"
Expand All @@ -27,75 +27,63 @@ import (
"github.com/elastic/beats/libbeat/common"
)

func TestTypeConv(t *testing.T) {
t.Run("init", withTypeConv(func(t *testing.T, tc *typeConv) {
}))

t.Run("from MapStr", withTypeConv(func(t *testing.T, tc *typeConv) {
func TestConverter(t *testing.T) {
t.Run("from MapStr", func(t *testing.T) {
type testStruct struct {
A int
B int
}

var v testStruct
tc.Convert(&v, &common.MapStr{"a": 1})
Convert(&v, &common.MapStr{"a": 1})
assert.Equal(t, testStruct{1, 0}, v)
}))
})

t.Run("to MapStr", withTypeConv(func(t *testing.T, tc *typeConv) {
t.Run("to MapStr", func(t *testing.T) {
var m common.MapStr
err := tc.Convert(&m, struct{ A string }{"test"})
err := Convert(&m, struct{ A string }{"test"})
require.NoError(t, err)
assert.Equal(t, common.MapStr{"a": "test"}, m)
}))
})

t.Run("timestamp to MapStr", withTypeConv(func(t *testing.T, tc *typeConv) {
t.Run("timestamp to MapStr", func(t *testing.T) {
var m common.MapStr
ts := time.Unix(1234, 5678).UTC()

off := int16(-1)
expected := []uint64{uint64(5678) | uint64(uint16(off))<<32, 1234}

err := tc.Convert(&m, struct{ Timestamp time.Time }{ts})
err := Convert(&m, struct{ Timestamp time.Time }{ts})
require.NoError(t, err)
assert.Equal(t, common.MapStr{"timestamp": expected}, m)
}))
})

t.Run("timestamp from encoded MapStr", withTypeConv(func(t *testing.T, tc *typeConv) {
t.Run("timestamp from encoded MapStr", func(t *testing.T) {
type testStruct struct {
Timestamp time.Time
}

var v testStruct
off := int16(-1)
err := tc.Convert(&v, common.MapStr{
err := Convert(&v, common.MapStr{
"timestamp": []uint64{5678 | (uint64(uint16(off)))<<32, 1234},
})
require.NoError(t, err)
expected := time.Unix(1234, 5678).UTC()
assert.Equal(t, testStruct{expected}, v)
}))
})

t.Run("timestamp from string", withTypeConv(func(t *testing.T, tc *typeConv) {
t.Run("timestamp from string", func(t *testing.T) {
type testStruct struct {
Timestamp time.Time
}

var v testStruct
ts := time.Now()
err := tc.Convert(&v, common.MapStr{
err := Convert(&v, common.MapStr{
"timestamp": ts.Format(time.RFC3339Nano),
})
require.NoError(t, err)
assert.Equal(t, v.Timestamp.Format(time.RFC3339Nano), ts.Format(time.RFC3339Nano))
}))
}

func withTypeConv(fn func(t *testing.T, tc *typeConv)) func(*testing.T) {
return func(t *testing.T) {
tc := newTypeConv()
defer tc.release()
require.NotNil(t, tc)
fn(t, tc)
}
})
}
13 changes: 7 additions & 6 deletions libbeat/registry/backend/memlog/diskstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/cleanup"
"github.com/elastic/beats/libbeat/common/transform/typeconv"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/registry/backend"
)
Expand All @@ -51,7 +52,7 @@ type diskStore struct {
}

type diskLoader struct {
typeConv *typeConv
typeConv *typeconv.Converter
}

type logAction struct {
Expand Down Expand Up @@ -418,14 +419,14 @@ func (s *diskStore) commitOps(st *txState) error {

func newDiskLoader() *diskLoader {
return &diskLoader{
typeConv: newTypeConv(),
typeConv: typeconv.NewConverter(),
}
}

func (l *diskLoader) IsReadonly() bool { return false }
func (l *diskLoader) checkRead() error { return nil }
func (l *diskLoader) checkWrite() error { return nil }
func (l *diskLoader) getTypeConv() *typeConv { return l.typeConv }
func (l *diskLoader) IsReadonly() bool { return false }
func (l *diskLoader) checkRead() error { return nil }
func (l *diskLoader) checkWrite() error { return nil }
func (l *diskLoader) getTypeConv() *typeconv.Converter { return l.typeConv }

// loadDataFile create a new hashtable with all key/value pairs found.
func loadDataFile(path string) (hashtable, error) {
Expand Down
19 changes: 18 additions & 1 deletion libbeat/registry/backend/memlog/memtx.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package memlog

import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/transform/typeconv"
"github.com/elastic/beats/libbeat/registry/backend"
)

Expand All @@ -34,7 +35,12 @@ type memTxParent interface {
IsReadonly() bool
checkRead() error
checkWrite() error
getTypeConv() *typeConv
getTypeConv() *typeconv.Converter
}

type valueDecoder struct {
tx memTxParent
value common.MapStr
}

func (tx *memTx) init(parent memTxParent, store *memStore, state *txState) {
Expand Down Expand Up @@ -253,3 +259,14 @@ func (tx *memTx) decodeOpValue(in interface{}) (common.MapStr, error) {
tc := tx.parent.getTypeConv()
return common.MapStr(opVal), tc.Convert(&opVal, in)
}

func newValueDecoder(tx memTxParent, value common.MapStr) *valueDecoder {
return &valueDecoder{tx: tx, value: value}
}

func (d *valueDecoder) Decode(to interface{}) (err error) {
if err = d.tx.checkRead(); err == nil {
err = d.tx.getTypeConv().Convert(to, d.value)
}
return
}
8 changes: 4 additions & 4 deletions libbeat/registry/backend/memlog/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package memlog

import (
"github.com/elastic/beats/libbeat/common/transform/typeconv"
"github.com/elastic/beats/libbeat/registry/backend"
)

Expand All @@ -26,7 +27,7 @@ type transaction struct {
state txState
mem memTx

typeConv *typeConv
typeConv *typeconv.Converter

active bool
readonly bool
Expand All @@ -51,7 +52,6 @@ func (tx *transaction) close() {
tx.active = false

if tx.typeConv != nil {
tx.typeConv.release()
tx.typeConv = nil
}
}
Expand Down Expand Up @@ -408,10 +408,10 @@ func (tx *transaction) checkRead() error {
return tx.checkActive()
}

func (tx *transaction) getTypeConv() *typeConv {
func (tx *transaction) getTypeConv() *typeconv.Converter {
tc := tx.typeConv
if tc == nil {
tc = newTypeConv()
tc = typeconv.NewConverter()
tx.typeConv = tc
}
return tc
Expand Down

0 comments on commit 40bed88

Please sign in to comment.