Skip to content

Commit

Permalink
[Merge to M104] Fix issue with pacing rate after long queue times.
Browse files Browse the repository at this point in the history
A recent cleanup cl (r36900) had an unintended side-effect.

If the queue-time limit is expected to be hit, we adjust the pacing
bitrate up to make sure all packets are sent within the nominal time
frame.
However after that change we stopped adjusting the pacing rate back to
normal levels when queue clears - at least not until the next BWE
update (which is fairly often - but not immediate).

This CL fixes that, and also makes sure whe properly update the
adjusted media rate on enqueu, dequeue and set rate calls.

(cherry picked from commit df9e51a)

No-Try: True
Bug: webrtc:10809, chromium:1336956
Change-Id: If00dc35169f1a1347fea6eb44fdb2868282ed3b7
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/265387
Reviewed-by: Per Kjellander <perkj@webrtc.org>
Commit-Queue: Erik Språng <sprang@webrtc.org>
Cr-Original-Commit-Position: refs/heads/main@{#37178}
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/266021
Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
Reviewed-by: Philip Eliasson <philipel@webrtc.org>
Cr-Commit-Position: refs/branch-heads/5112@{#1}
Cr-Branched-From: a976a87-refs/heads/main@{#37168}
  • Loading branch information
Erik Språng authored and WebRTC LUCI CQ committed Jun 17, 2022
1 parent a976a87 commit e083ccc
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 81 deletions.
89 changes: 50 additions & 39 deletions modules/pacing/pacing_controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ PacingController::PacingController(Clock* clock,
paused_(false),
media_debt_(DataSize::Zero()),
padding_debt_(DataSize::Zero()),
media_rate_(DataRate::Zero()),
pacing_rate_(DataRate::Zero()),
adjusted_media_rate_(DataRate::Zero()),
padding_rate_(DataRate::Zero()),
prober_(field_trials_),
probing_send_failure_(false),
Expand Down Expand Up @@ -198,21 +199,22 @@ void PacingController::SetPacingRates(DataRate pacing_rate,
<< " kbps, padding = " << padding_rate.kbps()
<< " kbps.";
}
media_rate_ = pacing_rate;
pacing_rate_ = pacing_rate;
padding_rate_ = padding_rate;
MaybeUpdateMediaRateDueToLongQueue(CurrentTime());

RTC_LOG(LS_VERBOSE) << "bwe:pacer_updated pacing_kbps=" << media_rate_.kbps()
RTC_LOG(LS_VERBOSE) << "bwe:pacer_updated pacing_kbps=" << pacing_rate_.kbps()
<< " padding_budget_kbps=" << padding_rate.kbps();
}

void PacingController::EnqueuePacket(std::unique_ptr<RtpPacketToSend> packet) {
RTC_DCHECK(media_rate_ > DataRate::Zero())
RTC_DCHECK(pacing_rate_ > DataRate::Zero())
<< "SetPacingRate must be called before InsertPacket.";
RTC_CHECK(packet->packet_type());

prober_.OnIncomingPacket(DataSize::Bytes(packet->payload_size()));

Timestamp now = CurrentTime();
const Timestamp now = CurrentTime();
if (packet_queue_->Empty()) {
// If queue is empty, we need to "fast-forward" the last process time,
// so that we don't use passed time as budget for sending the first new
Expand All @@ -228,6 +230,9 @@ void PacingController::EnqueuePacket(std::unique_ptr<RtpPacketToSend> packet) {
}
packet_queue_->Push(now, std::move(packet));
seen_first_packet_ = true;

// Queue length has increased, check if we need to change the pacing rate.
MaybeUpdateMediaRateDueToLongQueue(now);
}

void PacingController::SetAccountForAudioPackets(bool account_for_audio) {
Expand All @@ -249,10 +254,8 @@ void PacingController::SetSendBurstInterval(TimeDelta burst_interval) {
}

TimeDelta PacingController::ExpectedQueueTime() const {
RTC_DCHECK_GT(media_rate_, DataRate::Zero());
return TimeDelta::Millis(
(QueueSizeData().bytes() * 8 * rtc::kNumMillisecsPerSec) /
media_rate_.bps());
RTC_DCHECK_GT(adjusted_media_rate_, DataRate::Zero());
return QueueSizeData() / adjusted_media_rate_;
}

size_t PacingController::QueueSizePackets() const {
Expand Down Expand Up @@ -343,21 +346,21 @@ Timestamp PacingController::NextSendTime() const {
return last_send_time_ + kCongestedPacketInterval;
}

if (media_rate_ > DataRate::Zero() && !packet_queue_->Empty()) {
if (adjusted_media_rate_ > DataRate::Zero() && !packet_queue_->Empty()) {
// If packets are allowed to be sent in a burst, the
// debt is allowed to grow up to one packet more than what can be sent
// during 'send_burst_period_'.
TimeDelta drain_time = media_debt_ / media_rate_;
TimeDelta drain_time = media_debt_ / adjusted_media_rate_;
next_send_time =
last_process_time_ +
((send_burst_interval_ > drain_time) ? TimeDelta::Zero() : drain_time);
} else if (padding_rate_ > DataRate::Zero() && packet_queue_->Empty()) {
// If we _don't_ have pending packets, check how long until we have
// bandwidth for padding packets. Both media and padding debts must
// have been drained to do this.
RTC_DCHECK_GT(media_rate_, DataRate::Zero());
TimeDelta drain_time =
std::max(media_debt_ / media_rate_, padding_debt_ / padding_rate_);
RTC_DCHECK_GT(adjusted_media_rate_, DataRate::Zero());
TimeDelta drain_time = std::max(media_debt_ / adjusted_media_rate_,
padding_debt_ / padding_rate_);

if (drain_time.IsZero() &&
(!media_debt_.IsZero() || !padding_debt_.IsZero())) {
Expand All @@ -380,7 +383,7 @@ Timestamp PacingController::NextSendTime() const {
}

void PacingController::ProcessPackets() {
Timestamp now = CurrentTime();
const Timestamp now = CurrentTime();
Timestamp target_send_time = now;

if (ShouldSendKeepalive(now)) {
Expand Down Expand Up @@ -420,27 +423,6 @@ void PacingController::ProcessPackets() {
TimeDelta elapsed_time = UpdateTimeAndGetElapsed(target_send_time);

if (elapsed_time > TimeDelta::Zero()) {
DataRate target_rate = media_rate_;
DataSize queue_size_data = QueueSizeData();
if (queue_size_data > DataSize::Zero()) {
// Assuming equal size packets and input/output rate, the average packet
// has avg_time_left_ms left to get queue_size_bytes out of the queue, if
// time constraint shall be met. Determine bitrate needed for that.
packet_queue_->UpdateAverageQueueTime(now);
if (drain_large_queues_) {
TimeDelta avg_time_left =
std::max(TimeDelta::Millis(1),
queue_time_limit_ - packet_queue_->AverageQueueTime());
DataRate min_rate_needed = queue_size_data / avg_time_left;
if (min_rate_needed > target_rate) {
target_rate = min_rate_needed;
RTC_LOG(LS_VERBOSE) << "bwe:large_pacing_queue pacing_rate_kbps="
<< target_rate.kbps();
}
}
}

media_rate_ = target_rate;
UpdateBudgetWithElapsedTime(elapsed_time);
}

Expand Down Expand Up @@ -556,6 +538,11 @@ void PacingController::ProcessPackets() {
prober_.ProbeSent(CurrentTime(), data_sent);
}
}

// Queue length has probably decreased, check if pacing rate needs to updated.
// Poll the time again, since we might have enqueued new fec/padding packets
// with a later timestamp than `now`.
MaybeUpdateMediaRateDueToLongQueue(CurrentTime());
}

DataSize PacingController::PaddingToAdd(DataSize recommended_probe_size,
Expand Down Expand Up @@ -630,7 +617,7 @@ std::unique_ptr<RtpPacketToSend> PacingController::GetPendingPacket(
// is not more than would be reduced to zero at the target sent time.
// If we allow packets to be sent in a burst, packet are allowed to be
// sent early.
TimeDelta flush_time = media_debt_ / media_rate_;
TimeDelta flush_time = media_debt_ / adjusted_media_rate_;
if (now + flush_time > target_send_time) {
return nullptr;
}
Expand All @@ -656,13 +643,13 @@ void PacingController::OnPacketSent(RtpPacketMediaType packet_type,
}

void PacingController::UpdateBudgetWithElapsedTime(TimeDelta delta) {
media_debt_ -= std::min(media_debt_, media_rate_ * delta);
media_debt_ -= std::min(media_debt_, adjusted_media_rate_ * delta);
padding_debt_ -= std::min(padding_debt_, padding_rate_ * delta);
}

void PacingController::UpdateBudgetWithSentData(DataSize size) {
media_debt_ += size;
media_debt_ = std::min(media_debt_, media_rate_ * kMaxDebtInTime);
media_debt_ = std::min(media_debt_, adjusted_media_rate_ * kMaxDebtInTime);
UpdatePaddingBudgetWithSentData(size);
}

Expand All @@ -675,4 +662,28 @@ void PacingController::SetQueueTimeLimit(TimeDelta limit) {
queue_time_limit_ = limit;
}

void PacingController::MaybeUpdateMediaRateDueToLongQueue(Timestamp now) {
adjusted_media_rate_ = pacing_rate_;
if (!drain_large_queues_) {
return;
}

DataSize queue_size_data = QueueSizeData();
if (queue_size_data > DataSize::Zero()) {
// Assuming equal size packets and input/output rate, the average packet
// has avg_time_left_ms left to get queue_size_bytes out of the queue, if
// time constraint shall be met. Determine bitrate needed for that.
packet_queue_->UpdateAverageQueueTime(now);
TimeDelta avg_time_left =
std::max(TimeDelta::Millis(1),
queue_time_limit_ - packet_queue_->AverageQueueTime());
DataRate min_rate_needed = queue_size_data / avg_time_left;
if (min_rate_needed > pacing_rate_) {
adjusted_media_rate_ = min_rate_needed;
RTC_LOG(LS_VERBOSE) << "bwe:large_pacing_queue pacing_rate_kbps="
<< pacing_rate_.kbps();
}
}
}

} // namespace webrtc
13 changes: 11 additions & 2 deletions modules/pacing/pacing_controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ class PacingController {

// Sets the pacing rates. Must be called once before packets can be sent.
void SetPacingRates(DataRate pacing_rate, DataRate padding_rate);
DataRate pacing_rate() const { return media_rate_; }
DataRate pacing_rate() const { return adjusted_media_rate_; }

// Currently audio traffic is not accounted by pacer and passed through.
// With the introduction of audio BWE audio traffic will be accounted for
Expand Down Expand Up @@ -217,6 +217,7 @@ class PacingController {
void OnPacketSent(RtpPacketMediaType packet_type,
DataSize packet_size,
Timestamp send_time);
void MaybeUpdateMediaRateDueToLongQueue(Timestamp now);

Timestamp CurrentTime() const;

Expand All @@ -241,9 +242,17 @@ class PacingController {
mutable Timestamp last_timestamp_;
bool paused_;

// Amount of outstanding data for media and padding.
DataSize media_debt_;
DataSize padding_debt_;
DataRate media_rate_;

// The target pacing rate, signaled via SetPacingRates().
DataRate pacing_rate_;
// The media send rate, which might adjusted from pacing_rate_, e.g. if the
// pacing queue is growing too long.
DataRate adjusted_media_rate_;
// The padding target rate. We aim to fill up to this rate with padding what
// is not already used by media.
DataRate padding_rate_;

BitrateProber prober_;
Expand Down
81 changes: 41 additions & 40 deletions modules/pacing/pacing_controller_unittest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1096,45 +1096,6 @@ TEST_F(PacingControllerTest, InactiveFromStart) {
2 * PacingController::kPausedProcessInterval);
}

TEST_F(PacingControllerTest, ExpectedQueueTimeMs) {
uint32_t ssrc = 12346;
uint16_t sequence_number = 1234;
const size_t kNumPackets = 60;
const size_t kPacketSize = 1200;
const int32_t kMaxBitrate = kPaceMultiplier * 30000;
auto pacer = std::make_unique<PacingController>(&clock_, &callback_, trials_);
pacer->SetPacingRates(kTargetRate * kPaceMultiplier, DataRate::Zero());
EXPECT_TRUE(pacer->OldestPacketEnqueueTime().IsInfinite());

pacer->SetPacingRates(DataRate::BitsPerSec(30000 * kPaceMultiplier),
DataRate::Zero());
for (size_t i = 0; i < kNumPackets; ++i) {
SendAndExpectPacket(pacer.get(), RtpPacketMediaType::kVideo, ssrc,
sequence_number++, clock_.TimeInMilliseconds(),
kPacketSize);
}

// Queue in ms = 1000 * (bytes in queue) *8 / (bits per second)
TimeDelta queue_time =
TimeDelta::Millis(1000 * kNumPackets * kPacketSize * 8 / kMaxBitrate);
EXPECT_EQ(queue_time, pacer->ExpectedQueueTime());

const Timestamp time_start = clock_.CurrentTime();
while (pacer->QueueSizePackets() > 0) {
AdvanceTimeUntil(pacer->NextSendTime());
pacer->ProcessPackets();
}
TimeDelta duration = clock_.CurrentTime() - time_start;

EXPECT_EQ(TimeDelta::Zero(), pacer->ExpectedQueueTime());

// Allow for aliasing, duration should be within one pack of max time limit.
const TimeDelta deviation =
duration - PacingController::kMaxExpectedQueueLength;
EXPECT_LT(deviation.Abs(),
TimeDelta::Millis(1000 * kPacketSize * 8 / kMaxBitrate));
}

TEST_F(PacingControllerTest, QueueTimeGrowsOverTime) {
uint32_t ssrc = 12346;
uint16_t sequence_number = 1234;
Expand Down Expand Up @@ -1756,7 +1717,7 @@ TEST_F(PacingControllerTest,
}
}

TEST_F(PacingControllerTest, AccountsForAudioEnqueuTime) {
TEST_F(PacingControllerTest, AccountsForAudioEnqueueTime) {
const uint32_t kSsrc = 12345;
const DataRate kPacingDataRate = DataRate::KilobitsPerSec(125);
const DataRate kPaddingDataRate = DataRate::Zero();
Expand Down Expand Up @@ -2063,5 +2024,45 @@ TEST_F(PacingControllerTest, RespectsTargetRateWhenSendingPacketsInBursts) {
EXPECT_EQ(number_of_bursts, 4);
}

TEST_F(PacingControllerTest, RespectsQueueTimeLimit) {
static constexpr DataSize kPacketSize = DataSize::Bytes(100);
static constexpr DataRate kNominalPacingRate = DataRate::KilobitsPerSec(200);
static constexpr TimeDelta kPacketPacingTime =
kPacketSize / kNominalPacingRate;
static constexpr TimeDelta kQueueTimeLimit = TimeDelta::Millis(1000);

PacingController pacer(&clock_, &callback_, trials_);
pacer.SetPacingRates(kNominalPacingRate, /*padding_rate=*/DataRate::Zero());
pacer.SetQueueTimeLimit(kQueueTimeLimit);

// Fill pacer up to queue time limit.
static constexpr int kNumPackets = kQueueTimeLimit / kPacketPacingTime;
for (int i = 0; i < kNumPackets; ++i) {
pacer.EnqueuePacket(video_.BuildNextPacket(kPacketSize.bytes()));
}
EXPECT_EQ(pacer.ExpectedQueueTime(), kQueueTimeLimit);
EXPECT_EQ(pacer.pacing_rate(), kNominalPacingRate);

// Double the amount of packets in the queue, the queue time limit should
// effectively double the pacing rate in response.
for (int i = 0; i < kNumPackets; ++i) {
pacer.EnqueuePacket(video_.BuildNextPacket(kPacketSize.bytes()));
}
EXPECT_EQ(pacer.ExpectedQueueTime(), kQueueTimeLimit);
EXPECT_EQ(pacer.pacing_rate(), 2 * kNominalPacingRate);

// Send all the packets, should take as long as the queue time limit.
Timestamp start_time = clock_.CurrentTime();
while (pacer.QueueSizePackets() > 0) {
AdvanceTimeUntil(pacer.NextSendTime());
pacer.ProcessPackets();
}
EXPECT_EQ(clock_.CurrentTime() - start_time, kQueueTimeLimit);

// We're back in a normal state - pacing rate should be back to previous
// levels.
EXPECT_EQ(pacer.pacing_rate(), kNominalPacingRate);
}

} // namespace
} // namespace webrtc

0 comments on commit e083ccc

Please sign in to comment.