From e315ad94138f892d44b889b906edc5dd859ef6c4 Mon Sep 17 00:00:00 2001 From: Christian Spielberger Date: Wed, 3 Jan 2024 17:17:26 +0100 Subject: [PATCH 1/2] rtp: lock more rtcp_sess fields and code --- src/rtp/sess.c | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/src/rtp/sess.c b/src/rtp/sess.c index 9aa832f33..1b9e84c6f 100644 --- a/src/rtp/sess.c +++ b/src/rtp/sess.c @@ -229,6 +229,7 @@ void rtcp_handler(struct rtcp_sess *sess, struct rtcp_msg *msg) if (!sess || !msg) return; + mtx_lock(sess->lock); switch (msg->hdr.pt) { case RTCP_SR: @@ -246,6 +247,8 @@ void rtcp_handler(struct rtcp_sess *sess, struct rtcp_msg *msg) default: break; } + + mtx_unlock(sess->lock); } @@ -357,8 +360,10 @@ int rtcp_enable(struct rtcp_sess *sess, bool enabled, const char *cname) if (!sess) return EINVAL; + mtx_lock(sess->lock); sess->cname = mem_deref(sess->cname); err = str_dup(&sess->cname, cname); + mtx_unlock(sess->lock); if (err) return err; @@ -583,12 +588,15 @@ void rtcp_sess_rx_rtp(struct rtcp_sess *sess, struct rtp_header *hdr, if (!sess) return; + mtx_lock(sess->lock); mbr = get_member(sess, hdr->ssrc); + mtx_unlock(sess->lock); if (!mbr) { DEBUG_NOTICE("could not add member: 0x%08x\n", hdr->ssrc); return; } + mtx_lock(sess->lock); if (!mbr->s) { int err; @@ -598,6 +606,7 @@ void rtcp_sess_rx_rtp(struct rtcp_sess *sess, struct rtp_header *hdr, else err = mutex_alloc(&mbr->s->lock); + mtx_unlock(sess->lock); if (err) { DEBUG_NOTICE("could not add sender: 0x%08x\n", hdr->ssrc); @@ -611,28 +620,42 @@ void rtcp_sess_rx_rtp(struct rtcp_sess *sess, struct rtp_header *hdr, /* probation not used */ sa_cpy(&mbr->s->rtp_peer, peer); source_unlock(mbr->s); + mtx_lock(sess->lock); ++sess->senderc; + mtx_unlock(sess->lock); + } + else { + mtx_unlock(sess->lock); } source_lock(mbr->s); if (!source_update_seq(mbr->s, hdr->seq)) { DEBUG_WARNING("rtp_update_seq() returned 0\n"); } + source_unlock(mbr->s); + mtx_lock(sess->lock); if (sess->srate_rx) { /* Convert from wall-clock time to timestamp units */ hdr->ts_arrive = tmr_jiffies() * sess->srate_rx / 1000; + mtx_unlock(sess->lock); /* * Calculate jitter only when the timestamp is different than * last packet (see RTP FAQ * https://www.cs.columbia.edu/~hgs/rtp/faq.html#jitter). */ + source_lock(mbr->s); if (hdr->ts != mbr->s->last_rtp_ts) source_calc_jitter(mbr->s, hdr->ts, (uint32_t)hdr->ts_arrive); + source_unlock(mbr->s); + } + else { + mtx_unlock(sess->lock); } + source_lock(mbr->s); mbr->s->last_rtp_ts = hdr->ts; mbr->s->rtp_rx_bytes += payload_size; source_unlock(mbr->s); @@ -738,6 +761,7 @@ int rtcp_debug(struct re_printf *pf, const struct rtp_sock *rs) return 0; err |= re_hprintf(pf, "----- RTCP Session: -----\n"); + mtx_lock(sess->lock); err |= re_hprintf(pf, " cname=%s SSRC=0x%08x/%u rx=%uHz\n", sess->cname, rtp_sess_ssrc(sess->rs), rtp_sess_ssrc(sess->rs), @@ -745,7 +769,6 @@ int rtcp_debug(struct re_printf *pf, const struct rtp_sock *rs) hash_apply(sess->members, debug_handler, pf); - mtx_lock(sess->lock); err |= re_hprintf(pf, " TX: packets=%u, octets=%u\n", sess->txstat.psent, sess->txstat.osent); mtx_unlock(sess->lock); From c2e4edd10197a9bf47cd0810839018de5734505d Mon Sep 17 00:00:00 2001 From: Christian Spielberger Date: Thu, 4 Jan 2024 10:45:57 +0100 Subject: [PATCH 2/2] rtp: replace rtp_source lock by extended rtp_sess lock --- src/rtp/fb.c | 1 - src/rtp/member.c | 1 - src/rtp/pkt.c | 1 - src/rtp/rr.c | 1 - src/rtp/rtcp.c | 1 - src/rtp/rtcp.h | 4 -- src/rtp/rtp.c | 1 - src/rtp/sdes.c | 1 - src/rtp/sess.c | 119 ++++++++++++++++------------------------------- src/rtp/source.c | 13 ------ 10 files changed, 39 insertions(+), 104 deletions(-) diff --git a/src/rtp/fb.c b/src/rtp/fb.c index e541034fd..9dd376af9 100644 --- a/src/rtp/fb.c +++ b/src/rtp/fb.c @@ -12,7 +12,6 @@ #include #include #include -#include #include "rtcp.h" diff --git a/src/rtp/member.c b/src/rtp/member.c index 0d3e6c7ad..1a3bd78e5 100644 --- a/src/rtp/member.c +++ b/src/rtp/member.c @@ -12,7 +12,6 @@ #include #include #include -#include #include "rtcp.h" diff --git a/src/rtp/pkt.c b/src/rtp/pkt.c index b73abe255..dffb054d1 100644 --- a/src/rtp/pkt.c +++ b/src/rtp/pkt.c @@ -12,7 +12,6 @@ #include #include #include -#include #include "rtcp.h" diff --git a/src/rtp/rr.c b/src/rtp/rr.c index 3b2bc4ede..9d39de878 100644 --- a/src/rtp/rr.c +++ b/src/rtp/rr.c @@ -13,7 +13,6 @@ #include #include #include -#include #include "rtcp.h" diff --git a/src/rtp/rtcp.c b/src/rtp/rtcp.c index 620e3ab5a..17c3d20fb 100644 --- a/src/rtp/rtcp.c +++ b/src/rtp/rtcp.c @@ -11,7 +11,6 @@ #include #include #include -#include #include "rtcp.h" diff --git a/src/rtp/rtcp.h b/src/rtp/rtcp.h index 17861ff85..d85404866 100644 --- a/src/rtp/rtcp.h +++ b/src/rtp/rtcp.h @@ -47,8 +47,6 @@ struct rtp_source { uint32_t last_rtp_ts; /**< Last RTP timestamp */ uint32_t psent; /**< RTP packets sent */ uint32_t osent; /**< RTP octets sent */ - - mtx_t *lock; /**< Lock for this struct */ }; /** RTP Member */ @@ -73,8 +71,6 @@ void source_calc_jitter(struct rtp_source *s, uint32_t rtp_ts, uint32_t arrival); int source_calc_lost(const struct rtp_source *s); uint8_t source_calc_fraction_lost(struct rtp_source *s); -int source_lock(struct rtp_source *s); -int source_unlock(struct rtp_source *s); /* RR (Reception report) */ int rtcp_rr_alloc(struct rtcp_rr **rrp, size_t count); diff --git a/src/rtp/rtp.c b/src/rtp/rtp.c index 16b87ff11..76d73cf5f 100644 --- a/src/rtp/rtp.c +++ b/src/rtp/rtp.c @@ -13,7 +13,6 @@ #include #include #include -#include #include #include #include "rtcp.h" diff --git a/src/rtp/sdes.c b/src/rtp/sdes.c index b7bb7eb88..711d8a908 100644 --- a/src/rtp/sdes.c +++ b/src/rtp/sdes.c @@ -11,7 +11,6 @@ #include #include #include -#include #include "rtcp.h" diff --git a/src/rtp/sess.c b/src/rtp/sess.c index 1b9e84c6f..4d03ae0a8 100644 --- a/src/rtp/sess.c +++ b/src/rtp/sess.c @@ -56,9 +56,9 @@ struct rtcp_sess { uint32_t srate_tx; /**< Transmit sampling rate */ uint32_t srate_rx; /**< Receive sampling rate */ uint32_t interval; /**< RTCP interval in [ms] */ + mtx_t *lock; /**< Lock for rtcp_sess */ /* stats */ - mtx_t *lock; /**< Lock for txstat */ struct txstat txstat; /**< Local transmit statistics */ }; @@ -84,14 +84,6 @@ static void sess_destructor(void *data) } -static void source_destructor(void *data) -{ - struct rtp_source *s = data; - - mem_deref(s->lock); -} - - static struct rtp_member *get_member(struct rtcp_sess *sess, uint32_t src) { struct rtp_member *mbr; @@ -186,7 +178,6 @@ static void handle_incoming_sr(struct rtcp_sess *sess, if (mbr->s) { /* Save time when SR was received */ - source_lock(mbr->s); mbr->s->sr_recv = tmr_jiffies(); /* Save NTP timestamp from SR */ @@ -195,7 +186,6 @@ static void handle_incoming_sr(struct rtcp_sess *sess, mbr->s->rtp_ts = msg->r.sr.rtp_ts; mbr->s->psent = msg->r.sr.psent; mbr->s->osent = msg->r.sr.osent; - source_unlock(mbr->s); } for (i=0; ihdr.count; i++) @@ -407,14 +397,12 @@ static bool sender_apply_handler(struct le *le, void *arg) /* Initialise the members */ rr.ssrc = mbr->src; - source_lock(s); rr.fraction = source_calc_fraction_lost(s); rr.lost = source_calc_lost(s); rr.last_seq = s->cycles | s->max_seq; rr.jitter = s->jitter >> 4; rr.lsr = calc_lsr(&s->last_sr); rr.dlsr = calc_dlsr(s->sr_recv); - source_unlock(s); return 0 != rtcp_rr_encode(mb, &rr); } @@ -590,75 +578,48 @@ void rtcp_sess_rx_rtp(struct rtcp_sess *sess, struct rtp_header *hdr, mtx_lock(sess->lock); mbr = get_member(sess, hdr->ssrc); - mtx_unlock(sess->lock); if (!mbr) { DEBUG_NOTICE("could not add member: 0x%08x\n", hdr->ssrc); - return; + goto out; } - mtx_lock(sess->lock); if (!mbr->s) { - int err; - - mbr->s = mem_zalloc(sizeof(*mbr->s), source_destructor); - if (!mbr->s) - err = ENOMEM; - else - err = mutex_alloc(&mbr->s->lock); - - mtx_unlock(sess->lock); - if (err) { + mbr->s = mem_zalloc(sizeof(*mbr->s), NULL); + if (!mbr->s) { DEBUG_NOTICE("could not add sender: 0x%08x\n", hdr->ssrc); - mbr->s = mem_deref(mbr->s); - return; + goto out; } /* first packet - init sequence number */ - source_lock(mbr->s); source_init_seq(mbr->s, hdr->seq); /* probation not used */ sa_cpy(&mbr->s->rtp_peer, peer); - source_unlock(mbr->s); - mtx_lock(sess->lock); ++sess->senderc; - mtx_unlock(sess->lock); - } - else { - mtx_unlock(sess->lock); } - source_lock(mbr->s); if (!source_update_seq(mbr->s, hdr->seq)) { DEBUG_WARNING("rtp_update_seq() returned 0\n"); } - source_unlock(mbr->s); - mtx_lock(sess->lock); if (sess->srate_rx) { /* Convert from wall-clock time to timestamp units */ hdr->ts_arrive = tmr_jiffies() * sess->srate_rx / 1000; - mtx_unlock(sess->lock); /* * Calculate jitter only when the timestamp is different than * last packet (see RTP FAQ * https://www.cs.columbia.edu/~hgs/rtp/faq.html#jitter). */ - source_lock(mbr->s); if (hdr->ts != mbr->s->last_rtp_ts) source_calc_jitter(mbr->s, hdr->ts, (uint32_t)hdr->ts_arrive); - source_unlock(mbr->s); - } - else { - mtx_unlock(sess->lock); } - source_lock(mbr->s); mbr->s->last_rtp_ts = hdr->ts; mbr->s->rtp_rx_bytes += payload_size; - source_unlock(mbr->s); +out: + mtx_unlock(sess->lock); } @@ -675,17 +636,19 @@ int rtcp_stats(struct rtp_sock *rs, uint32_t ssrc, struct rtcp_stats *stats) { const struct rtcp_sess *sess = rtp_rtcp_sess(rs); struct rtp_member *mbr; + int err = 0; if (!sess || !stats) return EINVAL; + mtx_lock(sess->lock); mbr = member_find(sess->members, ssrc); - if (!mbr) - return ENOENT; + if (!mbr) { + err = ENOENT; + goto out; + } - mtx_lock(sess->lock); stats->tx.sent = sess->txstat.psent; - mtx_unlock(sess->lock); stats->tx.lost = mbr->cum_lost; stats->tx.jit = mbr->jit; @@ -694,52 +657,36 @@ int rtcp_stats(struct rtp_sock *rs, uint32_t ssrc, struct rtcp_stats *stats) if (!mbr->s) { memset(&stats->rx, 0, sizeof(stats->rx)); - return 0; + goto out; } - source_lock(mbr->s); stats->rx.sent = mbr->s->received; stats->rx.lost = source_calc_lost(mbr->s); stats->rx.jit = sess->srate_rx ? 1000000 * (mbr->s->jitter>>4) / sess->srate_rx : 0; - source_unlock(mbr->s); - return 0; +out: + mtx_unlock(sess->lock); + return err; } static bool debug_handler(struct le *le, void *arg) { const struct rtp_member *mbr = le->data; - struct re_printf *pf = arg; - struct mbuf *mb = NULL; + struct mbuf *mb = arg; int err; - err = re_hprintf(pf, " member 0x%08x: lost=%d Jitter=%.1fms" + err = mbuf_printf(mb, " member 0x%08x: lost=%d Jitter=%.1fms" " RTT=%.1fms\n", mbr->src, mbr->cum_lost, (double)mbr->jit/1000, (double)mbr->rtt/1000); - if (err) - return true; - if (mbr->s) { - mb = mbuf_alloc(64); - if (!mb) - return true; - - source_lock(mbr->s); - err = mbuf_printf(mb, - " IP=%J psent=%u rcvd=%u\n", - &mbr->s->rtp_peer, mbr->s->psent, - mbr->s->received); - source_unlock(mbr->s); - if (err) - goto out; - - re_hprintf(pf, "%b", mb->buf, mb->pos); + err |= mbuf_printf(mb, + " IP=%J psent=%u rcvd=%u\n", + &mbr->s->rtp_peer, mbr->s->psent, + mbr->s->received); } -out: - mem_deref(mb); return err != 0; } @@ -755,23 +702,35 @@ static bool debug_handler(struct le *le, void *arg) int rtcp_debug(struct re_printf *pf, const struct rtp_sock *rs) { const struct rtcp_sess *sess = rtp_rtcp_sess(rs); + struct mbuf *mb; int err = 0; if (!sess) return 0; - err |= re_hprintf(pf, "----- RTCP Session: -----\n"); + mb = mbuf_alloc(64); + if (!mb) + return ENOMEM; + + err |= mbuf_printf(mb, "----- RTCP Session: -----\n"); mtx_lock(sess->lock); - err |= re_hprintf(pf, " cname=%s SSRC=0x%08x/%u rx=%uHz\n", + err |= mbuf_printf(mb, " cname=%s SSRC=0x%08x/%u rx=%uHz\n", sess->cname, rtp_sess_ssrc(sess->rs), rtp_sess_ssrc(sess->rs), sess->srate_rx); - hash_apply(sess->members, debug_handler, pf); + hash_apply(sess->members, debug_handler, mb); - err |= re_hprintf(pf, " TX: packets=%u, octets=%u\n", + err |= mbuf_printf(mb, " TX: packets=%u, octets=%u\n", sess->txstat.psent, sess->txstat.osent); mtx_unlock(sess->lock); + if (err) + goto out; + + err = re_hprintf(pf, "%b", mb->buf, mb->pos); + +out: + mem_deref(mb); return err; } diff --git a/src/rtp/source.c b/src/rtp/source.c index 996623242..c6a324229 100644 --- a/src/rtp/source.c +++ b/src/rtp/source.c @@ -11,7 +11,6 @@ #include #include #include -#include #include #include "rtcp.h" @@ -176,15 +175,3 @@ uint8_t source_calc_fraction_lost(struct rtp_source *s) return fraction; } - - -int source_lock(struct rtp_source *s) -{ - return mtx_lock(s->lock); -} - - -int source_unlock(struct rtp_source *s) -{ - return mtx_unlock(s->lock); -}