Skip to content

Commit

Permalink
Add burst sends with a deadline mode to tperf
Browse files Browse the repository at this point in the history
Summary:
The server can now send bursts of data with a deadline - e.g. 100kb with 20ms deadline would make the server send bursts of 100kbevery 20ms.
If the the previous send was not acked yet, server cancels the transmission and stars a new one.

Reviewed By: sharmafb

Differential Revision: D66384550

fbshipit-source-id: 212be10c50198d4e324e579b6bc588b2ab3d45fa
  • Loading branch information
kvtsoy authored and facebook-github-bot committed Nov 23, 2024
1 parent ba7b573 commit 761e381
Showing 1 changed file with 116 additions and 3 deletions.
119 changes: 116 additions & 3 deletions quic/tools/tperf/tperf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ DEFINE_bool(
false,
"Whether to read and echo ecn marking from ingress packets");
DEFINE_uint32(dscp, 0, "DSCP value to use for outgoing packets");
DEFINE_uint32(
burst_deadline_ms,
0,
"If > 0, server will send bursts of data of size=block_size with a deadline of burst_deadline_ms milliseconds");

namespace quic::tperf {

Expand Down Expand Up @@ -185,7 +189,9 @@ class TPerfAcceptObserver : public AcceptObserver {
class ServerStreamHandler : public quic::QuicSocket::ConnectionSetupCallback,
public quic::QuicSocket::ConnectionCallback,
public quic::QuicSocket::ReadCallback,
public quic::QuicSocket::WriteCallback {
public quic::QuicSocket::WriteCallback,
public quic::QuicTimerCallback,
public QuicSocketLite::ByteEventCallback {
public:
explicit ServerStreamHandler(
folly::EventBase* evbIn,
Expand Down Expand Up @@ -226,6 +232,24 @@ class ServerStreamHandler : public quic::QuicSocket::ConnectionSetupCallback,
void onConnectionEnd() noexcept override {
LOG(INFO) << "Socket closed";
sock_.reset();
if (FLAGS_burst_deadline_ms > 0) {
LOG(INFO) << "Burst send stats, burst size of " << blockSize_
<< " bytes:";
LOG(INFO) << " * total bursts sent: " << batchN_;
LOG(INFO) << " * delivered: " << burstSendStats_.delivered;
LOG(INFO) << " * missed deadline: " << burstSendStats_.missedDeadline;

LOG(INFO) << "Burst ack latency stats, microseconds:";
LOG(INFO) << " * p5: "
<< burstSendAckedLatencyHistogramMicroseconds_
.getPercentileEstimate(0.05);
LOG(INFO) << " * p50: "
<< burstSendAckedLatencyHistogramMicroseconds_
.getPercentileEstimate(0.5);
LOG(INFO) << " * p95: "
<< burstSendAckedLatencyHistogramMicroseconds_
.getPercentileEstimate(0.95);
}
}

void onConnectionSetupError(QuicError error) noexcept override {
Expand All @@ -242,8 +266,12 @@ class ServerStreamHandler : public quic::QuicSocket::ConnectionSetupCallback,
sock_->setMaxPacingRate(FLAGS_max_pacing_rate);
}
LOG(INFO) << "Starting sends to client.";
for (uint32_t i = 0; i < numStreams_; i++) {
createNewStream();
if (FLAGS_burst_deadline_ms > 0) {
doBurstSending();
} else {
for (uint32_t i = 0; i < numStreams_; i++) {
createNewStream();
}
}
}

Expand Down Expand Up @@ -356,6 +384,75 @@ class ServerStreamHandler : public quic::QuicSocket::ConnectionSetupCallback,
}
}

void doBurstSending() {
if (!sock_) {
return;
}

VLOG(4) << "sending batch " << batchN_;
++batchN_;

auto stream = sock_->createUnidirectionalStream();
VLOG(5) << "New Stream with id = " << stream.value();
CHECK(stream.hasValue());
streamBurstSendResult_.streamId = *stream;
streamBurstSendResult_.acked = false;
streamBurstSendResult_.startTs = Clock::now();

auto sendBuffer = buf_->clone();
sendBuffer->append(blockSize_);
auto res = sock_->writeChain(
*stream,
std::move(sendBuffer),
true /* eof */,
this /* byte events callback */);
if (res.hasError()) {
LOG(FATAL) << "Got error on write: " << quic::toString(res.error());
}

// Schedule deadline.
evb_->scheduleTimeoutHighRes(
this, std::chrono::milliseconds(FLAGS_burst_deadline_ms));
}

void onByteEvent(QuicSocketLite::ByteEvent byteEvent) override {
if (byteEvent.type == QuicSocketLite::ByteEvent::Type::ACK) {
auto ackedLatencyUs =
std::chrono::duration_cast<std::chrono::microseconds>(
Clock::now() - streamBurstSendResult_.startTs);
burstSendAckedLatencyHistogramMicroseconds_.addValue(
ackedLatencyUs.count());
VLOG(4) << "got stream " << byteEvent.id << " offset " << byteEvent.offset
<< " acked (" << ackedLatencyUs.count() << "us)";
streamBurstSendResult_.acked = true;
++burstSendStats_.delivered;
}
}

void onByteEventCanceled(
QuicSocketLite::ByteEventCancellation cancellation) override {
VLOG(4) << "got stream " << cancellation.id << " offset "
<< cancellation.offset << " cancelled";
}

void timeoutExpired() noexcept override {
if (!sock_) {
return;
}

if (!streamBurstSendResult_.acked) {
LOG(ERROR) << "resetting stream " << streamBurstSendResult_.streamId
<< " on deadline";
++burstSendStats_.missedDeadline;
sock_->resetStream(
streamBurstSendResult_.streamId,
GenericApplicationErrorCode::NO_ERROR);
}
doBurstSending();
}

void callbackCanceled() noexcept override {}

private:
std::shared_ptr<quic::QuicSocket> sock_;
std::shared_ptr<FollyQuicEventBase> evb_;
Expand All @@ -367,6 +464,22 @@ class ServerStreamHandler : public quic::QuicSocket::ConnectionSetupCallback,
std::unordered_map<quic::StreamId, uint64_t> bytesPerStream_;
std::set<quic::StreamId> streamsHavingDSRSender_;
bool dsrEnabled_;

// Burst sending machinery.
uint64_t batchN_{0};
struct {
quic::StreamId streamId;
bool acked{false};
TimePoint startTs;
} streamBurstSendResult_;
struct {
uint64_t missedDeadline{0};
uint64_t delivered{0};
} burstSendStats_;
folly::Histogram<uint64_t> burstSendAckedLatencyHistogramMicroseconds_{
100, /* bucket size */
0, /* min */
1000000 /* 1 sec max delay */};
};

class TPerfServerTransportFactory : public quic::QuicServerTransportFactory {
Expand Down

0 comments on commit 761e381

Please sign in to comment.