diff --git a/api/cluster.go b/api/cluster.go index 5cb8bf90a4..d4db99ca2e 100644 --- a/api/cluster.go +++ b/api/cluster.go @@ -375,11 +375,20 @@ func (s *Server) getData(ctx *middleware.Context, request models.GetData) { if err != nil { // the only errors returned are from us catching panics, so we should treat them // all as internalServerErrors + + for _, s := range series { + pointSlicePool.PutMaybeNil(s.Datapoints) + } + log.Errorf("HTTP getData() %s", err.Error()) response.Write(ctx, response.WrapError(err)) return } response.Write(ctx, response.NewMsgp(200, &models.GetDataRespV1{Stats: ss, Series: series})) + + for _, s := range series { + pointSlicePool.PutMaybeNil(s.Datapoints) + } } func (s *Server) indexDelete(ctx *middleware.Context, req models.IndexDelete) { diff --git a/api/dataprocessor.go b/api/dataprocessor.go index 94ba236f0a..9dec989d5b 100644 --- a/api/dataprocessor.go +++ b/api/dataprocessor.go @@ -148,6 +148,7 @@ func divide(pointsA, pointsB []schema.Point) []schema.Point { } // getTargets retrieves the series for the given requests by querying local and/or remote nodes as needed +// if an error occurs, it returns all intermediate series to the pool and returns only the error func (s *Server) getTargets(ctx context.Context, ss *models.StorageStats, reqs []models.Req) ([]models.Series, error) { // split reqs into local and remote. localReqs := make([]models.Req, 0) @@ -198,17 +199,25 @@ func (s *Server) getTargets(ctx context.Context, ss *models.StorageStats, reqs [ }() out := make([]models.Series, 0) + var firstErr error for resp := range responses { - if resp.err != nil { - return nil, resp.err + // even when we run into an error, still collect any series so we can feed them back to the pool + if resp.err != nil && firstErr == nil { + firstErr = resp.err } out = append(out, resp.series...) } + if firstErr != nil { + for _, s := range out { + pointSlicePool.PutMaybeNil(s.Datapoints) + } + return nil, firstErr + } log.Debugf("DP getTargets: %d series found on cluster", len(out)) return out, nil } -// getTargetsRemote issues the requests - keyed by node name - on other nodes +// getTargetsRemote issues the requests - keyed by node name - on other nodes and returns corresponding series, along with the first error encountered func (s *Server) getTargetsRemote(ctx context.Context, ss *models.StorageStats, remoteReqs map[string][]models.Req) ([]models.Series, error) { allPeers, err := cluster.MembersForSpeculativeQuery() @@ -269,7 +278,7 @@ func (s *Server) getTargetsRemote(ctx context.Context, ss *models.StorageStats, return out, err } -// error is the error of the first failing target request +// getTargetsLocal returns the series corresponding to the given requests, along with the first error encountered func (s *Server) getTargetsLocal(ctx context.Context, ss *models.StorageStats, reqs []models.Req) ([]models.Series, error) { log.Debugf("DP getTargetsLocal: handling %d reqs locally", len(reqs)) rCtx, span := tracing.NewSpan(ctx, s.Tracer, "getTargetsLocal") @@ -310,14 +319,17 @@ LOOP: close(responses) }() out := make([]models.Series, 0, len(reqs)) + var firstErr error for resp := range responses { - if resp.err != nil { + if resp.err != nil && firstErr == nil { tags.Error.Set(span, true) - return nil, resp.err + firstErr = resp.err } out = append(out, resp.series) } - + if firstErr != nil { + return out, firstErr + } ss.Trace(span) log.Debugf("DP getTargetsLocal: %d series found locally", len(out)) return out, nil diff --git a/api/init.go b/api/init.go index 71a6109903..41cde44ff2 100644 --- a/api/init.go +++ b/api/init.go @@ -1,6 +1,7 @@ package api import ( + "github.com/grafana/metrictank/api/models" "github.com/grafana/metrictank/expr" "github.com/grafana/metrictank/pointslicepool" ) @@ -10,4 +11,5 @@ var pointSlicePool *pointslicepool.PointSlicePool func init() { pointSlicePool = pointslicepool.New(pointslicepool.DefaultPointSliceSize) expr.Pool(pointSlicePool) + models.Pool(pointSlicePool) } diff --git a/api/models/init_test.go b/api/models/init_test.go new file mode 100644 index 0000000000..95ab0c0a99 --- /dev/null +++ b/api/models/init_test.go @@ -0,0 +1,7 @@ +package models + +import "github.com/grafana/metrictank/pointslicepool" + +func init() { + pointSlicePool = pointslicepool.New(100) +} diff --git a/api/models/pointslicepool.go b/api/models/pointslicepool.go new file mode 100644 index 0000000000..e79ee38c9f --- /dev/null +++ b/api/models/pointslicepool.go @@ -0,0 +1,12 @@ +package models + +import ( + "github.com/grafana/metrictank/pointslicepool" +) + +var pointSlicePool *pointslicepool.PointSlicePool + +// Pool tells the models package library which pool to use for temporary []schema.Point +func Pool(p *pointslicepool.PointSlicePool) { + pointSlicePool = p +} diff --git a/api/models/series.go b/api/models/series.go index 8d1a158f84..c8018e76ea 100644 --- a/api/models/series.go +++ b/api/models/series.go @@ -16,7 +16,7 @@ import ( ) //go:generate msgp -//msgp:ignore SeriesMetaPropertiesExport +//msgp:ignore SeriesMetaPropertiesExport Series type Series struct { Target string // for fetched data, set from models.Req.Target, i.e. the metric graphite key. for function output, whatever should be shown as target string (legend) diff --git a/api/models/series_gen.go b/api/models/series_gen.go index 25215a9163..de138ce709 100644 --- a/api/models/series_gen.go +++ b/api/models/series_gen.go @@ -3,558 +3,9 @@ package models // Code generated by github.com/tinylib/msgp DO NOT EDIT. import ( - "github.com/grafana/metrictank/schema" "github.com/tinylib/msgp/msgp" ) -// DecodeMsg implements msgp.Decodable -func (z *Series) DecodeMsg(dc *msgp.Reader) (err error) { - var field []byte - _ = field - var zb0001 uint32 - zb0001, err = dc.ReadMapHeader() - if err != nil { - err = msgp.WrapError(err) - return - } - for zb0001 > 0 { - zb0001-- - field, err = dc.ReadMapKeyPtr() - if err != nil { - err = msgp.WrapError(err) - return - } - switch msgp.UnsafeString(field) { - case "Target": - z.Target, err = dc.ReadString() - if err != nil { - err = msgp.WrapError(err, "Target") - return - } - case "Tags": - var zb0002 uint32 - zb0002, err = dc.ReadMapHeader() - if err != nil { - err = msgp.WrapError(err, "Tags") - return - } - if z.Tags == nil { - z.Tags = make(map[string]string, zb0002) - } else if len(z.Tags) > 0 { - for key := range z.Tags { - delete(z.Tags, key) - } - } - for zb0002 > 0 { - zb0002-- - var za0001 string - var za0002 string - za0001, err = dc.ReadString() - if err != nil { - err = msgp.WrapError(err, "Tags") - return - } - za0002, err = dc.ReadString() - if err != nil { - err = msgp.WrapError(err, "Tags", za0001) - return - } - z.Tags[za0001] = za0002 - } - case "Interval": - z.Interval, err = dc.ReadUint32() - if err != nil { - err = msgp.WrapError(err, "Interval") - return - } - case "QueryPatt": - z.QueryPatt, err = dc.ReadString() - if err != nil { - err = msgp.WrapError(err, "QueryPatt") - return - } - case "QueryFrom": - z.QueryFrom, err = dc.ReadUint32() - if err != nil { - err = msgp.WrapError(err, "QueryFrom") - return - } - case "QueryTo": - z.QueryTo, err = dc.ReadUint32() - if err != nil { - err = msgp.WrapError(err, "QueryTo") - return - } - case "QueryCons": - err = z.QueryCons.DecodeMsg(dc) - if err != nil { - err = msgp.WrapError(err, "QueryCons") - return - } - case "Consolidator": - err = z.Consolidator.DecodeMsg(dc) - if err != nil { - err = msgp.WrapError(err, "Consolidator") - return - } - case "QueryMDP": - z.QueryMDP, err = dc.ReadUint32() - if err != nil { - err = msgp.WrapError(err, "QueryMDP") - return - } - case "QueryPNGroup": - err = z.QueryPNGroup.DecodeMsg(dc) - if err != nil { - err = msgp.WrapError(err, "QueryPNGroup") - return - } - case "Meta": - var zb0003 uint32 - zb0003, err = dc.ReadArrayHeader() - if err != nil { - err = msgp.WrapError(err, "Meta") - return - } - if cap(z.Meta) >= int(zb0003) { - z.Meta = (z.Meta)[:zb0003] - } else { - z.Meta = make(SeriesMeta, zb0003) - } - for za0003 := range z.Meta { - err = z.Meta[za0003].DecodeMsg(dc) - if err != nil { - err = msgp.WrapError(err, "Meta", za0003) - return - } - } - case "Datapoints": - var zb0004 uint32 - zb0004, err = dc.ReadArrayHeader() - if err != nil { - err = msgp.WrapError(err, "Datapoints") - return - } - if cap(z.Datapoints) >= int(zb0004) { - z.Datapoints = (z.Datapoints)[:zb0004] - } else { - z.Datapoints = make([]schema.Point, zb0004) - } - for za0004 := range z.Datapoints { - err = z.Datapoints[za0004].DecodeMsg(dc) - if err != nil { - err = msgp.WrapError(err, "Datapoints", za0004) - return - } - } - default: - err = dc.Skip() - if err != nil { - err = msgp.WrapError(err) - return - } - } - } - return -} - -// EncodeMsg implements msgp.Encodable -func (z *Series) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 12 - // write "Target" - err = en.Append(0x8c, 0xa6, 0x54, 0x61, 0x72, 0x67, 0x65, 0x74) - if err != nil { - return - } - err = en.WriteString(z.Target) - if err != nil { - err = msgp.WrapError(err, "Target") - return - } - // write "Tags" - err = en.Append(0xa4, 0x54, 0x61, 0x67, 0x73) - if err != nil { - return - } - err = en.WriteMapHeader(uint32(len(z.Tags))) - if err != nil { - err = msgp.WrapError(err, "Tags") - return - } - for za0001, za0002 := range z.Tags { - err = en.WriteString(za0001) - if err != nil { - err = msgp.WrapError(err, "Tags") - return - } - err = en.WriteString(za0002) - if err != nil { - err = msgp.WrapError(err, "Tags", za0001) - return - } - } - // write "Interval" - err = en.Append(0xa8, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, 0x6c) - if err != nil { - return - } - err = en.WriteUint32(z.Interval) - if err != nil { - err = msgp.WrapError(err, "Interval") - return - } - // write "QueryPatt" - err = en.Append(0xa9, 0x51, 0x75, 0x65, 0x72, 0x79, 0x50, 0x61, 0x74, 0x74) - if err != nil { - return - } - err = en.WriteString(z.QueryPatt) - if err != nil { - err = msgp.WrapError(err, "QueryPatt") - return - } - // write "QueryFrom" - err = en.Append(0xa9, 0x51, 0x75, 0x65, 0x72, 0x79, 0x46, 0x72, 0x6f, 0x6d) - if err != nil { - return - } - err = en.WriteUint32(z.QueryFrom) - if err != nil { - err = msgp.WrapError(err, "QueryFrom") - return - } - // write "QueryTo" - err = en.Append(0xa7, 0x51, 0x75, 0x65, 0x72, 0x79, 0x54, 0x6f) - if err != nil { - return - } - err = en.WriteUint32(z.QueryTo) - if err != nil { - err = msgp.WrapError(err, "QueryTo") - return - } - // write "QueryCons" - err = en.Append(0xa9, 0x51, 0x75, 0x65, 0x72, 0x79, 0x43, 0x6f, 0x6e, 0x73) - if err != nil { - return - } - err = z.QueryCons.EncodeMsg(en) - if err != nil { - err = msgp.WrapError(err, "QueryCons") - return - } - // write "Consolidator" - err = en.Append(0xac, 0x43, 0x6f, 0x6e, 0x73, 0x6f, 0x6c, 0x69, 0x64, 0x61, 0x74, 0x6f, 0x72) - if err != nil { - return - } - err = z.Consolidator.EncodeMsg(en) - if err != nil { - err = msgp.WrapError(err, "Consolidator") - return - } - // write "QueryMDP" - err = en.Append(0xa8, 0x51, 0x75, 0x65, 0x72, 0x79, 0x4d, 0x44, 0x50) - if err != nil { - return - } - err = en.WriteUint32(z.QueryMDP) - if err != nil { - err = msgp.WrapError(err, "QueryMDP") - return - } - // write "QueryPNGroup" - err = en.Append(0xac, 0x51, 0x75, 0x65, 0x72, 0x79, 0x50, 0x4e, 0x47, 0x72, 0x6f, 0x75, 0x70) - if err != nil { - return - } - err = z.QueryPNGroup.EncodeMsg(en) - if err != nil { - err = msgp.WrapError(err, "QueryPNGroup") - return - } - // write "Meta" - err = en.Append(0xa4, 0x4d, 0x65, 0x74, 0x61) - if err != nil { - return - } - err = en.WriteArrayHeader(uint32(len(z.Meta))) - if err != nil { - err = msgp.WrapError(err, "Meta") - return - } - for za0003 := range z.Meta { - err = z.Meta[za0003].EncodeMsg(en) - if err != nil { - err = msgp.WrapError(err, "Meta", za0003) - return - } - } - // write "Datapoints" - err = en.Append(0xaa, 0x44, 0x61, 0x74, 0x61, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x73) - if err != nil { - return - } - err = en.WriteArrayHeader(uint32(len(z.Datapoints))) - if err != nil { - err = msgp.WrapError(err, "Datapoints") - return - } - for za0004 := range z.Datapoints { - err = z.Datapoints[za0004].EncodeMsg(en) - if err != nil { - err = msgp.WrapError(err, "Datapoints", za0004) - return - } - } - return -} - -// MarshalMsg implements msgp.Marshaler -func (z *Series) MarshalMsg(b []byte) (o []byte, err error) { - o = msgp.Require(b, z.Msgsize()) - // map header, size 12 - // string "Target" - o = append(o, 0x8c, 0xa6, 0x54, 0x61, 0x72, 0x67, 0x65, 0x74) - o = msgp.AppendString(o, z.Target) - // string "Tags" - o = append(o, 0xa4, 0x54, 0x61, 0x67, 0x73) - o = msgp.AppendMapHeader(o, uint32(len(z.Tags))) - for za0001, za0002 := range z.Tags { - o = msgp.AppendString(o, za0001) - o = msgp.AppendString(o, za0002) - } - // string "Interval" - o = append(o, 0xa8, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, 0x6c) - o = msgp.AppendUint32(o, z.Interval) - // string "QueryPatt" - o = append(o, 0xa9, 0x51, 0x75, 0x65, 0x72, 0x79, 0x50, 0x61, 0x74, 0x74) - o = msgp.AppendString(o, z.QueryPatt) - // string "QueryFrom" - o = append(o, 0xa9, 0x51, 0x75, 0x65, 0x72, 0x79, 0x46, 0x72, 0x6f, 0x6d) - o = msgp.AppendUint32(o, z.QueryFrom) - // string "QueryTo" - o = append(o, 0xa7, 0x51, 0x75, 0x65, 0x72, 0x79, 0x54, 0x6f) - o = msgp.AppendUint32(o, z.QueryTo) - // string "QueryCons" - o = append(o, 0xa9, 0x51, 0x75, 0x65, 0x72, 0x79, 0x43, 0x6f, 0x6e, 0x73) - o, err = z.QueryCons.MarshalMsg(o) - if err != nil { - err = msgp.WrapError(err, "QueryCons") - return - } - // string "Consolidator" - o = append(o, 0xac, 0x43, 0x6f, 0x6e, 0x73, 0x6f, 0x6c, 0x69, 0x64, 0x61, 0x74, 0x6f, 0x72) - o, err = z.Consolidator.MarshalMsg(o) - if err != nil { - err = msgp.WrapError(err, "Consolidator") - return - } - // string "QueryMDP" - o = append(o, 0xa8, 0x51, 0x75, 0x65, 0x72, 0x79, 0x4d, 0x44, 0x50) - o = msgp.AppendUint32(o, z.QueryMDP) - // string "QueryPNGroup" - o = append(o, 0xac, 0x51, 0x75, 0x65, 0x72, 0x79, 0x50, 0x4e, 0x47, 0x72, 0x6f, 0x75, 0x70) - o, err = z.QueryPNGroup.MarshalMsg(o) - if err != nil { - err = msgp.WrapError(err, "QueryPNGroup") - return - } - // string "Meta" - o = append(o, 0xa4, 0x4d, 0x65, 0x74, 0x61) - o = msgp.AppendArrayHeader(o, uint32(len(z.Meta))) - for za0003 := range z.Meta { - o, err = z.Meta[za0003].MarshalMsg(o) - if err != nil { - err = msgp.WrapError(err, "Meta", za0003) - return - } - } - // string "Datapoints" - o = append(o, 0xaa, 0x44, 0x61, 0x74, 0x61, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x73) - o = msgp.AppendArrayHeader(o, uint32(len(z.Datapoints))) - for za0004 := range z.Datapoints { - o, err = z.Datapoints[za0004].MarshalMsg(o) - if err != nil { - err = msgp.WrapError(err, "Datapoints", za0004) - return - } - } - return -} - -// UnmarshalMsg implements msgp.Unmarshaler -func (z *Series) UnmarshalMsg(bts []byte) (o []byte, err error) { - var field []byte - _ = field - var zb0001 uint32 - zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) - if err != nil { - err = msgp.WrapError(err) - return - } - for zb0001 > 0 { - zb0001-- - field, bts, err = msgp.ReadMapKeyZC(bts) - if err != nil { - err = msgp.WrapError(err) - return - } - switch msgp.UnsafeString(field) { - case "Target": - z.Target, bts, err = msgp.ReadStringBytes(bts) - if err != nil { - err = msgp.WrapError(err, "Target") - return - } - case "Tags": - var zb0002 uint32 - zb0002, bts, err = msgp.ReadMapHeaderBytes(bts) - if err != nil { - err = msgp.WrapError(err, "Tags") - return - } - if z.Tags == nil { - z.Tags = make(map[string]string, zb0002) - } else if len(z.Tags) > 0 { - for key := range z.Tags { - delete(z.Tags, key) - } - } - for zb0002 > 0 { - var za0001 string - var za0002 string - zb0002-- - za0001, bts, err = msgp.ReadStringBytes(bts) - if err != nil { - err = msgp.WrapError(err, "Tags") - return - } - za0002, bts, err = msgp.ReadStringBytes(bts) - if err != nil { - err = msgp.WrapError(err, "Tags", za0001) - return - } - z.Tags[za0001] = za0002 - } - case "Interval": - z.Interval, bts, err = msgp.ReadUint32Bytes(bts) - if err != nil { - err = msgp.WrapError(err, "Interval") - return - } - case "QueryPatt": - z.QueryPatt, bts, err = msgp.ReadStringBytes(bts) - if err != nil { - err = msgp.WrapError(err, "QueryPatt") - return - } - case "QueryFrom": - z.QueryFrom, bts, err = msgp.ReadUint32Bytes(bts) - if err != nil { - err = msgp.WrapError(err, "QueryFrom") - return - } - case "QueryTo": - z.QueryTo, bts, err = msgp.ReadUint32Bytes(bts) - if err != nil { - err = msgp.WrapError(err, "QueryTo") - return - } - case "QueryCons": - bts, err = z.QueryCons.UnmarshalMsg(bts) - if err != nil { - err = msgp.WrapError(err, "QueryCons") - return - } - case "Consolidator": - bts, err = z.Consolidator.UnmarshalMsg(bts) - if err != nil { - err = msgp.WrapError(err, "Consolidator") - return - } - case "QueryMDP": - z.QueryMDP, bts, err = msgp.ReadUint32Bytes(bts) - if err != nil { - err = msgp.WrapError(err, "QueryMDP") - return - } - case "QueryPNGroup": - bts, err = z.QueryPNGroup.UnmarshalMsg(bts) - if err != nil { - err = msgp.WrapError(err, "QueryPNGroup") - return - } - case "Meta": - var zb0003 uint32 - zb0003, bts, err = msgp.ReadArrayHeaderBytes(bts) - if err != nil { - err = msgp.WrapError(err, "Meta") - return - } - if cap(z.Meta) >= int(zb0003) { - z.Meta = (z.Meta)[:zb0003] - } else { - z.Meta = make(SeriesMeta, zb0003) - } - for za0003 := range z.Meta { - bts, err = z.Meta[za0003].UnmarshalMsg(bts) - if err != nil { - err = msgp.WrapError(err, "Meta", za0003) - return - } - } - case "Datapoints": - var zb0004 uint32 - zb0004, bts, err = msgp.ReadArrayHeaderBytes(bts) - if err != nil { - err = msgp.WrapError(err, "Datapoints") - return - } - if cap(z.Datapoints) >= int(zb0004) { - z.Datapoints = (z.Datapoints)[:zb0004] - } else { - z.Datapoints = make([]schema.Point, zb0004) - } - for za0004 := range z.Datapoints { - bts, err = z.Datapoints[za0004].UnmarshalMsg(bts) - if err != nil { - err = msgp.WrapError(err, "Datapoints", za0004) - return - } - } - default: - bts, err = msgp.Skip(bts) - if err != nil { - err = msgp.WrapError(err) - return - } - } - } - o = bts - return -} - -// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message -func (z *Series) Msgsize() (s int) { - s = 1 + 7 + msgp.StringPrefixSize + len(z.Target) + 5 + msgp.MapHeaderSize - if z.Tags != nil { - for za0001, za0002 := range z.Tags { - _ = za0002 - s += msgp.StringPrefixSize + len(za0001) + msgp.StringPrefixSize + len(za0002) - } - } - s += 9 + msgp.Uint32Size + 10 + msgp.StringPrefixSize + len(z.QueryPatt) + 10 + msgp.Uint32Size + 8 + msgp.Uint32Size + 10 + z.QueryCons.Msgsize() + 13 + z.Consolidator.Msgsize() + 9 + msgp.Uint32Size + 13 + z.QueryPNGroup.Msgsize() + 5 + msgp.ArrayHeaderSize - for za0003 := range z.Meta { - s += z.Meta[za0003].Msgsize() - } - s += 11 + msgp.ArrayHeaderSize - for za0004 := range z.Datapoints { - s += z.Datapoints[za0004].Msgsize() - } - return -} - // DecodeMsg implements msgp.Decodable func (z *SeriesByTarget) DecodeMsg(dc *msgp.Reader) (err error) { var zb0002 uint32 diff --git a/api/models/series_gen_test.go b/api/models/series_gen_test.go index 5524599879..57b3d72dfc 100644 --- a/api/models/series_gen_test.go +++ b/api/models/series_gen_test.go @@ -9,119 +9,6 @@ import ( "github.com/tinylib/msgp/msgp" ) -func TestMarshalUnmarshalSeries(t *testing.T) { - v := Series{} - bts, err := v.MarshalMsg(nil) - if err != nil { - t.Fatal(err) - } - left, err := v.UnmarshalMsg(bts) - if err != nil { - t.Fatal(err) - } - if len(left) > 0 { - t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) - } - - left, err = msgp.Skip(bts) - if err != nil { - t.Fatal(err) - } - if len(left) > 0 { - t.Errorf("%d bytes left over after Skip(): %q", len(left), left) - } -} - -func BenchmarkMarshalMsgSeries(b *testing.B) { - v := Series{} - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { - v.MarshalMsg(nil) - } -} - -func BenchmarkAppendMsgSeries(b *testing.B) { - v := Series{} - bts := make([]byte, 0, v.Msgsize()) - bts, _ = v.MarshalMsg(bts[0:0]) - b.SetBytes(int64(len(bts))) - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { - bts, _ = v.MarshalMsg(bts[0:0]) - } -} - -func BenchmarkUnmarshalSeries(b *testing.B) { - v := Series{} - bts, _ := v.MarshalMsg(nil) - b.ReportAllocs() - b.SetBytes(int64(len(bts))) - b.ResetTimer() - for i := 0; i < b.N; i++ { - _, err := v.UnmarshalMsg(bts) - if err != nil { - b.Fatal(err) - } - } -} - -func TestEncodeDecodeSeries(t *testing.T) { - v := Series{} - var buf bytes.Buffer - msgp.Encode(&buf, &v) - - m := v.Msgsize() - if buf.Len() > m { - t.Logf("WARNING: Msgsize() for %v is inaccurate", v) - } - - vn := Series{} - err := msgp.Decode(&buf, &vn) - if err != nil { - t.Error(err) - } - - buf.Reset() - msgp.Encode(&buf, &v) - err = msgp.NewReader(&buf).Skip() - if err != nil { - t.Error(err) - } -} - -func BenchmarkEncodeSeries(b *testing.B) { - v := Series{} - var buf bytes.Buffer - msgp.Encode(&buf, &v) - b.SetBytes(int64(buf.Len())) - en := msgp.NewWriter(msgp.Nowhere) - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { - v.EncodeMsg(en) - } - en.Flush() -} - -func BenchmarkDecodeSeries(b *testing.B) { - v := Series{} - var buf bytes.Buffer - msgp.Encode(&buf, &v) - b.SetBytes(int64(buf.Len())) - rd := msgp.NewEndlessReader(buf.Bytes(), b) - dc := msgp.NewReader(rd) - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { - err := v.DecodeMsg(dc) - if err != nil { - b.Fatal(err) - } - } -} - func TestMarshalUnmarshalSeriesByTarget(t *testing.T) { v := SeriesByTarget{} bts, err := v.MarshalMsg(nil) diff --git a/api/models/series_msgp_manual.go b/api/models/series_msgp_manual.go new file mode 100644 index 0000000000..5fc346e1d5 --- /dev/null +++ b/api/models/series_msgp_manual.go @@ -0,0 +1,553 @@ +package models + +import ( + "github.com/tinylib/msgp/msgp" +) + +// DecodeMsg implements msgp.Decodable +func (z *Series) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "Target": + z.Target, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Target") + return + } + case "Tags": + var zb0002 uint32 + zb0002, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err, "Tags") + return + } + if z.Tags == nil { + z.Tags = make(map[string]string, zb0002) + } else if len(z.Tags) > 0 { + for key := range z.Tags { + delete(z.Tags, key) + } + } + for zb0002 > 0 { + zb0002-- + var za0001 string + var za0002 string + za0001, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Tags") + return + } + za0002, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Tags", za0001) + return + } + z.Tags[za0001] = za0002 + } + case "Interval": + z.Interval, err = dc.ReadUint32() + if err != nil { + err = msgp.WrapError(err, "Interval") + return + } + case "QueryPatt": + z.QueryPatt, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "QueryPatt") + return + } + case "QueryFrom": + z.QueryFrom, err = dc.ReadUint32() + if err != nil { + err = msgp.WrapError(err, "QueryFrom") + return + } + case "QueryTo": + z.QueryTo, err = dc.ReadUint32() + if err != nil { + err = msgp.WrapError(err, "QueryTo") + return + } + case "QueryCons": + err = z.QueryCons.DecodeMsg(dc) + if err != nil { + err = msgp.WrapError(err, "QueryCons") + return + } + case "Consolidator": + err = z.Consolidator.DecodeMsg(dc) + if err != nil { + err = msgp.WrapError(err, "Consolidator") + return + } + case "QueryMDP": + z.QueryMDP, err = dc.ReadUint32() + if err != nil { + err = msgp.WrapError(err, "QueryMDP") + return + } + case "QueryPNGroup": + err = z.QueryPNGroup.DecodeMsg(dc) + if err != nil { + err = msgp.WrapError(err, "QueryPNGroup") + return + } + case "Meta": + var zb0003 uint32 + zb0003, err = dc.ReadArrayHeader() + if err != nil { + err = msgp.WrapError(err, "Meta") + return + } + if cap(z.Meta) >= int(zb0003) { + z.Meta = (z.Meta)[:zb0003] + } else { + z.Meta = make(SeriesMeta, zb0003) + } + for za0003 := range z.Meta { + err = z.Meta[za0003].DecodeMsg(dc) + if err != nil { + err = msgp.WrapError(err, "Meta", za0003) + return + } + } + case "Datapoints": + var zb0004 uint32 + zb0004, err = dc.ReadArrayHeader() + if err != nil { + err = msgp.WrapError(err, "Datapoints") + return + } + if cap(z.Datapoints) >= int(zb0004) { + z.Datapoints = (z.Datapoints)[:zb0004] + } else { + z.Datapoints = pointSlicePool.GetMin(int(zb0004))[:zb0004] + } + for za0004 := range z.Datapoints { + err = z.Datapoints[za0004].DecodeMsg(dc) + if err != nil { + err = msgp.WrapError(err, "Datapoints", za0004) + return + } + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *Series) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 12 + // write "Target" + err = en.Append(0x8c, 0xa6, 0x54, 0x61, 0x72, 0x67, 0x65, 0x74) + if err != nil { + return + } + err = en.WriteString(z.Target) + if err != nil { + err = msgp.WrapError(err, "Target") + return + } + // write "Tags" + err = en.Append(0xa4, 0x54, 0x61, 0x67, 0x73) + if err != nil { + return + } + err = en.WriteMapHeader(uint32(len(z.Tags))) + if err != nil { + err = msgp.WrapError(err, "Tags") + return + } + for za0001, za0002 := range z.Tags { + err = en.WriteString(za0001) + if err != nil { + err = msgp.WrapError(err, "Tags") + return + } + err = en.WriteString(za0002) + if err != nil { + err = msgp.WrapError(err, "Tags", za0001) + return + } + } + // write "Interval" + err = en.Append(0xa8, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, 0x6c) + if err != nil { + return + } + err = en.WriteUint32(z.Interval) + if err != nil { + err = msgp.WrapError(err, "Interval") + return + } + // write "QueryPatt" + err = en.Append(0xa9, 0x51, 0x75, 0x65, 0x72, 0x79, 0x50, 0x61, 0x74, 0x74) + if err != nil { + return + } + err = en.WriteString(z.QueryPatt) + if err != nil { + err = msgp.WrapError(err, "QueryPatt") + return + } + // write "QueryFrom" + err = en.Append(0xa9, 0x51, 0x75, 0x65, 0x72, 0x79, 0x46, 0x72, 0x6f, 0x6d) + if err != nil { + return + } + err = en.WriteUint32(z.QueryFrom) + if err != nil { + err = msgp.WrapError(err, "QueryFrom") + return + } + // write "QueryTo" + err = en.Append(0xa7, 0x51, 0x75, 0x65, 0x72, 0x79, 0x54, 0x6f) + if err != nil { + return + } + err = en.WriteUint32(z.QueryTo) + if err != nil { + err = msgp.WrapError(err, "QueryTo") + return + } + // write "QueryCons" + err = en.Append(0xa9, 0x51, 0x75, 0x65, 0x72, 0x79, 0x43, 0x6f, 0x6e, 0x73) + if err != nil { + return + } + err = z.QueryCons.EncodeMsg(en) + if err != nil { + err = msgp.WrapError(err, "QueryCons") + return + } + // write "Consolidator" + err = en.Append(0xac, 0x43, 0x6f, 0x6e, 0x73, 0x6f, 0x6c, 0x69, 0x64, 0x61, 0x74, 0x6f, 0x72) + if err != nil { + return + } + err = z.Consolidator.EncodeMsg(en) + if err != nil { + err = msgp.WrapError(err, "Consolidator") + return + } + // write "QueryMDP" + err = en.Append(0xa8, 0x51, 0x75, 0x65, 0x72, 0x79, 0x4d, 0x44, 0x50) + if err != nil { + return + } + err = en.WriteUint32(z.QueryMDP) + if err != nil { + err = msgp.WrapError(err, "QueryMDP") + return + } + // write "QueryPNGroup" + err = en.Append(0xac, 0x51, 0x75, 0x65, 0x72, 0x79, 0x50, 0x4e, 0x47, 0x72, 0x6f, 0x75, 0x70) + if err != nil { + return + } + err = z.QueryPNGroup.EncodeMsg(en) + if err != nil { + err = msgp.WrapError(err, "QueryPNGroup") + return + } + // write "Meta" + err = en.Append(0xa4, 0x4d, 0x65, 0x74, 0x61) + if err != nil { + return + } + err = en.WriteArrayHeader(uint32(len(z.Meta))) + if err != nil { + err = msgp.WrapError(err, "Meta") + return + } + for za0003 := range z.Meta { + err = z.Meta[za0003].EncodeMsg(en) + if err != nil { + err = msgp.WrapError(err, "Meta", za0003) + return + } + } + // write "Datapoints" + err = en.Append(0xaa, 0x44, 0x61, 0x74, 0x61, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x73) + if err != nil { + return + } + err = en.WriteArrayHeader(uint32(len(z.Datapoints))) + if err != nil { + err = msgp.WrapError(err, "Datapoints") + return + } + for za0004 := range z.Datapoints { + err = z.Datapoints[za0004].EncodeMsg(en) + if err != nil { + err = msgp.WrapError(err, "Datapoints", za0004) + return + } + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z *Series) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 12 + // string "Target" + o = append(o, 0x8c, 0xa6, 0x54, 0x61, 0x72, 0x67, 0x65, 0x74) + o = msgp.AppendString(o, z.Target) + // string "Tags" + o = append(o, 0xa4, 0x54, 0x61, 0x67, 0x73) + o = msgp.AppendMapHeader(o, uint32(len(z.Tags))) + for za0001, za0002 := range z.Tags { + o = msgp.AppendString(o, za0001) + o = msgp.AppendString(o, za0002) + } + // string "Interval" + o = append(o, 0xa8, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, 0x6c) + o = msgp.AppendUint32(o, z.Interval) + // string "QueryPatt" + o = append(o, 0xa9, 0x51, 0x75, 0x65, 0x72, 0x79, 0x50, 0x61, 0x74, 0x74) + o = msgp.AppendString(o, z.QueryPatt) + // string "QueryFrom" + o = append(o, 0xa9, 0x51, 0x75, 0x65, 0x72, 0x79, 0x46, 0x72, 0x6f, 0x6d) + o = msgp.AppendUint32(o, z.QueryFrom) + // string "QueryTo" + o = append(o, 0xa7, 0x51, 0x75, 0x65, 0x72, 0x79, 0x54, 0x6f) + o = msgp.AppendUint32(o, z.QueryTo) + // string "QueryCons" + o = append(o, 0xa9, 0x51, 0x75, 0x65, 0x72, 0x79, 0x43, 0x6f, 0x6e, 0x73) + o, err = z.QueryCons.MarshalMsg(o) + if err != nil { + err = msgp.WrapError(err, "QueryCons") + return + } + // string "Consolidator" + o = append(o, 0xac, 0x43, 0x6f, 0x6e, 0x73, 0x6f, 0x6c, 0x69, 0x64, 0x61, 0x74, 0x6f, 0x72) + o, err = z.Consolidator.MarshalMsg(o) + if err != nil { + err = msgp.WrapError(err, "Consolidator") + return + } + // string "QueryMDP" + o = append(o, 0xa8, 0x51, 0x75, 0x65, 0x72, 0x79, 0x4d, 0x44, 0x50) + o = msgp.AppendUint32(o, z.QueryMDP) + // string "QueryPNGroup" + o = append(o, 0xac, 0x51, 0x75, 0x65, 0x72, 0x79, 0x50, 0x4e, 0x47, 0x72, 0x6f, 0x75, 0x70) + o, err = z.QueryPNGroup.MarshalMsg(o) + if err != nil { + err = msgp.WrapError(err, "QueryPNGroup") + return + } + // string "Meta" + o = append(o, 0xa4, 0x4d, 0x65, 0x74, 0x61) + o = msgp.AppendArrayHeader(o, uint32(len(z.Meta))) + for za0003 := range z.Meta { + o, err = z.Meta[za0003].MarshalMsg(o) + if err != nil { + err = msgp.WrapError(err, "Meta", za0003) + return + } + } + // string "Datapoints" + o = append(o, 0xaa, 0x44, 0x61, 0x74, 0x61, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x73) + o = msgp.AppendArrayHeader(o, uint32(len(z.Datapoints))) + for za0004 := range z.Datapoints { + o, err = z.Datapoints[za0004].MarshalMsg(o) + if err != nil { + err = msgp.WrapError(err, "Datapoints", za0004) + return + } + } + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *Series) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "Target": + z.Target, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Target") + return + } + case "Tags": + var zb0002 uint32 + zb0002, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Tags") + return + } + if z.Tags == nil { + z.Tags = make(map[string]string, zb0002) + } else if len(z.Tags) > 0 { + for key := range z.Tags { + delete(z.Tags, key) + } + } + for zb0002 > 0 { + var za0001 string + var za0002 string + zb0002-- + za0001, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Tags") + return + } + za0002, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Tags", za0001) + return + } + z.Tags[za0001] = za0002 + } + case "Interval": + z.Interval, bts, err = msgp.ReadUint32Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "Interval") + return + } + case "QueryPatt": + z.QueryPatt, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "QueryPatt") + return + } + case "QueryFrom": + z.QueryFrom, bts, err = msgp.ReadUint32Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "QueryFrom") + return + } + case "QueryTo": + z.QueryTo, bts, err = msgp.ReadUint32Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "QueryTo") + return + } + case "QueryCons": + bts, err = z.QueryCons.UnmarshalMsg(bts) + if err != nil { + err = msgp.WrapError(err, "QueryCons") + return + } + case "Consolidator": + bts, err = z.Consolidator.UnmarshalMsg(bts) + if err != nil { + err = msgp.WrapError(err, "Consolidator") + return + } + case "QueryMDP": + z.QueryMDP, bts, err = msgp.ReadUint32Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "QueryMDP") + return + } + case "QueryPNGroup": + bts, err = z.QueryPNGroup.UnmarshalMsg(bts) + if err != nil { + err = msgp.WrapError(err, "QueryPNGroup") + return + } + case "Meta": + var zb0003 uint32 + zb0003, bts, err = msgp.ReadArrayHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Meta") + return + } + if cap(z.Meta) >= int(zb0003) { + z.Meta = (z.Meta)[:zb0003] + } else { + z.Meta = make(SeriesMeta, zb0003) + } + for za0003 := range z.Meta { + bts, err = z.Meta[za0003].UnmarshalMsg(bts) + if err != nil { + err = msgp.WrapError(err, "Meta", za0003) + return + } + } + case "Datapoints": + var zb0004 uint32 + zb0004, bts, err = msgp.ReadArrayHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Datapoints") + return + } + if cap(z.Datapoints) >= int(zb0004) { + z.Datapoints = (z.Datapoints)[:zb0004] + } else { + z.Datapoints = pointSlicePool.GetMin(int(zb0004))[:zb0004] + } + for za0004 := range z.Datapoints { + bts, err = z.Datapoints[za0004].UnmarshalMsg(bts) + if err != nil { + err = msgp.WrapError(err, "Datapoints", za0004) + return + } + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *Series) Msgsize() (s int) { + s = 1 + 7 + msgp.StringPrefixSize + len(z.Target) + 5 + msgp.MapHeaderSize + if z.Tags != nil { + for za0001, za0002 := range z.Tags { + _ = za0002 + s += msgp.StringPrefixSize + len(za0001) + msgp.StringPrefixSize + len(za0002) + } + } + s += 9 + msgp.Uint32Size + 10 + msgp.StringPrefixSize + len(z.QueryPatt) + 10 + msgp.Uint32Size + 8 + msgp.Uint32Size + 10 + z.QueryCons.Msgsize() + 13 + z.Consolidator.Msgsize() + 9 + msgp.Uint32Size + 13 + z.QueryPNGroup.Msgsize() + 5 + msgp.ArrayHeaderSize + for za0003 := range z.Meta { + s += z.Meta[za0003].Msgsize() + } + s += 11 + msgp.ArrayHeaderSize + for za0004 := range z.Datapoints { + s += z.Datapoints[za0004].Msgsize() + } + return +} diff --git a/api/models/series_msgp_manual_test.go b/api/models/series_msgp_manual_test.go new file mode 100644 index 0000000000..2bfb065617 --- /dev/null +++ b/api/models/series_msgp_manual_test.go @@ -0,0 +1,121 @@ +package models + +import ( + "bytes" + "testing" + + "github.com/tinylib/msgp/msgp" +) + +func TestMarshalUnmarshalSeries(t *testing.T) { + v := Series{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgSeries(b *testing.B) { + v := Series{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgSeries(b *testing.B) { + v := Series{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalSeries(b *testing.B) { + v := Series{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodeSeries(t *testing.T) { + v := Series{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Logf("WARNING: Msgsize() for %v is inaccurate", v) + } + + vn := Series{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodeSeries(b *testing.B) { + v := Series{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodeSeries(b *testing.B) { + v := Series{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} diff --git a/pointslicepool/pointslicepool.go b/pointslicepool/pointslicepool.go index 1f8b2e50dd..574ee097e3 100644 --- a/pointslicepool/pointslicepool.go +++ b/pointslicepool/pointslicepool.go @@ -44,6 +44,12 @@ func New(defaultSize int) *PointSlicePool { } } +func (p *PointSlicePool) PutMaybeNil(s []schema.Point) { + if s != nil { + p.Put(s) + } +} + func (p *PointSlicePool) Put(s []schema.Point) { if cap(s) >= p.defaultSize { p.putLarge.Inc()