Skip to content

Commit

Permalink
Fixes skupperproject#1136: Add test for mismatched TCP adaptor encaps…
Browse files Browse the repository at this point in the history
…ulation
  • Loading branch information
kgiusti committed Oct 18, 2023
1 parent 22fb5dc commit 7969813
Show file tree
Hide file tree
Showing 2 changed files with 170 additions and 8 deletions.
79 changes: 73 additions & 6 deletions src/adaptors/tcp_lite/tcp_lite.c
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ static void on_connection_event_CSIDE_IO(pn_event_t *e, qd_server_t *qd_server,
static void connection_run_LSIDE_IO(tcplite_connection_t *conn);
static void connection_run_CSIDE_IO(tcplite_connection_t *conn);
static void connection_run_XSIDE_IO(tcplite_connection_t *conn);
static uint64_t validate_outbound_message(const qdr_delivery_t *out_dlv);


//=================================================================================
Expand Down Expand Up @@ -912,14 +913,29 @@ static void extract_metadata_from_stream_CSIDE(tcplite_connection_t *conn)
}
}


static void handle_outbound_delivery_LSIDE_IO(tcplite_connection_t *conn, qdr_link_t *link, qdr_delivery_t *delivery)
// Handle delivery of outbound message to the client.
//
// @return 0 on success, otherwise a terminal outcome indicating that the message cannot be delivered.
//
static uint64_t handle_outbound_delivery_LSIDE_IO(tcplite_connection_t *conn, qdr_link_t *link, qdr_delivery_t *delivery)
{
ASSERT_RAW_IO;
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] handle_outbound_delivery_LSIDE_IO - receive_complete=%s",
conn->common.conn_id, qd_message_receive_complete(conn->outbound_stream) ? "true" : "false");

if (!conn->outbound_delivery) {
// newly arrived delivery: validate it
//
uint64_t dispo = validate_outbound_message(delivery);
if (dispo != PN_RECEIVED) {
// PN_RELEASED: since this message was delivered to this listener's unique reply-to, it cannot be
// redelivered to another consumer. PN_RELEASED means incompatible encapsulation so this is a
// misconfiguration. Reject the delivery.
if (dispo == PN_RELEASED)
dispo = PN_REJECTED;
return dispo;
}

qdr_delivery_incref(delivery, "handle_outbound_delivery_LSIDE_IO");
conn->outbound_delivery = delivery;
conn->outbound_stream = qdr_delivery_message(delivery);
Expand All @@ -937,18 +953,28 @@ static void handle_outbound_delivery_LSIDE_IO(tcplite_connection_t *conn, qdr_li
}

connection_run_LSIDE_IO(conn);
return 0;
}


/**
* Handle the first indication of a new outbound delivery on CSIDE. This is where the raw connection to the
* external service is established. This function executes in an IO thread not associated with a raw connection.
*
* @return disposition. MOVED_TO_NEW_LINK on success, 0 if more message needed, else error outcome
*/
static void handle_first_outbound_delivery_CSIDE(tcplite_connector_t *cr, qdr_link_t *link, qdr_delivery_t *delivery)
static uint64_t handle_first_outbound_delivery_CSIDE(tcplite_connector_t *cr, qdr_link_t *link, qdr_delivery_t *delivery)
{
ASSERT_TIMER_IO;
assert(!qdr_delivery_get_context(delivery));

// Verify the message properties have arrived and are valid
//
uint64_t dispo = validate_outbound_message(delivery);
if (dispo != PN_RECEIVED) {
return dispo;
}

tcplite_connection_t *conn = new_tcplite_connection_t();
ZERO(conn);

Expand Down Expand Up @@ -1004,6 +1030,8 @@ static void handle_first_outbound_delivery_CSIDE(tcplite_connector_t *cr, qdr_li
//
SET_ATOMIC_FLAG(&conn->raw_opened);
pn_proactor_raw_connect(tcplite_context->proactor, conn->raw_conn, cr->adaptor_config->host_port);

return QD_DELIVERY_MOVED_TO_NEW_LINK;
}


Expand Down Expand Up @@ -1281,6 +1309,46 @@ static void connection_run_XSIDE_IO(tcplite_connection_t *conn)
}
}

// Validate the outbound message associated with out_dlv
//
// @return a disposition value indicating the validity of the message:
// 0: message headers incomplete, wait for more data to arrive
// PN_REJECTED: corrupt headers, cannot be re-delivered
// PN_RELEASED: headers ok, incompatible body format: deliver elsewhere
// PN_RECEIVED: headers & body ok
//
static uint64_t validate_outbound_message(const qdr_delivery_t *out_dlv)
{
qd_message_t *msg = qdr_delivery_message(out_dlv);
qd_message_depth_status_t depth_ok = qd_message_check_depth(msg, QD_DEPTH_PROPERTIES);
if (depth_ok == QD_MESSAGE_DEPTH_INCOMPLETE) {
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG,
DLV_FMT " tcp_adaptor egress message incomplete, waiting for more", DLV_ARGS(out_dlv));
return 0; // retry later
}
if (depth_ok != QD_MESSAGE_DEPTH_OK) { // otherwise bug? corrupted message encoding?
qd_log(LOG_TCP_ADAPTOR, QD_LOG_WARNING, DLV_FMT " Malformed TCP message - discarding!", DLV_ARGS(out_dlv));
qd_message_set_send_complete(msg);
return PN_REJECTED;
}

// ISSUE-1136: ensure the message body is using the proper encapsulation.
//
bool encaps_ok = false;
qd_iterator_t *encaps = qd_message_field_iterator(msg, QD_FIELD_CONTENT_TYPE);
if (encaps) {
encaps_ok = qd_iterator_equal(encaps, (unsigned char *) QD_CONTENT_TYPE_APP_OCTETS);
qd_iterator_free(encaps);
}
if (!encaps_ok) {
qd_log(LOG_TCP_ADAPTOR, QD_LOG_ERROR, DLV_FMT " Misconfigured TCP adaptor (wrong encapsulation)",
DLV_ARGS(out_dlv));
qd_message_set_send_complete(msg);
return PN_RELEASED; // allow it to be re-forwarded to a different adaptor
}
return PN_RECEIVED;
}


//=================================================================================
// Handlers for events from the Raw Connections
Expand Down Expand Up @@ -1503,12 +1571,11 @@ static uint64_t CORE_deliver_outbound(void *context, qdr_link_t *link, qdr_deliv
}

if (common->context_type == TL_CONNECTOR) {
handle_first_outbound_delivery_CSIDE((tcplite_connector_t*) common, link, delivery);
return QD_DELIVERY_MOVED_TO_NEW_LINK;
return handle_first_outbound_delivery_CSIDE((tcplite_connector_t*) common, link, delivery);
} else if (common->context_type == TL_CONNECTION) {
tcplite_connection_t *conn = (tcplite_connection_t*) common;
if (conn->listener_side) {
handle_outbound_delivery_LSIDE_IO(conn, link, delivery);
return handle_outbound_delivery_LSIDE_IO(conn, link, delivery);
} else {
handle_outbound_delivery_CSIDE(conn, link, delivery, settled);
}
Expand Down
99 changes: 97 additions & 2 deletions tests/system_tests_tcp_adaptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2198,7 +2198,7 @@ def check_connection_deleted():
class TcpLegacyInvalidEncodingTest(TestCase):
"""
Ensure that the TCP adaptor can recover from receiving an improperly
formatted/wrong version AMQP encoded stream message.
formatted AMQP encoded stream message.
"""
@classmethod
def setUpClass(cls):
Expand Down Expand Up @@ -2287,6 +2287,10 @@ def test_invalid_ingress_server_reply_body(self):


class InvalidClientSendRequest(MessagingHandler):
"""
Connect to address and send msg to destination. Expect message to be
released.
"""
def __init__(self, msg, address, destination):
super(InvalidClientSendRequest, self).__init__(auto_settle=False)
self.msg = msg
Expand Down Expand Up @@ -2331,6 +2335,14 @@ def run(self):


class InvalidServerSendReply(MessagingHandler):
"""
Simulate a client/server TCP flow and send an invalid reply message via the
faked "server". The fake server connects to server_address and creates an
AMQP listener for service_address. The fake client connects to listener
address and creates an AMQP sender to service_address. When the request
message arrives at the fake server, send an invalid msg back as the
response.
"""
def __init__(self, msg, server_address, listener_address, service_address, dispo):
super(InvalidServerSendReply, self).__init__(auto_settle=False)
self.msg = msg
Expand All @@ -2353,7 +2365,8 @@ def __init__(self, msg, server_address, listener_address, service_address, dispo
self.request_dlv = None
self.dlv_drain_timer = None

# fack tcp client, just sends a request message
# fake tcp client, just opens an AMQP connection to the TCP
# listener. This initiates the ingress streaming request message.
self.listener_address = listener_address
self.client_conn = None
self.client_sent = False
Expand Down Expand Up @@ -2630,5 +2643,87 @@ def test_01_check_delayed_deliveries(self):
f"Expected delay counter to be zero, got {counters}")


class TcpMisconfiguredLegacyLiteEncapsTest(TestCase):
"""
Ensure that the TCP adaptor can detect misconfiguration of the
encapsulation setting by creating a tcpListener and tcpConnector pair that
use different encaps.
"""
@classmethod
def setUpClass(cls):
super(TcpMisconfiguredLegacyLiteEncapsTest, cls).setUpClass()

config = [
('router', {'mode': 'interior', 'id': 'TcpMisconfiguredEncaps'}),
# Listener for handling router management requests.
('listener', {'role': 'normal',
'port': cls.tester.get_port()}),
('address', {'prefix': 'closest', 'distribution': 'closest'}),
('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
]

cls.router = cls.tester.qdrouterd('TcpInvalidEncoding',
Qdrouterd.Config(config), wait=True)
cls.address = cls.router.addresses[0]

def _test(self, ingress_encaps, egress_encaps):
ingress_port = self.tester.get_port()
egress_port = self.tester.get_port()
mgmt = self.router.qd_manager

mgmt.create(TCP_CONNECTOR_TYPE,
{"name": "EncapsConnector",
"address": "EncapsTest",
"host": "localhost",
"port": egress_port,
"encapsulation": egress_encaps})
mgmt.create(TCP_LISTENER_TYPE,
{"name": "EncapsListener",
"address": "EncapsTest",
"port": ingress_port,
"encapsulation": ingress_encaps})
wait_tcp_listeners_up(self.address)

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server:
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.settimeout(TIMEOUT)
server.bind(("", egress_port))
server.listen(1)

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client_conn:
client_conn.settimeout(TIMEOUT)
while True:
try:
client_conn.connect(('127.0.0.1', ingress_port))
break
except ConnectionRefusedError:
# There may be a delay between the operStatus going up and
# the actual listener socket availability, so allow that:
time.sleep(0.1)
continue

# send data to kick off the flow, but expect connection to fail
# in recv (either close or reset)

client_conn.sendall(b' test ')
try:
data = client_conn.recv(4096)
except ConnectionResetError:
data = b''

self.assertEqual(b'', data, "expected recv to fail")

mgmt.delete(TCP_CONNECTOR_TYPE, name="EncapsConnector")
mgmt.delete(TCP_LISTENER_TYPE, name="EncapsListener")

def test_01_encaps_mismatch(self):
"""
Attempt to configure incompatible TCP encapsulation on each side of the
TCP path
"""
self._test(ingress_encaps="legacy", egress_encaps="lite")
self._test(ingress_encaps="lite", egress_encaps="legacy")


if __name__ == '__main__':
unittest.main(main_module())

0 comments on commit 7969813

Please sign in to comment.