-
-
Notifications
You must be signed in to change notification settings - Fork 213
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Put receiver dropped packets into ring buffer #609
base: develop
Are you sure you want to change the base?
Conversation
e5feb20
to
8fcee5f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for PR!
@@ -10,6 +10,7 @@ | |||
#include "roc_address/socket_addr_to_str.h" | |||
#include "roc_core/log.h" | |||
#include "roc_core/panic.h" | |||
#include "roc_core/parse_duration.h" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Unneeded include
if (!rtcp_session_->is_valid()) { | ||
drop_packet_(packet); | ||
return; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We enter this branch if there were error initializing RTCP, e.g. allocation failure. If RTCP session failed to initialize, it is never fixed later, so there is no need to save packets for later.
It seems that we don't need to save control packets (route_control_packet_). We should bother only about transport packets (route_transport_packet_).
bool ReceiverSessionGroup::can_create_session_(const packet::PacketPtr& packet) { | ||
if (packet->flags() & packet::Packet::FlagRepair) { | ||
roc_log(LogDebug, "session group: ignoring repair packet for unknown session"); | ||
drop_packet_(packet); | ||
return false; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks surprising that method called "can_create_session_" (likes it's checking something) has such side-effects (saving packets).
Let's move saving packet from here to appropriate branch in route_transport_packet_().
void ReceiverSessionGroup::drop_packet_(const packet::PacketPtr& packet_ptr) { | ||
dropped_packet_buffer_.push_back(*packet_ptr); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this method actually does not drop packet, but saves it, the name "drop_packet_" looks a bit confusing.
I suggest to name this feature "pre-buffering", meaning that we're buffering packets in advance for session that was not created yet.
Then, we can rename things to something like:
- drop_packet_ => enqueue_prebuf_packet_
- handle_buffer_ => dequeue_prebuf_packets_
- dropped_packet_buffer_ => prebuf_packets_
...or something like this.
if (max_size == 0) { | ||
return; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is actually a dangerous behavior. It means that if max_sessions or max_session_packets is zero, we can accumulate pre-buffered packets indefinitely. If user hasn't set one of the limits, and some peer sends us some ill-formed packets (intentionally or not), and we never create session for those packets, we will consume more and more memory until we eat everything available on system.
Given it second thought, it seems that using max_session_packets * max_sessions
formula here is not so good idea. I suggest to change approach a bit: instead of max_session_packets and max_sessions, introduce prebuf_len
setting, measured in nanoseconds (core::nanoseconds_t).
Instead of limiting number of packets, it will limit the age of packets. When the age of the oldest (i.e. the first) packet in buffer becomes larger than prebuf_len, that packet is removed from the buffer.
We have a good default value for prebuf_len
: if the user did not set it, it will be equal to target_latency
- i.e. we will accumulate as much packets as needed for session start. We can add prebuf_len
to ReceiverSessionConfig. We also need to add deduce_prebuf_len()
, that sets prebuf_len to target_latency, and is called if the user did not provide --prebuf-len
explicitly (similar to deduce_resampler_backend()
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To compute the age of a packet, we'll need to add receive timestamps to packets. Then, the age can be computed as now - receive_timestamp
.
It should be easy:
-
add receive_timestamp field here: https://github.com/roc-streaming/roc-toolkit/blob/develop/src/internal_modules/roc_packet/target_libuv/roc_packet/udp.h
We also need to cover this feature in pipeline tests. (We cover all important aspects like this). Pipeline tests for receiver are here: https://github.com/roc-streaming/roc-toolkit/blob/develop/src/tests/roc_pipeline/test_receiver_source.cpp Those tests work as follows:
Basically, receiver converts sequence of network packets into sequence of audio frames. Each tests prepares its own sequence of packets and checks what audio receiver produces for it. For example, a test may reorder packets and ensure that receiver restores correct order. Or a test can generate packets for multiple sessions and ensure that receiver will mix them together into one stream. The "Tests" section in the issue should give an idea what kind of tests would be fine for this feature. Let me know if you have questions. |
@gavv |
Yeah, our pipeline tests are quite complicated. However, over time they prevent A LOT of regressions, so it's worth it. Currently, the main use-case for pre-bufferring is preserving repair packets delivered before first source packet. So we should use this use-case for the test. There are three types of packets: source, repair, and control. Source packets contain audio data. Repair (FEC) packets contain redundancy data that may be used to restore lost source packets, if needed. Please read this page first: https://roc-streaming.org/toolkit/docs/internals/fec.html Without pre-bufferring, all repair packets received before first source packet are dropped, because we can create session only based on source packet. With pre-buffering, those repair packet are saved and then "delivered" right after delivering the first source packet. Hence, the test should simulate situation when repair packets go before source packets. And it should pass if pre-buffering works fine and fail if it doesn't. Repair packets are needed to restore losses. Hence, we need to simulate situation when repair packets go before source packets AND some source packets are lost. In this case, if pre-bufferring works fine, receiver will successfully recover losses, otherwise it won't. Test will produce sequence of source and repair packets, pass it to receiver, and inspect audio produced by receiver. If receiver produces correct audio despite losses, the test passes. Tests for receiver pipeline (test_receiver_source.cpp) use test::PacketWriter to produce source packets and test::ControlWriter to produce control packets. Currently this group of tests does not use repair packets, so we'll need to add it. Currently, test::PacketWriter generates sequence of packets using packet factory (to allocate packet), encoder (to fill packet with samples), and composer (to serialize packet into bytes). Now we need to teach test::PacketWriter to use fec::Writer in addition. fec::Writer works rather simple. It implements IWriter interface, and in its constructor you pass it next writer. Then you write source packets to fec::Writer, and it just writes all these packets to the next writer, and in addition it generates repair packets and also writes them to next writer. In other words, you pass a sequence of source packets to fec::Writer, it it produces a sequence of source and repair packets. So, we should add a mode for test::PacketWriter when it uses fec::Writer to produce repair packets in addition to the source packets. In this mode, it'll need to use different composers for source and repair packets. The flow will become the following: first use packet factory (to allocate source packet), then encoder (to fill source packet with samples), then fec::Writer (to generate repair packets), then composers (to serialize source and repair packets into bytes). Example of using fec::Writer in tests, including creating and using composers, can be found here: https://github.com/roc-streaming/roc-toolkit/blob/develop/src/tests/roc_fec/test_writer_reader.cpp The sequence of source and repair packets produced by fec::Writer will look like:
Where S-X.Y is source packet number Y of FEC block number X, and R-X.Y is repair packet. You can see that source and repair packets are divided into blocks. Repair packets usually come in the end of the block. For the purpose of our test, we can alter the sequence of packets:
Then we should check that audio produced by receiver is expected sequence, using test::FrameReader, same way as we do in all other tests. If receiver isn't able to restore losses, audio will have zeros in places of those packets, and the test will fail. |
Short summary of the above:
If pre-bufferring works, test::FrameReader will see complete audio stream. If pre-bufferring does not work, test::FrameReader will see unexpected zeros where lost packets were not restored, and will fail. |
A word on how we can alter the sequence of packets. test::PacketWriter writes produced packets to IWriter. Usually we configure it to write packets directly in receiver pipeline (into its endpoint writers). Instead of that, our test can configure it to write packets to packet::Queue. Then test can read packets from the queue and write them to pipeline in different order. |
All this sounds quite big, but I think the actual implementation shouldn't be that complicated once you'll get familiar with the context. In test_receiver_source.cpp we have many relatively simple and short 50-lines tests that still would need many words to describe them from scratch :) If you'll have any more questions, feel free to ask here on in chat. And thanks for looking into this. |
Added hacktoberfest-accepted just in case you need it. |
@gavv so far ive gotten a packet writer set up with an LDPC source composer and an fec writer, but when it comes to writing the packets using PacketWriter, i get an error from fec writer saying I would have assumed the fec data would have been set by a composer at some point before being sent off to the fec writer. Any help would be greatly appreciated! |
You can push your code somewhere (this PR or other branch) and I'll take a look. |
@gavv |
Hi, sorry for delay, I was occupied on other projects last weeks. Actually you were very close, you configured everything correctly. The reason of the panic is quite subtle. If you look here:
you can see that before passing packet to destination writer, PacketWriter creates a copy of the packet from the buffer of the original packet, and uses that copy. When we receive packet from network, it has only buffer with data. However as packet goes through the pipeline, it gets populated with various meta-information, like flags, parsed fields, etc. However, in pipeline tests, we're simulating how receiver handles packets received from network, and so we should pass to the receiver packets without all these meta-information. So we "strip" it by making a copy of packet that has only buffer with data. However, fec::Writer lives in the middle of the pipeline, and expected that packet will already have all necessary meta-information. And you were passing to it packets from PacketWriter, which are already stripped. Hence you got a panic. The solution is to move fec::Writer inside test::PacketWriter. PacketWriter should create packet, pass it to fec::Writer, and only then strip packets and pass them to destination writer. I've implemented this approach and pushed to develop: 3f4d3ed PacketWriter now has two constructors, one to work without FEC, and another to work with FEC. Currently all tests use version without FEC, but you can use the new version in your test. I also refactored PacketWriter and add comments to make the behavior I described more obvious from code. Here is the updated version of your tests that compiles and runs (though I didn't try to verify its logic, just fixed errors): TEST(receiver_source, packet_buffer) {
packet::PacketPtr source_packets[20];
enum { Rate = SampleRate, Chans = Chans_Stereo, MaxPackets = 10 };
init(Rate, Chans, Rate, Chans);
ReceiverConfig config = make_config();
config.default_session.prebuf_len = 0;
ReceiverSource receiver(config, format_map, packet_factory, byte_buffer_factory,
sample_buffer_factory, arena);
CHECK(receiver.is_valid());
ReceiverSlot* slot = create_slot(receiver);
CHECK(slot);
packet::IWriter* source_endpoint_writer =
create_endpoint(slot, address::Iface_AudioSource, address::Proto_RTP_LDPC_Source);
CHECK(source_endpoint_writer);
packet::IWriter* repair_endpoint_writer =
create_endpoint(slot, address::Iface_AudioRepair, address::Proto_LDPC_Repair);
CHECK(repair_endpoint_writer);
fec::WriterConfig fec_config;
test::PacketWriter packet_writer(
arena, *source_endpoint_writer, *repair_endpoint_writer, format_map,
packet_factory, byte_buffer_factory, src1, dst1, dst2, PayloadType_Ch2,
packet::FEC_LDPC_Staircase, fec_config);
// setup reader
test::FrameReader frame_reader(receiver, sample_buffer_factory);
packet_writer.write_packets(fec_config.n_source_packets, SamplesPerPacket,
output_sample_spec);
receiver.refresh(frame_reader.refresh_ts());
frame_reader.read_nonzero_samples(SamplesPerFrame, output_sample_spec);
UNSIGNED_LONGS_EQUAL(1, receiver.num_sessions());
} A few more notes:
|
Thanks for the update and explanation! Ill update my tests once i get the chance to |
468a3a0
to
83aa7be
Compare
@@ -286,6 +286,7 @@ void UdpReceiverPort::recv_cb_(uv_udp_t* handle, | |||
|
|||
pp->udp()->src_addr = src_addr; | |||
pp->udp()->dst_addr = self.config_.bind_address; | |||
pp->udp()->recieve_timestamp = core::timestamp(core::ClockMonotonic); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For timestamping audio frames we use core::ClockUnix
cc @gavv
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed, I think recieve_timestamp should use the same.
93c939f
to
83aa7be
Compare
Sorry, I accidentally pushed to PR's branch, I returned it back to the same commit. |
☔ The latest upstream change (presumably these) made this pull request unmergeable. Please resolve the merge conflicts. |
Based on discussion here, I've added some docs:
I also added new packet flag (f7e3a92) that will make panic that happened here a bit more clear (it will state that prepare() was not called on a packet passed to fec writer, and the role of prepare() is now documented in the page I linked above) |
83aa7be
to
af7c2c7
Compare
🤖 The latest upstream change made this pull request unmergeable. Please resolve the merge conflicts. |
🤖 The latest upstream change made this pull request unmergeable. Please resolve the merge conflicts. |
Hi, I'm preparing 0.3.0 release and have rebased develop on master (this workflow is described here). Please reset develop in your fork to up-to-date version and rebase your PR on that. (You can use |
Hey, I've rebased the patch on develop to keep PR alive. |
🤖 The latest upstream change made this pull request unmergeable. Please resolve the merge conflicts. |
Rebased on fresh develop (didn't touch logic). |
🤖 The latest upstream change made this pull request unmergeable. Please resolve the merge conflicts. |
🤖 The latest upstream change made this pull request unmergeable. Please resolve the merge conflicts. |
PR for #217
Added ring buffer for dropped packets, and pass them into new receiver session as they are created.
Added --max-session-packets and --max-sessions arguments to help determine maximum size for ring buffer.