@@ -199,6 +199,33 @@ static inline bool xskq_cons_read_desc(struct xsk_queue *q,
199199 return false;
200200}
201201
202+ static inline u32 xskq_cons_read_desc_batch (struct xsk_queue * q ,
203+ struct xdp_desc * descs ,
204+ struct xsk_buff_pool * pool , u32 max )
205+ {
206+ u32 cached_cons = q -> cached_cons , nb_entries = 0 ;
207+
208+ while (cached_cons != q -> cached_prod && nb_entries < max ) {
209+ struct xdp_rxtx_ring * ring = (struct xdp_rxtx_ring * )q -> ring ;
210+ u32 idx = cached_cons & q -> ring_mask ;
211+
212+ descs [nb_entries ] = ring -> desc [idx ];
213+ if (unlikely (!xskq_cons_is_valid_desc (q , & descs [nb_entries ], pool ))) {
214+ if (nb_entries ) {
215+ /* Invalid entry detected. Return what we have. */
216+ return nb_entries ;
217+ }
218+ /* Use non-batch version to progress beyond invalid entry/entries */
219+ return xskq_cons_read_desc (q , descs , pool ) ? 1 : 0 ;
220+ }
221+
222+ nb_entries ++ ;
223+ cached_cons ++ ;
224+ }
225+
226+ return nb_entries ;
227+ }
228+
202229/* Functions for consumers */
203230
204231static inline void __xskq_cons_release (struct xsk_queue * q )
@@ -220,17 +247,22 @@ static inline void xskq_cons_get_entries(struct xsk_queue *q)
220247 __xskq_cons_peek (q );
221248}
222249
223- static inline bool xskq_cons_has_entries (struct xsk_queue * q , u32 cnt )
250+ static inline u32 xskq_cons_nb_entries (struct xsk_queue * q , u32 max )
224251{
225252 u32 entries = q -> cached_prod - q -> cached_cons ;
226253
227- if (entries >= cnt )
228- return true ;
254+ if (entries >= max )
255+ return max ;
229256
230257 __xskq_cons_peek (q );
231258 entries = q -> cached_prod - q -> cached_cons ;
232259
233- return entries >= cnt ;
260+ return entries >= max ? max : entries ;
261+ }
262+
263+ static inline bool xskq_cons_has_entries (struct xsk_queue * q , u32 cnt )
264+ {
265+ return xskq_cons_nb_entries (q , cnt ) >= cnt ? true : false;
234266}
235267
236268static inline bool xskq_cons_peek_addr_unchecked (struct xsk_queue * q , u64 * addr )
@@ -249,16 +281,28 @@ static inline bool xskq_cons_peek_desc(struct xsk_queue *q,
249281 return xskq_cons_read_desc (q , desc , pool );
250282}
251283
284+ static inline u32 xskq_cons_peek_desc_batch (struct xsk_queue * q , struct xdp_desc * descs ,
285+ struct xsk_buff_pool * pool , u32 max )
286+ {
287+ u32 entries = xskq_cons_nb_entries (q , max );
288+
289+ return xskq_cons_read_desc_batch (q , descs , pool , entries );
290+ }
291+
292+ /* To improve performance in the xskq_cons_release functions, only update local state here.
293+ * Reflect this to global state when we get new entries from the ring in
294+ * xskq_cons_get_entries() and whenever Rx or Tx processing are completed in the NAPI loop.
295+ */
252296static inline void xskq_cons_release (struct xsk_queue * q )
253297{
254- /* To improve performance, only update local state here.
255- * Reflect this to global state when we get new entries
256- * from the ring in xskq_cons_get_entries() and whenever
257- * Rx or Tx processing are completed in the NAPI loop.
258- */
259298 q -> cached_cons ++ ;
260299}
261300
301+ static inline void xskq_cons_release_n (struct xsk_queue * q , u32 cnt )
302+ {
303+ q -> cached_cons += cnt ;
304+ }
305+
262306static inline bool xskq_cons_is_full (struct xsk_queue * q )
263307{
264308 /* No barriers needed since data is not accessed */
@@ -268,18 +312,23 @@ static inline bool xskq_cons_is_full(struct xsk_queue *q)
268312
269313/* Functions for producers */
270314
271- static inline bool xskq_prod_is_full (struct xsk_queue * q )
315+ static inline u32 xskq_prod_nb_free (struct xsk_queue * q , u32 max )
272316{
273317 u32 free_entries = q -> nentries - (q -> cached_prod - q -> cached_cons );
274318
275- if (free_entries )
276- return false ;
319+ if (free_entries >= max )
320+ return max ;
277321
278322 /* Refresh the local tail pointer */
279323 q -> cached_cons = READ_ONCE (q -> ring -> consumer );
280324 free_entries = q -> nentries - (q -> cached_prod - q -> cached_cons );
281325
282- return !free_entries ;
326+ return free_entries >= max ? max : free_entries ;
327+ }
328+
329+ static inline bool xskq_prod_is_full (struct xsk_queue * q )
330+ {
331+ return xskq_prod_nb_free (q , 1 ) ? false : true;
283332}
284333
285334static inline int xskq_prod_reserve (struct xsk_queue * q )
@@ -304,6 +353,23 @@ static inline int xskq_prod_reserve_addr(struct xsk_queue *q, u64 addr)
304353 return 0 ;
305354}
306355
356+ static inline u32 xskq_prod_reserve_addr_batch (struct xsk_queue * q , struct xdp_desc * descs ,
357+ u32 max )
358+ {
359+ struct xdp_umem_ring * ring = (struct xdp_umem_ring * )q -> ring ;
360+ u32 nb_entries , i , cached_prod ;
361+
362+ nb_entries = xskq_prod_nb_free (q , max );
363+
364+ /* A, matches D */
365+ cached_prod = q -> cached_prod ;
366+ for (i = 0 ; i < nb_entries ; i ++ )
367+ ring -> desc [cached_prod ++ & q -> ring_mask ] = descs [i ].addr ;
368+ q -> cached_prod = cached_prod ;
369+
370+ return nb_entries ;
371+ }
372+
307373static inline int xskq_prod_reserve_desc (struct xsk_queue * q ,
308374 u64 addr , u32 len )
309375{
0 commit comments