Skip to content

Commit

Permalink
Merge branch 'sipwise:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
lbalaceanu authored Mar 4, 2024
2 parents 2d86b06 + 527225a commit 895c4b6
Show file tree
Hide file tree
Showing 15 changed files with 150 additions and 213 deletions.
1 change: 1 addition & 0 deletions daemon/call.c
Original file line number Diff line number Diff line change
Expand Up @@ -3931,6 +3931,7 @@ static call_t *call_create(const str *callid) {
c->created = rtpe_now;
c->dtls_cert = dtls_cert();
c->tos = rtpe_config.default_tos;
c->poller = rtpe_get_poller();
if (rtpe_config.cpu_affinity)
c->cpu_affinity = call_socket_cpu_affinity++ % rtpe_config.cpu_affinity;
else
Expand Down
149 changes: 72 additions & 77 deletions daemon/codec.c
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@ struct dtx_packet;
TYPED_GQUEUE(dtx_packet, struct dtx_packet)


typedef enum {
TCC_ERR = -1,
TCC_OK = 0, // not consumed, must be freed
TCC_CONSUMED = 1, // ok, don't free
} tc_code;

struct dtx_buffer {
struct codec_timer ct;
mutex_t lock;
Expand All @@ -129,7 +135,7 @@ struct dtx_packet {
struct media_packet mp;
struct codec_ssrc_handler *decoder_handler; // holds reference
struct codec_ssrc_handler *input_handler; // holds reference
int (*dtx_func)(struct codec_ssrc_handler *ch, struct codec_ssrc_handler *input_ch,
tc_code (*dtx_func)(struct codec_ssrc_handler *ch, struct codec_ssrc_handler *input_ch,
struct transcode_packet *packet, struct media_packet *mp);
};

Expand Down Expand Up @@ -218,7 +224,7 @@ struct transcode_packet {
struct codec_handler *handler;
unsigned int marker:1,
bypass_seq:1;
int (*packet_func)(struct codec_ssrc_handler *, struct codec_ssrc_handler *, struct transcode_packet *,
tc_code (*packet_func)(struct codec_ssrc_handler *, struct codec_ssrc_handler *, struct transcode_packet *,
struct media_packet *);
int (*dup_func)(struct codec_ssrc_handler *, struct codec_ssrc_handler *, struct transcode_packet *,
struct media_packet *);
Expand Down Expand Up @@ -253,7 +259,7 @@ static void __free_ssrc_handler(void *);

static void __transcode_packet_free(struct transcode_packet *);

static int packet_decode(struct codec_ssrc_handler *, struct codec_ssrc_handler *,
static tc_code packet_decode(struct codec_ssrc_handler *, struct codec_ssrc_handler *,
struct transcode_packet *, struct media_packet *);
static int packet_encoded_rtp(encoder_t *enc, void *u1, void *u2);
static int packet_decoded_fifo(decoder_t *decoder, AVFrame *frame, void *u1, void *u2);
Expand All @@ -262,10 +268,10 @@ static int packet_decoded_audio_player(decoder_t *decoder, AVFrame *frame, void

static void codec_touched(struct codec_store *cs, rtp_payload_type *pt);

static int __buffer_dtx(struct dtx_buffer *dtxb, struct codec_ssrc_handler *ch,
static bool __buffer_dtx(struct dtx_buffer *dtxb, struct codec_ssrc_handler *ch,
struct codec_ssrc_handler *input_handler,
struct transcode_packet *packet, struct media_packet *mp,
int (*dtx_func)(struct codec_ssrc_handler *ch, struct codec_ssrc_handler *input_ch,
tc_code (*dtx_func)(struct codec_ssrc_handler *ch, struct codec_ssrc_handler *input_ch,
struct transcode_packet *packet,
struct media_packet *mp));
static void __dtx_shutdown(struct dtx_buffer *dtxb);
Expand All @@ -277,7 +283,7 @@ static void __delay_buffer_setup(struct delay_buffer **dbufp,
struct codec_handler *h, call_t *call, unsigned int delay);
static void __delay_buffer_shutdown(struct delay_buffer *dbuf, bool);
static void delay_buffer_stop(struct delay_buffer **pcmbp);
static int __buffer_delay_packet(struct delay_buffer *dbuf,
static tc_code __buffer_delay_packet(struct delay_buffer *dbuf,
struct codec_ssrc_handler *ch,
struct codec_ssrc_handler *input_ch,
struct transcode_packet *packet,
Expand Down Expand Up @@ -1818,8 +1824,8 @@ static int __handler_func_sequencer(struct media_packet *mp, struct transcode_pa
if (packet->bypass_seq) {
// bypass sequencer
__ssrc_lock_both(mp);
int ret = packet->packet_func(ch, input_ch ?: ch, packet, mp);
if (ret != 1)
tc_code code = packet->packet_func(ch, input_ch ?: ch, packet, mp);
if (code != TCC_CONSUMED)
__transcode_packet_free(packet);
goto out;
}
Expand Down Expand Up @@ -1866,7 +1872,7 @@ static int __handler_func_sequencer(struct media_packet *mp, struct transcode_pa
// got a new packet, run decoder

while (1) {
int func_ret = 0;
tc_code func_ret = TCC_OK;

packet = packet_sequencer_next_packet(seq);
if (G_UNLIKELY(!packet)) {
Expand Down Expand Up @@ -1947,10 +1953,10 @@ static int __handler_func_sequencer(struct media_packet *mp, struct transcode_pa
mp->rtp = &packet->rtp;

func_ret = packet->packet_func(ch, input_ch, packet, mp);
if (func_ret < 0)
if (func_ret == TCC_ERR)
ilogs(transcoding, LOG_WARN | LOG_FLAG_LIMIT, "Decoder error while processing RTP packet");
next:
if (func_ret != 1)
if (func_ret != TCC_CONSUMED)
__transcode_packet_free(packet);
}

Expand Down Expand Up @@ -2168,8 +2174,7 @@ static int codec_add_dtmf_packet(struct codec_ssrc_handler *ch, struct codec_ssr
}

// forwards DTMF input to DTMF output, plus rescaling duration
// returns 1 if packet has been consumed
static int packet_dtmf_fwd(struct codec_ssrc_handler *ch, struct codec_ssrc_handler *input_ch,
static tc_code packet_dtmf_fwd(struct codec_ssrc_handler *ch, struct codec_ssrc_handler *input_ch,
struct transcode_packet *packet,
struct media_packet *mp)
{
Expand All @@ -2178,7 +2183,7 @@ static int packet_dtmf_fwd(struct codec_ssrc_handler *ch, struct codec_ssrc_hand
struct codec_handler *h = ch->handler;
struct codec_handler *input_h = input_ch->handler;

int ret = __buffer_delay_packet(input_h->delay_buffer, ch, input_ch, packet, ts_delay, payload_type,
tc_code ret = __buffer_delay_packet(input_h->delay_buffer, ch, input_ch, packet, ts_delay, payload_type,
codec_add_dtmf_packet, mp, h->source_pt.clock_rate);
__buffer_delay_seq(input_h->delay_buffer, mp, -1);
return ret;
Expand Down Expand Up @@ -2230,12 +2235,12 @@ static int packet_dtmf_event(struct codec_ssrc_handler *ch, struct codec_ssrc_ha
return 0;
}

static int packet_dtmf(struct codec_ssrc_handler *ch, struct codec_ssrc_handler *input_ch,
static tc_code packet_dtmf(struct codec_ssrc_handler *ch, struct codec_ssrc_handler *input_ch,
struct transcode_packet *packet, struct media_packet *mp)
{
int dtmf_event_processed = packet_dtmf_event(ch, input_ch, packet, mp);
if (dtmf_event_processed == -1)
return -1;
return TCC_ERR;


enum block_dtmf_mode block_dtmf = dtmf_get_block_mode(mp->call, mp->media->monologue);
Expand Down Expand Up @@ -2282,14 +2287,14 @@ static int packet_dtmf(struct codec_ssrc_handler *ch, struct codec_ssrc_handler
if (is_dtmf == 1)
dup->marker = 1;

int ret = 0;
tc_code ret = TCC_OK;

if (__buffer_dtx(input_ch->dtx_buffer, ch, input_ch, dup, mp, packet_dtmf_fwd))
ret = 1; // consumed
ret = TCC_CONSUMED;
else
ret = packet_dtmf_fwd(ch, input_ch, dup, mp);

if (ret == 0)
if (ret != TCC_CONSUMED)
__transcode_packet_free(dup);
}

Expand All @@ -2303,27 +2308,27 @@ static int packet_dtmf(struct codec_ssrc_handler *ch, struct codec_ssrc_handler

}

int ret = 0;
tc_code ret = TCC_OK;

if (do_blocking)
{ }
else {
// pass through
if (__buffer_dtx(input_ch->dtx_buffer, ch, input_ch, packet, mp, packet_dtmf_fwd))
ret = 1; // consumed
ret = TCC_CONSUMED;
else
ret = packet_dtmf_fwd(ch, input_ch, packet, mp);
}

return ret;
}
static int packet_dtmf_dup(struct codec_ssrc_handler *ch, struct codec_ssrc_handler *input_ch,
static tc_code packet_dtmf_dup(struct codec_ssrc_handler *ch, struct codec_ssrc_handler *input_ch,
struct transcode_packet *packet,
struct media_packet *mp)
{
enum block_dtmf_mode block_dtmf = dtmf_get_block_mode(mp->call, mp->media->monologue);

int ret = 0;
tc_code ret = TCC_OK;

if (block_dtmf == BLOCK_DTMF_DROP)
{ }
Expand All @@ -2333,7 +2338,7 @@ static int packet_dtmf_dup(struct codec_ssrc_handler *ch, struct codec_ssrc_hand
}

static int __handler_func_supplemental(struct codec_handler *h, struct media_packet *mp,
int (*packet_func)(struct codec_ssrc_handler *, struct codec_ssrc_handler *,
tc_code (*packet_func)(struct codec_ssrc_handler *, struct codec_ssrc_handler *,
struct transcode_packet *, struct media_packet *),
int (*dup_func)(struct codec_ssrc_handler *, struct codec_ssrc_handler *,
struct transcode_packet *, struct media_packet *))
Expand Down Expand Up @@ -2796,8 +2801,7 @@ static void __buffer_delay_raw(struct delay_buffer *dbuf, struct codec_handler *
__delay_buffer_schedule(dbuf);
}

// returns 1 if packet has been consumed
static int __buffer_delay_packet(struct delay_buffer *dbuf,
static tc_code __buffer_delay_packet(struct delay_buffer *dbuf,
struct codec_ssrc_handler *ch,
struct codec_ssrc_handler *input_ch,
struct transcode_packet *packet,
Expand All @@ -2808,7 +2812,7 @@ static int __buffer_delay_packet(struct delay_buffer *dbuf,
if (__buffer_delay_do_direct(dbuf)) {
// direct passthrough
packet_func(ch, input_ch, packet, ts_delay, payload_type, mp);
return 0;
return TCC_OK;
}

struct delay_frame *dframe = g_slice_alloc0(sizeof(*dframe));
Expand All @@ -2828,7 +2832,7 @@ static int __buffer_delay_packet(struct delay_buffer *dbuf,

__delay_buffer_schedule(dbuf);

return 1;
return TCC_CONSUMED;
}

static void __buffer_delay_seq(struct delay_buffer *dbuf, struct media_packet *mp, int seq_adj) {
Expand All @@ -2852,17 +2856,17 @@ static void __buffer_delay_seq(struct delay_buffer *dbuf, struct media_packet *m
dframe->seq_adj += seq_adj;
}

// consumes `packet` if buffered (returns 1)
// consumes `packet` if buffered (returns true)
// `packet` can be NULL (discarded packet for seq tracking)
static int __buffer_dtx(struct dtx_buffer *dtxb, struct codec_ssrc_handler *decoder_handler,
static bool __buffer_dtx(struct dtx_buffer *dtxb, struct codec_ssrc_handler *decoder_handler,
struct codec_ssrc_handler *input_handler,
struct transcode_packet *packet, struct media_packet *mp,
int (*dtx_func)(struct codec_ssrc_handler *ch, struct codec_ssrc_handler *input_ch,
tc_code (*dtx_func)(struct codec_ssrc_handler *ch, struct codec_ssrc_handler *input_ch,
struct transcode_packet *packet,
struct media_packet *mp))
{
if (!dtxb || !mp->sfd || !mp->ssrc_in || !mp->ssrc_out)
return 0;
return false;

unsigned long ts = packet ? packet->ts : 0;

Expand Down Expand Up @@ -2895,14 +2899,32 @@ static int __buffer_dtx(struct dtx_buffer *dtxb, struct codec_ssrc_handler *deco
}

// packet now consumed if there was one
int ret = packet ? 1 : 0;
bool ret = packet ? true : false;
packet = NULL;

mutex_unlock(&dtxb->lock);

return ret;
}

static void send_buffered(struct media_packet *mp, unsigned int log_sys) {
struct sink_handler *sh = &mp->sink;
struct packet_stream *sink = sh->sink;

if (!sink)
media_socket_dequeue(mp, NULL); // just free
else {
if (sh->handler && media_packet_encrypt(sh->handler->out->rtp_crypt, sink, mp))
ilogsn(log_sys, LOG_ERR | LOG_FLAG_LIMIT, "Error encrypting buffered RTP media");

mutex_lock(&sink->out_lock);
if (media_socket_dequeue(mp, sink))
ilogsn(log_sys, LOG_ERR | LOG_FLAG_LIMIT,
"Error sending buffered media to RTP sink");
mutex_unlock(&sink->out_lock);
}
}

static void delay_frame_free(struct delay_frame *dframe) {
av_frame_free(&dframe->frame);
g_free(dframe->mp.raw.s);
Expand All @@ -2916,22 +2938,7 @@ static void delay_frame_free(struct delay_frame *dframe) {
g_slice_free1(sizeof(*dframe), dframe);
}
static void delay_frame_send(struct delay_frame *dframe) {
// XXX this should be unified with other instances of the same code
struct sink_handler *sh = &dframe->mp.sink;
struct packet_stream *sink = sh->sink;

if (!sink)
media_socket_dequeue(&dframe->mp, NULL); // just free
else {
if (sh->handler && media_packet_encrypt(sh->handler->out->rtp_crypt, sink, &dframe->mp))
ilogs(transcoding, LOG_ERR | LOG_FLAG_LIMIT, "Error encrypting buffered RTP media");

mutex_lock(&sink->out_lock);
if (media_socket_dequeue(&dframe->mp, sink))
ilogs(transcoding, LOG_ERR | LOG_FLAG_LIMIT,
"Error sending buffered media to RTP sink");
mutex_unlock(&sink->out_lock);
}
send_buffered(&dframe->mp, log_level_index_transcoding);
}
static void delay_frame_flush(struct delay_buffer *dbuf, struct delay_frame *dframe) {
// call is locked in W here
Expand Down Expand Up @@ -3457,10 +3464,12 @@ static void __dtx_send_later(struct codec_timer *ct) {
"%i packets left in queue", ts, p_left);

mp_copy.ptime = -1;
ret = dtxp->dtx_func(ch, input_ch, dtxp->packet, &mp_copy);
if (!ret) {
tc_code tcc = dtxp->dtx_func(ch, input_ch, dtxp->packet, &mp_copy);
if (tcc >= TCC_OK) {
if (mp_copy.ptime > 0)
ptime = mp_copy.ptime;
if (tcc == TCC_CONSUMED)
dtxp->packet = NULL;
}
else
ilogs(dtx, LOG_WARN | LOG_FLAG_LIMIT,
Expand Down Expand Up @@ -3505,23 +3514,8 @@ static void __dtx_send_later(struct codec_timer *ct) {

__ssrc_unlock_both(&mp_copy);

if (mp_copy.packets_out.length && ret == 0) {
struct sink_handler *sh = &mp_copy.sink;
struct packet_stream *sink = sh->sink;

if (!sink)
media_socket_dequeue(&mp_copy, NULL); // just free
else {
if (sh->handler && media_packet_encrypt(sh->handler->out->rtp_crypt, sink, &mp_copy))
ilogs(dtx, LOG_ERR | LOG_FLAG_LIMIT, "Error encrypting buffered RTP media");

mutex_lock(&sink->out_lock);
if (media_socket_dequeue(&mp_copy, sink))
ilogs(dtx, LOG_ERR | LOG_FLAG_LIMIT,
"Error sending buffered media to RTP sink");
mutex_unlock(&sink->out_lock);
}
}
if (mp_copy.packets_out.length && ret == 0)
send_buffered(&mp_copy, log_level_index_dtx);

rwlock_unlock_r(&call->master_lock);

Expand Down Expand Up @@ -4160,10 +4154,10 @@ static int packet_decoded_audio_player(decoder_t *decoder, AVFrame *frame, void
return 0;
}

static int __rtp_decode(struct codec_ssrc_handler *ch, struct codec_ssrc_handler *input_ch,
static tc_code __rtp_decode(struct codec_ssrc_handler *ch, struct codec_ssrc_handler *input_ch,
struct transcode_packet *packet, struct media_packet *mp)
{
int ret = 0;
tc_code code = TCC_OK;
if (packet) {
if (ch->chain) {
static const struct fraction chain_fact = {1,1};
Expand All @@ -4173,18 +4167,20 @@ static int __rtp_decode(struct codec_ssrc_handler *ch, struct codec_ssrc_handler
packet_encoded_tx);
av_packet_unref(pkt);
}
else
ret = decoder_input_data_ptime(ch->decoder, packet->payload, packet->ts, &mp->ptime,
else {
int ret = decoder_input_data_ptime(ch->decoder, packet->payload, packet->ts, &mp->ptime,
ch->handler->packet_decoded,
ch, mp);
code = ret == 0 ? TCC_OK : TCC_ERR;
}
}
__buffer_delay_seq(input_ch->handler->delay_buffer, mp, -1);
return ret;
return code;
}
static int packet_decode(struct codec_ssrc_handler *ch, struct codec_ssrc_handler *input_ch,
static tc_code packet_decode(struct codec_ssrc_handler *ch, struct codec_ssrc_handler *input_ch,
struct transcode_packet *packet, struct media_packet *mp)
{
int ret = 0;
tc_code ret = TCC_OK;

if (!ch->csch.first_ts)
ch->csch.first_ts = packet->ts;
Expand All @@ -4206,11 +4202,10 @@ static int packet_decode(struct codec_ssrc_handler *ch, struct codec_ssrc_handle
}

if (__buffer_dtx(input_ch->dtx_buffer, ch, input_ch, packet, mp, __rtp_decode))
ret = 1; // consumed
ret = TCC_CONSUMED;
else {
ilogs(transcoding, LOG_DEBUG, "Decoding RTP packet now");
ret = __rtp_decode(ch, input_ch, packet, mp);
ret = ret ? -1 : 0;
}

out:
Expand Down
Loading

0 comments on commit 895c4b6

Please sign in to comment.