Skip to content

Commit

Permalink
models: Added AppendString, PointSize, and Round to Point
Browse files Browse the repository at this point in the history
This change also updates the UDP client to take advantage of these
improvements, as well as some code review changes.
  • Loading branch information
joelegasse committed Sep 23, 2016
1 parent ee68167 commit 0d2b339
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 38 deletions.
3 changes: 2 additions & 1 deletion client/v2/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ type BatchPointsConfig struct {

// Client is a client interface for writing & querying the database
type Client interface {
// Ping checks that status of cluster
// Ping checks that status of cluster, and will always return 0 time and no
// error for UDP clients
Ping(timeout time.Duration) (time.Duration, string, error)

// Write takes a BatchPoints object and writes all Points to InfluxDB.
Expand Down
59 changes: 24 additions & 35 deletions client/v2/udp.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,16 @@
package client

import (
"bytes"
"errors"
"fmt"
"io"
"net"
"time"
)

var ErrLargePoint = errors.New("point exceeds allowed size")

const (
// UDPPayloadSize is a reasonable default payload size for UDP packets that
// could be travelling over the internet.
UDPPayloadSize = 512

// MaxPayloadSize is a safe maximum limit for a UDP payload over IPv4
MaxUDPPayloadSize = 65467
)

// UDPConfig is the config data needed to create a UDP Client
Expand All @@ -27,7 +20,7 @@ type UDPConfig struct {
Addr string

// PayloadSize is the maximum size of a UDP client message, optional
// Tune this based on your network. Defaults to UDPBufferSize.
// Tune this based on your network. Defaults to UDPPayloadSize.
PayloadSize int
}

Expand Down Expand Up @@ -56,12 +49,6 @@ func NewUDPClient(conf UDPConfig) (Client, error) {
}, nil
}

// Ping will check to see if the server is up with an optional timeout on waiting for leader.
// Ping returns how long the request took, the version of the server it connected to, and an error if one occurred.
func (uc *udpclient) Ping(timeout time.Duration) (time.Duration, string, error) {
return 0, "", nil
}

// Close releases the udpclient's resources.
func (uc *udpclient) Close() error {
return uc.conn.Close()
Expand All @@ -73,45 +60,43 @@ type udpclient struct {
}

func (uc *udpclient) Write(bp BatchPoints) error {
var b bytes.Buffer
var d time.Duration
d, _ = time.ParseDuration("1" + bp.Precision())
var b = make([]byte, 0, uc.payloadSize) // initial buffer size, it will grow as needed
var d, _ = time.ParseDuration("1" + bp.Precision())

var delayedError error

var checkBuffer = func(s string) {
if b.Len() > 0 && b.Len()+len(s) > uc.payloadSize {
if _, err := uc.conn.Write(b.Bytes()); err != nil {
var checkBuffer = func(n int) {
if len(b) > 0 && len(b)+n > uc.payloadSize {
if _, err := uc.conn.Write(b); err != nil {
delayedError = err
}
b.Reset()
b = b[:0]
}
}

for _, p := range bp.Points() {
point := p.pt.RoundedString(d) + "\n"
if len(point) > MaxUDPPayloadSize {
delayedError = ErrLargePoint
continue
}
p.pt.Round(d)
pointSize := p.pt.StringSize() + 1 // include newline in size
//point := p.pt.RoundedString(d) + "\n"

checkBuffer(point)
checkBuffer(pointSize)

if p.Time().IsZero() || len(point) <= uc.payloadSize {
b.WriteString(point)
if p.Time().IsZero() || pointSize <= uc.payloadSize {
b = p.pt.AppendString(b)
b = append(b, '\n')
continue
}

points := p.pt.Split(uc.payloadSize - 1) // newline will be added
points := p.pt.Split(uc.payloadSize - 1) // account for newline character
for _, sp := range points {
point = sp.RoundedString(d) + "\n"
checkBuffer(point)
b.WriteString(point)
checkBuffer(sp.StringSize() + 1)
b = sp.AppendString(b)
b = append(b, '\n')
}
}

if b.Len() > 0 {
if _, err := uc.conn.Write(b.Bytes()); err != nil {
if len(b) > 0 {
if _, err := uc.conn.Write(b); err != nil {
return err
}
}
Expand All @@ -121,3 +106,7 @@ func (uc *udpclient) Write(bp BatchPoints) error {
func (uc *udpclient) Query(q Query) (*Response, error) {
return nil, fmt.Errorf("Querying via UDP is not supported")
}

func (uc *udpclient) Ping(timeout time.Duration) (time.Duration, string, error) {
return 0, "", nil
}
50 changes: 50 additions & 0 deletions models/points.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,16 @@ type Point interface {
// string representations are no longer than size. Points with a single field or
// a point without a timestamp may exceed the requested size.
Split(size int) []Point

// Round will round the timestamp of the point to the given duration
Round(d time.Duration)

// StringSize returns the length of the string that would be returned by String()
StringSize() int

// AppendString appends the result of String() to the provided buffer and returns
// the result, potentially reducing string allocations
AppendString(buf []byte) []byte
}

// Points represents a sortable list of points by timestamp.
Expand Down Expand Up @@ -1234,6 +1244,11 @@ func (p *point) SetTime(t time.Time) {
p.time = t
}

// Round implements Point.Round
func (p *point) Round(d time.Duration) {
p.time = p.time.Round(d)
}

// Tags returns the tag set for the point
func (p *point) Tags() Tags {
return parseTags(p.key)
Expand Down Expand Up @@ -1332,6 +1347,41 @@ func (p *point) String() string {
return string(p.Key()) + " " + string(p.fields) + " " + strconv.FormatInt(p.UnixNano(), 10)
}

// AppendString implements Point.AppendString
func (p *point) AppendString(buf []byte) []byte {
buf = append(buf, p.key...)
buf = append(buf, ' ')
buf = append(buf, p.fields...)

if !p.time.IsZero() {
buf = append(buf, ' ')
buf = strconv.AppendInt(buf, p.UnixNano(), 10)
}

return buf
}

func (p *point) StringSize() int {
size := len(p.key) + len(p.fields) + 1

if !p.time.IsZero() {
digits := 1 // even "0" has one digit
t := p.UnixNano()
if t < 0 {
// account for negative sign, then negate
digits++
t = -t
}
for t > 9 { // already accounted for one digit
digits++
t /= 10
}
size += digits + 1 // digits and a space
}

return size
}

func (p *point) MarshalBinary() ([]byte, error) {
tb, err := p.time.MarshalBinary()
if err != nil {
Expand Down
48 changes: 46 additions & 2 deletions models/points_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ var (
"uint32": uint32(math.MaxUint32),
"string": "String field that has a decent length, probably some log message or something",
"boolean": false,
"float64-tiny": math.SmallestNonzeroFloat64,
"float64-large": math.MaxFloat64,
"float64-tiny": float64(math.SmallestNonzeroFloat64),
"float64-large": float64(math.MaxFloat64),
}
maxFloat64 = strconv.FormatFloat(math.MaxFloat64, 'f', 1, 64)
minFloat64 = strconv.FormatFloat(-math.MaxFloat64, 'f', 1, 64)
Expand All @@ -43,6 +43,50 @@ func BenchmarkMarshal(b *testing.B) {
}
}

func TestPoint_StringSize(t *testing.T) {
testPoint_cube(t, func(p models.Point) {
l := p.StringSize()
s := p.String()

if l != len(s) {
t.Errorf("Incorrect length for %q. got %v, exp %v", s, l, len(s))
}
})

}

func TestPoint_AppendString(t *testing.T) {
testPoint_cube(t, func(p models.Point) {
got := p.AppendString(nil)
exp := []byte(p.String())

if !reflect.DeepEqual(exp, got) {
t.Errorf("AppendString() didn't match String(): got %v, exp %v", got, exp)
}
})
}

func testPoint_cube(t *testing.T, f func(p models.Point)) {
// heard of a table-driven test? let's make a cube-driven test...
tagList := []models.Tags{nil, {{[]byte("foo"), []byte("bar")}}, tags}
fieldList := []models.Fields{{"a": 42.0}, {"a": 42, "b": "things"}, fields}
timeList := []time.Time{time.Time{}, time.Unix(0, 0), time.Unix(-34526, 0), time.Unix(231845, 0), time.Now()}

for _, tagSet := range tagList {
for _, fieldSet := range fieldList {
for _, pointTime := range timeList {
p, err := models.NewPoint("test", tagSet, fieldSet, pointTime)
if err != nil {
t.Errorf("unexpected error creating point: %v", err)
continue
}

f(p)
}
}
}
}

var p models.Point

func BenchmarkNewPoint(b *testing.B) {
Expand Down

0 comments on commit 0d2b339

Please sign in to comment.