Skip to content

Commit

Permalink
Rebase ring buffer branch to develop
Browse files Browse the repository at this point in the history
  • Loading branch information
ForeverASilver authored and gavv committed May 30, 2024
1 parent f8dab55 commit a4877e8
Show file tree
Hide file tree
Showing 8 changed files with 141 additions and 1 deletion.
7 changes: 6 additions & 1 deletion src/internal_modules/roc_pipeline/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,21 @@ void ReceiverCommonConfig::deduce_defaults() {

ReceiverSessionConfig::ReceiverSessionConfig()
: payload_type(0)
, prebuf_len(0)
, enable_beeping(false) {
}

void ReceiverSessionConfig::deduce_defaults() {
if (prebuf_len == 0) {
prebuf_len = latency.target_latency;
}
latency.deduce_defaults(DefaultLatency, true);
watchdog.deduce_defaults(latency.target_latency);
resampler.deduce_defaults(latency.tuner_backend, latency.tuner_profile);
}

ReceiverSourceConfig::ReceiverSourceConfig() {
ReceiverSourceConfig::ReceiverSourceConfig()
: max_session_packets(0) {
}

void ReceiverSourceConfig::deduce_defaults() {
Expand Down
6 changes: 6 additions & 0 deletions src/internal_modules/roc_pipeline/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ struct ReceiverSessionConfig {
//! Packet payload type.
unsigned int payload_type;

//! Packet prebuffer length, nanoseconds.
core::nanoseconds_t prebuf_len;

//! FEC reader parameters.
fec::ReaderConfig fec_reader;

Expand Down Expand Up @@ -191,6 +194,9 @@ struct ReceiverSourceConfig {
//! Default parameters for a session.
ReceiverSessionConfig session_defaults;

//! Maximum number of packets per session.
size_t max_session_packets;

//! Initialize config.
ReceiverSourceConfig();

Expand Down
45 changes: 45 additions & 0 deletions src/internal_modules/roc_pipeline/receiver_session_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ ReceiverSessionGroup::route_transport_packet_(const packet::PacketPtr& packet) {
}

if (sess) {
enqueue_prebuf_packet_(packet);
// Session found, route packet to it.
return sess->route_packet(packet);
}
Expand All @@ -327,6 +328,48 @@ ReceiverSessionGroup::route_transport_packet_(const packet::PacketPtr& packet) {
return status::StatusOK;
}

void ReceiverSessionGroup::enqueue_prebuf_packet_(const packet::PacketPtr& packet_ptr) {
prebuf_packets_.push_back(*packet_ptr.get());

core::nanoseconds_t now = core::timestamp(core::ClockMonotonic);

while (prebuf_packets_.size() > 0) {
core::nanoseconds_t received = prebuf_packets_.front()->udp()->receive_timestamp;
if (now - received > receiver_config_.default_session.prebuf_len) {
prebuf_packets_.remove(*prebuf_packets_.front());
} else {
break;
}
}
}

void ReceiverSessionGroup::dequeue_prebuf_packets_(ReceiverSession& sess) {
packet::PacketPtr curr, next;

if (prebuf_packets_.size() == 0) {
return;
}

core::nanoseconds_t now = core::timestamp(core::ClockMonotonic);

for (curr = prebuf_packets_.front(); curr; curr = next) {
next = prebuf_packets_.nextof(*curr);

// if packet is too old, remove it from the queue
core::nanoseconds_t received = curr->udp()->receive_timestamp;
if (now - received > receiver_config_.default_session.prebuf_len) {
prebuf_packets_.remove(*curr);
continue;
}

// if session handles the packet, remove it from the queue
const status::StatusCode code = sess.route(curr);
if (code == status::StatusOK) {
prebuf_packets_.remove(*curr);
}
}
}

status::StatusCode
ReceiverSessionGroup::route_control_packet_(const packet::PacketPtr& packet,
core::nanoseconds_t current_time) {
Expand Down Expand Up @@ -409,6 +452,8 @@ ReceiverSessionGroup::create_session_(const packet::PacketPtr& packet) {

state_tracker_.add_active_sessions(+1);

dequeue_prebuf_packets_(*sess);

return status::StatusOK;
}

Expand Down
4 changes: 4 additions & 0 deletions src/internal_modules/roc_pipeline/receiver_session_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ class ReceiverSessionGroup : public core::NonCopyable<>, private rtcp::IParticip
status::StatusCode route_transport_packet_(const packet::PacketPtr& packet);
status::StatusCode route_control_packet_(const packet::PacketPtr& packet,
core::nanoseconds_t current_time);
void enqueue_prebuf_packet_(const packet::PacketPtr& packet);
void dequeue_prebuf_packets_(ReceiverSession& sess);

bool can_create_session_(const packet::PacketPtr& packet);

Expand Down Expand Up @@ -153,6 +155,8 @@ class ReceiverSessionGroup : public core::NonCopyable<>, private rtcp::IParticip
core::List<ReceiverSession> sessions_;
ReceiverSessionRouter session_router_;

core::List<packet::Packet> prebuf_packets_;

bool valid_;
};

Expand Down
8 changes: 8 additions & 0 deletions src/public_api/include/roc/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -961,6 +961,14 @@ typedef struct roc_receiver_config {
* If zero, default value is used. If negative, the check is disabled.
*/
long long choppy_playback_timeout;

/** Packet prebuffer length, in nanoseconds.
* Packets received for sessions that have not yet been created
* will be buffered. Any packets older than the prebuf_len
* will be discarded.
* If zero, default value is used.
*/
unsigned long long prebuf_len;
} roc_receiver_config;

/** Interface configuration.
Expand Down
56 changes: 56 additions & 0 deletions src/tests/roc_pipeline/test_receiver_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2108,6 +2108,62 @@ TEST(receiver_source, timestamp_mapping_remixing) {
CHECK(first_ts);
}

TEST(receiver_source, packet_buffer) {
enum { Rate = SampleRate, Chans = Chans_Stereo, MaxPackets = 10 };

init(Rate, Chans, Rate, Chans);

ReceiverConfig config = make_config();
config.default_session.prebuf_len = 0;
ReceiverSource receiver(config, format_map, packet_factory, byte_buffer_factory,
sample_buffer_factory, arena);
CHECK(receiver.is_valid());

ReceiverSlot* slot = create_slot(receiver);
CHECK(slot);

packet::Queue queue;
packet::Queue source_queue;
packet::Queue repair_queue;

packet::IWriter* source_endpoint_writer =
create_endpoint(slot, address::Iface_AudioSource, address::Proto_RTP_RS8M_Source);
CHECK(source_endpoint_writer);

packet::IWriter* repair_endpoint_writer =
create_endpoint(slot, address::Iface_AudioRepair, address::Proto_RS8M_Repair);
CHECK(repair_endpoint_writer);

fec::WriterConfig fec_config;

test::PacketWriter packet_writer(
arena, queue, queue, format_map, packet_factory, byte_buffer_factory, src1, dst1,
dst2, PayloadType_Ch2, packet::FEC_ReedSolomon_M8, fec_config);

// setup reader
test::FrameReader frame_reader(receiver, sample_buffer_factory);

packet_writer.write_packets(fec_config.n_source_packets, SamplesPerPacket,
output_sample_spec);

for (int i = 0; i < ManyPackets; ++i) {
packet::PacketPtr pp;
UNSIGNED_LONGS_EQUAL(status::StatusOK, queue.read(pp));
CHECK(pp);

if (pp->flags() & packet::Packet::FlagAudio) {
UNSIGNED_LONGS_EQUAL(status::StatusOK, source_queue.write(pp));
}
if (pp->flags() & packet::Packet::FlagRepair) {
UNSIGNED_LONGS_EQUAL(status::StatusOK, repair_queue.write(pp));
}
}

receiver.refresh(frame_reader.refresh_ts());
frame_reader.read_nonzero_samples(SamplesPerFrame, output_sample_spec);
UNSIGNED_LONGS_EQUAL(1, receiver.num_sessions());
}

// Check receiver metrics for multiple remote participants (senders).
TEST(receiver_source, metrics_participants) {
enum { Rate = SampleRate, Chans = Chans_Stereo, MaxParties = 10 };
Expand Down
3 changes: 3 additions & 0 deletions src/tools/roc_recv/cmdline.ggo
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ section "Options"
option "no-play-timeout" - "No playback timeout, TIME units"
string optional

option "prebuf-len" - "Length of packet prebuffer, TIME units"
string optional

option "choppy-play-timeout" - "Choppy playback timeout, TIME units"
string optional

Expand Down
13 changes: 13 additions & 0 deletions src/tools/roc_recv/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,19 @@ int main(int argc, char** argv) {
}
}

if (args.prebuf_len_given) {
core::nanoseconds_t prebuf_len = 0;
if (!core::parse_duration(args.prebuf_len_arg, prebuf_len)) {
roc_log(LogError, "invalid --prebuf-len");
return 1;
}
receiver_config.default_session.prebuf_len =
(core::nanoseconds_t)args.prebuf_len_arg;
} else {
receiver_config.default_session.prebuf_len =
receiver_config.default_session.target_latency;
}

if (args.choppy_play_timeout_given) {
if (!core::parse_duration(
args.choppy_play_timeout_arg,
Expand Down

0 comments on commit a4877e8

Please sign in to comment.