From 796981315c2d2b7694882910e73c73af8eb7b5bc Mon Sep 17 00:00:00 2001 From: Kenneth Giusti Date: Tue, 10 Oct 2023 11:51:41 -0400 Subject: [PATCH] Fixes #1136: Add test for mismatched TCP adaptor encapsulation --- src/adaptors/tcp_lite/tcp_lite.c | 79 ++++++++++++++++++++++-- tests/system_tests_tcp_adaptor.py | 99 ++++++++++++++++++++++++++++++- 2 files changed, 170 insertions(+), 8 deletions(-) diff --git a/src/adaptors/tcp_lite/tcp_lite.c b/src/adaptors/tcp_lite/tcp_lite.c index de5726c67..4a3bd9616 100644 --- a/src/adaptors/tcp_lite/tcp_lite.c +++ b/src/adaptors/tcp_lite/tcp_lite.c @@ -92,6 +92,7 @@ static void on_connection_event_CSIDE_IO(pn_event_t *e, qd_server_t *qd_server, static void connection_run_LSIDE_IO(tcplite_connection_t *conn); static void connection_run_CSIDE_IO(tcplite_connection_t *conn); static void connection_run_XSIDE_IO(tcplite_connection_t *conn); +static uint64_t validate_outbound_message(const qdr_delivery_t *out_dlv); //================================================================================= @@ -912,14 +913,29 @@ static void extract_metadata_from_stream_CSIDE(tcplite_connection_t *conn) } } - -static void handle_outbound_delivery_LSIDE_IO(tcplite_connection_t *conn, qdr_link_t *link, qdr_delivery_t *delivery) +// Handle delivery of outbound message to the client. +// +// @return 0 on success, otherwise a terminal outcome indicating that the message cannot be delivered. +// +static uint64_t handle_outbound_delivery_LSIDE_IO(tcplite_connection_t *conn, qdr_link_t *link, qdr_delivery_t *delivery) { ASSERT_RAW_IO; qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] handle_outbound_delivery_LSIDE_IO - receive_complete=%s", conn->common.conn_id, qd_message_receive_complete(conn->outbound_stream) ? "true" : "false"); if (!conn->outbound_delivery) { + // newly arrived delivery: validate it + // + uint64_t dispo = validate_outbound_message(delivery); + if (dispo != PN_RECEIVED) { + // PN_RELEASED: since this message was delivered to this listener's unique reply-to, it cannot be + // redelivered to another consumer. PN_RELEASED means incompatible encapsulation so this is a + // misconfiguration. Reject the delivery. + if (dispo == PN_RELEASED) + dispo = PN_REJECTED; + return dispo; + } + qdr_delivery_incref(delivery, "handle_outbound_delivery_LSIDE_IO"); conn->outbound_delivery = delivery; conn->outbound_stream = qdr_delivery_message(delivery); @@ -937,18 +953,28 @@ static void handle_outbound_delivery_LSIDE_IO(tcplite_connection_t *conn, qdr_li } connection_run_LSIDE_IO(conn); + return 0; } /** * Handle the first indication of a new outbound delivery on CSIDE. This is where the raw connection to the * external service is established. This function executes in an IO thread not associated with a raw connection. + * + * @return disposition. MOVED_TO_NEW_LINK on success, 0 if more message needed, else error outcome */ -static void handle_first_outbound_delivery_CSIDE(tcplite_connector_t *cr, qdr_link_t *link, qdr_delivery_t *delivery) +static uint64_t handle_first_outbound_delivery_CSIDE(tcplite_connector_t *cr, qdr_link_t *link, qdr_delivery_t *delivery) { ASSERT_TIMER_IO; assert(!qdr_delivery_get_context(delivery)); + // Verify the message properties have arrived and are valid + // + uint64_t dispo = validate_outbound_message(delivery); + if (dispo != PN_RECEIVED) { + return dispo; + } + tcplite_connection_t *conn = new_tcplite_connection_t(); ZERO(conn); @@ -1004,6 +1030,8 @@ static void handle_first_outbound_delivery_CSIDE(tcplite_connector_t *cr, qdr_li // SET_ATOMIC_FLAG(&conn->raw_opened); pn_proactor_raw_connect(tcplite_context->proactor, conn->raw_conn, cr->adaptor_config->host_port); + + return QD_DELIVERY_MOVED_TO_NEW_LINK; } @@ -1281,6 +1309,46 @@ static void connection_run_XSIDE_IO(tcplite_connection_t *conn) } } +// Validate the outbound message associated with out_dlv +// +// @return a disposition value indicating the validity of the message: +// 0: message headers incomplete, wait for more data to arrive +// PN_REJECTED: corrupt headers, cannot be re-delivered +// PN_RELEASED: headers ok, incompatible body format: deliver elsewhere +// PN_RECEIVED: headers & body ok +// +static uint64_t validate_outbound_message(const qdr_delivery_t *out_dlv) +{ + qd_message_t *msg = qdr_delivery_message(out_dlv); + qd_message_depth_status_t depth_ok = qd_message_check_depth(msg, QD_DEPTH_PROPERTIES); + if (depth_ok == QD_MESSAGE_DEPTH_INCOMPLETE) { + qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, + DLV_FMT " tcp_adaptor egress message incomplete, waiting for more", DLV_ARGS(out_dlv)); + return 0; // retry later + } + if (depth_ok != QD_MESSAGE_DEPTH_OK) { // otherwise bug? corrupted message encoding? + qd_log(LOG_TCP_ADAPTOR, QD_LOG_WARNING, DLV_FMT " Malformed TCP message - discarding!", DLV_ARGS(out_dlv)); + qd_message_set_send_complete(msg); + return PN_REJECTED; + } + + // ISSUE-1136: ensure the message body is using the proper encapsulation. + // + bool encaps_ok = false; + qd_iterator_t *encaps = qd_message_field_iterator(msg, QD_FIELD_CONTENT_TYPE); + if (encaps) { + encaps_ok = qd_iterator_equal(encaps, (unsigned char *) QD_CONTENT_TYPE_APP_OCTETS); + qd_iterator_free(encaps); + } + if (!encaps_ok) { + qd_log(LOG_TCP_ADAPTOR, QD_LOG_ERROR, DLV_FMT " Misconfigured TCP adaptor (wrong encapsulation)", + DLV_ARGS(out_dlv)); + qd_message_set_send_complete(msg); + return PN_RELEASED; // allow it to be re-forwarded to a different adaptor + } + return PN_RECEIVED; +} + //================================================================================= // Handlers for events from the Raw Connections @@ -1503,12 +1571,11 @@ static uint64_t CORE_deliver_outbound(void *context, qdr_link_t *link, qdr_deliv } if (common->context_type == TL_CONNECTOR) { - handle_first_outbound_delivery_CSIDE((tcplite_connector_t*) common, link, delivery); - return QD_DELIVERY_MOVED_TO_NEW_LINK; + return handle_first_outbound_delivery_CSIDE((tcplite_connector_t*) common, link, delivery); } else if (common->context_type == TL_CONNECTION) { tcplite_connection_t *conn = (tcplite_connection_t*) common; if (conn->listener_side) { - handle_outbound_delivery_LSIDE_IO(conn, link, delivery); + return handle_outbound_delivery_LSIDE_IO(conn, link, delivery); } else { handle_outbound_delivery_CSIDE(conn, link, delivery, settled); } diff --git a/tests/system_tests_tcp_adaptor.py b/tests/system_tests_tcp_adaptor.py index f83215b8a..a30a5c0f5 100644 --- a/tests/system_tests_tcp_adaptor.py +++ b/tests/system_tests_tcp_adaptor.py @@ -2198,7 +2198,7 @@ def check_connection_deleted(): class TcpLegacyInvalidEncodingTest(TestCase): """ Ensure that the TCP adaptor can recover from receiving an improperly - formatted/wrong version AMQP encoded stream message. + formatted AMQP encoded stream message. """ @classmethod def setUpClass(cls): @@ -2287,6 +2287,10 @@ def test_invalid_ingress_server_reply_body(self): class InvalidClientSendRequest(MessagingHandler): + """ + Connect to address and send msg to destination. Expect message to be + released. + """ def __init__(self, msg, address, destination): super(InvalidClientSendRequest, self).__init__(auto_settle=False) self.msg = msg @@ -2331,6 +2335,14 @@ def run(self): class InvalidServerSendReply(MessagingHandler): + """ + Simulate a client/server TCP flow and send an invalid reply message via the + faked "server". The fake server connects to server_address and creates an + AMQP listener for service_address. The fake client connects to listener + address and creates an AMQP sender to service_address. When the request + message arrives at the fake server, send an invalid msg back as the + response. + """ def __init__(self, msg, server_address, listener_address, service_address, dispo): super(InvalidServerSendReply, self).__init__(auto_settle=False) self.msg = msg @@ -2353,7 +2365,8 @@ def __init__(self, msg, server_address, listener_address, service_address, dispo self.request_dlv = None self.dlv_drain_timer = None - # fack tcp client, just sends a request message + # fake tcp client, just opens an AMQP connection to the TCP + # listener. This initiates the ingress streaming request message. self.listener_address = listener_address self.client_conn = None self.client_sent = False @@ -2630,5 +2643,87 @@ def test_01_check_delayed_deliveries(self): f"Expected delay counter to be zero, got {counters}") +class TcpMisconfiguredLegacyLiteEncapsTest(TestCase): + """ + Ensure that the TCP adaptor can detect misconfiguration of the + encapsulation setting by creating a tcpListener and tcpConnector pair that + use different encaps. + """ + @classmethod + def setUpClass(cls): + super(TcpMisconfiguredLegacyLiteEncapsTest, cls).setUpClass() + + config = [ + ('router', {'mode': 'interior', 'id': 'TcpMisconfiguredEncaps'}), + # Listener for handling router management requests. + ('listener', {'role': 'normal', + 'port': cls.tester.get_port()}), + ('address', {'prefix': 'closest', 'distribution': 'closest'}), + ('address', {'prefix': 'multicast', 'distribution': 'multicast'}), + ] + + cls.router = cls.tester.qdrouterd('TcpInvalidEncoding', + Qdrouterd.Config(config), wait=True) + cls.address = cls.router.addresses[0] + + def _test(self, ingress_encaps, egress_encaps): + ingress_port = self.tester.get_port() + egress_port = self.tester.get_port() + mgmt = self.router.qd_manager + + mgmt.create(TCP_CONNECTOR_TYPE, + {"name": "EncapsConnector", + "address": "EncapsTest", + "host": "localhost", + "port": egress_port, + "encapsulation": egress_encaps}) + mgmt.create(TCP_LISTENER_TYPE, + {"name": "EncapsListener", + "address": "EncapsTest", + "port": ingress_port, + "encapsulation": ingress_encaps}) + wait_tcp_listeners_up(self.address) + + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server: + server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + server.settimeout(TIMEOUT) + server.bind(("", egress_port)) + server.listen(1) + + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client_conn: + client_conn.settimeout(TIMEOUT) + while True: + try: + client_conn.connect(('127.0.0.1', ingress_port)) + break + except ConnectionRefusedError: + # There may be a delay between the operStatus going up and + # the actual listener socket availability, so allow that: + time.sleep(0.1) + continue + + # send data to kick off the flow, but expect connection to fail + # in recv (either close or reset) + + client_conn.sendall(b' test ') + try: + data = client_conn.recv(4096) + except ConnectionResetError: + data = b'' + + self.assertEqual(b'', data, "expected recv to fail") + + mgmt.delete(TCP_CONNECTOR_TYPE, name="EncapsConnector") + mgmt.delete(TCP_LISTENER_TYPE, name="EncapsListener") + + def test_01_encaps_mismatch(self): + """ + Attempt to configure incompatible TCP encapsulation on each side of the + TCP path + """ + self._test(ingress_encaps="legacy", egress_encaps="lite") + self._test(ingress_encaps="lite", egress_encaps="legacy") + + if __name__ == '__main__': unittest.main(main_module())