Skip to content

Commit

Permalink
Add Remaining and BufferSize to perf/ring buffers
Browse files Browse the repository at this point in the history
Signed-off-by: Bryce Kahle <bryce.kahle@datadoghq.com>
Signed-off-by: Lorenz Bauer <lmb@isovalent.com>
  • Loading branch information
brycekahle committed Oct 20, 2023
1 parent ed9bf3b commit 5a0992d
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 5 deletions.
15 changes: 15 additions & 0 deletions perf/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ type Record struct {
// The number of samples which could not be output, since
// the ring buffer was full.
LostSamples uint64

// The minimum number of bytes remaining in the per-CPU buffer after this Record has been read.
// Negative for overwritable buffers.
Remaining int
}

// Read a record from a reader and tag it as being from the given CPU.
Expand Down Expand Up @@ -158,6 +162,8 @@ type Reader struct {

paused bool
overwritable bool

bufferSize int
}

// ReaderOptions control the behaviour of the user
Expand Down Expand Up @@ -216,6 +222,7 @@ func NewReaderWithOptions(array *ebpf.Map, perCPUBuffer int, opts ReaderOptions)
// bpf_perf_event_output checks which CPU an event is enabled on,
// but doesn't allow using a wildcard like -1 to specify "all CPUs".
// Hence we have to create a ring for each CPU.
bufferSize := 0
for i := 0; i < nCPU; i++ {
ring, err := newPerfEventRing(i, perCPUBuffer, opts.Watermark, opts.Overwritable)
if errors.Is(err, unix.ENODEV) {
Expand All @@ -224,6 +231,7 @@ func NewReaderWithOptions(array *ebpf.Map, perCPUBuffer int, opts ReaderOptions)
pauseFds = append(pauseFds, -1)
continue
}
bufferSize = ring.size()

if err != nil {
return nil, fmt.Errorf("failed to create perf ring for CPU %d: %v", i, err)
Expand Down Expand Up @@ -251,6 +259,7 @@ func NewReaderWithOptions(array *ebpf.Map, perCPUBuffer int, opts ReaderOptions)
eventHeader: make([]byte, perfEventHeaderSize),
pauseFds: pauseFds,
overwritable: opts.Overwritable,
bufferSize: bufferSize,
}
if err = pr.Resume(); err != nil {
return nil, err
Expand Down Expand Up @@ -430,6 +439,11 @@ func (pr *Reader) Resume() error {
return nil
}

// BufferSize is the size in bytes of each per-CPU buffer
func (pr *Reader) BufferSize() int {
return pr.bufferSize
}

// NB: Has to be preceded by a call to ring.loadHead.
func (pr *Reader) readRecordFromRing(rec *Record, ring *perfEventRing) error {
defer ring.writeTail()
Expand All @@ -439,6 +453,7 @@ func (pr *Reader) readRecordFromRing(rec *Record, ring *perfEventRing) error {
if pr.overwritable && (errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF)) {
return errEOR
}
rec.Remaining = ring.remaining()
return err
}

Expand Down
17 changes: 12 additions & 5 deletions perf/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,15 @@ func TestPerfReader(t *testing.T) {
}
defer rd.Close()

outputSamples(t, events, 5)
qt.Assert(t, rd.BufferSize(), qt.Equals, 4096)

checkRecord(t, rd)
outputSamples(t, events, 5, 5)

_, rem := checkRecord(t, rd)
qt.Assert(t, rem >= 5, qt.IsTrue, qt.Commentf("expected at least 5 Remaining"))

_, rem = checkRecord(t, rd)
qt.Assert(t, rem, qt.Equals, 0, qt.Commentf("expected zero Remaining"))

rd.SetDeadline(time.Now().Add(4 * time.Millisecond))
_, err = rd.Read()
Expand Down Expand Up @@ -155,7 +161,7 @@ func outputSamplesProg(tb testing.TB, events *ebpf.Map, sampleSizes ...byte) *eb
return prog
}

func checkRecord(tb testing.TB, rd *Reader) (id int) {
func checkRecord(tb testing.TB, rd *Reader) (id int, remaining int) {
tb.Helper()

rec, err := rd.Read()
Expand All @@ -172,7 +178,7 @@ func checkRecord(tb testing.TB, rd *Reader) (id int) {

// padding is ignored since it's value is undefined.

return int(rec.RawSample[1])
return int(rec.RawSample[1]), rec.Remaining
}

func TestPerfReaderLostSample(t *testing.T) {
Expand Down Expand Up @@ -305,8 +311,9 @@ func TestPerfReaderOverwritable(t *testing.T) {

nextID := maxEvents
for i := 0; i < maxEvents; i++ {
id := checkRecord(t, rd)
id, rem := checkRecord(t, rd)
qt.Assert(t, id, qt.Equals, nextID)
qt.Assert(t, rem, qt.Equals, -1)
nextID--
}
}
Expand Down
11 changes: 11 additions & 0 deletions perf/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ func createPerfEvent(cpu, watermark int, overwritable bool) (int, error) {
type ringReader interface {
loadHead()
size() int
remaining() int
writeTail()
Read(p []byte) (int, error)
}
Expand Down Expand Up @@ -157,6 +158,10 @@ func (rr *forwardReader) size() int {
return len(rr.ring)
}

func (rr *forwardReader) remaining() int {
return int((rr.head - rr.tail) & rr.mask)
}

func (rr *forwardReader) writeTail() {
// Commit the new tail. This lets the kernel know that
// the ring buffer has been consumed.
Expand Down Expand Up @@ -244,6 +249,12 @@ func (rr *reverseReader) size() int {
return len(rr.ring)
}

func (rr *reverseReader) remaining() int {
// remaining data is inaccurate for overwritable buffers
// once an overwrite happens, so return -1 here.
return -1
}

func (rr *reverseReader) writeTail() {
// We do not care about tail for over writable perf buffer.
// So, this function is noop.
Expand Down
9 changes: 9 additions & 0 deletions ringbuf/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ func (rh *ringbufHeader) dataLen() int {

type Record struct {
RawSample []byte

// The minimum number of bytes remaining in the ring buffer after this Record has been read.
Remaining int
}

// Read a record from an event ring.
Expand Down Expand Up @@ -98,6 +101,7 @@ func readRecord(rd *ringbufEventRing, rec *Record, buf []byte) error {

rd.storeConsumer()
rec.RawSample = rec.RawSample[:header.dataLen()]
rec.Remaining = rd.remaining()
return nil
}

Expand Down Expand Up @@ -231,3 +235,8 @@ func (r *Reader) ReadInto(rec *Record) error {
}
}
}

// BufferSize returns the size in bytes of the ring buffer
func (r *Reader) BufferSize() int {
return r.ring.size()
}
13 changes: 13 additions & 0 deletions ringbuf/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ func TestRingbufReader(t *testing.T) {
}
defer rd.Close()

if uint32(rd.BufferSize()) != 2*events.MaxEntries() {
t.Errorf("expected %d BufferSize, got %d", events.MaxEntries(), rd.BufferSize())
}

ret, _, err := prog.Test(internal.EmptyBPFContext)
testutils.SkipIfNotSupported(t, err)
if err != nil {
Expand All @@ -77,6 +81,15 @@ func TestRingbufReader(t *testing.T) {
t.Fatal("Can't read samples:", err)
}
raw[len(record.RawSample)] = record.RawSample
if len(raw) == len(tt.want) {
if record.Remaining != 0 {
t.Errorf("expected 0 Remaining, got %d", record.Remaining)
}
} else {
if record.Remaining == 0 {
t.Error("expected non-zero Remaining, got 0")
}
}
}

if diff := cmp.Diff(tt.want, raw); diff != "" {
Expand Down
11 changes: 11 additions & 0 deletions ringbuf/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,17 @@ func (rr *ringReader) isEmpty() bool {
return prod == cons
}

func (rr *ringReader) size() int {
return cap(rr.ring)
}

func (rr *ringReader) remaining() int {
cons := atomic.LoadUint64(rr.cons_pos)
prod := atomic.LoadUint64(rr.prod_pos)

return int((prod - cons) & rr.mask)
}

func (rr *ringReader) Read(p []byte) (int, error) {
prod := atomic.LoadUint64(rr.prod_pos)

Expand Down

0 comments on commit 5a0992d

Please sign in to comment.