Skip to content

Commit 6e375b2

Browse files
author
Martin KaFai Lau
committed
Merge branch 'bpf-tcp-exactly-once-socket-iteration'
Jordan Rife says: ==================== bpf: tcp: Exactly-once socket iteration TCP socket iterators use iter->offset to track progress through a bucket, which is a measure of the number of matching sockets from the current bucket that have been seen or processed by the iterator. On subsequent iterations, if the current bucket has unprocessed items, we skip at least iter->offset matching items in the bucket before adding any remaining items to the next batch. However, iter->offset isn't always an accurate measure of "things already seen" when the underlying bucket changes between reads, which can lead to repeated or skipped sockets. Instead, this series remembers the cookies of the sockets we haven't seen yet in the current bucket and resumes from the first cookie in that list that we can find on the next iteration. This is a continuation of the work started in [1]. This series largely replicates the patterns applied to UDP socket iterators, applying them instead to TCP socket iterators. CHANGES ======= v5 -> v6: * In patch ten ("selftests/bpf: Create established sockets in socket iterator tests"), use poll() to choose a socket that has a connection ready to be accept()ed. Before, connect_to_server would set the O_NONBLOCK flag on all listening sockets so that accept_from_one could loop through them all and find the one that connect_to_addr_str connected to. However, this is subtly buggy and could potentially lead to test flakes, since the 3 way handshake isn't necessarily done when connect returns, so it's possible none of the accept() calls succeed. Use poll() instead to guarantee that the socket we accept() from is ready and eliminate the need for the O_NONBLOCK flag (Martin). v4 -> v5: * Move WARN_ON_ONCE before the `done` label in patch two ("bpf: tcp: Make sure iter->batch always contains a full bucket snapshot"") (Martin). * Remove unnecessary kfunc declaration in patch eleven ("selftests/bpf: Create iter_tcp_destroy test program") (Martin). * Make sure to close the socket fd at the end of `destroy` in patch twelve ("selftests/bpf: Add tests for bucket resume logic in established sockets") (Martin). v3 -> v4: * Drop braces around sk_nulls_for_each_from in patch five ("bpf: tcp: Avoid socket skips and repeats during iteration") (Stanislav). * Add a break after the TCP_SEQ_STATE_ESTABLISHED case in patch five (Stanislav). * Add an `if (sock_type == SOCK_STREAM)` check before assigning TCP_LISTEN to skel->rodata->ss in patch eight ("selftests/bpf: Allow for iteration over multiple states") to more clearly express the intent that the option is only consumed for SOCK_STREAM tests (Stanislav). * Move the `i = 0` assignment into the for loop in patch ten ("selftests/bpf: Create established sockets in socket iterator tests") (Stanislav). v2 -> v3: * Unroll the loop inside bpf_iter_tcp_batch to make the logic easier to follow in patch two ("bpf: tcp: Make sure iter->batch always contains a full bucket snapshot"). This gets rid of the `resizes` variable from v2 and eliminates the extra conditional that checks how many batch resize attempts have occurred so far (Stanislav). Note: This changes the behavior slightly. Before, in the case that the second call to tcp_seek_last_pos (and later bpf_iter_tcp_resume) advances to a new bucket, which may happen if the current bucket is emptied after releasing its lock, the `resizes` "budget" would be reset, the net effect being that we would try a batch resize with GFP_USER at most once per bucket. Now, we try to resize the batch with GFP_USER at most once per call, so it makes it slightly more likely that we hit the GFP_NOWAIT scenario. However, this edge case should be rare in practice anyway, and the new behavior is more or less consistent with the original retry logic, so avoid the loop and prefer code clarity. * Move the call to bpf_iter_tcp_put_batch out of bpf_iter_tcp_realloc_batch and call it directly before invoking bpf_iter_tcp_realloc_batch with GFP_USER inside bpf_iter_tcp_batch. /Don't/ call it before invoking bpf_iter_tcp_realloc_batch the second time while we hold the lock with GFP_NOWAIT. This avoids a conditional inside bpf_iter_tcp_realloc_batch from v2 that only calls bpf_iter_tcp_put_batch if flags != GFP_NOWAIT and is a bit more explicit (Stanislav). * Adjust patch five ("bpf: tcp: Avoid socket skips and repeats during iteration") to fit with the new logic in patch two. v1 -> v2: * In patch five ("bpf: tcp: Avoid socket skips and repeats during iteration"), remove unnecessary bucket bounds checks in bpf_iter_tcp_resume. In either case, if st->bucket is outside the current table's range then bpf_iter_tcp_resume_* calls *_get_first which immediately returns NULL anyway and the logic will fall through. (Martin) * Add a check at the top of bpf_iter_tcp_resume_listening and bpf_iter_tcp_resume_established to see if we're done with the current bucket and advance it immediately instead of wasting time finding the first matching socket in that bucket with (listening|established)_get_first. In v1, we originally discussed adding logic to advance the bucket in bpf_iter_tcp_seq_next and bpf_iter_tcp_seq_stop, but after trying this the logic seemed harder to track. Overall, keeping everything inside bpf_iter_tcp_resume_* seemed a bit clearer. (Martin) * Instead of using a timeout in the last patch ("selftests/bpf: Add tests for bucket resume logic in established sockets") to wait for sockets to leave the ehash table after calling close(), use bpf_sock_destroy to deterministically destroy and remove them. This introduces one more patch ("selftests/bpf: Create iter_tcp_destroy test program") to create the iterator program that destroys a selected socket. Drive this through a destroy() function in the last patch which, just like close(), accepts a socket file descriptor. (Martin) * Introduce one more patch ("selftests/bpf: Allow for iteration over multiple states") to fix a latent bug in iter_tcp_soreuse where the sk->sk_state != TCP_LISTEN check was ignored. Add the "ss" variable to allow test code to configure which socket states to allow. [1]: https://lore.kernel.org/bpf/20250502161528.264630-1-jordan@jrife.io/ ==================== Link: https://patch.msgid.link/20250714180919.127192-1-jordan@jrife.io Signed-off-by: Martin KaFai Lau <martin.lau@kernel.org>
2 parents 8efa26f + f126f0c commit 6e375b2

File tree

3 files changed

+679
-84
lines changed

3 files changed

+679
-84
lines changed

net/ipv4/tcp_ipv4.c

Lines changed: 200 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
#include <linux/times.h>
5959
#include <linux/slab.h>
6060
#include <linux/sched.h>
61+
#include <linux/sock_diag.h>
6162

6263
#include <net/net_namespace.h>
6364
#include <net/icmp.h>
@@ -3014,13 +3015,17 @@ static int tcp4_seq_show(struct seq_file *seq, void *v)
30143015
}
30153016

30163017
#ifdef CONFIG_BPF_SYSCALL
3018+
union bpf_tcp_iter_batch_item {
3019+
struct sock *sk;
3020+
__u64 cookie;
3021+
};
3022+
30173023
struct bpf_tcp_iter_state {
30183024
struct tcp_iter_state state;
30193025
unsigned int cur_sk;
30203026
unsigned int end_sk;
30213027
unsigned int max_sk;
3022-
struct sock **batch;
3023-
bool st_bucket_done;
3028+
union bpf_tcp_iter_batch_item *batch;
30243029
};
30253030

30263031
struct bpf_iter__tcp {
@@ -3043,134 +3048,267 @@ static int tcp_prog_seq_show(struct bpf_prog *prog, struct bpf_iter_meta *meta,
30433048

30443049
static void bpf_iter_tcp_put_batch(struct bpf_tcp_iter_state *iter)
30453050
{
3046-
while (iter->cur_sk < iter->end_sk)
3047-
sock_gen_put(iter->batch[iter->cur_sk++]);
3051+
union bpf_tcp_iter_batch_item *item;
3052+
unsigned int cur_sk = iter->cur_sk;
3053+
__u64 cookie;
3054+
3055+
/* Remember the cookies of the sockets we haven't seen yet, so we can
3056+
* pick up where we left off next time around.
3057+
*/
3058+
while (cur_sk < iter->end_sk) {
3059+
item = &iter->batch[cur_sk++];
3060+
cookie = sock_gen_cookie(item->sk);
3061+
sock_gen_put(item->sk);
3062+
item->cookie = cookie;
3063+
}
30483064
}
30493065

30503066
static int bpf_iter_tcp_realloc_batch(struct bpf_tcp_iter_state *iter,
3051-
unsigned int new_batch_sz)
3067+
unsigned int new_batch_sz, gfp_t flags)
30523068
{
3053-
struct sock **new_batch;
3069+
union bpf_tcp_iter_batch_item *new_batch;
30543070

30553071
new_batch = kvmalloc(sizeof(*new_batch) * new_batch_sz,
3056-
GFP_USER | __GFP_NOWARN);
3072+
flags | __GFP_NOWARN);
30573073
if (!new_batch)
30583074
return -ENOMEM;
30593075

3060-
bpf_iter_tcp_put_batch(iter);
3076+
memcpy(new_batch, iter->batch, sizeof(*iter->batch) * iter->end_sk);
30613077
kvfree(iter->batch);
30623078
iter->batch = new_batch;
30633079
iter->max_sk = new_batch_sz;
30643080

30653081
return 0;
30663082
}
30673083

3068-
static unsigned int bpf_iter_tcp_listening_batch(struct seq_file *seq,
3069-
struct sock *start_sk)
3084+
static struct sock *bpf_iter_tcp_resume_bucket(struct sock *first_sk,
3085+
union bpf_tcp_iter_batch_item *cookies,
3086+
int n_cookies)
3087+
{
3088+
struct hlist_nulls_node *node;
3089+
struct sock *sk;
3090+
int i;
3091+
3092+
for (i = 0; i < n_cookies; i++) {
3093+
sk = first_sk;
3094+
sk_nulls_for_each_from(sk, node)
3095+
if (cookies[i].cookie == atomic64_read(&sk->sk_cookie))
3096+
return sk;
3097+
}
3098+
3099+
return NULL;
3100+
}
3101+
3102+
static struct sock *bpf_iter_tcp_resume_listening(struct seq_file *seq)
30703103
{
30713104
struct inet_hashinfo *hinfo = seq_file_net(seq)->ipv4.tcp_death_row.hashinfo;
30723105
struct bpf_tcp_iter_state *iter = seq->private;
30733106
struct tcp_iter_state *st = &iter->state;
3107+
unsigned int find_cookie = iter->cur_sk;
3108+
unsigned int end_cookie = iter->end_sk;
3109+
int resume_bucket = st->bucket;
3110+
struct sock *sk;
3111+
3112+
if (end_cookie && find_cookie == end_cookie)
3113+
++st->bucket;
3114+
3115+
sk = listening_get_first(seq);
3116+
iter->cur_sk = 0;
3117+
iter->end_sk = 0;
3118+
3119+
if (sk && st->bucket == resume_bucket && end_cookie) {
3120+
sk = bpf_iter_tcp_resume_bucket(sk, &iter->batch[find_cookie],
3121+
end_cookie - find_cookie);
3122+
if (!sk) {
3123+
spin_unlock(&hinfo->lhash2[st->bucket].lock);
3124+
++st->bucket;
3125+
sk = listening_get_first(seq);
3126+
}
3127+
}
3128+
3129+
return sk;
3130+
}
3131+
3132+
static struct sock *bpf_iter_tcp_resume_established(struct seq_file *seq)
3133+
{
3134+
struct inet_hashinfo *hinfo = seq_file_net(seq)->ipv4.tcp_death_row.hashinfo;
3135+
struct bpf_tcp_iter_state *iter = seq->private;
3136+
struct tcp_iter_state *st = &iter->state;
3137+
unsigned int find_cookie = iter->cur_sk;
3138+
unsigned int end_cookie = iter->end_sk;
3139+
int resume_bucket = st->bucket;
3140+
struct sock *sk;
3141+
3142+
if (end_cookie && find_cookie == end_cookie)
3143+
++st->bucket;
3144+
3145+
sk = established_get_first(seq);
3146+
iter->cur_sk = 0;
3147+
iter->end_sk = 0;
3148+
3149+
if (sk && st->bucket == resume_bucket && end_cookie) {
3150+
sk = bpf_iter_tcp_resume_bucket(sk, &iter->batch[find_cookie],
3151+
end_cookie - find_cookie);
3152+
if (!sk) {
3153+
spin_unlock_bh(inet_ehash_lockp(hinfo, st->bucket));
3154+
++st->bucket;
3155+
sk = established_get_first(seq);
3156+
}
3157+
}
3158+
3159+
return sk;
3160+
}
3161+
3162+
static struct sock *bpf_iter_tcp_resume(struct seq_file *seq)
3163+
{
3164+
struct bpf_tcp_iter_state *iter = seq->private;
3165+
struct tcp_iter_state *st = &iter->state;
3166+
struct sock *sk = NULL;
3167+
3168+
switch (st->state) {
3169+
case TCP_SEQ_STATE_LISTENING:
3170+
sk = bpf_iter_tcp_resume_listening(seq);
3171+
if (sk)
3172+
break;
3173+
st->bucket = 0;
3174+
st->state = TCP_SEQ_STATE_ESTABLISHED;
3175+
fallthrough;
3176+
case TCP_SEQ_STATE_ESTABLISHED:
3177+
sk = bpf_iter_tcp_resume_established(seq);
3178+
break;
3179+
}
3180+
3181+
return sk;
3182+
}
3183+
3184+
static unsigned int bpf_iter_tcp_listening_batch(struct seq_file *seq,
3185+
struct sock **start_sk)
3186+
{
3187+
struct bpf_tcp_iter_state *iter = seq->private;
30743188
struct hlist_nulls_node *node;
30753189
unsigned int expected = 1;
30763190
struct sock *sk;
30773191

3078-
sock_hold(start_sk);
3079-
iter->batch[iter->end_sk++] = start_sk;
3192+
sock_hold(*start_sk);
3193+
iter->batch[iter->end_sk++].sk = *start_sk;
30803194

3081-
sk = sk_nulls_next(start_sk);
3195+
sk = sk_nulls_next(*start_sk);
3196+
*start_sk = NULL;
30823197
sk_nulls_for_each_from(sk, node) {
30833198
if (seq_sk_match(seq, sk)) {
30843199
if (iter->end_sk < iter->max_sk) {
30853200
sock_hold(sk);
3086-
iter->batch[iter->end_sk++] = sk;
3201+
iter->batch[iter->end_sk++].sk = sk;
3202+
} else if (!*start_sk) {
3203+
/* Remember where we left off. */
3204+
*start_sk = sk;
30873205
}
30883206
expected++;
30893207
}
30903208
}
3091-
spin_unlock(&hinfo->lhash2[st->bucket].lock);
30923209

30933210
return expected;
30943211
}
30953212

30963213
static unsigned int bpf_iter_tcp_established_batch(struct seq_file *seq,
3097-
struct sock *start_sk)
3214+
struct sock **start_sk)
30983215
{
3099-
struct inet_hashinfo *hinfo = seq_file_net(seq)->ipv4.tcp_death_row.hashinfo;
31003216
struct bpf_tcp_iter_state *iter = seq->private;
3101-
struct tcp_iter_state *st = &iter->state;
31023217
struct hlist_nulls_node *node;
31033218
unsigned int expected = 1;
31043219
struct sock *sk;
31053220

3106-
sock_hold(start_sk);
3107-
iter->batch[iter->end_sk++] = start_sk;
3221+
sock_hold(*start_sk);
3222+
iter->batch[iter->end_sk++].sk = *start_sk;
31083223

3109-
sk = sk_nulls_next(start_sk);
3224+
sk = sk_nulls_next(*start_sk);
3225+
*start_sk = NULL;
31103226
sk_nulls_for_each_from(sk, node) {
31113227
if (seq_sk_match(seq, sk)) {
31123228
if (iter->end_sk < iter->max_sk) {
31133229
sock_hold(sk);
3114-
iter->batch[iter->end_sk++] = sk;
3230+
iter->batch[iter->end_sk++].sk = sk;
3231+
} else if (!*start_sk) {
3232+
/* Remember where we left off. */
3233+
*start_sk = sk;
31153234
}
31163235
expected++;
31173236
}
31183237
}
3119-
spin_unlock_bh(inet_ehash_lockp(hinfo, st->bucket));
31203238

31213239
return expected;
31223240
}
31233241

3124-
static struct sock *bpf_iter_tcp_batch(struct seq_file *seq)
3242+
static unsigned int bpf_iter_fill_batch(struct seq_file *seq,
3243+
struct sock **start_sk)
3244+
{
3245+
struct bpf_tcp_iter_state *iter = seq->private;
3246+
struct tcp_iter_state *st = &iter->state;
3247+
3248+
if (st->state == TCP_SEQ_STATE_LISTENING)
3249+
return bpf_iter_tcp_listening_batch(seq, start_sk);
3250+
else
3251+
return bpf_iter_tcp_established_batch(seq, start_sk);
3252+
}
3253+
3254+
static void bpf_iter_tcp_unlock_bucket(struct seq_file *seq)
31253255
{
31263256
struct inet_hashinfo *hinfo = seq_file_net(seq)->ipv4.tcp_death_row.hashinfo;
31273257
struct bpf_tcp_iter_state *iter = seq->private;
31283258
struct tcp_iter_state *st = &iter->state;
3259+
3260+
if (st->state == TCP_SEQ_STATE_LISTENING)
3261+
spin_unlock(&hinfo->lhash2[st->bucket].lock);
3262+
else
3263+
spin_unlock_bh(inet_ehash_lockp(hinfo, st->bucket));
3264+
}
3265+
3266+
static struct sock *bpf_iter_tcp_batch(struct seq_file *seq)
3267+
{
3268+
struct bpf_tcp_iter_state *iter = seq->private;
31293269
unsigned int expected;
3130-
bool resized = false;
31313270
struct sock *sk;
3271+
int err;
31323272

3133-
/* The st->bucket is done. Directly advance to the next
3134-
* bucket instead of having the tcp_seek_last_pos() to skip
3135-
* one by one in the current bucket and eventually find out
3136-
* it has to advance to the next bucket.
3137-
*/
3138-
if (iter->st_bucket_done) {
3139-
st->offset = 0;
3140-
st->bucket++;
3141-
if (st->state == TCP_SEQ_STATE_LISTENING &&
3142-
st->bucket > hinfo->lhash2_mask) {
3143-
st->state = TCP_SEQ_STATE_ESTABLISHED;
3144-
st->bucket = 0;
3145-
}
3146-
}
3273+
sk = bpf_iter_tcp_resume(seq);
3274+
if (!sk)
3275+
return NULL; /* Done */
31473276

3148-
again:
3149-
/* Get a new batch */
3150-
iter->cur_sk = 0;
3151-
iter->end_sk = 0;
3152-
iter->st_bucket_done = false;
3277+
expected = bpf_iter_fill_batch(seq, &sk);
3278+
if (likely(iter->end_sk == expected))
3279+
goto done;
31533280

3154-
sk = tcp_seek_last_pos(seq);
3281+
/* Batch size was too small. */
3282+
bpf_iter_tcp_unlock_bucket(seq);
3283+
bpf_iter_tcp_put_batch(iter);
3284+
err = bpf_iter_tcp_realloc_batch(iter, expected * 3 / 2,
3285+
GFP_USER);
3286+
if (err)
3287+
return ERR_PTR(err);
3288+
3289+
sk = bpf_iter_tcp_resume(seq);
31553290
if (!sk)
31563291
return NULL; /* Done */
31573292

3158-
if (st->state == TCP_SEQ_STATE_LISTENING)
3159-
expected = bpf_iter_tcp_listening_batch(seq, sk);
3160-
else
3161-
expected = bpf_iter_tcp_established_batch(seq, sk);
3162-
3163-
if (iter->end_sk == expected) {
3164-
iter->st_bucket_done = true;
3165-
return sk;
3166-
}
3293+
expected = bpf_iter_fill_batch(seq, &sk);
3294+
if (likely(iter->end_sk == expected))
3295+
goto done;
31673296

3168-
if (!resized && !bpf_iter_tcp_realloc_batch(iter, expected * 3 / 2)) {
3169-
resized = true;
3170-
goto again;
3297+
/* Batch size was still too small. Hold onto the lock while we try
3298+
* again with a larger batch to make sure the current bucket's size
3299+
* does not change in the meantime.
3300+
*/
3301+
err = bpf_iter_tcp_realloc_batch(iter, expected, GFP_NOWAIT);
3302+
if (err) {
3303+
bpf_iter_tcp_unlock_bucket(seq);
3304+
return ERR_PTR(err);
31713305
}
31723306

3173-
return sk;
3307+
expected = bpf_iter_fill_batch(seq, &sk);
3308+
WARN_ON_ONCE(iter->end_sk != expected);
3309+
done:
3310+
bpf_iter_tcp_unlock_bucket(seq);
3311+
return iter->batch[0].sk;
31743312
}
31753313

31763314
static void *bpf_iter_tcp_seq_start(struct seq_file *seq, loff_t *pos)
@@ -3200,16 +3338,11 @@ static void *bpf_iter_tcp_seq_next(struct seq_file *seq, void *v, loff_t *pos)
32003338
* meta.seq_num is used instead.
32013339
*/
32023340
st->num++;
3203-
/* Move st->offset to the next sk in the bucket such that
3204-
* the future start() will resume at st->offset in
3205-
* st->bucket. See tcp_seek_last_pos().
3206-
*/
3207-
st->offset++;
3208-
sock_gen_put(iter->batch[iter->cur_sk++]);
3341+
sock_gen_put(iter->batch[iter->cur_sk++].sk);
32093342
}
32103343

32113344
if (iter->cur_sk < iter->end_sk)
3212-
sk = iter->batch[iter->cur_sk];
3345+
sk = iter->batch[iter->cur_sk].sk;
32133346
else
32143347
sk = bpf_iter_tcp_batch(seq);
32153348

@@ -3275,10 +3408,8 @@ static void bpf_iter_tcp_seq_stop(struct seq_file *seq, void *v)
32753408
(void)tcp_prog_seq_show(prog, &meta, v, 0);
32763409
}
32773410

3278-
if (iter->cur_sk < iter->end_sk) {
3411+
if (iter->cur_sk < iter->end_sk)
32793412
bpf_iter_tcp_put_batch(iter);
3280-
iter->st_bucket_done = false;
3281-
}
32823413
}
32833414

32843415
static const struct seq_operations bpf_iter_tcp_seq_ops = {
@@ -3596,7 +3727,7 @@ static int bpf_iter_init_tcp(void *priv_data, struct bpf_iter_aux_info *aux)
35963727
if (err)
35973728
return err;
35983729

3599-
err = bpf_iter_tcp_realloc_batch(iter, INIT_BATCH_SZ);
3730+
err = bpf_iter_tcp_realloc_batch(iter, INIT_BATCH_SZ, GFP_USER);
36003731
if (err) {
36013732
bpf_iter_fini_seq_net(priv_data);
36023733
return err;

0 commit comments

Comments
 (0)