Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rtp: lock more fields from rtcp_sess #1039

Merged
merged 2 commits into from
Jan 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/rtp/fb.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
#include <re_sys.h>
#include <re_sa.h>
#include <re_rtp.h>
#include <re_thread.h>
#include "rtcp.h"


Expand Down
1 change: 0 additions & 1 deletion src/rtp/member.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
#include <re_hash.h>
#include <re_sa.h>
#include <re_rtp.h>
#include <re_thread.h>
#include "rtcp.h"


Expand Down
1 change: 0 additions & 1 deletion src/rtp/pkt.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
#include <re_sys.h>
#include <re_sa.h>
#include <re_rtp.h>
#include <re_thread.h>
#include "rtcp.h"


Expand Down
1 change: 0 additions & 1 deletion src/rtp/rr.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
#include <re_sys.h>
#include <re_net.h>
#include <re_rtp.h>
#include <re_thread.h>
#include "rtcp.h"


Expand Down
1 change: 0 additions & 1 deletion src/rtp/rtcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
#include <re_list.h>
#include <re_sa.h>
#include <re_rtp.h>
#include <re_thread.h>
#include "rtcp.h"


Expand Down
4 changes: 0 additions & 4 deletions src/rtp/rtcp.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ struct rtp_source {
uint32_t last_rtp_ts; /**< Last RTP timestamp */
uint32_t psent; /**< RTP packets sent */
uint32_t osent; /**< RTP octets sent */

mtx_t *lock; /**< Lock for this struct */
};

/** RTP Member */
Expand All @@ -73,8 +71,6 @@ void source_calc_jitter(struct rtp_source *s, uint32_t rtp_ts,
uint32_t arrival);
int source_calc_lost(const struct rtp_source *s);
uint8_t source_calc_fraction_lost(struct rtp_source *s);
int source_lock(struct rtp_source *s);
int source_unlock(struct rtp_source *s);

/* RR (Reception report) */
int rtcp_rr_alloc(struct rtcp_rr **rrp, size_t count);
Expand Down
1 change: 0 additions & 1 deletion src/rtp/rtp.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
#include <re_sys.h>
#include <re_net.h>
#include <re_udp.h>
#include <re_thread.h>
#include <re_rtp.h>
#include <re_atomic.h>
#include "rtcp.h"
Expand Down
1 change: 0 additions & 1 deletion src/rtp/sdes.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
#include <re_list.h>
#include <re_sa.h>
#include <re_rtp.h>
#include <re_thread.h>
#include "rtcp.h"


Expand Down
110 changes: 46 additions & 64 deletions src/rtp/sess.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ struct rtcp_sess {
uint32_t srate_tx; /**< Transmit sampling rate */
uint32_t srate_rx; /**< Receive sampling rate */
uint32_t interval; /**< RTCP interval in [ms] */
mtx_t *lock; /**< Lock for rtcp_sess */

/* stats */
mtx_t *lock; /**< Lock for txstat */
struct txstat txstat; /**< Local transmit statistics */
};

Expand All @@ -84,14 +84,6 @@ static void sess_destructor(void *data)
}


static void source_destructor(void *data)
{
struct rtp_source *s = data;

mem_deref(s->lock);
}


static struct rtp_member *get_member(struct rtcp_sess *sess, uint32_t src)
{
struct rtp_member *mbr;
Expand Down Expand Up @@ -186,7 +178,6 @@ static void handle_incoming_sr(struct rtcp_sess *sess,

if (mbr->s) {
/* Save time when SR was received */
source_lock(mbr->s);
mbr->s->sr_recv = tmr_jiffies();

/* Save NTP timestamp from SR */
Expand All @@ -195,7 +186,6 @@ static void handle_incoming_sr(struct rtcp_sess *sess,
mbr->s->rtp_ts = msg->r.sr.rtp_ts;
mbr->s->psent = msg->r.sr.psent;
mbr->s->osent = msg->r.sr.osent;
source_unlock(mbr->s);
}

for (i=0; i<msg->hdr.count; i++)
Expand Down Expand Up @@ -229,6 +219,7 @@ void rtcp_handler(struct rtcp_sess *sess, struct rtcp_msg *msg)
if (!sess || !msg)
return;

mtx_lock(sess->lock);
switch (msg->hdr.pt) {

case RTCP_SR:
Expand All @@ -246,6 +237,8 @@ void rtcp_handler(struct rtcp_sess *sess, struct rtcp_msg *msg)
default:
break;
}

mtx_unlock(sess->lock);
}


Expand Down Expand Up @@ -357,8 +350,10 @@ int rtcp_enable(struct rtcp_sess *sess, bool enabled, const char *cname)
if (!sess)
return EINVAL;

mtx_lock(sess->lock);
sess->cname = mem_deref(sess->cname);
err = str_dup(&sess->cname, cname);
mtx_unlock(sess->lock);
if (err)
return err;

Expand Down Expand Up @@ -402,14 +397,12 @@ static bool sender_apply_handler(struct le *le, void *arg)

/* Initialise the members */
rr.ssrc = mbr->src;
source_lock(s);
rr.fraction = source_calc_fraction_lost(s);
rr.lost = source_calc_lost(s);
rr.last_seq = s->cycles | s->max_seq;
rr.jitter = s->jitter >> 4;
rr.lsr = calc_lsr(&s->last_sr);
rr.dlsr = calc_dlsr(s->sr_recv);
source_unlock(s);

return 0 != rtcp_rr_encode(mb, &rr);
}
Expand Down Expand Up @@ -583,38 +576,28 @@ void rtcp_sess_rx_rtp(struct rtcp_sess *sess, struct rtp_header *hdr,
if (!sess)
return;

mtx_lock(sess->lock);
mbr = get_member(sess, hdr->ssrc);
if (!mbr) {
DEBUG_NOTICE("could not add member: 0x%08x\n", hdr->ssrc);
return;
goto out;
}

if (!mbr->s) {
int err;

mbr->s = mem_zalloc(sizeof(*mbr->s), source_destructor);
if (!mbr->s)
err = ENOMEM;
else
err = mutex_alloc(&mbr->s->lock);

if (err) {
mbr->s = mem_zalloc(sizeof(*mbr->s), NULL);
if (!mbr->s) {
DEBUG_NOTICE("could not add sender: 0x%08x\n",
hdr->ssrc);
mbr->s = mem_deref(mbr->s);
return;
goto out;
}

/* first packet - init sequence number */
source_lock(mbr->s);
source_init_seq(mbr->s, hdr->seq);
/* probation not used */
sa_cpy(&mbr->s->rtp_peer, peer);
source_unlock(mbr->s);
++sess->senderc;
}

source_lock(mbr->s);
if (!source_update_seq(mbr->s, hdr->seq)) {
DEBUG_WARNING("rtp_update_seq() returned 0\n");
}
Expand All @@ -635,7 +618,8 @@ void rtcp_sess_rx_rtp(struct rtcp_sess *sess, struct rtp_header *hdr,

mbr->s->last_rtp_ts = hdr->ts;
mbr->s->rtp_rx_bytes += payload_size;
source_unlock(mbr->s);
out:
mtx_unlock(sess->lock);
}


Expand All @@ -652,17 +636,19 @@ int rtcp_stats(struct rtp_sock *rs, uint32_t ssrc, struct rtcp_stats *stats)
{
const struct rtcp_sess *sess = rtp_rtcp_sess(rs);
struct rtp_member *mbr;
int err = 0;

if (!sess || !stats)
return EINVAL;

mtx_lock(sess->lock);
mbr = member_find(sess->members, ssrc);
if (!mbr)
return ENOENT;
if (!mbr) {
err = ENOENT;
goto out;
}

mtx_lock(sess->lock);
stats->tx.sent = sess->txstat.psent;
mtx_unlock(sess->lock);

stats->tx.lost = mbr->cum_lost;
stats->tx.jit = mbr->jit;
Expand All @@ -671,52 +657,36 @@ int rtcp_stats(struct rtp_sock *rs, uint32_t ssrc, struct rtcp_stats *stats)

if (!mbr->s) {
memset(&stats->rx, 0, sizeof(stats->rx));
return 0;
goto out;
}

source_lock(mbr->s);
stats->rx.sent = mbr->s->received;
stats->rx.lost = source_calc_lost(mbr->s);
stats->rx.jit = sess->srate_rx ?
1000000 * (mbr->s->jitter>>4) / sess->srate_rx : 0;
source_unlock(mbr->s);

return 0;
out:
mtx_unlock(sess->lock);
return err;
}


static bool debug_handler(struct le *le, void *arg)
{
const struct rtp_member *mbr = le->data;
struct re_printf *pf = arg;
struct mbuf *mb = NULL;
struct mbuf *mb = arg;
int err;

err = re_hprintf(pf, " member 0x%08x: lost=%d Jitter=%.1fms"
err = mbuf_printf(mb, " member 0x%08x: lost=%d Jitter=%.1fms"
" RTT=%.1fms\n", mbr->src, mbr->cum_lost,
(double)mbr->jit/1000, (double)mbr->rtt/1000);
if (err)
return true;

if (mbr->s) {
mb = mbuf_alloc(64);
if (!mb)
return true;

source_lock(mbr->s);
err = mbuf_printf(mb,
" IP=%J psent=%u rcvd=%u\n",
&mbr->s->rtp_peer, mbr->s->psent,
mbr->s->received);
source_unlock(mbr->s);
if (err)
goto out;

re_hprintf(pf, "%b", mb->buf, mb->pos);
err |= mbuf_printf(mb,
" IP=%J psent=%u rcvd=%u\n",
&mbr->s->rtp_peer, mbr->s->psent,
mbr->s->received);
}

out:
mem_deref(mb);
return err != 0;
}

Expand All @@ -732,23 +702,35 @@ static bool debug_handler(struct le *le, void *arg)
int rtcp_debug(struct re_printf *pf, const struct rtp_sock *rs)
{
const struct rtcp_sess *sess = rtp_rtcp_sess(rs);
struct mbuf *mb;
int err = 0;

if (!sess)
return 0;

err |= re_hprintf(pf, "----- RTCP Session: -----\n");
err |= re_hprintf(pf, " cname=%s SSRC=0x%08x/%u rx=%uHz\n",
mb = mbuf_alloc(64);
if (!mb)
return ENOMEM;

err |= mbuf_printf(mb, "----- RTCP Session: -----\n");
mtx_lock(sess->lock);
err |= mbuf_printf(mb, " cname=%s SSRC=0x%08x/%u rx=%uHz\n",
sess->cname,
rtp_sess_ssrc(sess->rs), rtp_sess_ssrc(sess->rs),
sess->srate_rx);

hash_apply(sess->members, debug_handler, pf);
hash_apply(sess->members, debug_handler, mb);

mtx_lock(sess->lock);
err |= re_hprintf(pf, " TX: packets=%u, octets=%u\n",
err |= mbuf_printf(mb, " TX: packets=%u, octets=%u\n",
sess->txstat.psent, sess->txstat.osent);
mtx_unlock(sess->lock);

if (err)
goto out;

err = re_hprintf(pf, "%b", mb->buf, mb->pos);

out:
mem_deref(mb);
return err;
}
13 changes: 0 additions & 13 deletions src/rtp/source.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
#include <re_list.h>
#include <re_hash.h>
#include <re_sa.h>
#include <re_thread.h>
#include <re_rtp.h>
#include "rtcp.h"

Expand Down Expand Up @@ -176,15 +175,3 @@ uint8_t source_calc_fraction_lost(struct rtp_source *s)

return fraction;
}


int source_lock(struct rtp_source *s)
{
return mtx_lock(s->lock);
}


int source_unlock(struct rtp_source *s)
{
return mtx_unlock(s->lock);
}
Loading