Skip to content

Commit

Permalink
pw_rpc: Expose integration test socket
Browse files Browse the repository at this point in the history
Exposes the socket file descriptor for socket-based RPC integration
testing servers/clients to enable socket configuration.

Change-Id: I04d0cb8674a10436da926a07347182c7c844ac59
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/91942
Pigweed-Auto-Submit: Armando Montanez <amontanez@google.com>
Reviewed-by: Erik Gilling <konkers@google.com>
Commit-Queue: Auto-Submit <auto-submit@pigweed.google.com.iam.gserviceaccount.com>
  • Loading branch information
armandomontanez authored and CQ Bot Account committed Apr 25, 2022
1 parent 917daac commit 4b52b5d
Show file tree
Hide file tree
Showing 8 changed files with 38 additions and 10 deletions.
2 changes: 2 additions & 0 deletions pw_rpc/integration_testing.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ unit_test::LoggingEventHandler log_test_events;

Client& client() { return context.client(); }

int GetClientSocketFd() { return context.GetSocketFd(); }

void SetEgressChannelManipulator(ChannelManipulator* new_channel_manipulator) {
context.SetEgressChannelManipulator(new_channel_manipulator);
}
Expand Down
2 changes: 2 additions & 0 deletions pw_rpc/public/pw_rpc/integration_test_socket_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ class SocketClientContext {
return OkStatus();
}

int GetSocketFd() { return stream_.connection_fd(); }

void SetEgressChannelManipulator(
ChannelManipulator* new_channel_manipulator) {
channel_output_with_manipulator_.set_channel_manipulator(
Expand Down
4 changes: 4 additions & 0 deletions pw_rpc/public/pw_rpc/integration_testing.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ void SetIngressChannelManipulator(ChannelManipulator* new_channel_manipulator);
// Returns the global RPC client for integration test use.
Client& client();

// The file descriptor for the socket associated with the client. This may be
// used to configure socket options.
int GetClientSocketFd();

// Initializes logging and the global RPC client for integration testing. Starts
// a background thread that processes incoming.
Status InitializeClient(int argc,
Expand Down
4 changes: 4 additions & 0 deletions pw_rpc/system_server/public/pw_rpc_system_server/socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,8 @@ namespace pw::rpc::system_server {
// Sets the port to use for pw::rpc::system_server backends that use sockets.
void set_socket_port(uint16_t port);

// The file descriptor for the socket associated with the server. This may be
// used to configure socket options.
int GetServerSocketFd();

} // namespace pw::rpc::system_server
5 changes: 5 additions & 0 deletions pw_stream/docs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,11 @@ Implementations
``StdFileReader`` wraps an ``std::ifstream`` with the :cpp:class:`Reader`
interface.

.. cpp:class:: SocketStream : public NonSeekableReaderWriter

``SocketStream`` wraps posix-style sockets with the :cpp:class:`Reader` and
:cpp:class:`Writer` interfaces.

------------------
Why use pw_stream?
------------------
Expand Down
9 changes: 8 additions & 1 deletion pw_stream/public/pw_stream/socket_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ class SocketStream : public NonSeekableReaderWriter {
// Close the socket stream and release all resources
void Close();

// Exposes the file descriptor for the active connection. This is exposed to
// allow configuration and introspection of this socket's current
// configuration using setsockopt() and getsockopt().
//
// Returns -1 if there is no active connection.
int connection_fd() { return connection_fd_; }

private:
static constexpr int kInvalidFd = -1;

Expand All @@ -47,7 +54,7 @@ class SocketStream : public NonSeekableReaderWriter {

uint16_t listen_port_ = 0;
int socket_fd_ = kInvalidFd;
int conn_fd_ = kInvalidFd;
int connection_fd_ = kInvalidFd;
struct sockaddr_in sockaddr_client_ = {};
};

Expand Down
20 changes: 11 additions & 9 deletions pw_stream/socket_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,16 @@ Status SocketStream::Serve(uint16_t port) {

socklen_t len = sizeof(sockaddr_client_);

conn_fd_ =
connection_fd_ =
accept(socket_fd_, reinterpret_cast<sockaddr*>(&sockaddr_client_), &len);
if (conn_fd_ < 0) {
if (connection_fd_ < 0) {
return Status::Unknown();
}
return OkStatus();
}

Status SocketStream::SocketStream::Connect(const char* host, uint16_t port) {
conn_fd_ = socket(AF_INET, SOCK_STREAM, 0);
connection_fd_ = socket(AF_INET, SOCK_STREAM, 0);

sockaddr_in addr;
addr.sin_family = AF_INET;
Expand All @@ -94,7 +94,9 @@ Status SocketStream::SocketStream::Connect(const char* host, uint16_t port) {
return Status::InvalidArgument();
}

if (connect(conn_fd_, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) < 0) {
if (connect(connection_fd_,
reinterpret_cast<sockaddr*>(&addr),
sizeof(addr)) < 0) {
PW_LOG_ERROR(
"Failed to connect to %s:%d: %s", host, port, std::strerror(errno));
return Status::Unknown();
Expand All @@ -109,17 +111,17 @@ void SocketStream::Close() {
socket_fd_ = kInvalidFd;
}

if (conn_fd_ != kInvalidFd) {
close(conn_fd_);
conn_fd_ = kInvalidFd;
if (connection_fd_ != kInvalidFd) {
close(connection_fd_);
connection_fd_ = kInvalidFd;
}
}

Status SocketStream::DoWrite(std::span<const std::byte> data) {
// Use MSG_NOSIGNAL to avoid getting a SIGPIPE signal when the remote
// peer drops the connection.
ssize_t bytes_sent =
send(conn_fd_, data.data(), data.size_bytes(), MSG_NOSIGNAL);
send(connection_fd_, data.data(), data.size_bytes(), MSG_NOSIGNAL);

if (bytes_sent < 0 || static_cast<size_t>(bytes_sent) != data.size()) {
if (errno == EPIPE) {
Expand All @@ -134,7 +136,7 @@ Status SocketStream::DoWrite(std::span<const std::byte> data) {
}

StatusWithSize SocketStream::DoRead(ByteSpan dest) {
ssize_t bytes_rcvd = recv(conn_fd_, dest.data(), dest.size_bytes(), 0);
ssize_t bytes_rcvd = recv(connection_fd_, dest.data(), dest.size_bytes(), 0);
if (bytes_rcvd < 0) {
return StatusWithSize::Unknown();
}
Expand Down
2 changes: 2 additions & 0 deletions targets/host/system_rpc_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ void set_socket_port(uint16_t new_socket_port) {
socket_port = new_socket_port;
}

int GetServerSocketFd() { return socket_stream.connection_fd(); }

void Init() {
log_basic::SetOutput([](std::string_view log) {
std::fprintf(stderr, "%.*s\n", static_cast<int>(log.size()), log.data());
Expand Down

0 comments on commit 4b52b5d

Please sign in to comment.