Skip to content

Commit

Permalink
fix: memory leaks (influxdata#2489)
Browse files Browse the repository at this point in the history
There were a number of memory leaks of the form of `a = a[i:]`.
These were replaced with circular queue buffers, to prevent the leaks.
  • Loading branch information
docmerlin authored and alespour committed Mar 24, 2021
1 parent 5603974 commit 189d300
Show file tree
Hide file tree
Showing 12 changed files with 1,022 additions and 78 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@

### Bugfixes
- [#2498](https://github.com/influxdata/kapacitor/pull/2498): avoid infinite hang when closing Kakfa writer, this also prevents the timeout error on an http update to Kafka config.
## unreleased

### BugFixes
- [#2489](https://github.com/influxdata/kapacitor/pull/2489): Fix memory leaks in JoinNode and UnionNode.

## v1.5.8 [2021-01-11]

Expand Down
115 changes: 115 additions & 0 deletions circularqueue.gen.go.tmpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
//lint:file-ignore U1000 this is generated code
package kapacitor

{{with $types := .}}{{range $k := $types}}

// {{ $k }}CircularQueue defines a circular queue, always use the contructor to create one.
type {{ $k }}CircularQueue struct {
data []{{ $k }}
head int
tail int
l int
}


// {{ if eq (substr 0 1 $k ) (substr 0 1 $k | upper) -}} New {{- else -}} new {{- end -}}
{{- substr 0 1 $k | upper -}}{{- substr 1 (len $k) $k -}} constructs a Circular Queue
// with a given buffer buf. It is ok for buf to be nil.
func {{ if eq (substr 0 1 $k ) (substr 0 1 $k | upper) -}} New {{- else -}} new {{- end -}}
{{- substr 0 1 $k | upper -}}{{- substr 1 (len $k) $k -}} CircularQueue(buf ...{{ $k }}) *{{ $k }}CircularQueue {
// if we have a useless buffer, make one that is at least useful
if cap(buf) < 4{
buf = append(make([]{{ $k }}, 0, 4), buf...)
}
return &{{ $k }}CircularQueue{
data: buf[:cap(buf)],
tail: len(buf), // tail is here we insert
l: len(buf),
}
}

// Enqueue adds an item to the queue.
func (q * {{- $k -}} CircularQueue) Enqueue(v {{ $k }}) {
// if full we must grow and insert together. This is an expensive op
if cap(q.data) > q.l {// no need to grow
if q.tail == len(q.data){
q.tail = 0
}
q.data[q.tail] = v
}else{ // we need to grow
buf := make([]{{ $k }}, cap(q.data)*2)
if q.head < q.tail{
copy(buf, q.data[q.head:q.tail])
} else {
partialWriteLen := copy(buf, q.data[q.head:])
copy(buf[partialWriteLen:], q.data[:q.tail])
}
q.head = 0
q.tail = cap(q.data)
buf[q.tail] = v
q.data = buf
}
q.l++
q.tail++
return
}

// Dequeue removes n items from the queue. If n is longer than the number of the items in the queue it will clear them all out.
func (q *{{ $k }}CircularQueue) Dequeue(n int) {
if n<=0{
return
}
if q.l <= n{
n = q.l
}
ni:=n
var fill {{ $k }}
if q.head>q.tail{
for i:=q.head;i<len(q.data)&&ni>0;i++{
q.data[i] = fill
ni--
}
for i:=0;i<q.tail&&ni>0;i++{
q.data[i] = fill
ni--
}
} else {
for i:=q.head;i<q.tail&&ni>0;i++{
q.data[i] = fill
ni--
}
}
q.head+=n
if q.head>len(q.data){
q.head -= len(q.data)
}
q.l-=n
if q.l==0{
q.head = 0
q.tail = 0
}
return
}

// Peek peeks i ahead of the current head of queue. It should be used in conjunction with .Len() to prevent a panic.
func (q *{{ $k }}CircularQueue) Peek(i int) {{ $k }} {
if i<0 || i>= q.l{
panic("peek index is out of bounds")
}
p := q.head + i

if p >= len(q.data) {
p-=len(q.data)
}
return q.data[p]
}


// Len returns the current number of items in the queue.
func (q *{{ $k }}CircularQueue) Len() int {
return q.l
}

{{end}}
{{end}}

214 changes: 214 additions & 0 deletions circularqueue.gen_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
// Generated by tmpl
// https://github.com/benbjohnson/tmpl
//
// DO NOT EDIT!
// Source: circularqueue.gen.go.tmpl

//lint:file-ignore U1000 this is generated code
package kapacitor

// circularIntCircularQueue defines a circular queue, always use the contructor to create one.
type circularIntCircularQueue struct {
data []circularInt
head int
tail int
l int
}

// newCircularIntconstructs a Circular Queue
// with a given buffer buf. It is ok for buf to be nil.
func newCircularIntCircularQueue(buf ...circularInt) *circularIntCircularQueue {
// if we have a useless buffer, make one that is at least useful
if cap(buf) < 4 {
buf = append(make([]circularInt, 0, 4), buf...)
}
return &circularIntCircularQueue{
data: buf[:cap(buf)],
tail: len(buf), // tail is here we insert
l: len(buf),
}
}

// Enqueue adds an item to the queue.
func (q *circularIntCircularQueue) Enqueue(v circularInt) {
// if full we must grow and insert together. This is an expensive op
if cap(q.data) > q.l { // no need to grow
if q.tail == len(q.data) {
q.tail = 0
}
q.data[q.tail] = v
} else { // we need to grow
buf := make([]circularInt, cap(q.data)*2)
if q.head < q.tail {
copy(buf, q.data[q.head:q.tail])
} else {
partialWriteLen := copy(buf, q.data[q.head:])
copy(buf[partialWriteLen:], q.data[:q.tail])
}
q.head = 0
q.tail = cap(q.data)
buf[q.tail] = v
q.data = buf
}
q.l++
q.tail++
return
}

// Dequeue removes n items from the queue. If n is longer than the number of the items in the queue it will clear them all out.
func (q *circularIntCircularQueue) Dequeue(n int) {
if n <= 0 {
return
}
if q.l <= n {
n = q.l
}
ni := n
var fill circularInt
if q.head > q.tail {
for i := q.head; i < len(q.data) && ni > 0; i++ {
q.data[i] = fill
ni--
}
for i := 0; i < q.tail && ni > 0; i++ {
q.data[i] = fill
ni--
}
} else {
for i := q.head; i < q.tail && ni > 0; i++ {
q.data[i] = fill
ni--
}
}
q.head += n
if q.head > len(q.data) {
q.head -= len(q.data)
}
q.l -= n
if q.l == 0 {
q.head = 0
q.tail = 0
}
return
}

// Peek peeks i ahead of the current head of queue. It should be used in conjunction with .Len() to prevent a panic.
func (q *circularIntCircularQueue) Peek(i int) circularInt {
if i < 0 || i >= q.l {
panic("peek index is out of bounds")
}
p := q.head + i

if p >= len(q.data) {
p -= len(q.data)
}
return q.data[p]
}

// Len returns the current number of items in the queue.
func (q *circularIntCircularQueue) Len() int {
return q.l
}

// circularIntPtrCircularQueue defines a circular queue, always use the contructor to create one.
type circularIntPtrCircularQueue struct {
data []circularIntPtr
head int
tail int
l int
}

// newCircularIntPtrconstructs a Circular Queue
// with a given buffer buf. It is ok for buf to be nil.
func newCircularIntPtrCircularQueue(buf ...circularIntPtr) *circularIntPtrCircularQueue {
// if we have a useless buffer, make one that is at least useful
if cap(buf) < 4 {
buf = append(make([]circularIntPtr, 0, 4), buf...)
}
return &circularIntPtrCircularQueue{
data: buf[:cap(buf)],
tail: len(buf), // tail is here we insert
l: len(buf),
}
}

// Enqueue adds an item to the queue.
func (q *circularIntPtrCircularQueue) Enqueue(v circularIntPtr) {
// if full we must grow and insert together. This is an expensive op
if cap(q.data) > q.l { // no need to grow
if q.tail == len(q.data) {
q.tail = 0
}
q.data[q.tail] = v
} else { // we need to grow
buf := make([]circularIntPtr, cap(q.data)*2)
if q.head < q.tail {
copy(buf, q.data[q.head:q.tail])
} else {
partialWriteLen := copy(buf, q.data[q.head:])
copy(buf[partialWriteLen:], q.data[:q.tail])
}
q.head = 0
q.tail = cap(q.data)
buf[q.tail] = v
q.data = buf
}
q.l++
q.tail++
return
}

// Dequeue removes n items from the queue. If n is longer than the number of the items in the queue it will clear them all out.
func (q *circularIntPtrCircularQueue) Dequeue(n int) {
if n <= 0 {
return
}
if q.l <= n {
n = q.l
}
ni := n
var fill circularIntPtr
if q.head > q.tail {
for i := q.head; i < len(q.data) && ni > 0; i++ {
q.data[i] = fill
ni--
}
for i := 0; i < q.tail && ni > 0; i++ {
q.data[i] = fill
ni--
}
} else {
for i := q.head; i < q.tail && ni > 0; i++ {
q.data[i] = fill
ni--
}
}
q.head += n
if q.head > len(q.data) {
q.head -= len(q.data)
}
q.l -= n
if q.l == 0 {
q.head = 0
q.tail = 0
}
return
}

// Peek peeks i ahead of the current head of queue. It should be used in conjunction with .Len() to prevent a panic.
func (q *circularIntPtrCircularQueue) Peek(i int) circularIntPtr {
if i < 0 || i >= q.l {
panic("peek index is out of bounds")
}
p := q.head + i

if p >= len(q.data) {
p -= len(q.data)
}
return q.data[p]
}

// Len returns the current number of items in the queue.
func (q *circularIntPtrCircularQueue) Len() int {
return q.l
}
Loading

0 comments on commit 189d300

Please sign in to comment.