Skip to content

Commit

Permalink
Clear output buffer when resuming audiobridge participants and clear …
Browse files Browse the repository at this point in the history
…inbound queues in more scenarios.
  • Loading branch information
atoppi committed Nov 15, 2023
1 parent 454d614 commit e274a93
Showing 1 changed file with 52 additions and 106 deletions.
158 changes: 52 additions & 106 deletions plugins/janus_audiobridge.c
Original file line number Diff line number Diff line change
Expand Up @@ -1699,6 +1699,35 @@ static void janus_audiobridge_buffer_packet_destroy(janus_audiobridge_buffer_pac
g_free(pkt);
}

static void janus_audiobridge_participant_clear_jitter_buffer(janus_audiobridge_participant *participant) {
if(participant->jitter) {
jitter_buffer_reset(participant->jitter);
}
}

static void janus_audiobridge_participant_clear_inbuf(janus_audiobridge_participant *participant) {
while(participant->inbuf) {
GList *first = g_list_first(participant->inbuf);
janus_audiobridge_rtp_relay_packet *pkt = (janus_audiobridge_rtp_relay_packet *)first->data;
participant->inbuf = g_list_delete_link(participant->inbuf, first);
first = NULL;
if(pkt == NULL)
continue;
g_free(pkt->data);
pkt->data = NULL;
g_free(pkt);
pkt = NULL;
}
}

static void janus_audiobridge_participant_clear_outbuf(janus_audiobridge_participant *participant) {
while(participant->outbuf && g_async_queue_length(participant->outbuf) > 0) {
janus_audiobridge_rtp_relay_packet *pkt = g_async_queue_pop(participant->outbuf);
g_free(pkt->data);
g_free(pkt);
}
}

static void janus_audiobridge_participant_destroy(janus_audiobridge_participant *participant) {
if(!participant)
return;
Expand Down Expand Up @@ -1726,20 +1755,9 @@ static void janus_audiobridge_participant_free(const janus_refcount *participant
opus_decoder_destroy(participant->decoder);
if(participant->jitter)
jitter_buffer_destroy(participant->jitter);
while(participant->inbuf) {
GList *first = g_list_first(participant->inbuf);
janus_audiobridge_rtp_relay_packet *pkt = (janus_audiobridge_rtp_relay_packet *)first->data;
participant->inbuf = g_list_delete_link(participant->inbuf, first);
if(pkt)
g_free(pkt->data);
g_free(pkt);
}
janus_audiobridge_participant_clear_inbuf(participant);
if(participant->outbuf != NULL) {
while(g_async_queue_length(participant->outbuf) > 0) {
janus_audiobridge_rtp_relay_packet *pkt = g_async_queue_pop(participant->outbuf);
g_free(pkt->data);
g_free(pkt);
}
janus_audiobridge_participant_clear_outbuf(participant);
g_async_queue_unref(participant->outbuf);
}
g_free(participant->mjr_base);
Expand Down Expand Up @@ -3743,19 +3761,10 @@ static json_t *janus_audiobridge_process_synchronous_request(janus_audiobridge_s
/* Get rid of queued packets */
janus_mutex_lock(&p->qmutex);
g_atomic_int_set(&p->active, 0);
while(p->inbuf) {
GList *first = g_list_first(p->inbuf);
janus_audiobridge_rtp_relay_packet *pkt = (janus_audiobridge_rtp_relay_packet *)first->data;
p->inbuf = g_list_delete_link(p->inbuf, first);
first = NULL;
if(pkt == NULL)
continue;
g_free(pkt->data);
pkt->data = NULL;
g_free(pkt);
pkt = NULL;
}
janus_audiobridge_participant_clear_jitter_buffer(p);
janus_audiobridge_participant_clear_inbuf(p);
janus_mutex_unlock(&p->qmutex);
janus_audiobridge_participant_clear_outbuf(p);
/* Request a WebRTC hangup */
gateway->close_pc(p->session->handle);
}
Expand Down Expand Up @@ -4202,20 +4211,8 @@ static json_t *janus_audiobridge_process_synchronous_request(janus_audiobridge_s
participant->muted ? "true" : "false", participant->room->room_id_str, participant->user_id_str);
/* Clear the queued packets waiting to be handled */
janus_mutex_lock(&participant->qmutex);
if(participant->jitter)
jitter_buffer_reset(participant->jitter);
while(participant->inbuf) {
GList *first = g_list_first(participant->inbuf);
janus_audiobridge_rtp_relay_packet *pkt = (janus_audiobridge_rtp_relay_packet *)first->data;
participant->inbuf = g_list_delete_link(participant->inbuf, first);
first = NULL;
if(pkt == NULL)
continue;
g_free(pkt->data);
pkt->data = NULL;
g_free(pkt);
pkt = NULL;
}
janus_audiobridge_participant_clear_jitter_buffer(participant);
janus_audiobridge_participant_clear_inbuf(participant);
janus_mutex_unlock(&participant->qmutex);
}

Expand Down Expand Up @@ -5423,23 +5420,12 @@ static json_t *janus_audiobridge_process_synchronous_request(janus_audiobridge_s
if(pauseevs && json_is_true(pauseevs))
g_atomic_int_set(&participant->paused_events, 1);
notify_participant = TRUE;
/* Participant is now muted, so clear the queued packets waiting to be handled */
/* Participant is now suspended, so clear the queued packets waiting to be handled */
janus_mutex_lock(&participant->qmutex);
if(participant->jitter)
jitter_buffer_reset(participant->jitter);
while(participant->inbuf) {
GList *first = g_list_first(participant->inbuf);
janus_audiobridge_rtp_relay_packet *pkt = (janus_audiobridge_rtp_relay_packet *)first->data;
participant->inbuf = g_list_delete_link(participant->inbuf, first);
first = NULL;
if(pkt == NULL)
continue;
g_free(pkt->data);
pkt->data = NULL;
g_free(pkt);
pkt = NULL;
}
janus_audiobridge_participant_clear_jitter_buffer(participant);
janus_audiobridge_participant_clear_inbuf(participant);
janus_mutex_unlock(&participant->qmutex);
janus_audiobridge_participant_clear_outbuf(participant);
/* Should we close the recording? */
json_t *stoprec = json_object_get(root, "stop_record");
if(stoprec && json_is_true(stoprec)) {
Expand Down Expand Up @@ -5916,7 +5902,7 @@ void janus_audiobridge_incoming_rtp(janus_plugin_session *handle, janus_plugin_r
jitter_buffer_ctl(participant->jitter, JITTER_BUFFER_GET_AVALIABLE_COUNT, &count);
if(count > JITTER_BUFFER_MAX_PACKETS) {
JANUS_LOG(LOG_WARN, "Jitter buffer contains too many packets, clearing now (count=%d)\n", count);
jitter_buffer_reset(participant->jitter);
janus_audiobridge_participant_clear_jitter_buffer(participant);
}
/* Schedule next check */
participant->jitter_next_check = now + JITTER_BUFFER_CHECK_USECS;
Expand Down Expand Up @@ -6082,21 +6068,10 @@ static void janus_audiobridge_hangup_media_internal(janus_plugin_session *handle
g_free(participant->mjr_base);
participant->mjr_base = NULL;
/* Get rid of queued packets */
if(participant->jitter)
jitter_buffer_reset(participant->jitter);
while(participant->inbuf) {
GList *first = g_list_first(participant->inbuf);
janus_audiobridge_rtp_relay_packet *pkt = (janus_audiobridge_rtp_relay_packet *)first->data;
participant->inbuf = g_list_delete_link(participant->inbuf, first);
first = NULL;
if(pkt == NULL)
continue;
g_free(pkt->data);
pkt->data = NULL;
g_free(pkt);
pkt = NULL;
}
janus_audiobridge_participant_clear_jitter_buffer(participant);
janus_audiobridge_participant_clear_inbuf(participant);
janus_mutex_unlock(&participant->qmutex);
janus_audiobridge_participant_clear_outbuf(participant);
janus_mutex_lock(&participant->suspend_cond_mutex);
if(g_atomic_int_compare_and_exchange(&participant->suspended, 1, 0))
g_cond_signal(&participant->suspend_cond);
Expand Down Expand Up @@ -6883,21 +6858,8 @@ static void *janus_audiobridge_handler(void *data) {
if(participant->muted) {
/* Clear the queued packets waiting to be handled */
janus_mutex_lock(&participant->qmutex);
if(participant->jitter)
jitter_buffer_reset(participant->jitter);
while(participant->inbuf) {
GList *first = g_list_first(participant->inbuf);
janus_audiobridge_rtp_relay_packet *pkt = (janus_audiobridge_rtp_relay_packet *)first->data;
participant->inbuf = g_list_delete_link(participant->inbuf, first);
first = NULL;
if(pkt == NULL)
continue;
if(pkt->data)
g_free(pkt->data);
pkt->data = NULL;
g_free(pkt);
pkt = NULL;
}
janus_audiobridge_participant_clear_jitter_buffer(participant);
janus_audiobridge_participant_clear_inbuf(participant);
janus_mutex_unlock(&participant->qmutex);
}
}
Expand Down Expand Up @@ -7521,21 +7483,10 @@ static void *janus_audiobridge_handler(void *data) {
/* Get rid of queued packets */
janus_mutex_lock(&participant->qmutex);
g_atomic_int_set(&participant->active, 0);
if(participant->jitter)
jitter_buffer_reset(participant->jitter);
while(participant->inbuf) {
GList *first = g_list_first(participant->inbuf);
janus_audiobridge_rtp_relay_packet *pkt = (janus_audiobridge_rtp_relay_packet *)first->data;
participant->inbuf = g_list_delete_link(participant->inbuf, first);
first = NULL;
if(pkt == NULL)
continue;
g_free(pkt->data);
pkt->data = NULL;
g_free(pkt);
pkt = NULL;
}
janus_audiobridge_participant_clear_jitter_buffer(participant);
janus_audiobridge_participant_clear_inbuf(participant);
janus_mutex_unlock(&participant->qmutex);
janus_audiobridge_participant_clear_outbuf(participant);
/* Stop recording, if we were */
janus_mutex_lock(&participant->rec_mutex);
janus_audiobridge_recorder_close(participant);
Expand Down Expand Up @@ -8614,6 +8565,8 @@ static void *janus_audiobridge_participant_thread(void *data) {
before = janus_get_monotonic_time();
participant->context.a_seq_reset = TRUE;
first = TRUE;
/* Clear the output queue since it might contain old packets and break RTP sequence */
janus_audiobridge_participant_clear_outbuf(participant);
}
janus_mutex_unlock(&participant->suspend_cond_mutex);
/* Start with packets to decode and queue for the mixer */
Expand Down Expand Up @@ -8730,14 +8683,7 @@ static void *janus_audiobridge_participant_thread(void *data) {
guint count = g_list_length(participant->inbuf);
if(count > QUEUE_IN_MAX_PACKETS) {
JANUS_LOG(LOG_WARN, "Participant queue-in contains too many packets, clearing now (count=%u)\n", count);
while(participant->inbuf) {
GList *first = g_list_first(participant->inbuf);
janus_audiobridge_rtp_relay_packet *pkt = (janus_audiobridge_rtp_relay_packet *)first->data;
participant->inbuf = g_list_delete_link(participant->inbuf, first);
if(pkt)
g_free(pkt->data);
g_free(pkt);
}
janus_audiobridge_participant_clear_inbuf(participant);
}
participant->inbuf = g_list_append(participant->inbuf, pkt);
}
Expand Down

0 comments on commit e274a93

Please sign in to comment.