From a0f82295a7b38251cfb6104d09691fa88127e82c Mon Sep 17 00:00:00 2001 From: Adam Casey Date: Fri, 1 Apr 2022 12:19:27 -0400 Subject: [PATCH 1/4] perf tester: Count messages properly Noticed in a packet capture that we were sending two messages when it should only send one. --- tests/performance_tester/src/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/performance_tester/src/client.rs b/tests/performance_tester/src/client.rs index 8c82521..3c5e182 100644 --- a/tests/performance_tester/src/client.rs +++ b/tests/performance_tester/src/client.rs @@ -33,7 +33,7 @@ pub(crate) fn run_sync_client( let mut count = 0; loop { - if count > num_messages { + if count >= num_messages { break; } exchange.publish(Publish::new(&arr, routing_key))?; From 243022f46a14b908ad0bd721935351c94d2ebd8e Mon Sep 17 00:00:00 2001 From: Adam Casey Date: Fri, 1 Apr 2022 12:25:09 -0400 Subject: [PATCH 2/4] perf tester: Handle channel close, extend listen backlog Noticed during testing that amiquip was sending Channel Close but the server send Channel OpenOk in return, triggering an error. Since this happened while the connection was being Drop'ed the error seems to have been swallowed. The connection close is now explicit, even though this does avoid closing the channel. I left in the channel close -> closeok in case we change something here in future. --- tests/performance_tester/src/client.rs | 2 ++ tests/performance_tester/src/main.rs | 15 ++++++++++----- tests/performance_tester/src/server.rs | 19 ++++++++++++++++--- 3 files changed, 28 insertions(+), 8 deletions(-) diff --git a/tests/performance_tester/src/client.rs b/tests/performance_tester/src/client.rs index 3c5e182..9469355 100644 --- a/tests/performance_tester/src/client.rs +++ b/tests/performance_tester/src/client.rs @@ -41,5 +41,7 @@ pub(crate) fn run_sync_client( count += 1; } + connection.close()?; + Ok(()) } diff --git a/tests/performance_tester/src/main.rs b/tests/performance_tester/src/main.rs index 50cb4e7..bc71690 100644 --- a/tests/performance_tester/src/main.rs +++ b/tests/performance_tester/src/main.rs @@ -95,9 +95,7 @@ fn main() -> Result<()> { { println!("Starting TLS dummy amqp server"); let acceptor = server::create_tls_acceptor(&listen_cert, &listen_key).unwrap(); - tokio::spawn(async move { - server::run_tls_server(address, acceptor).await - }) + tokio::spawn(async move { server::run_tls_server(address, acceptor).await }) } else { println!("Starting non-TLS dummy amqp server"); tokio::spawn(async move { server::run_server(address).await }) @@ -115,7 +113,12 @@ fn main() -> Result<()> { let routing_key = opts.routing_key.clone(); let handle = tokio::task::spawn_blocking(move || { - crate::client::run_sync_client(address, message_size, num_messages, &routing_key) + crate::client::run_sync_client( + address, + message_size, + num_messages, + &routing_key, + ) }); handles.push(handle); } @@ -173,7 +176,9 @@ async fn wait_for_addr(addr: SocketAddr, timeout_total: Duration) -> Result<()> } } - let target = start.checked_add(timeout_step).ok_or(anyhow::anyhow!("Timeout add overflowed"))?; + let target = start + .checked_add(timeout_step) + .ok_or(anyhow::anyhow!("Timeout add overflowed"))?; if let Some(sleep_time) = target.checked_duration_since(Instant::now()) { tokio::time::sleep(sleep_time).await; } diff --git a/tests/performance_tester/src/server.rs b/tests/performance_tester/src/server.rs index f662399..156e24e 100644 --- a/tests/performance_tester/src/server.rs +++ b/tests/performance_tester/src/server.rs @@ -36,6 +36,7 @@ use std::path::Path; use std::sync::Arc; use tokio::io::AsyncReadExt; use tokio::net::TcpListener; +use tokio::net::TcpSocket; use tokio_rustls::rustls::{self, Certificate, PrivateKey}; use tokio_rustls::TlsAcceptor; use AMQPFrame::Method; @@ -198,12 +199,20 @@ pub async fn process_connection< while let Some(frame) = framed.next().await { log::trace!("Received: {:?}", &frame); match frame { - Ok(Method(channel, AMQPClass::Channel(channelmsg))) => { + Ok(Method(channel, AMQPClass::Channel(channel::AMQPMethod::Open(channelmsg)))) => { log::debug!("Set up channel: {} {:?}", channel, channelmsg); let channelok_method = channel::AMQPMethod::OpenOk(channel::OpenOk {}); let channelok = Method(channel, AMQPClass::Channel(channelok_method)); framed.send(channelok).await?; } + Ok(Method(channel, AMQPClass::Channel(channel::AMQPMethod::Close(_channelmsg)))) => { + log::debug!("Received channel close"); + let closeok = Method( + channel, + AMQPClass::Channel(channel::AMQPMethod::CloseOk(channel::CloseOk {})), + ); + framed.send(closeok).await?; + } Ok(Method(0, AMQPClass::Connection(connection::AMQPMethod::Close(closemsg)))) => { log::info!("Closing connection requested: {:?}", closemsg); let closeok_method = connection::AMQPMethod::CloseOk(connection::CloseOk {}); @@ -234,9 +243,13 @@ pub fn create_tls_acceptor(cert_chain: &Path, key: &Path) -> Result } pub async fn run_tls_server(address: SocketAddr, acceptor: TlsAcceptor) -> Result<()> { - log::info!("Listening on {:?}", &address); - let listener = TcpListener::bind(address).await?; + + let socket = TcpSocket::new_v4()?; + socket.set_reuseaddr(true)?; + socket.bind(address)?; + + let listener = socket.listen(1024)?; loop { let (socket, peer) = listener.accept().await?; From c8bb8d24ef59a2583a1f20d241fcd53e233eaf2c Mon Sep 17 00:00:00 2001 From: Adam Casey Date: Fri, 1 Apr 2022 14:52:13 -0400 Subject: [PATCH 3/4] perf tester: Stop checking the result of the close --- tests/performance_tester/src/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/performance_tester/src/client.rs b/tests/performance_tester/src/client.rs index 9469355..0c14e7a 100644 --- a/tests/performance_tester/src/client.rs +++ b/tests/performance_tester/src/client.rs @@ -41,7 +41,7 @@ pub(crate) fn run_sync_client( count += 1; } - connection.close()?; + let _ = connection.close(); Ok(()) } From 688cc88fe876f71e8f2eb2a04b880591fb20c998 Mon Sep 17 00:00:00 2001 From: Adam Casey Date: Thu, 24 Mar 2022 08:11:47 -0400 Subject: [PATCH 4/4] Workaround asio::ssl async_read_some busy-loop https://github.com/chriskohlhoff/asio/issues/1015 `asio::ssl` has a bug where calling async_read_some with `null_buffers` isn't correctly handled, and more or less immediately invokes the provided handler with no data. This causes `amqpprox` to busy-loop whenever a TLS socket has been accepted or opened. There were two options for how to fix this: 1) Always read with a fixed size buffer, such as 32kb. This would simplify the code slightly, at the expense of needing multiple reads to handle larger than 32kb frames in non-TLS mode, even when the full frame is available to `amqpprox` in one go. 2) Ask asio::ssl to read with a very small buffer, then ask the openssl library how many bytes are available. This technique aligns with how `amqpprox`'s read loop works today. That is what is implemented here. In theory something similar could be upstreamed into `asio::ssl`. It's a little tricky though and this exact code couldn't handle the generic `MutableBufferSequence` interface - we can take some shortcuts in our code. I've done some benchmarking to check this change isn't going to regress performance noticeably. Data throughput tests indicate that this fix improves performance for TLS connections over the existing code. Still running connection throughput tests. --- .../amqpprox_maybesecuresocketadaptor.h | 109 +++++++++++++++--- libamqpprox/amqpprox_session.cpp | 25 ++-- 2 files changed, 108 insertions(+), 26 deletions(-) diff --git a/libamqpprox/amqpprox_maybesecuresocketadaptor.h b/libamqpprox/amqpprox_maybesecuresocketadaptor.h index 10ca614..03cd7e7 100644 --- a/libamqpprox/amqpprox_maybesecuresocketadaptor.h +++ b/libamqpprox/amqpprox_maybesecuresocketadaptor.h @@ -19,6 +19,7 @@ #include #include #include +#include #include #include @@ -38,29 +39,33 @@ class MaybeSecureSocketAdaptor { using endpoint = boost::asio::ip::tcp::endpoint; using handshake_type = boost::asio::ssl::stream_base::handshake_type; - boost::asio::io_service & d_ioService; + boost::asio::io_service &d_ioService; std::optional> d_intercept; - std::unique_ptr d_socket; - bool d_secured; - bool d_handshook; + std::unique_ptr d_socket; + bool d_secured; + bool d_handshook; + char d_smallBuffer; + bool d_smallBufferSet; public: typedef typename stream_type::executor_type executor_type; #ifdef SOCKET_TESTING MaybeSecureSocketAdaptor(boost::asio::io_service &ioService, - SocketIntercept & intercept, + SocketIntercept &intercept, bool secured) : d_ioService(ioService) , d_intercept(intercept) , d_socket() , d_secured(secured) , d_handshook(false) + , d_smallBuffer(0) + , d_smallBufferSet(false) { } #endif - MaybeSecureSocketAdaptor(boost::asio::io_service & ioService, + MaybeSecureSocketAdaptor(boost::asio::io_service &ioService, boost::asio::ssl::context &context, bool secured) : d_ioService(ioService) @@ -68,6 +73,8 @@ class MaybeSecureSocketAdaptor { , d_socket(std::make_unique(ioService, context)) , d_secured(secured) , d_handshook(false) + , d_smallBuffer(0) + , d_smallBufferSet(false) { } @@ -77,10 +84,14 @@ class MaybeSecureSocketAdaptor { , d_socket(std::move(src.d_socket)) , d_secured(src.d_secured) , d_handshook(src.d_handshook) + , d_smallBuffer(src.d_smallBuffer) + , d_smallBufferSet(src.d_smallBufferSet) { - src.d_socket = std::unique_ptr(); - src.d_secured = false; - src.d_handshook = false; + src.d_socket = std::unique_ptr(); + src.d_secured = false; + src.d_handshook = false; + src.d_smallBuffer = 0; + src.d_smallBufferSet = false; } boost::asio::ip::tcp::socket &socket() { return d_socket->next_layer(); } @@ -182,13 +193,24 @@ class MaybeSecureSocketAdaptor { d_socket->next_layer().close(ec); } + /** + * Indicates the number of bytes immediately readable out of the socket + * For TLS connections this references the number of bytes which are + * immediately available for reading from the current fully-read record + */ std::size_t available(boost::system::error_code &ec) { if (BOOST_UNLIKELY(d_intercept.has_value())) { return d_intercept.value().get().available(ec); } - return d_socket->next_layer().available(ec); + if (d_secured) { + return (d_smallBufferSet ? 1 : 0) + + SSL_pending(d_socket->native_handle()); + } + else { + return d_socket->next_layer().available(ec); + } } template @@ -247,13 +269,33 @@ class MaybeSecureSocketAdaptor { template std::size_t read_some(const MutableBufferSequence &buffers, - boost::system::error_code & ec) + boost::system::error_code &ec) { if (BOOST_UNLIKELY(d_intercept.has_value())) { return d_intercept.value().get().read_some(buffers, ec); } if (isSecure()) { + // Ensure we read the small-buffer-workaround if it has been used + if (d_smallBufferSet && buffers.size() >= 1) { + ((char *)buffers.data())[0] = d_smallBuffer; + d_smallBufferSet = false; + + MutableBufferSequence replacement(buffers); + replacement += 1; + + size_t result = d_socket->read_some(replacement, ec); + + if (ec && result == 0) { + ec = boost::system::error_code(); + // Pretend read_some succeeded this time around because + // there's one byte left over. + return 1; + } + + return 1 + result; + } + return d_socket->read_some(buffers, ec); } else { @@ -261,21 +303,56 @@ class MaybeSecureSocketAdaptor { } } - template + /** + * This async_read_some specialisation is required due to + * https://github.com/chriskohlhoff/asio/issues/1015 + * + * For TLS sockets we need to ensure we call this method with a buffer size + * of at least one byte. This is handled by passing in a small buffer (1 + * byte). This byte is then passed back via `read_some`. The presense of + * this byte is also represented in the return value of `available`. + */ + template BOOST_ASIO_INITFN_RESULT_TYPE(ReadHandler, void(boost::system::error_code, std::size_t)) - async_read_some(const MutableBufferSequence &buffers, + async_read_some(const boost::asio::null_buffers &null_buffer, BOOST_ASIO_MOVE_ARG(ReadHandler) handler) { if (BOOST_UNLIKELY(d_intercept.has_value())) { - return d_intercept.value().get().async_read_some(buffers, handler); + return d_intercept.value().get().async_read_some(null_buffer, + handler); } if (isSecure()) { - return d_socket->async_read_some(buffers, handler); + if (d_smallBufferSet) { + // The reader missed a byte - invoke ssl + // async_read_some(zero-sized-buffer) so the handler is + // immediately invoked to collect this missing byte. This + // codepath wasn't hit during testing, but it's left here for + // completeness + LOG_DEBUG << "Invoked async_read_some again before reading " + "data. Immediately invoking handler"; + + return d_socket->async_read_some( + boost::asio::buffer(&d_smallBuffer, 0), handler); + } + + // async_read_some with a one byte buffer to ensure we are only + // called with useful progress + return d_socket->async_read_some( + boost::asio::buffer(&d_smallBuffer, sizeof(d_smallBuffer)), + [this, handler](boost::system::error_code ec, + std::size_t length) { + if (length != 0) { + d_smallBufferSet = true; + } + + handler(ec, length); + }); } else { - return d_socket->next_layer().async_read_some(buffers, handler); + return d_socket->next_layer().async_read_some(null_buffer, + handler); } } }; diff --git a/libamqpprox/amqpprox_session.cpp b/libamqpprox/amqpprox_session.cpp index 86b536c..e765ac4 100644 --- a/libamqpprox/amqpprox_session.cpp +++ b/libamqpprox/amqpprox_session.cpp @@ -70,13 +70,13 @@ namespace amqpprox { using namespace boost::asio::ip; using namespace boost::system; -Session::Session(boost::asio::io_service & ioservice, - MaybeSecureSocketAdaptor && serverSocket, - MaybeSecureSocketAdaptor && clientSocket, - ConnectionSelector * connectionSelector, - EventSource * eventSource, - BufferPool * bufferPool, - DNSResolver * dnsResolver, +Session::Session(boost::asio::io_service &ioservice, + MaybeSecureSocketAdaptor &&serverSocket, + MaybeSecureSocketAdaptor &&clientSocket, + ConnectionSelector *connectionSelector, + EventSource *eventSource, + BufferPool *bufferPool, + DNSResolver *dnsResolver, const std::shared_ptr &hostnameMapper, std::string_view localHostname, const std::shared_ptr &authIntercept) @@ -184,7 +184,7 @@ void Session::attemptConnection( using endpointType = boost::asio::ip::tcp::endpoint; auto self(shared_from_this()); auto callback = [this, self, connectionManager]( - const error_code & ec, + const error_code &ec, std::vector endpoints) { BOOST_LOG_SCOPED_THREAD_ATTR( "Vhost", @@ -682,6 +682,11 @@ void Session::readData(FlowType direction) readData(direction); } else { + if (readAmount > 0) { + LOG_TRACE << "read_some returned data and error. Data " + "discarded from " + << direction << " to close sockets"; + } handleSessionError("read_some", direction, ec); return; } @@ -754,7 +759,7 @@ void Session::handleData(FlowType direction) } } -void Session::handleSessionError(const char * action, +void Session::handleSessionError(const char *action, FlowType direction, boost::system::error_code ec) { @@ -818,7 +823,7 @@ void Session::handleSessionError(const char * action, } void Session::handleConnectionError( - const char * action, + const char *action, boost::system::error_code ec, const std::shared_ptr &connectionManager) {