Skip to content

Commit 9349eb3

Browse files
magnus-karlssonborkmann
authored andcommitted
xsk: Introduce batched Tx descriptor interfaces
Introduce batched descriptor interfaces in the xsk core code for the Tx path to be used in the driver to write a code path with higher performance. This interface will be used by the i40e driver in the next patch. Though other drivers would likely benefit from this new interface too. Note that batching is only implemented for the common case when there is only one socket bound to the same device and queue id. When this is not the case, we fall back to the old non-batched version of the function. Signed-off-by: Magnus Karlsson <magnus.karlsson@intel.com> Signed-off-by: Daniel Borkmann <daniel@iogearbox.net> Acked-by: John Fastabend <john.fastabend@gmail.com> Link: https://lore.kernel.org/bpf/1605525167-14450-5-git-send-email-magnus.karlsson@gmail.com
1 parent b8c7aec commit 9349eb3

File tree

3 files changed

+140
-13
lines changed

3 files changed

+140
-13
lines changed

include/net/xdp_sock_drv.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
void xsk_tx_completed(struct xsk_buff_pool *pool, u32 nb_entries);
1515
bool xsk_tx_peek_desc(struct xsk_buff_pool *pool, struct xdp_desc *desc);
16+
u32 xsk_tx_peek_release_desc_batch(struct xsk_buff_pool *pool, struct xdp_desc *desc, u32 max);
1617
void xsk_tx_release(struct xsk_buff_pool *pool);
1718
struct xsk_buff_pool *xsk_get_pool_from_qid(struct net_device *dev,
1819
u16 queue_id);
@@ -128,6 +129,12 @@ static inline bool xsk_tx_peek_desc(struct xsk_buff_pool *pool,
128129
return false;
129130
}
130131

132+
static inline u32 xsk_tx_peek_release_desc_batch(struct xsk_buff_pool *pool, struct xdp_desc *desc,
133+
u32 max)
134+
{
135+
return 0;
136+
}
137+
131138
static inline void xsk_tx_release(struct xsk_buff_pool *pool)
132139
{
133140
}

net/xdp/xsk.c

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,63 @@ bool xsk_tx_peek_desc(struct xsk_buff_pool *pool, struct xdp_desc *desc)
332332
}
333333
EXPORT_SYMBOL(xsk_tx_peek_desc);
334334

335+
static u32 xsk_tx_peek_release_fallback(struct xsk_buff_pool *pool, struct xdp_desc *descs,
336+
u32 max_entries)
337+
{
338+
u32 nb_pkts = 0;
339+
340+
while (nb_pkts < max_entries && xsk_tx_peek_desc(pool, &descs[nb_pkts]))
341+
nb_pkts++;
342+
343+
xsk_tx_release(pool);
344+
return nb_pkts;
345+
}
346+
347+
u32 xsk_tx_peek_release_desc_batch(struct xsk_buff_pool *pool, struct xdp_desc *descs,
348+
u32 max_entries)
349+
{
350+
struct xdp_sock *xs;
351+
u32 nb_pkts;
352+
353+
rcu_read_lock();
354+
if (!list_is_singular(&pool->xsk_tx_list)) {
355+
/* Fallback to the non-batched version */
356+
rcu_read_unlock();
357+
return xsk_tx_peek_release_fallback(pool, descs, max_entries);
358+
}
359+
360+
xs = list_first_or_null_rcu(&pool->xsk_tx_list, struct xdp_sock, tx_list);
361+
if (!xs) {
362+
nb_pkts = 0;
363+
goto out;
364+
}
365+
366+
nb_pkts = xskq_cons_peek_desc_batch(xs->tx, descs, pool, max_entries);
367+
if (!nb_pkts) {
368+
xs->tx->queue_empty_descs++;
369+
goto out;
370+
}
371+
372+
/* This is the backpressure mechanism for the Tx path. Try to
373+
* reserve space in the completion queue for all packets, but
374+
* if there are fewer slots available, just process that many
375+
* packets. This avoids having to implement any buffering in
376+
* the Tx path.
377+
*/
378+
nb_pkts = xskq_prod_reserve_addr_batch(pool->cq, descs, nb_pkts);
379+
if (!nb_pkts)
380+
goto out;
381+
382+
xskq_cons_release_n(xs->tx, nb_pkts);
383+
__xskq_cons_release(xs->tx);
384+
xs->sk.sk_write_space(&xs->sk);
385+
386+
out:
387+
rcu_read_unlock();
388+
return nb_pkts;
389+
}
390+
EXPORT_SYMBOL(xsk_tx_peek_release_desc_batch);
391+
335392
static int xsk_wakeup(struct xdp_sock *xs, u8 flags)
336393
{
337394
struct net_device *dev = xs->dev;

net/xdp/xsk_queue.h

Lines changed: 76 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,30 @@ static inline bool xskq_cons_read_desc(struct xsk_queue *q,
199199
return false;
200200
}
201201

202+
static inline u32 xskq_cons_read_desc_batch(struct xsk_queue *q,
203+
struct xdp_desc *descs,
204+
struct xsk_buff_pool *pool, u32 max)
205+
{
206+
u32 cached_cons = q->cached_cons, nb_entries = 0;
207+
208+
while (cached_cons != q->cached_prod && nb_entries < max) {
209+
struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring;
210+
u32 idx = cached_cons & q->ring_mask;
211+
212+
descs[nb_entries] = ring->desc[idx];
213+
if (unlikely(!xskq_cons_is_valid_desc(q, &descs[nb_entries], pool))) {
214+
/* Skip the entry */
215+
cached_cons++;
216+
continue;
217+
}
218+
219+
nb_entries++;
220+
cached_cons++;
221+
}
222+
223+
return nb_entries;
224+
}
225+
202226
/* Functions for consumers */
203227

204228
static inline void __xskq_cons_release(struct xsk_queue *q)
@@ -220,17 +244,22 @@ static inline void xskq_cons_get_entries(struct xsk_queue *q)
220244
__xskq_cons_peek(q);
221245
}
222246

223-
static inline bool xskq_cons_has_entries(struct xsk_queue *q, u32 cnt)
247+
static inline u32 xskq_cons_nb_entries(struct xsk_queue *q, u32 max)
224248
{
225249
u32 entries = q->cached_prod - q->cached_cons;
226250

227-
if (entries >= cnt)
228-
return true;
251+
if (entries >= max)
252+
return max;
229253

230254
__xskq_cons_peek(q);
231255
entries = q->cached_prod - q->cached_cons;
232256

233-
return entries >= cnt;
257+
return entries >= max ? max : entries;
258+
}
259+
260+
static inline bool xskq_cons_has_entries(struct xsk_queue *q, u32 cnt)
261+
{
262+
return xskq_cons_nb_entries(q, cnt) >= cnt ? true : false;
234263
}
235264

236265
static inline bool xskq_cons_peek_addr_unchecked(struct xsk_queue *q, u64 *addr)
@@ -249,16 +278,28 @@ static inline bool xskq_cons_peek_desc(struct xsk_queue *q,
249278
return xskq_cons_read_desc(q, desc, pool);
250279
}
251280

281+
static inline u32 xskq_cons_peek_desc_batch(struct xsk_queue *q, struct xdp_desc *descs,
282+
struct xsk_buff_pool *pool, u32 max)
283+
{
284+
u32 entries = xskq_cons_nb_entries(q, max);
285+
286+
return xskq_cons_read_desc_batch(q, descs, pool, entries);
287+
}
288+
289+
/* To improve performance in the xskq_cons_release functions, only update local state here.
290+
* Reflect this to global state when we get new entries from the ring in
291+
* xskq_cons_get_entries() and whenever Rx or Tx processing are completed in the NAPI loop.
292+
*/
252293
static inline void xskq_cons_release(struct xsk_queue *q)
253294
{
254-
/* To improve performance, only update local state here.
255-
* Reflect this to global state when we get new entries
256-
* from the ring in xskq_cons_get_entries() and whenever
257-
* Rx or Tx processing are completed in the NAPI loop.
258-
*/
259295
q->cached_cons++;
260296
}
261297

298+
static inline void xskq_cons_release_n(struct xsk_queue *q, u32 cnt)
299+
{
300+
q->cached_cons += cnt;
301+
}
302+
262303
static inline bool xskq_cons_is_full(struct xsk_queue *q)
263304
{
264305
/* No barriers needed since data is not accessed */
@@ -268,18 +309,23 @@ static inline bool xskq_cons_is_full(struct xsk_queue *q)
268309

269310
/* Functions for producers */
270311

271-
static inline bool xskq_prod_is_full(struct xsk_queue *q)
312+
static inline u32 xskq_prod_nb_free(struct xsk_queue *q, u32 max)
272313
{
273314
u32 free_entries = q->nentries - (q->cached_prod - q->cached_cons);
274315

275-
if (free_entries)
276-
return false;
316+
if (free_entries >= max)
317+
return max;
277318

278319
/* Refresh the local tail pointer */
279320
q->cached_cons = READ_ONCE(q->ring->consumer);
280321
free_entries = q->nentries - (q->cached_prod - q->cached_cons);
281322

282-
return !free_entries;
323+
return free_entries >= max ? max : free_entries;
324+
}
325+
326+
static inline bool xskq_prod_is_full(struct xsk_queue *q)
327+
{
328+
return xskq_prod_nb_free(q, 1) ? false : true;
283329
}
284330

285331
static inline int xskq_prod_reserve(struct xsk_queue *q)
@@ -304,6 +350,23 @@ static inline int xskq_prod_reserve_addr(struct xsk_queue *q, u64 addr)
304350
return 0;
305351
}
306352

353+
static inline u32 xskq_prod_reserve_addr_batch(struct xsk_queue *q, struct xdp_desc *descs,
354+
u32 max)
355+
{
356+
struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
357+
u32 nb_entries, i, cached_prod;
358+
359+
nb_entries = xskq_prod_nb_free(q, max);
360+
361+
/* A, matches D */
362+
cached_prod = q->cached_prod;
363+
for (i = 0; i < nb_entries; i++)
364+
ring->desc[cached_prod++ & q->ring_mask] = descs[i].addr;
365+
q->cached_prod = cached_prod;
366+
367+
return nb_entries;
368+
}
369+
307370
static inline int xskq_prod_reserve_desc(struct xsk_queue *q,
308371
u64 addr, u32 len)
309372
{

0 commit comments

Comments
 (0)