Skip to content

Commit

Permalink
libbpf: Add implementation to consume overwritable BPF ring buffer.
Browse files Browse the repository at this point in the history
If the BPF ring buffer is overwritable, ringbuf_process_overwritable_ring() will
be called to handle the data consumption.
All the available data will be consumed but some checks will be performed:
* check we do not read data we already read, if there is no new data, nothing
happens.
* check we do not read more than the buffer size.
* check we do not read invalid data by checking they fit the buffer size.

Signed-off-by: Francis Laniel <flaniel@linux.microsoft.com>
  • Loading branch information
eiffel-fl committed Aug 29, 2022
1 parent 66bceec commit 3a436c0
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 0 deletions.
3 changes: 3 additions & 0 deletions tools/include/uapi/linux/bpf.h
Original file line number Diff line number Diff line change
Expand Up @@ -1226,6 +1226,9 @@ enum {

/* Create a map that is suitable to be an inner map with dynamic max entries */
BPF_F_INNER_MAP = (1U << 12),

/* Create an over writable BPF_RINGBUF */
BFP_F_RB_OVERWRITABLE = (1U << 13),
};

/* Flags for BPF_PROG_QUERY. */
Expand Down
106 changes: 106 additions & 0 deletions tools/lib/bpf/ringbuf.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

struct ring {
ring_buffer_sample_fn sample_cb;
__u8 overwritable: 1,
__reserved: 7;
void *ctx;
void *data;
unsigned long *consumer_pos;
Expand Down Expand Up @@ -51,6 +53,11 @@ static void ringbuf_unmap_ring(struct ring_buffer *rb, struct ring *r)
}
}

static inline bool is_overwritable(struct ring *r)
{
return !!r->overwritable;
}

/* Add extra RINGBUF maps to this ring buffer manager */
int ring_buffer__add(struct ring_buffer *rb, int map_fd,
ring_buffer_sample_fn sample_cb, void *ctx)
Expand Down Expand Up @@ -95,6 +102,7 @@ int ring_buffer__add(struct ring_buffer *rb, int map_fd,
r->sample_cb = sample_cb;
r->ctx = ctx;
r->mask = info.max_entries - 1;
r->overwritable = !!(info.map_flags & BFP_F_RB_OVERWRITABLE);

/* Map writable consumer page */
tmp = mmap(NULL, rb->page_size, PROT_READ | PROT_WRITE, MAP_SHARED,
Expand Down Expand Up @@ -202,6 +210,101 @@ static inline int roundup_len(__u32 len)
return (len + 7) / 8 * 8;
}


static int64_t ringbuf_process_overwritable_ring(struct ring *r)
{
/* 64-bit to avoid overflow in case of extreme application behavior */
int64_t cnt = 0;
unsigned long read_pos, prod_pos, previous_prod_pos;

prod_pos = smp_load_acquire(r->producer_pos);
previous_prod_pos = smp_load_acquire(r->consumer_pos);

/*
* For overwritable ring buffer, we use consumer_pos as the previous
* producer_pos.
* So, if between two calls to this function, the prod_pos did not move,
* it means there is no new data, so we can return right now rather than
* dealing with data we already proceeded.
* NOTE the kernel space does not care about consumer_pos to reserve()
* in overwritable ring buffers, hence we can hijack this field.
*/
if (previous_prod_pos == prod_pos)
return 0;

/*
* BPF ring buffer is over writable, we start reading from
* producer position.
*/
read_pos = prod_pos;
while (read_pos - prod_pos < r->mask) {
int *len_ptr, len;

len_ptr = r->data + (read_pos & r->mask);
len = smp_load_acquire(len_ptr);

/* sample not committed yet, bail out for now */
if (len & BPF_RINGBUF_BUSY_BIT)
break;

/*
* If len is 0, it means we read all the data
* available in the buffer and jump on 0 data:
*
* prod_pos read_pos
* | |
* V V
* +---+------+----------+-------+------+
* | |D....D|C........C|B.....B|A....A|
* +---+------+----------+-------+------+
*/
if (!len)
break;

/*
* If adding the event len to the current
* consumer position makes us wrap the buffer,
* it means we already did "one loop" around the
* buffer.
* So, the pointed data would not be usable:
*
* prod_pos
* read_pos----+ |
* | |
* V V
* +---+------+----------+-------+---+--+
* |..E|D....D|C........C|B.....B|A..|E.|
* +---+------+----------+-------+---+--+
*/
if (read_pos - prod_pos + len > r->mask)
break;

read_pos += roundup_len(len);

if ((len & BPF_RINGBUF_DISCARD_BIT) == 0) {
void *sample;
int err;

sample = (void *)len_ptr + BPF_RINGBUF_HDR_SZ;
err = r->sample_cb(r->ctx, sample, len);
if (err < 0) {
/* update consumer pos and bail out */
smp_store_release(r->consumer_pos,
prod_pos);
return err;
}
cnt++;
}

/* This prevents reading data we already processed. */
if (previous_prod_pos && read_pos >= previous_prod_pos)
break;
}

smp_store_release(r->consumer_pos, prod_pos);
return cnt;
}

static int64_t ringbuf_process_ring(struct ring* r)
{
int *len_ptr, len, err;
Expand All @@ -211,6 +314,9 @@ static int64_t ringbuf_process_ring(struct ring* r)
bool got_new_data;
void *sample;

if (is_overwritable(r))
return ringbuf_process_overwritable_ring(r);

cons_pos = smp_load_acquire(r->consumer_pos);
do {
got_new_data = false;
Expand Down

0 comments on commit 3a436c0

Please sign in to comment.