Skip to content

Commit

Permalink
mptcp: allow picking different xmit subflows
Browse files Browse the repository at this point in the history
Update the scheduler to less trivial heuristic: cache
the last used subflow, and try to send on it a reasonably
long burst of data. When the burst or the subflow send
space is exhausted, move to the next one with available
send space.

Signed-off-by: Paolo Abeni <pabeni@redhat.com>
  • Loading branch information
Paolo Abeni authored and jenkins-tessares committed Sep 9, 2020
1 parent 6855e98 commit a1975d0
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 18 deletions.
115 changes: 99 additions & 16 deletions net/mptcp/protocol.c
Original file line number Diff line number Diff line change
Expand Up @@ -1031,41 +1031,109 @@ static void mptcp_nospace(struct mptcp_sock *msk)
}
}

static bool mptcp_subflow_active(struct mptcp_subflow_context *subflow)
{
struct sock *ssk = mptcp_subflow_tcp_sock(subflow);

/* can't send if JOIN hasn't completed yet (i.e. is usable for mptcp) */
if (subflow->request_join && !subflow->fully_established)
return false;

/* only send if our side has not closed yet */
return ((1 << ssk->sk_state) & (TCPF_ESTABLISHED | TCPF_CLOSE_WAIT));
}

#define MPTCP_SEND_BURST_SIZE (1 << 15)

static struct list_head *mptcp_next_subflow(struct mptcp_sock *msk,
struct list_head *pos,
bool wrap_around)
{
if (list_is_last(pos, &msk->conn_list) && wrap_around)
return msk->conn_list.next;
return pos->next;
}

static struct sock *mptcp_subflow_get_send(struct mptcp_sock *msk,
u32 *sndbuf)
{
struct mptcp_subflow_context *subflow;
struct sock *sk = (struct sock *)msk;
struct sock *backup = NULL;
struct sock *next_backup = NULL;
struct list_head *pos, *start;
struct sock *next_ssk = NULL;
bool wrap_around;
bool free;

sock_owned_by_me(sk);
sock_owned_by_me((struct sock *)msk);

*sndbuf = 0;
if (!mptcp_ext_cache_refill(msk))
return NULL;

mptcp_for_each_subflow(msk, subflow) {
struct sock *ssk = mptcp_subflow_tcp_sock(subflow);
if (__mptcp_check_fallback(msk)) {
*sndbuf = msk->first ? msk->first->sk_sndbuf : 0;
return msk->first;
}

free = sk_stream_is_writeable(subflow->tcp_sock);
if (!free) {
mptcp_nospace(msk);
return NULL;
}
/* lookup the first writeable subflow and first writable back subflow
* starting from last used, with wrap-around
*/
if (msk->last_snd) {
start = &mptcp_subflow_ctx(msk->last_snd)->node;
wrap_around = true;
} else {
start = &msk->conn_list;
wrap_around = false;
}
pr_debug("msk=%p start=%p pos=%p wrap_around=%d last=%p", msk, start,
mptcp_next_subflow(msk, start, wrap_around), wrap_around,
msk->last_snd);
for (pos = mptcp_next_subflow(msk, start, wrap_around); pos != start;
pos = mptcp_next_subflow(msk, pos, wrap_around)) {
struct sock *ssk;

subflow = list_entry(pos, struct mptcp_subflow_context, node);
ssk = mptcp_subflow_tcp_sock(subflow);
if (!mptcp_subflow_active(subflow))
continue;

*sndbuf = max(tcp_sk(ssk)->snd_wnd, *sndbuf);
if (subflow->backup) {
if (!backup)
backup = ssk;
if (next_backup || next_ssk)
continue;

free = sk_stream_is_writeable(subflow->tcp_sock);
if (!free)
continue;
}

return ssk;
if (!subflow->backup && !next_ssk)
next_ssk = ssk;

if (subflow->backup && !next_backup)
next_backup = ssk;
}
if (!next_ssk)
next_ssk = next_backup;

return backup;
if (msk->last_snd) {
*sndbuf = max(tcp_sk(msk->last_snd)->snd_wnd, *sndbuf);
free = sk_stream_memory_free(msk->last_snd);
} else {
free = false;
}
pr_debug("msk=%p ssk=%p:%d last=%p last free=%d burst=%d", msk,
next_ssk, next_ssk ? sk_stream_wspace(next_ssk) : 0,
msk->last_snd, free, msk->snd_burst);

/* use the looked-up subflow if the previusly one has exauted the burst
* or is not writable
*/
if (next_ssk && (!free || msk->snd_burst <= 0)) {
msk->last_snd = next_ssk;
msk->snd_burst = min_t(int, MPTCP_SEND_BURST_SIZE,
sk_stream_wspace(next_ssk));
free = true;
}
return free ? msk->last_snd : NULL;
}

static void ssk_check_wmem(struct mptcp_sock *msk)
Expand Down Expand Up @@ -1160,6 +1228,10 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
break;
}

/* burst can be negative, we will try move to the next subflow
* at selection time, if possible.
*/
msk->snd_burst -= ret;
copied += ret;

tx_ok = msg_data_left(msg);
Expand Down Expand Up @@ -1375,6 +1447,11 @@ static bool __mptcp_move_skbs(struct mptcp_sock *msk)
unsigned int moved = 0;
bool done;

/* avoid looping forever below on racing close */
if (((struct sock *)msk)->sk_state == TCP_CLOSE)
return false;

__mptcp_flush_join_list(msk);
do {
struct sock *ssk = mptcp_subflow_recv_lookup(msk);

Expand Down Expand Up @@ -1539,9 +1616,15 @@ static struct sock *mptcp_subflow_get_retrans(const struct mptcp_sock *msk)

sock_owned_by_me((const struct sock *)msk);

if (__mptcp_check_fallback(msk))
return msk->first;

mptcp_for_each_subflow(msk, subflow) {
struct sock *ssk = mptcp_subflow_tcp_sock(subflow);

if (!mptcp_subflow_active(subflow))
continue;

/* still data outstanding at TCP level? Don't retransmit. */
if (!tcp_write_queue_empty(ssk))
return NULL;
Expand Down
6 changes: 4 additions & 2 deletions net/mptcp/protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,8 @@ struct mptcp_sock {
u64 write_seq;
u64 ack_seq;
u64 rcv_data_fin_seq;
struct sock *last_snd;
int snd_burst;
atomic64_t snd_una;
unsigned long timer_ival;
u32 token;
Expand Down Expand Up @@ -491,12 +493,12 @@ static inline bool before64(__u64 seq1, __u64 seq2)

void mptcp_diag_subflow_init(struct tcp_ulp_ops *ops);

static inline bool __mptcp_check_fallback(struct mptcp_sock *msk)
static inline bool __mptcp_check_fallback(const struct mptcp_sock *msk)
{
return test_bit(MPTCP_FALLBACK_DONE, &msk->flags);
}

static inline bool mptcp_check_fallback(struct sock *sk)
static inline bool mptcp_check_fallback(const struct sock *sk)
{
struct mptcp_subflow_context *subflow = mptcp_subflow_ctx(sk);
struct mptcp_sock *msk = mptcp_sk(subflow->conn);
Expand Down

0 comments on commit a1975d0

Please sign in to comment.