From 189d3000b9850998fde972b63c9f9d5371781809 Mon Sep 17 00:00:00 2001 From: "J. Emrys Landivar" Date: Wed, 17 Mar 2021 16:12:06 -0500 Subject: [PATCH] fix: memory leaks (#2489) There were a number of memory leaks of the form of `a = a[i:]`. These were replaced with circular queue buffers, to prevent the leaks. --- CHANGELOG.md | 4 + circularqueue.gen.go.tmpl | 115 +++++++++ circularqueue.gen_test.go | 214 +++++++++++++++++ circularqueue_test.go | 245 ++++++++++++++++++++ integrations/streamer_test.go | 2 - join.go | 145 +++++++----- join_circularqueues.gen.go | 214 +++++++++++++++++ task_master.go | 2 +- udf/agent/examples/moving_avg/moving_avg.go | 2 +- udf/agent/udf.pb.go | 2 +- union.go | 44 ++-- union_circularqueues.gen.go | 111 +++++++++ 12 files changed, 1022 insertions(+), 78 deletions(-) create mode 100644 circularqueue.gen.go.tmpl create mode 100644 circularqueue.gen_test.go create mode 100644 circularqueue_test.go create mode 100644 join_circularqueues.gen.go create mode 100644 union_circularqueues.gen.go diff --git a/CHANGELOG.md b/CHANGELOG.md index cb8c82c8a..43bf11c7d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,10 @@ ### Bugfixes - [#2498](https://github.com/influxdata/kapacitor/pull/2498): avoid infinite hang when closing Kakfa writer, this also prevents the timeout error on an http update to Kafka config. +## unreleased + +### BugFixes +- [#2489](https://github.com/influxdata/kapacitor/pull/2489): Fix memory leaks in JoinNode and UnionNode. ## v1.5.8 [2021-01-11] diff --git a/circularqueue.gen.go.tmpl b/circularqueue.gen.go.tmpl new file mode 100644 index 000000000..71db5746a --- /dev/null +++ b/circularqueue.gen.go.tmpl @@ -0,0 +1,115 @@ +//lint:file-ignore U1000 this is generated code +package kapacitor + +{{with $types := .}}{{range $k := $types}} + +// {{ $k }}CircularQueue defines a circular queue, always use the contructor to create one. +type {{ $k }}CircularQueue struct { + data []{{ $k }} + head int + tail int + l int +} + + +// {{ if eq (substr 0 1 $k ) (substr 0 1 $k | upper) -}} New {{- else -}} new {{- end -}} +{{- substr 0 1 $k | upper -}}{{- substr 1 (len $k) $k -}} constructs a Circular Queue +// with a given buffer buf. It is ok for buf to be nil. +func {{ if eq (substr 0 1 $k ) (substr 0 1 $k | upper) -}} New {{- else -}} new {{- end -}} +{{- substr 0 1 $k | upper -}}{{- substr 1 (len $k) $k -}} CircularQueue(buf ...{{ $k }}) *{{ $k }}CircularQueue { + // if we have a useless buffer, make one that is at least useful + if cap(buf) < 4{ + buf = append(make([]{{ $k }}, 0, 4), buf...) + } + return &{{ $k }}CircularQueue{ + data: buf[:cap(buf)], + tail: len(buf), // tail is here we insert + l: len(buf), + } +} + +// Enqueue adds an item to the queue. +func (q * {{- $k -}} CircularQueue) Enqueue(v {{ $k }}) { + // if full we must grow and insert together. This is an expensive op + if cap(q.data) > q.l {// no need to grow + if q.tail == len(q.data){ + q.tail = 0 + } + q.data[q.tail] = v + }else{ // we need to grow + buf := make([]{{ $k }}, cap(q.data)*2) + if q.head < q.tail{ + copy(buf, q.data[q.head:q.tail]) + } else { + partialWriteLen := copy(buf, q.data[q.head:]) + copy(buf[partialWriteLen:], q.data[:q.tail]) + } + q.head = 0 + q.tail = cap(q.data) + buf[q.tail] = v + q.data = buf + } + q.l++ + q.tail++ + return +} + +// Dequeue removes n items from the queue. If n is longer than the number of the items in the queue it will clear them all out. +func (q *{{ $k }}CircularQueue) Dequeue(n int) { + if n<=0{ + return + } + if q.l <= n{ + n = q.l + } + ni:=n + var fill {{ $k }} + if q.head>q.tail{ + for i:=q.head;i0;i++{ + q.data[i] = fill + ni-- + } + for i:=0;i0;i++{ + q.data[i] = fill + ni-- + } + } else { + for i:=q.head;i0;i++{ + q.data[i] = fill + ni-- + } + } + q.head+=n + if q.head>len(q.data){ + q.head -= len(q.data) + } + q.l-=n + if q.l==0{ + q.head = 0 + q.tail = 0 + } + return +} + +// Peek peeks i ahead of the current head of queue. It should be used in conjunction with .Len() to prevent a panic. +func (q *{{ $k }}CircularQueue) Peek(i int) {{ $k }} { + if i<0 || i>= q.l{ + panic("peek index is out of bounds") + } + p := q.head + i + + if p >= len(q.data) { + p-=len(q.data) + } + return q.data[p] +} + + +// Len returns the current number of items in the queue. +func (q *{{ $k }}CircularQueue) Len() int { + return q.l +} + +{{end}} +{{end}} + diff --git a/circularqueue.gen_test.go b/circularqueue.gen_test.go new file mode 100644 index 000000000..f067acec5 --- /dev/null +++ b/circularqueue.gen_test.go @@ -0,0 +1,214 @@ +// Generated by tmpl +// https://github.com/benbjohnson/tmpl +// +// DO NOT EDIT! +// Source: circularqueue.gen.go.tmpl + +//lint:file-ignore U1000 this is generated code +package kapacitor + +// circularIntCircularQueue defines a circular queue, always use the contructor to create one. +type circularIntCircularQueue struct { + data []circularInt + head int + tail int + l int +} + +// newCircularIntconstructs a Circular Queue +// with a given buffer buf. It is ok for buf to be nil. +func newCircularIntCircularQueue(buf ...circularInt) *circularIntCircularQueue { + // if we have a useless buffer, make one that is at least useful + if cap(buf) < 4 { + buf = append(make([]circularInt, 0, 4), buf...) + } + return &circularIntCircularQueue{ + data: buf[:cap(buf)], + tail: len(buf), // tail is here we insert + l: len(buf), + } +} + +// Enqueue adds an item to the queue. +func (q *circularIntCircularQueue) Enqueue(v circularInt) { + // if full we must grow and insert together. This is an expensive op + if cap(q.data) > q.l { // no need to grow + if q.tail == len(q.data) { + q.tail = 0 + } + q.data[q.tail] = v + } else { // we need to grow + buf := make([]circularInt, cap(q.data)*2) + if q.head < q.tail { + copy(buf, q.data[q.head:q.tail]) + } else { + partialWriteLen := copy(buf, q.data[q.head:]) + copy(buf[partialWriteLen:], q.data[:q.tail]) + } + q.head = 0 + q.tail = cap(q.data) + buf[q.tail] = v + q.data = buf + } + q.l++ + q.tail++ + return +} + +// Dequeue removes n items from the queue. If n is longer than the number of the items in the queue it will clear them all out. +func (q *circularIntCircularQueue) Dequeue(n int) { + if n <= 0 { + return + } + if q.l <= n { + n = q.l + } + ni := n + var fill circularInt + if q.head > q.tail { + for i := q.head; i < len(q.data) && ni > 0; i++ { + q.data[i] = fill + ni-- + } + for i := 0; i < q.tail && ni > 0; i++ { + q.data[i] = fill + ni-- + } + } else { + for i := q.head; i < q.tail && ni > 0; i++ { + q.data[i] = fill + ni-- + } + } + q.head += n + if q.head > len(q.data) { + q.head -= len(q.data) + } + q.l -= n + if q.l == 0 { + q.head = 0 + q.tail = 0 + } + return +} + +// Peek peeks i ahead of the current head of queue. It should be used in conjunction with .Len() to prevent a panic. +func (q *circularIntCircularQueue) Peek(i int) circularInt { + if i < 0 || i >= q.l { + panic("peek index is out of bounds") + } + p := q.head + i + + if p >= len(q.data) { + p -= len(q.data) + } + return q.data[p] +} + +// Len returns the current number of items in the queue. +func (q *circularIntCircularQueue) Len() int { + return q.l +} + +// circularIntPtrCircularQueue defines a circular queue, always use the contructor to create one. +type circularIntPtrCircularQueue struct { + data []circularIntPtr + head int + tail int + l int +} + +// newCircularIntPtrconstructs a Circular Queue +// with a given buffer buf. It is ok for buf to be nil. +func newCircularIntPtrCircularQueue(buf ...circularIntPtr) *circularIntPtrCircularQueue { + // if we have a useless buffer, make one that is at least useful + if cap(buf) < 4 { + buf = append(make([]circularIntPtr, 0, 4), buf...) + } + return &circularIntPtrCircularQueue{ + data: buf[:cap(buf)], + tail: len(buf), // tail is here we insert + l: len(buf), + } +} + +// Enqueue adds an item to the queue. +func (q *circularIntPtrCircularQueue) Enqueue(v circularIntPtr) { + // if full we must grow and insert together. This is an expensive op + if cap(q.data) > q.l { // no need to grow + if q.tail == len(q.data) { + q.tail = 0 + } + q.data[q.tail] = v + } else { // we need to grow + buf := make([]circularIntPtr, cap(q.data)*2) + if q.head < q.tail { + copy(buf, q.data[q.head:q.tail]) + } else { + partialWriteLen := copy(buf, q.data[q.head:]) + copy(buf[partialWriteLen:], q.data[:q.tail]) + } + q.head = 0 + q.tail = cap(q.data) + buf[q.tail] = v + q.data = buf + } + q.l++ + q.tail++ + return +} + +// Dequeue removes n items from the queue. If n is longer than the number of the items in the queue it will clear them all out. +func (q *circularIntPtrCircularQueue) Dequeue(n int) { + if n <= 0 { + return + } + if q.l <= n { + n = q.l + } + ni := n + var fill circularIntPtr + if q.head > q.tail { + for i := q.head; i < len(q.data) && ni > 0; i++ { + q.data[i] = fill + ni-- + } + for i := 0; i < q.tail && ni > 0; i++ { + q.data[i] = fill + ni-- + } + } else { + for i := q.head; i < q.tail && ni > 0; i++ { + q.data[i] = fill + ni-- + } + } + q.head += n + if q.head > len(q.data) { + q.head -= len(q.data) + } + q.l -= n + if q.l == 0 { + q.head = 0 + q.tail = 0 + } + return +} + +// Peek peeks i ahead of the current head of queue. It should be used in conjunction with .Len() to prevent a panic. +func (q *circularIntPtrCircularQueue) Peek(i int) circularIntPtr { + if i < 0 || i >= q.l { + panic("peek index is out of bounds") + } + p := q.head + i + + if p >= len(q.data) { + p -= len(q.data) + } + return q.data[p] +} + +// Len returns the current number of items in the queue. +func (q *circularIntPtrCircularQueue) Len() int { + return q.l +} diff --git a/circularqueue_test.go b/circularqueue_test.go new file mode 100644 index 000000000..130c45c8e --- /dev/null +++ b/circularqueue_test.go @@ -0,0 +1,245 @@ +package kapacitor + +import ( + "math" + "reflect" + "runtime" + "sort" + "sync" + "testing" + "time" +) + +type circularInt int +type circularIntPtr *int + +//go:generate tmpl --o=circularqueue.gen_test.go -data "[ \"circularInt\" , \"circularIntPtr\" ]" circularqueue.gen.go.tmpl + +func Test_intCircularBufPeek(t *testing.T) { + exp := []circularInt{1, 2, 3, 4, 5, 6, 7} + peekRes := []circularInt{} + q := newCircularIntCircularQueue([]circularInt{1, 2, 3, 4, 5, 6, 7}...) + for i := 0; i < q.Len(); i++ { + peekRes = append(peekRes, q.Peek(i)) + } + if !reflect.DeepEqual(peekRes, exp) { + t.Errorf("expected peeking, we would see %v, got %v", exp, peekRes) + + } + peekRes = []circularInt{} + for i := 0; i < q.Len(); i++ { + peekRes = append(peekRes, q.Peek(i)) + + } + + if !reflect.DeepEqual(peekRes, exp) { + t.Errorf("expected peeking after next, we would see %v, got %v", exp, peekRes) + + } + +} + +func Test_intCircularBuf(t *testing.T) { + cases := []struct { + starting []circularInt + name string + expected []circularInt + expectedPrePeek [][]circularInt + expectedPeek [][]circularInt + add [][]circularInt + dequeueTimes []int + }{ + + { + starting: []circularInt{1, 2, 3, 4, 5, 6, 7}, + name: "remove everything but one then fill it up again", + expected: []circularInt{1, 2, 3, 4, 5, 6, 7}, + expectedPeek: [][]circularInt{{6, 7}, {}, {8, 9, 10, 11}}, + add: [][]circularInt{{}, {}, {8, 9, 10, 11}}, + dequeueTimes: []int{5, 3, 0}, + }, + { + starting: []circularInt{1, 2, 3, 4, 5, 6, 7}, + name: "regular", + expected: []circularInt{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, + expectedPeek: [][]circularInt{{11}}, + add: [][]circularInt{{8, 9, 10, 11}}, + dequeueTimes: []int{10}, + }, + { + starting: nil, + name: "empty", + expected: []circularInt{}, + expectedPeek: [][]circularInt{{}}, + add: [][]circularInt{nil}, + dequeueTimes: []int{1}, + }, + { + starting: nil, + name: "empty way past zero", + expected: []circularInt{}, + expectedPeek: [][]circularInt{{}, {}, {}}, + add: [][]circularInt{nil, nil, nil}, + dequeueTimes: []int{1, 2, 4}, + }, + { + starting: nil, + name: "add to empty", + expected: []circularInt{}, + expectedPeek: [][]circularInt{{1}}, + add: [][]circularInt{{1}}, + dequeueTimes: []int{0}, + }, + { + starting: []circularInt{1, 2, 3, 4, 5, 6, 7}, + name: "remove then add", + expected: []circularInt{1, 2, 3, 4, 5, 6}, + expectedPeek: [][]circularInt{{6, 7}, {7, 8, 9, 10}}, + add: [][]circularInt{nil, {8, 9, 10}}, + dequeueTimes: []int{5, 1}, + }, + { + starting: []circularInt{1, 2, 3, 4, 5, 6, 7}, + name: "remove then add #2", + expected: []circularInt{1, 2, 3, 4, 5, 6}, + expectedPeek: [][]circularInt{{6, 7, 8, 9}, {7, 8, 9, 10, 11, 12}}, + add: [][]circularInt{{8, 9}, {10, 11, 12}}, + dequeueTimes: []int{5, 1}, + }, + { + starting: []circularInt{1, 2, 3, 4, 5, 6, 7}, + name: "remove then add #3", + expected: []circularInt{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}, + expectedPeek: [][]circularInt{{6, 7, 8}, {}, {13}}, + add: [][]circularInt{{8}, {9, 10, 11}, {12, 13}}, + dequeueTimes: []int{5, 7, 1}, + }, + { + starting: []circularInt{1, 2, 3, 4, 5, 6, 7}, + name: "remove then add #4", + expected: []circularInt{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}, + expectedPeek: [][]circularInt{{6, 7, 8}, {11}, {12}}, + add: [][]circularInt{{8}, {9, 10, 11}, {12}}, + dequeueTimes: []int{5, 5, 1}, + }, + { + starting: []circularInt{1, 2, 3, 4, 5, 6, 7}, + name: "remove too many too early then add one in", + expected: []circularInt{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}, + expectedPeek: [][]circularInt{{6, 7, 8}, {}, {12}}, + add: [][]circularInt{{8}, {9, 10, 11}, {12}}, + dequeueTimes: []int{5, 33, 0}, + }, + { + starting: []circularInt{1, 2, 3, 4, 5, 6, 7}, + name: "remove everyone", + expected: []circularInt{1, 2, 3, 4, 5, 6, 7}, + expectedPeek: [][]circularInt{{6, 7}, {}}, + add: [][]circularInt{{}, {}}, + dequeueTimes: []int{5, 3}, + }, + { + starting: []circularInt{1, 2, 3, 4, 5, 6, 7}, + name: "remove everything but one then fill it up again", + expected: []circularInt{1, 2, 3, 4, 5, 6, 7}, + expectedPeek: [][]circularInt{{6, 7}, {}, {8, 9, 10, 11}}, + add: [][]circularInt{{}, {}, {8, 9, 10, 11}}, + dequeueTimes: []int{5, 3, 0}, + }, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + q := newCircularIntCircularQueue(c.starting...) + res := []circularInt{} + for j := 0; j < len(c.dequeueTimes); j++ { + for i := range c.add[j] { + q.Enqueue(c.add[j][i]) + } + peekRes := []circularInt{} + if len(peekRes) > 0 { + for i := 0; i < q.Len(); i++ { + peekRes = append(peekRes, q.Peek(i)) + } + if !reflect.DeepEqual(peekRes, c.expectedPrePeek[j]) { + t.Errorf("expected peeking before we called next, on step %d we would see %v, got %v", j, c.expectedPeek[j], peekRes) + } + } + for i := 0; i < c.dequeueTimes[j]; i++ { + if q.Len() > 0 { + res = append(res, q.Peek(0)) + q.Dequeue(1) + } + } + peekRes = []circularInt{} + for i := 0; i < q.Len(); i++ { + peekRes = append(peekRes, q.Peek(i)) + } + if !reflect.DeepEqual(peekRes, c.expectedPeek[j]) { + t.Errorf("expected peeking, on step %d we would see %v, got %v", j, c.expectedPeek[j], peekRes) + } + } + if !reflect.DeepEqual(res, c.expected) { + t.Errorf("expected %v, got %v", c.expected, res) + } + }) + + } +} + +func Test_leakCircularBuf(t *testing.T) { + if testing.Short() { + t.Skip("Testing for leaks can be slow, because of the way finalizers work") + } + finalizedItems := []int{} // this can't be pointers because we need the objects to leave memory + finalizedLock := &sync.Mutex{} + expectedFinalizedItems := []int{} + // fill the expectedFinalizedItems + for i := 0; i < 20; i++ { + expectedFinalizedItems = append(expectedFinalizedItems, i) + } + + q := newCircularIntPtrCircularQueue(nil) + for i := 0; i < len(expectedFinalizedItems); i++ { + i := i // make i a local object + item := circularIntPtr(&i) + runtime.SetFinalizer(item, func(q circularIntPtr) { + // if the finalizer is called, that means that the GC believes the objects should be freed + finalizedLock.Lock() + finalizedItems = append(finalizedItems, *q) + finalizedLock.Unlock() + }) + q.Enqueue(item) + } + + // go through the queue till it is empty + q.Dequeue(q.Len()) + + // the items should eventually be collected. + // sometimes they won't be because the GC is optimizing for low latency so we try a bunch + for i := 0; i < 100; i++ { + finalizedLock.Lock() + l := len(finalizedItems) + finalizedLock.Unlock() + if l == len(expectedFinalizedItems) { + break + } + runtime.GC() + // we have to sleep here because finalizers are async + time.Sleep(50 * time.Millisecond) + } + if len(finalizedItems) != len(expectedFinalizedItems) { + t.Errorf("expected %d objects to be freed, but got %d", len(expectedFinalizedItems), len(finalizedItems)) + } + + sort.Ints(finalizedItems) + if !reflect.DeepEqual(finalizedItems, expectedFinalizedItems) { + t.Errorf("The wrong items were finalized expected %v got %v", expectedFinalizedItems, finalizedItems) + } + // we don't want q to be freed above, when we are checking if the elements are freed, + // so what is below is to prevent the calls to runtime.GC from freeing the whole queue early + // the code below isn't important, just that it does something with q.data, and doesn't get + // elided out by the compiler + if len(q.data) == math.MaxInt32 { + panic(q) + } +} diff --git a/integrations/streamer_test.go b/integrations/streamer_test.go index 3bd893007..4531ef0c5 100644 --- a/integrations/streamer_test.go +++ b/integrations/streamer_test.go @@ -5773,7 +5773,6 @@ cpuT }, }, } - testStreamerWithOutput(t, "TestStream_Union", script, 15*time.Second, er, false, nil) } @@ -13617,7 +13616,6 @@ func testStreamerWithOutput( if err != nil { t.Error(err) } - // Get the result output, err := et.GetOutput(name) if err != nil { diff --git a/join.go b/join.go index e812df184..41d1e2c3a 100644 --- a/join.go +++ b/join.go @@ -13,6 +13,8 @@ import ( "github.com/pkg/errors" ) +//go:generate tmpl -data "[\"srcPoint\",\"joinsetPtr\"]" -o=join_circularqueues.gen.go circularqueue.gen.go.tmpl + type JoinNode struct { node j *pipeline.JoinNode @@ -26,9 +28,9 @@ type JoinNode struct { lowMarks map[srcGroup]time.Time // Buffer for caching points that need to be matched with specific points. - matchGroupsBuffer map[models.GroupID][]srcPoint + matchGroupsBuffer map[models.GroupID]*srcPointCircularQueue // Buffer for caching specific points until their match arrivces. - specificGroupsBuffer map[models.GroupID][]srcPoint + specificGroupsBuffer map[models.GroupID]*srcPointCircularQueue reported map[int]bool allReported bool @@ -40,8 +42,8 @@ func newJoinNode(et *ExecutingTask, n *pipeline.JoinNode, d NodeDiagnostic) (*Jo j: n, node: node{Node: n, et: et, diag: d}, groups: make(map[models.GroupID]*joinGroup), - matchGroupsBuffer: make(map[models.GroupID][]srcPoint), - specificGroupsBuffer: make(map[models.GroupID][]srcPoint), + matchGroupsBuffer: make(map[models.GroupID]*srcPointCircularQueue), + specificGroupsBuffer: make(map[models.GroupID]*srcPointCircularQueue), lowMarks: make(map[srcGroup]time.Time), reported: make(map[int]bool), } @@ -169,18 +171,21 @@ func (n *JoinNode) matchPoints(p srcPoint) { // Send all cached specific point that won't match anymore. var i int buf := n.specificGroupsBuffer[groupId] - l := len(buf) - for i = 0; i < l; i++ { - st := buf[i].Msg.Time().Round(n.j.Tolerance) - if st.Before(lowMark) { - // Send point by itself since it won't get a match. - n.sendSpecificPoint(buf[i]) - } else { - break + if buf != nil { + l := buf.Len() + for i = 0; i < l; i++ { + pt := buf.Peek(i) + st := pt.Msg.Time().Round(n.j.Tolerance) + if st.Before(lowMark) { + // Send point by itself since it won't get a match. + n.sendSpecificPoint(pt) + } else { + break + } } + // Remove all sent points. + buf.Dequeue(i) } - // Remove all sent points. - n.specificGroupsBuffer[groupId] = buf[i:] } if len(p.Msg.Dimensions().TagNames) > len(n.j.Dimensions) { @@ -193,25 +198,27 @@ func (n *JoinNode) matchPoints(p srcPoint) { // Also purge any old match points. matches := n.matchGroupsBuffer[groupId] matched := false - var i int - l := len(matches) - for i = 0; i < l; i++ { - match := matches[i] - pt := match.Msg.Time().Round(n.j.Tolerance) - if pt.Equal(t) { - // Option 1, send both points - n.sendMatchPoint(p, match) - matched = true + if matches != nil { + var i int + l := matches.Len() + for i = 0; i < l; i++ { + match := matches.Peek(i) + pt := match.Msg.Time().Round(n.j.Tolerance) + if pt.Equal(t) { + // Option 1, send both points + n.sendMatchPoint(p, match) + matched = true + } + if !pt.Before(lowMark) { + break + } } - if !pt.Before(lowMark) { - break + if n.allReported { + // Can't trust lowMark until all parents have reported. + // Remove any unneeded match points. + n.matchGroupsBuffer[groupId].Dequeue(i) } } - if n.allReported { - // Can't trust lowMark until all parents have reported. - // Remove any unneeded match points. - n.matchGroupsBuffer[groupId] = matches[i:] - } // If the point didn't match that leaves us with options 2 and 3. if !matched { @@ -222,27 +229,31 @@ func (n *JoinNode) matchPoints(p srcPoint) { } else { // Option 2 // Cache this point for when its match arrives. - n.specificGroupsBuffer[groupId] = append(n.specificGroupsBuffer[groupId], p) + n.getOrCreateSpecificGroup(groupId).Enqueue(p) } } + } else { // Cache match point. - n.matchGroupsBuffer[groupId] = append(n.matchGroupsBuffer[groupId], p) + n.getOrCreateMatchGroup(groupId).Enqueue(p) // Send all specific points that match, to the group. var i int buf := n.specificGroupsBuffer[groupId] - l := len(buf) - for i = 0; i < l; i++ { - st := buf[i].Msg.Time().Round(n.j.Tolerance) - if st.Equal(t) { - n.sendMatchPoint(buf[i], p) - } else { - break + if buf != nil { + l := buf.Len() + for i = 0; i < l; i++ { + pt := buf.Peek(i) + st := pt.Msg.Time().Round(n.j.Tolerance) + if st.Equal(t) { + n.sendMatchPoint(pt, p) + } else { + break + } } + // Remove all sent points + n.specificGroupsBuffer[groupId].Dequeue(i) } - // Remove all sent points - n.specificGroupsBuffer[groupId] = buf[i:] } } @@ -290,16 +301,37 @@ func (n *JoinNode) getOrCreateGroup(groupID models.GroupID) *joinGroup { func (n *JoinNode) newGroup(count int) *joinGroup { return &joinGroup{ n: n, - sets: make(map[time.Time][]*joinset), + sets: make(map[time.Time]*joinsetPtrCircularQueue), head: make([]time.Time, count), } } +func (n *JoinNode) getOrCreateMatchGroup(id models.GroupID) *srcPointCircularQueue { + buf := n.matchGroupsBuffer[id] + if buf == nil { + buf = newSrcPointCircularQueue() + n.matchGroupsBuffer[id] = buf + } + + return buf +} + +func (n *JoinNode) getOrCreateSpecificGroup(id models.GroupID) *srcPointCircularQueue { + buf := n.specificGroupsBuffer[id] + if buf == nil { + buf = newSrcPointCircularQueue() + n.specificGroupsBuffer[id] = buf + } + return buf +} + +type joinsetPtr *joinset + // handles emitting joined sets once enough data has arrived from parents. type joinGroup struct { n *JoinNode - sets map[time.Time][]*joinset + sets map[time.Time]*joinsetPtrCircularQueue head []time.Time oldestTime time.Time } @@ -318,21 +350,20 @@ func (g *joinGroup) Collect(src int, p timeMessage) error { var set *joinset sets := g.sets[t] - if len(sets) == 0 { - set = g.newJoinset(t) - sets = append(sets, set) + if sets == nil { + sets = newJoinsetPtrCircularQueue(g.newJoinset(t)) g.sets[t] = sets } - for i := 0; i < len(sets); i++ { - if !sets[i].Has(src) { - set = sets[i] + l := sets.Len() + for i := 0; i < l; i++ { + if x := sets.Peek(i); !((*joinset)(x)).Has(src) { + set = x break } } if set == nil { set = g.newJoinset(t) - sets = append(sets, set) - g.sets[t] = sets + g.sets[t].Enqueue(set) } set.Set(src, p) @@ -387,9 +418,11 @@ func (g *joinGroup) emit(onlyReadySets bool) error { } sets := g.sets[g.oldestTime] i := 0 - for ; i < len(sets); i++ { - if sets[i].Ready() || !onlyReadySets { - err := g.emitJoinedSet(sets[i]) + l := sets.Len() + for ; i < l; i++ { + set := (*joinset)(sets.Peek(i)) + if set.Ready() || !onlyReadySets { + err := g.emitJoinedSet(set) if err != nil { return err } @@ -397,10 +430,10 @@ func (g *joinGroup) emit(onlyReadySets bool) error { break } } - if i == len(sets) { + if i == sets.Len() { delete(g.sets, g.oldestTime) } else { - g.sets[g.oldestTime] = sets[i:] + g.sets[g.oldestTime].Dequeue(i) } g.oldestTime = time.Time{} diff --git a/join_circularqueues.gen.go b/join_circularqueues.gen.go new file mode 100644 index 000000000..d29a01de6 --- /dev/null +++ b/join_circularqueues.gen.go @@ -0,0 +1,214 @@ +// Generated by tmpl +// https://github.com/benbjohnson/tmpl +// +// DO NOT EDIT! +// Source: circularqueue.gen.go.tmpl + +//lint:file-ignore U1000 this is generated code +package kapacitor + +// srcPointCircularQueue defines a circular queue, always use the contructor to create one. +type srcPointCircularQueue struct { + data []srcPoint + head int + tail int + l int +} + +// newSrcPointconstructs a Circular Queue +// with a given buffer buf. It is ok for buf to be nil. +func newSrcPointCircularQueue(buf ...srcPoint) *srcPointCircularQueue { + // if we have a useless buffer, make one that is at least useful + if cap(buf) < 4 { + buf = append(make([]srcPoint, 0, 4), buf...) + } + return &srcPointCircularQueue{ + data: buf[:cap(buf)], + tail: len(buf), // tail is here we insert + l: len(buf), + } +} + +// Enqueue adds an item to the queue. +func (q *srcPointCircularQueue) Enqueue(v srcPoint) { + // if full we must grow and insert together. This is an expensive op + if cap(q.data) > q.l { // no need to grow + if q.tail == len(q.data) { + q.tail = 0 + } + q.data[q.tail] = v + } else { // we need to grow + buf := make([]srcPoint, cap(q.data)*2) + if q.head < q.tail { + copy(buf, q.data[q.head:q.tail]) + } else { + partialWriteLen := copy(buf, q.data[q.head:]) + copy(buf[partialWriteLen:], q.data[:q.tail]) + } + q.head = 0 + q.tail = cap(q.data) + buf[q.tail] = v + q.data = buf + } + q.l++ + q.tail++ + return +} + +// Dequeue removes n items from the queue. If n is longer than the number of the items in the queue it will clear them all out. +func (q *srcPointCircularQueue) Dequeue(n int) { + if n <= 0 { + return + } + if q.l <= n { + n = q.l + } + ni := n + var fill srcPoint + if q.head > q.tail { + for i := q.head; i < len(q.data) && ni > 0; i++ { + q.data[i] = fill + ni-- + } + for i := 0; i < q.tail && ni > 0; i++ { + q.data[i] = fill + ni-- + } + } else { + for i := q.head; i < q.tail && ni > 0; i++ { + q.data[i] = fill + ni-- + } + } + q.head += n + if q.head > len(q.data) { + q.head -= len(q.data) + } + q.l -= n + if q.l == 0 { + q.head = 0 + q.tail = 0 + } + return +} + +// Peek peeks i ahead of the current head of queue. It should be used in conjunction with .Len() to prevent a panic. +func (q *srcPointCircularQueue) Peek(i int) srcPoint { + if i < 0 || i >= q.l { + panic("peek index is out of bounds") + } + p := q.head + i + + if p >= len(q.data) { + p -= len(q.data) + } + return q.data[p] +} + +// Len returns the current number of items in the queue. +func (q *srcPointCircularQueue) Len() int { + return q.l +} + +// joinsetPtrCircularQueue defines a circular queue, always use the contructor to create one. +type joinsetPtrCircularQueue struct { + data []joinsetPtr + head int + tail int + l int +} + +// newJoinsetPtrconstructs a Circular Queue +// with a given buffer buf. It is ok for buf to be nil. +func newJoinsetPtrCircularQueue(buf ...joinsetPtr) *joinsetPtrCircularQueue { + // if we have a useless buffer, make one that is at least useful + if cap(buf) < 4 { + buf = append(make([]joinsetPtr, 0, 4), buf...) + } + return &joinsetPtrCircularQueue{ + data: buf[:cap(buf)], + tail: len(buf), // tail is here we insert + l: len(buf), + } +} + +// Enqueue adds an item to the queue. +func (q *joinsetPtrCircularQueue) Enqueue(v joinsetPtr) { + // if full we must grow and insert together. This is an expensive op + if cap(q.data) > q.l { // no need to grow + if q.tail == len(q.data) { + q.tail = 0 + } + q.data[q.tail] = v + } else { // we need to grow + buf := make([]joinsetPtr, cap(q.data)*2) + if q.head < q.tail { + copy(buf, q.data[q.head:q.tail]) + } else { + partialWriteLen := copy(buf, q.data[q.head:]) + copy(buf[partialWriteLen:], q.data[:q.tail]) + } + q.head = 0 + q.tail = cap(q.data) + buf[q.tail] = v + q.data = buf + } + q.l++ + q.tail++ + return +} + +// Dequeue removes n items from the queue. If n is longer than the number of the items in the queue it will clear them all out. +func (q *joinsetPtrCircularQueue) Dequeue(n int) { + if n <= 0 { + return + } + if q.l <= n { + n = q.l + } + ni := n + var fill joinsetPtr + if q.head > q.tail { + for i := q.head; i < len(q.data) && ni > 0; i++ { + q.data[i] = fill + ni-- + } + for i := 0; i < q.tail && ni > 0; i++ { + q.data[i] = fill + ni-- + } + } else { + for i := q.head; i < q.tail && ni > 0; i++ { + q.data[i] = fill + ni-- + } + } + q.head += n + if q.head > len(q.data) { + q.head -= len(q.data) + } + q.l -= n + if q.l == 0 { + q.head = 0 + q.tail = 0 + } + return +} + +// Peek peeks i ahead of the current head of queue. It should be used in conjunction with .Len() to prevent a panic. +func (q *joinsetPtrCircularQueue) Peek(i int) joinsetPtr { + if i < 0 || i >= q.l { + panic("peek index is out of bounds") + } + p := q.head + i + + if p >= len(q.data) { + p -= len(q.data) + } + return q.data[p] +} + +// Len returns the current number of items in the queue. +func (q *joinsetPtrCircularQueue) Len() int { + return q.l +} diff --git a/task_master.go b/task_master.go index ec629fc75..694bda9f1 100644 --- a/task_master.go +++ b/task_master.go @@ -384,7 +384,7 @@ func (tm *TaskMaster) Drain() { tm.mu.Lock() defer tm.mu.Unlock() - for id, _ := range tm.taskToForkKeys { + for id := range tm.taskToForkKeys { tm.delFork(id) } } diff --git a/udf/agent/examples/moving_avg/moving_avg.go b/udf/agent/examples/moving_avg/moving_avg.go index a9fc22697..ce398f55a 100644 --- a/udf/agent/examples/moving_avg/moving_avg.go +++ b/udf/agent/examples/moving_avg/moving_avg.go @@ -32,7 +32,7 @@ func (a *avgState) update(value float64) float64 { l := len(a.Window) if a.Size == l { a.Avg += value/float64(l) - a.Window[0]/float64(l) - a.Window = a.Window[1:] + a.Window = append(a.Window[:0], a.Window[1:]...) } else { a.Avg = (value + float64(l)*a.Avg) / float64(l+1) } diff --git a/udf/agent/udf.pb.go b/udf/agent/udf.pb.go index 3b9dc7f1d..a4d947565 100644 --- a/udf/agent/udf.pb.go +++ b/udf/agent/udf.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.23.0 +// protoc-gen-go v1.25.0 // protoc v3.11.4 // source: udf.proto diff --git a/union.go b/union.go index 7ca5e75d9..636aee3ba 100644 --- a/union.go +++ b/union.go @@ -1,6 +1,7 @@ package kapacitor import ( + "sync" "time" "github.com/influxdata/kapacitor/edge" @@ -12,13 +13,15 @@ type UnionNode struct { u *pipeline.UnionNode // Buffer of points/batches from each source. - sources [][]timeMessage + sources []*timeMessageCircularQueue // the low water marks for each source. lowMarks []time.Time - - rename string + lock sync.Mutex + rename string } +//go:generate tmpl -data "[\"timeMessage\"]" -o=union_circularqueues.gen.go circularqueue.gen.go.tmpl + type timeMessage interface { edge.Message edge.TimeGetter @@ -39,7 +42,10 @@ func newUnionNode(et *ExecutingTask, n *pipeline.UnionNode, d NodeDiagnostic) (* func (n *UnionNode) runUnion([]byte) error { // Keep buffer of values from parents so they can be ordered. - n.sources = make([][]timeMessage, len(n.ins)) + n.sources = make([]*timeMessageCircularQueue, len(n.ins)) + for i := range n.ins { + n.sources[i] = newTimeMessageCircularQueue() + } n.lowMarks = make([]time.Time, len(n.ins)) consumer := edge.NewMultiConsumerWithStats(n.ins, n) @@ -57,7 +63,7 @@ func (n *UnionNode) BufferedBatch(src int, batch edge.BufferedBatchMessage) erro } // Add newest point to buffer - n.sources[src] = append(n.sources[src], batch) + n.sources[src].Enqueue(batch) // Emit the next values return n.emitReady(false) @@ -66,13 +72,14 @@ func (n *UnionNode) BufferedBatch(src int, batch edge.BufferedBatchMessage) erro func (n *UnionNode) Point(src int, p edge.PointMessage) error { n.timer.Start() defer n.timer.Stop() + if n.rename != "" { p = p.ShallowCopy() p.SetName(n.rename) } // Add newest point to buffer - n.sources[src] = append(n.sources[src], p) + n.sources[src].Enqueue(p) // Emit the next values return n.emitReady(false) @@ -83,7 +90,7 @@ func (n *UnionNode) Barrier(src int, b edge.BarrierMessage) error { defer n.timer.Stop() // Add newest point to buffer - n.sources[src] = append(n.sources[src], b) + n.sources[src].Enqueue(b) // Emit the next values return n.emitReady(false) @@ -96,6 +103,8 @@ func (n *UnionNode) Finish() error { func (n *UnionNode) emitReady(drain bool) error { emitted := true + var v timeMessage + var i int // Emit all points until nothing changes for emitted { emitted = false @@ -104,8 +113,8 @@ func (n *UnionNode) emitReady(drain bool) error { validSources := 0 for i, values := range n.sources { sourceMark := n.lowMarks[i] - if len(values) > 0 { - t := values[0].Time() + if values.Len() > 0 { + t := values.Peek(0).Time() if mark.IsZero() || t.Before(mark) { mark = t } @@ -126,14 +135,14 @@ func (n *UnionNode) emitReady(drain bool) error { // Unless we are draining the buffer than we can continue. return nil } - // Emit all values that are at or below the mark. - for i, values := range n.sources { - var j int - l := len(values) + for i = range n.sources { + l := n.sources[i].Len() + j := 0 for j = 0; j < l; j++ { - if !values[j].Time().After(mark) { - err := n.emit(values[j]) + v = n.sources[i].Peek(j) + if !v.Time().After(mark) { + err := n.emit(v) if err != nil { return err } @@ -143,13 +152,14 @@ func (n *UnionNode) emitReady(drain bool) error { break } } - // Drop values that were emitted - n.sources[i] = values[j:] + n.sources[i].Dequeue(j) } } return nil } +var q = 0 + func (n *UnionNode) emit(m edge.Message) error { n.timer.Pause() defer n.timer.Resume() diff --git a/union_circularqueues.gen.go b/union_circularqueues.gen.go new file mode 100644 index 000000000..aad7c8927 --- /dev/null +++ b/union_circularqueues.gen.go @@ -0,0 +1,111 @@ +// Generated by tmpl +// https://github.com/benbjohnson/tmpl +// +// DO NOT EDIT! +// Source: circularqueue.gen.go.tmpl + +//lint:file-ignore U1000 this is generated code +package kapacitor + +// timeMessageCircularQueue defines a circular queue, always use the contructor to create one. +type timeMessageCircularQueue struct { + data []timeMessage + head int + tail int + l int +} + +// newTimeMessageconstructs a Circular Queue +// with a given buffer buf. It is ok for buf to be nil. +func newTimeMessageCircularQueue(buf ...timeMessage) *timeMessageCircularQueue { + // if we have a useless buffer, make one that is at least useful + if cap(buf) < 4 { + buf = append(make([]timeMessage, 0, 4), buf...) + } + return &timeMessageCircularQueue{ + data: buf[:cap(buf)], + tail: len(buf), // tail is here we insert + l: len(buf), + } +} + +// Enqueue adds an item to the queue. +func (q *timeMessageCircularQueue) Enqueue(v timeMessage) { + // if full we must grow and insert together. This is an expensive op + if cap(q.data) > q.l { // no need to grow + if q.tail == len(q.data) { + q.tail = 0 + } + q.data[q.tail] = v + } else { // we need to grow + buf := make([]timeMessage, cap(q.data)*2) + if q.head < q.tail { + copy(buf, q.data[q.head:q.tail]) + } else { + partialWriteLen := copy(buf, q.data[q.head:]) + copy(buf[partialWriteLen:], q.data[:q.tail]) + } + q.head = 0 + q.tail = cap(q.data) + buf[q.tail] = v + q.data = buf + } + q.l++ + q.tail++ + return +} + +// Dequeue removes n items from the queue. If n is longer than the number of the items in the queue it will clear them all out. +func (q *timeMessageCircularQueue) Dequeue(n int) { + if n <= 0 { + return + } + if q.l <= n { + n = q.l + } + ni := n + var fill timeMessage + if q.head > q.tail { + for i := q.head; i < len(q.data) && ni > 0; i++ { + q.data[i] = fill + ni-- + } + for i := 0; i < q.tail && ni > 0; i++ { + q.data[i] = fill + ni-- + } + } else { + for i := q.head; i < q.tail && ni > 0; i++ { + q.data[i] = fill + ni-- + } + } + q.head += n + if q.head > len(q.data) { + q.head -= len(q.data) + } + q.l -= n + if q.l == 0 { + q.head = 0 + q.tail = 0 + } + return +} + +// Peek peeks i ahead of the current head of queue. It should be used in conjunction with .Len() to prevent a panic. +func (q *timeMessageCircularQueue) Peek(i int) timeMessage { + if i < 0 || i >= q.l { + panic("peek index is out of bounds") + } + p := q.head + i + + if p >= len(q.data) { + p -= len(q.data) + } + return q.data[p] +} + +// Len returns the current number of items in the queue. +func (q *timeMessageCircularQueue) Len() int { + return q.l +}