Skip to content

Commit

Permalink
Create copies of net.IP and net.HardwareAddr when decoding IE values (#…
Browse files Browse the repository at this point in the history
…330)

When using the provided slice (provided by the collector) directly, the
garbage collector will not be able to release the underlying array while
references to the net objects exist. The underlying array is allocated
when reading the message from the TCP connection and can be 100s of
bytes in size. The collector will typically run aggregation and keep
references to the data record and its IEs for some time. During that
time, we cannot release the packet buffer, while new packets keep coming
in. To avoid the issue, we create a new slice, with a new underlying
array, and the packet buffer can be releases as soon as all IEs are
instantiated.

Signed-off-by: Antonin Bas <abas@vmware.com>
  • Loading branch information
antoninbas authored and heanlan committed Dec 7, 2023
1 parent 27b72b2 commit f67aac2
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 2 deletions.
60 changes: 60 additions & 0 deletions pkg/collector/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ import (
"crypto/tls"
"crypto/x509"
"net"
"runtime"
"sync"
"testing"
"time"

"github.com/pion/dtls/v2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/util/wait"

"github.com/vmware/go-ipfix/pkg/entities"
Expand Down Expand Up @@ -144,6 +146,64 @@ func TestTCPCollectingProcess_ReceiveDataRecord(t *testing.T) {
assert.Equal(t, int64(1), cp.GetNumRecordsReceived())
}

// This test was added to easily measure memory usage when collecting and storing data records.
func TestTCPCollectingProcess_ReceiveDataRecordsMemoryUsage(t *testing.T) {
input := getCollectorInput(tcpTransport, false, false)
cp, err := InitCollectingProcess(input)
require.NoError(t, err)
// Add the templates before sending data record
cp.addTemplate(uint32(1), uint16(256), elementsWithValueIPv4)

go cp.Start()

// wait until collector is ready
waitForCollectorReady(t, cp)

var conn net.Conn
collectorAddr := cp.GetAddress()
conn, err = net.Dial(collectorAddr.Network(), collectorAddr.String())
require.NoError(t, err)

const numRecords = 1000

// We collect the IEs containing the source IPv4 address from all the records received by
// the collector. We need to make sure that we access the values from this slice *after*
// running the garbage collector and collecting memory stats, otherwise everything may be
// garbage collected too early and the results of the test will be inaccurate.
ies := make([]entities.InfoElementWithValue, 0, numRecords)

t.Logf("Data packet length: %d", len(validDataPacket))

for i := 0; i < numRecords; i++ {
conn.Write(validDataPacket)
message := <-cp.GetMsgChan()
set := message.GetSet()
require.NotNil(t, set)
records := set.GetRecords()
require.NotEmpty(t, records)
ie, _, exist := records[0].GetInfoElementWithValue("sourceIPv4Address")
require.True(t, exist)
ies = append(ies, ie)
}

conn.Close()
cp.Stop()

// Force the GC to run before collecting memory stats
runtime.GC()
var m runtime.MemStats
runtime.ReadMemStats(&m)

ipAddress := net.IP([]byte{1, 2, 3, 4})
for _, ie := range ies {
ip := ie.GetIPAddressValue()
assert.Equal(t, ipAddress, ip)
}

t.Logf("Live objects: %d\n", m.Mallocs-m.Frees)
t.Logf("Bytes of allocated heap objects: %d\n", m.HeapAlloc)
}

func TestUDPCollectingProcess_ReceiveDataRecord(t *testing.T) {
input := getCollectorInput(udpTransport, false, false)
cp, err := InitCollectingProcess(input)
Expand Down
20 changes: 18 additions & 2 deletions pkg/entities/ie.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,9 +318,25 @@ func DecodeAndCreateInfoElementWithValue(element *InfoElement, value []byte) (In
case DateTimeMicroseconds, DateTimeNanoseconds:
return nil, fmt.Errorf("API does not support micro and nano seconds types yet")
case MacAddress:
return NewMacAddressInfoElement(element, value), nil
if value == nil {
return NewMacAddressInfoElement(element, nil), nil
} else {
// make sure that we make a copy of the slice, instead of using it as is
// otherwise the underlying array for value may not be GC'd until the IE is GC'd
// the underlying array may be much larger than the value slice itself
addr := append([]byte{}, value...)
return NewMacAddressInfoElement(element, addr), nil
}
case Ipv4Address, Ipv6Address:
return NewIPAddressInfoElement(element, value), nil
if value == nil {
return NewIPAddressInfoElement(element, nil), nil
} else {
// make sure that we make a copy of the slice, instead of using it as is
// otherwise the underlying array for value may not be GC'd until the IE is GC'd
// the underlying array may be much larger than the value slice itself
addr := append([]byte{}, value...)
return NewIPAddressInfoElement(element, addr), nil
}
case String:
var val string
if value == nil {
Expand Down

0 comments on commit f67aac2

Please sign in to comment.