Skip to content

Commit

Permalink
net: introduce input_buffer_factory concept.
Browse files Browse the repository at this point in the history
Signed-off-by: Radoslaw Zarzynski <rzarzyns@redhat.com>
  • Loading branch information
rzarzynski committed Jun 10, 2019
1 parent 0f1c501 commit c7aef8c
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 16 deletions.
16 changes: 15 additions & 1 deletion include/seastar/net/api.hh
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,17 @@ public:
void close();
};

class input_buffer_factory {
public:
virtual ~input_buffer_factory() = default;
/// Provide a rx buffer. Implementation is responsible for determining its size
/// and memory. This is useful when a network stack implementation does not put
/// extra requirements on these factors. The POSIX stack is the example here.
/// \param allocator Memory allocator \c connected_socket implementation prefers.
/// Maybe nullptr.
virtual temporary_buffer<char> create(compat::polymorphic_allocator<char>* allocator) = 0;
};

} /* namespace net */

/// \addtogroup networking-module
Expand Down Expand Up @@ -156,7 +167,10 @@ public:
/// Gets the input stream.
///
/// Gets an object returning data sent from the remote endpoint.
input_stream<char> input();
/// \param ibf_hint optional factory of rx buffers. The decision
/// whether to use the factory is opt to an implementation of \c
/// connected_socket.
input_stream<char> input(net::input_buffer_factory* ibf_hint = nullptr);
/// Gets the output stream.
///
/// Gets an object that sends data to the remote endpoint.
Expand Down
7 changes: 3 additions & 4 deletions include/seastar/net/posix-stack.hh
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,10 @@ class posix_data_source_impl final : public data_source_impl {
compat::polymorphic_allocator<char>* _buffer_allocator;
lw_shared_ptr<pollable_fd> _fd;
temporary_buffer<char> _buf;
size_t _buf_size;
net::input_buffer_factory* _buffer_factory;
public:
explicit posix_data_source_impl(lw_shared_ptr<pollable_fd> fd, compat::polymorphic_allocator<char>* allocator=memory::malloc_allocator,
size_t buf_size = 8192) : _buffer_allocator(allocator), _fd(std::move(fd)),
_buf(make_temporary_buffer<char>(_buffer_allocator, buf_size)), _buf_size(buf_size) {}
explicit posix_data_source_impl(lw_shared_ptr<pollable_fd> fd, net::input_buffer_factory* ibf, compat::polymorphic_allocator<char>* allocator=memory::malloc_allocator)
: _buffer_allocator(allocator), _fd(std::move(fd)), _buffer_factory(ibf) {}
future<temporary_buffer<char>> get() override;
future<> close() override;
};
Expand Down
2 changes: 1 addition & 1 deletion include/seastar/net/stack.hh
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ namespace net {
class connected_socket_impl {
public:
virtual ~connected_socket_impl() {}
virtual data_source source() = 0;
virtual data_source source(input_buffer_factory* ibf = nullptr) = 0;
virtual data_sink sink() = 0;
virtual void shutdown_input() = 0;
virtual void shutdown_output() = 0;
Expand Down
4 changes: 2 additions & 2 deletions src/net/native-stack-impl.hh
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class native_connected_socket_impl : public connected_socket_impl {
public:
explicit native_connected_socket_impl(lw_shared_ptr<typename Protocol::connection> conn)
: _conn(std::move(conn)) {}
virtual data_source source() override;
virtual data_source source(net::input_buffer_factory*) override;
virtual data_sink sink() override;
virtual void shutdown_input() override;
virtual void shutdown_output() override;
Expand Down Expand Up @@ -180,7 +180,7 @@ public:
};

template <typename Protocol>
data_source native_connected_socket_impl<Protocol>::source() {
data_source native_connected_socket_impl<Protocol>::source(net::input_buffer_factory*) {
return data_source(std::make_unique<native_data_source_impl>(_conn));
}

Expand Down
16 changes: 12 additions & 4 deletions src/net/posix-stack.cc
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,16 @@ class posix_connected_socket_impl final : public connected_socket_impl, posix_co
explicit posix_connected_socket_impl(lw_shared_ptr<pollable_fd> fd, conntrack::handle&& handle,
compat::polymorphic_allocator<char>* allocator=memory::malloc_allocator) : _fd(std::move(fd)), _handle(std::move(handle)), _allocator(allocator) {}
public:
virtual data_source source() override {
return data_source(std::make_unique< posix_data_source_impl>(_fd, _allocator));
virtual data_source source(net::input_buffer_factory* ibf) override {
if (!ibf) {
static struct final : input_buffer_factory {
temporary_buffer<char> create(compat::polymorphic_allocator<char>* const allocator) override {
return make_temporary_buffer<char>(allocator, 8192);
}
} default_posix_inbuf_factory{};
ibf = &default_posix_inbuf_factory;
}
return data_source(std::make_unique<posix_data_source_impl>(_fd, ibf, _allocator));
}
virtual data_sink sink() override {
return data_sink(std::make_unique< posix_data_sink_impl>(_fd));
Expand Down Expand Up @@ -317,10 +325,10 @@ posix_ap_server_socket_impl<Transport>::move_connected_socket(socket_address sa,

future<temporary_buffer<char>>
posix_data_source_impl::get() {
return _fd->read_some(_buf.get_write(), _buf_size).then([this] (size_t size) {
_buf = _buffer_factory->create(_buffer_allocator);
return _fd->read_some(_buf.get_write(), _buf.size()).then([this] (size_t size) {
_buf.trim(size);
auto ret = std::move(_buf);
_buf = make_temporary_buffer<char>(_buffer_allocator, _buf_size);
return make_ready_future<temporary_buffer<char>>(std::move(ret));
});
}
Expand Down
4 changes: 2 additions & 2 deletions src/net/stack.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ connected_socket& connected_socket::operator=(connected_socket&& cs) noexcept =
connected_socket::~connected_socket()
{}

input_stream<char> connected_socket::input() {
return input_stream<char>(_csi->source());
input_stream<char> connected_socket::input(net::input_buffer_factory* const ise) {
return input_stream<char>(_csi->source(ise));
}

output_stream<char> connected_socket::output(size_t buffer_size) {
Expand Down
4 changes: 2 additions & 2 deletions src/net/tls.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1050,7 +1050,7 @@ class tls_connected_socket_impl : public net::connected_socket_impl, public sess
class source_impl;
class sink_impl;

data_source source() override;
data_source source(net::input_buffer_factory*) override;
data_sink sink() override;

void shutdown_input() override {
Expand Down Expand Up @@ -1160,7 +1160,7 @@ class tls_socket_impl : public net::socket_impl {

}

data_source tls::tls_connected_socket_impl::source() {
data_source tls::tls_connected_socket_impl::source(net::input_buffer_factory*) {
return data_source(std::make_unique<source_impl>(_session));
}

Expand Down

0 comments on commit c7aef8c

Please sign in to comment.