Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Remaining and Size to perf/ring Record #1167

Merged
merged 1 commit into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions perf/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ type Record struct {
// The number of samples which could not be output, since
// the ring buffer was full.
LostSamples uint64

// The minimum number of bytes remaining in the per-CPU buffer after this Record has been read.
// Negative for overwritable buffers.
Remaining int
}

// Read a record from a reader and tag it as being from the given CPU.
Expand Down Expand Up @@ -158,6 +162,8 @@ type Reader struct {

paused bool
overwritable bool

bufferSize int
}

// ReaderOptions control the behaviour of the user
Expand Down Expand Up @@ -216,6 +222,7 @@ func NewReaderWithOptions(array *ebpf.Map, perCPUBuffer int, opts ReaderOptions)
// bpf_perf_event_output checks which CPU an event is enabled on,
// but doesn't allow using a wildcard like -1 to specify "all CPUs".
// Hence we have to create a ring for each CPU.
bufferSize := 0
for i := 0; i < nCPU; i++ {
ring, err := newPerfEventRing(i, perCPUBuffer, opts.Watermark, opts.Overwritable)
if errors.Is(err, unix.ENODEV) {
Expand All @@ -224,6 +231,7 @@ func NewReaderWithOptions(array *ebpf.Map, perCPUBuffer int, opts ReaderOptions)
pauseFds = append(pauseFds, -1)
continue
}
bufferSize = ring.size()

if err != nil {
return nil, fmt.Errorf("failed to create perf ring for CPU %d: %v", i, err)
Expand Down Expand Up @@ -251,6 +259,7 @@ func NewReaderWithOptions(array *ebpf.Map, perCPUBuffer int, opts ReaderOptions)
eventHeader: make([]byte, perfEventHeaderSize),
pauseFds: pauseFds,
overwritable: opts.Overwritable,
bufferSize: bufferSize,
}
if err = pr.Resume(); err != nil {
return nil, err
Expand Down Expand Up @@ -430,6 +439,11 @@ func (pr *Reader) Resume() error {
return nil
}

// BufferSize is the size in bytes of each per-CPU buffer
func (pr *Reader) BufferSize() int {
return pr.bufferSize
}

// NB: Has to be preceded by a call to ring.loadHead.
func (pr *Reader) readRecordFromRing(rec *Record, ring *perfEventRing) error {
defer ring.writeTail()
Expand All @@ -439,6 +453,7 @@ func (pr *Reader) readRecordFromRing(rec *Record, ring *perfEventRing) error {
if pr.overwritable && (errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF)) {
return errEOR
}
rec.Remaining = ring.remaining()
return err
}

Expand Down
17 changes: 12 additions & 5 deletions perf/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,15 @@ func TestPerfReader(t *testing.T) {
}
defer rd.Close()

outputSamples(t, events, 5)
qt.Assert(t, rd.BufferSize(), qt.Equals, 4096)

checkRecord(t, rd)
outputSamples(t, events, 5, 5)

_, rem := checkRecord(t, rd)
qt.Assert(t, rem >= 5, qt.IsTrue, qt.Commentf("expected at least 5 Remaining"))

_, rem = checkRecord(t, rd)
qt.Assert(t, rem, qt.Equals, 0, qt.Commentf("expected zero Remaining"))

rd.SetDeadline(time.Now().Add(4 * time.Millisecond))
_, err = rd.Read()
Expand Down Expand Up @@ -155,7 +161,7 @@ func outputSamplesProg(tb testing.TB, events *ebpf.Map, sampleSizes ...byte) *eb
return prog
}

func checkRecord(tb testing.TB, rd *Reader) (id int) {
func checkRecord(tb testing.TB, rd *Reader) (id int, remaining int) {
tb.Helper()

rec, err := rd.Read()
Expand All @@ -172,7 +178,7 @@ func checkRecord(tb testing.TB, rd *Reader) (id int) {

// padding is ignored since it's value is undefined.

return int(rec.RawSample[1])
return int(rec.RawSample[1]), rec.Remaining
}

func TestPerfReaderLostSample(t *testing.T) {
Expand Down Expand Up @@ -305,8 +311,9 @@ func TestPerfReaderOverwritable(t *testing.T) {

nextID := maxEvents
for i := 0; i < maxEvents; i++ {
id := checkRecord(t, rd)
id, rem := checkRecord(t, rd)
qt.Assert(t, id, qt.Equals, nextID)
qt.Assert(t, rem, qt.Equals, -1)
nextID--
}
}
Expand Down
11 changes: 11 additions & 0 deletions perf/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ func createPerfEvent(cpu, watermark int, overwritable bool) (int, error) {
type ringReader interface {
loadHead()
size() int
remaining() int
writeTail()
Read(p []byte) (int, error)
}
Expand Down Expand Up @@ -157,6 +158,10 @@ func (rr *forwardReader) size() int {
return len(rr.ring)
}

func (rr *forwardReader) remaining() int {
return int((rr.head - rr.tail) & rr.mask)
}

func (rr *forwardReader) writeTail() {
// Commit the new tail. This lets the kernel know that
// the ring buffer has been consumed.
Expand Down Expand Up @@ -244,6 +249,12 @@ func (rr *reverseReader) size() int {
return len(rr.ring)
}

func (rr *reverseReader) remaining() int {
// remaining data is inaccurate for overwritable buffers
// once an overwrite happens, so return -1 here.
return -1
}

func (rr *reverseReader) writeTail() {
// We do not care about tail for over writable perf buffer.
// So, this function is noop.
Expand Down
9 changes: 9 additions & 0 deletions ringbuf/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ func (rh *ringbufHeader) dataLen() int {

type Record struct {
RawSample []byte

// The minimum number of bytes remaining in the ring buffer after this Record has been read.
Remaining int
}

// Read a record from an event ring.
Expand Down Expand Up @@ -98,6 +101,7 @@ func readRecord(rd *ringbufEventRing, rec *Record, buf []byte) error {

rd.storeConsumer()
rec.RawSample = rec.RawSample[:header.dataLen()]
rec.Remaining = rd.remaining()
return nil
}

Expand Down Expand Up @@ -231,3 +235,8 @@ func (r *Reader) ReadInto(rec *Record) error {
}
}
}

// BufferSize returns the size in bytes of the ring buffer
func (r *Reader) BufferSize() int {
return r.ring.size()
}
13 changes: 13 additions & 0 deletions ringbuf/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ func TestRingbufReader(t *testing.T) {
}
defer rd.Close()

if uint32(rd.BufferSize()) != 2*events.MaxEntries() {
t.Errorf("expected %d BufferSize, got %d", events.MaxEntries(), rd.BufferSize())
}

ret, _, err := prog.Test(internal.EmptyBPFContext)
testutils.SkipIfNotSupported(t, err)
if err != nil {
Expand All @@ -77,6 +81,15 @@ func TestRingbufReader(t *testing.T) {
t.Fatal("Can't read samples:", err)
}
raw[len(record.RawSample)] = record.RawSample
if len(raw) == len(tt.want) {
if record.Remaining != 0 {
t.Errorf("expected 0 Remaining, got %d", record.Remaining)
}
} else {
if record.Remaining == 0 {
t.Error("expected non-zero Remaining, got 0")
}
}
}

if diff := cmp.Diff(tt.want, raw); diff != "" {
Expand Down
11 changes: 11 additions & 0 deletions ringbuf/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,17 @@ func (rr *ringReader) isEmpty() bool {
return prod == cons
}

func (rr *ringReader) size() int {
return cap(rr.ring)
}

func (rr *ringReader) remaining() int {
cons := atomic.LoadUint64(rr.cons_pos)
prod := atomic.LoadUint64(rr.prod_pos)

return int((prod - cons) & rr.mask)
}

func (rr *ringReader) Read(p []byte) (int, error) {
prod := atomic.LoadUint64(rr.prod_pos)

Expand Down