Skip to content

Commit

Permalink
pw_transfer: Rate limiting backpressure
Browse files Browse the repository at this point in the history
Uses unix socket send/receive buffer limits to cause rate-limiting
backpressure to affect clients.

Change-Id: I2eecda3043eca058abe08c5903a8bc89604cf5f3
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/91940
Reviewed-by: Erik Gilling <konkers@google.com>
Pigweed-Auto-Submit: Armando Montanez <amontanez@google.com>
Commit-Queue: Auto-Submit <auto-submit@pigweed.google.com.iam.gserviceaccount.com>
  • Loading branch information
armandomontanez authored and CQ Bot Account committed Apr 25, 2022
1 parent 4b52b5d commit 873865d
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 6 deletions.
23 changes: 22 additions & 1 deletion pw_transfer/integration_test/JavaClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
Expand All @@ -44,6 +45,21 @@ public class JavaClient {
private static final long RPC_HDLC_ADDRESS = 'R';
private static final String HOSTNAME = "localhost";

// This is the maximum size of the socket send buffers. Ideally, this is set
// to the lowest allowed value to minimize buffering between the proxy and
// clients so rate limiting causes the client to block and wait for the
// integration test proxy to drain rather than allowing OS buffers to backlog
// large quantities of data.
//
// Note that the OS may chose to not strictly follow this requested buffer
// size. Still, setting this value to be as small as possible does reduce
// bufer sizes significantly enough to better reflect typical inter-device
// communication.
//
// For this to be effective, servers should also configure their sockets to a
// smaller receive buffer size.
private static final int MAX_SOCKET_SEND_BUFFER_SIZE = 1;

private HdlcRpcChannelOutput channelOutput;
private Client rpcClient;
private HdlcParseThread parseThread;
Expand Down Expand Up @@ -176,7 +192,12 @@ public static void main(String[] args) {
logger.atSevere().log("Failed to connect to %s:%d", HOSTNAME, port);
System.exit(1);
}

try {
socket.setSendBufferSize(MAX_SOCKET_SEND_BUFFER_SIZE);
} catch (SocketException e) {
logger.atSevere().log("Invalid socket buffer size %d", MAX_SOCKET_SEND_BUFFER_SIZE);
System.exit(1);
}
InputStream reader = null;
OutputStream writer = null;

Expand Down
34 changes: 31 additions & 3 deletions pw_transfer/integration_test/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@
// WORK IN PROGRESS, SEE b/228516801
#include "pw_transfer/client.h"

#include <sys/socket.h>

#include <cstddef>
#include <cstdio>

#include "google/protobuf/text_format.h"
#include "pw_assert/check.h"
#include "pw_log/log.h"
#include "pw_rpc/integration_testing.h"
#include "pw_status/status.h"
Expand All @@ -36,9 +39,23 @@
#include "pw_transfer/integration_test/config.pb.h"
#include "pw_transfer/transfer_thread.h"

namespace pw::transfer {
namespace pw::transfer::integration_test {
namespace {

// This is the maximum size of the socket send buffers. Ideally, this is set
// to the lowest allowed value to minimize buffering between the proxy and
// clients so rate limiting causes the client to block and wait for the
// integration test proxy to drain rather than allowing OS buffers to backlog
// large quantities of data.
//
// Note that the OS may chose to not strictly follow this requested buffer size.
// Still, setting this value to be as small as possible does reduce bufer sizes
// significantly enough to better reflect typical inter-device communication.
//
// For this to be effective, servers should also configure their sockets to a
// smaller receive buffer size.
constexpr int kMaxSocketSendBufferSize = 1;

thread::Options& TransferThreadOptions() {
static thread::stl::Options options;
return options;
Expand Down Expand Up @@ -90,7 +107,7 @@ pw::Status SendData(const pw::transfer::ClientConfig& config) {
}

} // namespace
} // namespace pw::transfer
} // namespace pw::transfer::integration_test

int main(int argc, char* argv[]) {
if (argc < 2) {
Expand Down Expand Up @@ -121,7 +138,18 @@ int main(int argc, char* argv[]) {
return 1;
}

if (!pw::transfer::SendData(config).ok()) {
int retval = setsockopt(
pw::rpc::integration_test::GetClientSocketFd(),
SOL_SOCKET,
SO_SNDBUF,
&pw::transfer::integration_test::kMaxSocketSendBufferSize,
sizeof(pw::transfer::integration_test::kMaxSocketSendBufferSize));
PW_CHECK_INT_EQ(retval,
0,
"Failed to configure socket send buffer size with errno=%d",
errno);

if (!pw::transfer::integration_test::SendData(config).ok()) {
PW_LOG_INFO("Failed to transfer!");
return 1;
}
Expand Down
25 changes: 23 additions & 2 deletions pw_transfer/integration_test/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import asyncio
import logging
import random
import socket
import sys
import time
from typing import (Any, Awaitable, Callable, List, Optional)
Expand All @@ -35,6 +36,20 @@

_LOG = logging.getLogger('pw_transfer_intergration_test_proxy')

# This is the maximum size of the socket receive buffers. Ideally, this is set
# to the lowest allowed value to minimize buffering between the proxy and
# clients so rate limiting causes the client to block and wait for the
# integration test proxy to drain rather than allowing OS buffers to backlog
# large quantities of data.
#
# Note that the OS may chose to not strictly follow this requested buffer size.
# Still, setting this value to be relatively small does reduce bufer sizes
# significantly enough to better reflect typical inter-device communication.
#
# For this to be effective, clients should also configure their sockets to a
# smaller send buffer size.
_RECEIVE_BUFFER_SIZE = 2048


class Filter(abc.ABC):
"""An abstract interface for manipulating a stream of data.
Expand Down Expand Up @@ -301,9 +316,15 @@ async def _main(server_port: int, client_port: int) -> None:
config = text_format.Parse(text_config, config_pb2.ProxyConfig())

# Instantiate the TCP server.
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF,
_RECEIVE_BUFFER_SIZE)
server_socket.bind(('localhost', client_port))
server = await asyncio.start_server(
lambda reader, writer: _handle_connection(
server_port, config, reader, writer), 'localhost', client_port)
lambda reader, writer: _handle_connection(server_port, config, reader,
writer),
limit=_RECEIVE_BUFFER_SIZE,
sock=server_socket)

addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
_LOG.info(f'Listening for client connection on {addrs}')
Expand Down
26 changes: 26 additions & 0 deletions pw_transfer/integration_test/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
//
// integration_test_server 3300 <<< "resource_id: 12 file: '/tmp/gotbytes'"

#include <sys/socket.h>

#include <chrono>
#include <cstddef>
#include <cstdlib>
Expand All @@ -47,6 +49,20 @@ namespace {
using stream::MemoryReader;
using stream::MemoryWriter;

// This is the maximum size of the socket send buffers. Ideally, this is set
// to the lowest allowed value to minimize buffering between the proxy and
// clients so rate limiting causes the client to block and wait for the
// integration test proxy to drain rather than allowing OS buffers to backlog
// large quantities of data.
//
// Note that the OS may chose to not strictly follow this requested buffer size.
// Still, setting this value to be as small as possible does reduce bufer sizes
// significantly enough to better reflect typical inter-device communication.
//
// For this to be effective, servers should also configure their sockets to a
// smaller receive buffer size.
constexpr int kMaxSocketSendBufferSize = 1;

// TODO(tpudlik): This is copy-pasted from test_rpc_server.cc, break it out into
// a shared library.
class FileTransferHandler final : public ReadWriteHandler {
Expand Down Expand Up @@ -106,6 +122,16 @@ void RunServer(int socket_port, ServerConfig config) {

thread::DetachedThread(thread::stl::Options(), transfer_thread);

int retval = setsockopt(rpc::system_server::GetServerSocketFd(),
SOL_SOCKET,
SO_SNDBUF,
&kMaxSocketSendBufferSize,
sizeof(kMaxSocketSendBufferSize));
PW_CHECK_INT_EQ(retval,
0,
"Failed to configure socket send buffer size with errno=%d",
errno);

// It's fine to allocate this on the stack since this thread doesn't return
// until this process is killed.
FileTransferHandler transfer_handler(
Expand Down

0 comments on commit 873865d

Please sign in to comment.