Skip to content

Commit

Permalink
Issue 1563 (#1601)
Browse files Browse the repository at this point in the history
* Fixes #1563: Delete corresponding connections optionally on deletion of tcpListener and tcpConnector

Signed-off-by: Gabor Dozsa <gabor.dozsa@ibm.com>

* Add the TCP connection termination test to the system tests list

Signed-off-by: Gabor Dozsa <gabor.dozsa@ibm.com>

* Add SSL scenario to the TCP connections termination system test

* Use Qdrouterd.SKManager to run skmanage commands

* Fix python-checker warnings

* Take the listener/connector lock for traversing the connections list

* Make the new config flag global to the router

* Change the name of the new config flag to dropTcpConnections

* Fix comments

* Make the the default value True for the new config flag

* Take activation_lock and check raw_opened flag when trigger closing connection

* Check if core_conn is not zero when trigger closing connection

* Add pending_close flag to qd_tcp_connection struct to handle early termination

* Fix data race warning for conn->core_conn

* Couple conn->core_conn setup and delete with DEQ_INSERT(conn) and DEQ_REMOVE(conn), repectively

* Try to remove a connection from the connections list only if core_conn is not zero

* Only set closing flag in delete loop and wake up the connections

* Fix and adjust system test timeouts

* Only wake up the HEAD conn at delete. Wake up the NEXT conn in the close handler.

* Check for conn->raw_conn in delete loop

---------

Signed-off-by: Gabor Dozsa <gabor.dozsa@ibm.com>
Co-authored-by: Gabor Dozsa <gabor.dozsa@ibm.com>
  • Loading branch information
ganeshmurthy and gabordozsa authored Aug 16, 2024
1 parent 6d04a7e commit e50c2d5
Show file tree
Hide file tree
Showing 6 changed files with 582 additions and 83 deletions.
219 changes: 139 additions & 80 deletions src/adaptors/tcp/tcp_adaptor.c
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ static void set_state_XSIDE_IO(qd_tcp_connection_t *conn, qd_tcp_connection_stat
// thread for activation (see CORE_activate()). During cleanup of these objects we need to ensure that both the I/O and
// Core threads do not reference them after they have been deallocated. To do this we use a two-phase approach to
// freeing these objects. In the first phase all non-activation-related resources are released by the I/O thread (see
// qd_tcp_connector_decref(), free_connection_IO). Then the object is passed to the Core thread for cleanup of the activation
// qd_tcp_connector_decref()). Then the object is passed to the Core thread for cleanup of the activation
// resources and freeing the base object (see free_tcp_resource(), qdr_core_free_tcp_resource_CT()).
//
// tcp_listener_t does not use a qdr_connection_t so this process does not apply to it.
Expand Down Expand Up @@ -481,49 +481,6 @@ static void free_tcp_resource(qd_tcp_common_t *resource)
qdr_action_enqueue(tcp_context->core, action);
}

static void free_connection_IO(void *context)
{
// No thread assertion here - can be RAW_IO or TIMER_IO
qd_tcp_connection_t *conn = (qd_tcp_connection_t*) context;
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] Cleaning up resources", conn->conn_id);

// Disable activation via Core thread. The lock needs to be taken to ensure the core thread is not currently
// attempting to activate the connection: after the mutex is unlocked we're guaranteed no further activations can
// take place.
sys_mutex_lock(&conn->activation_lock);
CLEAR_ATOMIC_FLAG(&conn->raw_opened);
sys_mutex_unlock(&conn->activation_lock);
// Do NOT free the core_activation lock since the core may be holding it

if (conn->common.parent) {
if (conn->common.parent->context_type == TL_LISTENER) {
qd_tcp_listener_t *listener = (qd_tcp_listener_t*) conn->common.parent;
sys_mutex_lock(&listener->lock);
DEQ_REMOVE(listener->connections, conn);
sys_mutex_unlock(&listener->lock);
//
// Call listener decref when a connection associated with the listener is removed (DEQ_REMOVE(listener->connections, conn))
//
conn->common.parent = 0;
qd_tcp_listener_decref(listener);
} else {
qd_tcp_connector_t *connector = (qd_tcp_connector_t*) conn->common.parent;
sys_mutex_lock(&connector->lock);
DEQ_REMOVE(connector->connections, conn);
sys_mutex_unlock(&connector->lock);
//
// Call connector decref when a connection associated with the connector is removed (DEQ_REMOVE(connector->connections, conn))
//
conn->common.parent = 0;
qd_tcp_connector_decref(connector);
}
}

// Pass connection to Core for final deallocation. The Core will free the activation_lock and the related flags. See
// qdr_core_free_tcp_resource_CT()
free_tcp_resource(&conn->common);
}

// Initate close of the raw connection.
//
// The close will be complete when the PN_RAW_CONNECTION_DISCONNECTED event is handled. At that point any associated
Expand Down Expand Up @@ -561,6 +518,46 @@ static void close_connection_XSIDE_IO(qd_tcp_connection_t *conn)
if (conn->state != XSIDE_CLOSING)
set_state_XSIDE_IO(conn, XSIDE_CLOSING);

if (conn->common.parent) {
if (conn->common.parent->context_type == TL_LISTENER) {
qd_tcp_listener_t *listener = (qd_tcp_listener_t*) conn->common.parent;
sys_mutex_lock(&listener->lock);
listener->connections_closed++;
if (IS_ATOMIC_FLAG_SET(&listener->closing)) {
// Wake up the next conn on the list to get it closed
// See qd_dispatch_delete_tcp_listener() where the head connection is woken up.
qd_tcp_connection_t *next_conn = DEQ_NEXT(conn);
if (!!next_conn)
pn_raw_connection_wake(next_conn->raw_conn);
}
DEQ_REMOVE(listener->connections, conn);
sys_mutex_unlock(&listener->lock);
//
// Call listener decref when a connection associated with the listener is removed (DEQ_REMOVE(listener->connections, conn))
//
conn->common.parent = 0;
qd_tcp_listener_decref(listener);
} else {
qd_tcp_connector_t *connector = (qd_tcp_connector_t*) conn->common.parent;
sys_mutex_lock(&connector->lock);
connector->connections_closed++;
if (IS_ATOMIC_FLAG_SET(&connector->closing)) {
// Wake up the next conn on the list to get it closed
// See qd_dispatch_delete_tcp_connector() where the head connection is woken up.
qd_tcp_connection_t *next_conn = DEQ_NEXT(conn);
if (!!next_conn)
pn_raw_connection_wake(next_conn->raw_conn);
}
DEQ_REMOVE(connector->connections, conn);
sys_mutex_unlock(&connector->lock);
//
// Call connector decref when a connection associated with the connector is removed (DEQ_REMOVE(connector->connections, conn))
//
conn->common.parent = 0;
qd_tcp_connector_decref(connector);
}
}

if (!!conn->raw_conn) {
CLEAR_ATOMIC_FLAG(&conn->raw_opened);
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] close_connection_XSIDE_IO", conn->conn_id);
Expand All @@ -576,8 +573,6 @@ static void close_connection_XSIDE_IO(qd_tcp_connection_t *conn)
sys_mutex_unlock(&conn->activation_lock);
}

free(conn->reply_to);

if (!!conn->inbound_stream) {
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, DLV_FMT " TCP cancel producer activation", DLV_ARGS(conn->inbound_delivery));
qd_message_cancel_producer_activation(conn->inbound_stream);
Expand Down Expand Up @@ -633,6 +628,7 @@ static void close_connection_XSIDE_IO(qd_tcp_connection_t *conn)
qd_tls_free2(conn->tls);
qd_tls_domain_decref(conn->tls_domain);
free(conn->alpn_protocol);
free(conn->reply_to);

conn->reply_to = 0;
conn->inbound_link = 0;
Expand All @@ -643,25 +639,23 @@ static void close_connection_XSIDE_IO(qd_tcp_connection_t *conn)
conn->outbound_delivery = 0;
conn->observer_handle = 0;
conn->common.vflow = 0;
conn->core_conn = 0;
conn->tls = 0;
conn->tls_domain = 0;

if (conn->common.parent) {
if (conn->common.parent->context_type == TL_LISTENER) {
qd_tcp_listener_t *li = (qd_tcp_listener_t*) conn->common.parent;
sys_mutex_lock(&li->lock);
li->connections_closed++;
sys_mutex_unlock(&li->lock);
} else {
qd_tcp_connector_t *cr = (qd_tcp_connector_t*) conn->common.parent;
sys_mutex_lock(&cr->lock);
cr->connections_closed++;
sys_mutex_unlock(&cr->lock);
}
}
// No thread assertion here - can be RAW_IO or TIMER_IO
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] Cleaning up resources", conn->conn_id);

// Disable activation via Core thread. The lock needs to be taken to ensure the core thread is not currently
// attempting to activate the connection: after the mutex is unlocked we're guaranteed no further activations can
// take place.
sys_mutex_lock(&conn->activation_lock);
CLEAR_ATOMIC_FLAG(&conn->raw_opened);
sys_mutex_unlock(&conn->activation_lock);
// Do NOT free the core_activation lock since the core may be holding it

free_connection_IO(conn);
// Pass connection to Core for final deallocation. The Core will free the activation_lock and the related flags. See
// qdr_core_free_tcp_resource_CT()
free_tcp_resource(&conn->common);
}


Expand Down Expand Up @@ -921,15 +915,14 @@ static void link_setup_LSIDE_IO(qd_tcp_connection_t *conn)
qd_tcp_listener_t *li = (qd_tcp_listener_t*) conn->common.parent;
qdr_terminus_t *target = qdr_terminus(0);
qdr_terminus_t *source = qdr_terminus(0);
char host[64]; // for numeric remote client IP:port address
char host[64]; // for numeric remote client IP:port address

qdr_terminus_set_address(target, li->adaptor_config->address);
qdr_terminus_set_dynamic(source);

qd_raw_conn_get_address_buf(conn->raw_conn, host, sizeof(host));
conn->core_conn = TL_open_core_connection(conn->conn_id, true, host);
qdr_connection_set_context(conn->core_conn, conn);

conn->inbound_link = qdr_link_first_attach(conn->core_conn, QD_INCOMING, qdr_terminus(0), target, "tcp.lside.in", 0, false, 0, &conn->inbound_link_id);
qdr_link_set_context(conn->inbound_link, conn);
conn->outbound_link = qdr_link_first_attach(conn->core_conn, QD_OUTGOING, source, qdr_terminus(0), "tcp.lside.out", 0, false, 0, &conn->outbound_link_id);
Expand All @@ -944,7 +937,7 @@ static void link_setup_CSIDE_IO(qd_tcp_connection_t *conn, qdr_delivery_t *deliv
ASSERT_RAW_IO;

assert(conn->common.parent->context_type == TL_CONNECTOR);
const char *host = ((qd_tcp_connector_t *)conn->common.parent)->adaptor_config->host_port;
const char *host = ((qd_tcp_connector_t *) conn->common.parent)->adaptor_config->host_port;
conn->core_conn = TL_open_core_connection(conn->conn_id, false, host);
qdr_connection_set_context(conn->core_conn, conn);

Expand Down Expand Up @@ -1237,17 +1230,16 @@ static uint64_t handle_first_outbound_delivery_CSIDE(qd_tcp_connector_t *connect
conn->context.context = conn;
conn->context.handler = on_connection_event_CSIDE_IO;

conn->raw_conn = pn_raw_connection();
pn_raw_connection_set_context(conn->raw_conn, &conn->context);

sys_mutex_lock(&connector->lock);
DEQ_INSERT_TAIL(connector->connections, conn);

connector->connections_opened++;
vflow_set_uint64(connector->common.vflow, VFLOW_ATTRIBUTE_FLOW_COUNT_L4, connector->connections_opened);
vflow_set_ref_from_record(conn->common.vflow, VFLOW_ATTRIBUTE_CONNECTOR, connector->common.vflow);
sys_mutex_unlock(&connector->lock);

conn->raw_conn = pn_raw_connection();
pn_raw_connection_set_context(conn->raw_conn, &conn->context);

//
// The raw connection establishment must be the last thing done in this function.
// After this call, a separate IO thread may immediately be invoked in the context
Expand Down Expand Up @@ -1736,13 +1728,25 @@ static void connection_run_LSIDE_IO(qd_tcp_connection_t *conn)
//
// Don't do anything
//
break;
return;

default:
assert(false);
break;
}
} while (repeat);

bool closing = false;
if (conn->common.parent)
closing = IS_ATOMIC_FLAG_SET(&((qd_tcp_listener_t *) conn->common.parent)->closing);
if (closing) {
if (!!conn->core_conn) {
qdr_core_close_connection(conn->core_conn);
} else {
close_raw_connection(conn, "Parent-deleted", "Forced closed");
set_state_XSIDE_IO(conn, XSIDE_CLOSING); // prevent further connection I/O
}
}
}


Expand Down Expand Up @@ -1804,13 +1808,25 @@ static void connection_run_CSIDE_IO(qd_tcp_connection_t *conn)
//
// Don't do anything
//
break;
return;

default:
assert(false);
break;
}
}
} while(repeat);

bool closing = false;
if (conn->common.parent)
closing = IS_ATOMIC_FLAG_SET(&((qd_tcp_connector_t *) conn->common.parent)->closing);
if (closing) {
if (!!conn->core_conn) {
qdr_core_close_connection(conn->core_conn);
} else {
close_raw_connection(conn, "Parent-deleted", "Forced closed");
set_state_XSIDE_IO(conn, XSIDE_CLOSING); // prevent further connection I/O
}
}
}


Expand Down Expand Up @@ -1949,7 +1965,6 @@ static int setup_tls_session(qd_tcp_connection_t *conn, const qd_tls_domain_t *p
return 0;
}


//=================================================================================
// Handlers for events from the Raw Connections
//=================================================================================
Expand Down Expand Up @@ -2038,19 +2053,19 @@ static void on_accept(qd_adaptor_listener_t *adaptor_listener, pn_listener_t *pn
conn->context.context = conn;
conn->context.handler = on_connection_event_LSIDE_IO;

conn->raw_conn = pn_raw_connection();
pn_raw_connection_set_context(conn->raw_conn, &conn->context);

if (listener->protocol_observer) {
conn->observer_handle = qdpo_begin(listener->protocol_observer, conn->common.vflow, conn, conn->conn_id);
}

sys_mutex_lock(&listener->lock);
DEQ_INSERT_TAIL(listener->connections, conn);

listener->connections_opened++;
vflow_set_uint64(listener->common.vflow, VFLOW_ATTRIBUTE_FLOW_COUNT_L4, listener->connections_opened);
sys_mutex_unlock(&listener->lock);

if (listener->protocol_observer) {
conn->observer_handle = qdpo_begin(listener->protocol_observer, conn->common.vflow, conn, conn->conn_id);
}

conn->raw_conn = pn_raw_connection();
pn_raw_connection_set_context(conn->raw_conn, &conn->context);
// Note: this will trigger the connection's event handler on another thread:
pn_listener_raw_accept(pn_listener, conn->raw_conn);
}
Expand Down Expand Up @@ -2274,7 +2289,6 @@ static void CORE_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t di

static void CORE_connection_close(void *context, qdr_connection_t *conn, qdr_error_t *error)
{
// hahaha
qd_tcp_common_t *common = (qd_tcp_common_t*) qdr_connection_get_context(conn);
qd_tcp_connection_t *tcp_conn = (qd_tcp_connection_t*) common;
if (tcp_conn) {
Expand Down Expand Up @@ -2356,6 +2370,7 @@ QD_EXPORT void *qd_dispatch_configure_tcp_listener(qd_dispatch_t *qd, qd_entity_

listener->common.context_type = TL_LISTENER;
sys_mutex_init(&listener->lock);
sys_atomic_init(&listener->closing, 0);

sys_mutex_lock(&tcp_context->lock);
DEQ_INSERT_TAIL(tcp_context->listeners, listener);
Expand Down Expand Up @@ -2387,6 +2402,27 @@ QD_EXPORT void qd_dispatch_delete_tcp_listener(qd_dispatch_t *qd, void *impl)
listener->common.vflow = 0;
}
//
// Initiate termination of existing connections
//
if (tcp_context->qd->terminate_tcp_conns) {
// Note: PN_RAW_CONNECTION_CONNECTED event or PN_RAW_CONNECTION_DISCONNECTED event
// could come upon any of the connections. We need to hold the listener->lock
// to prevent any modification of the connections list while it is being traversed.
sys_mutex_lock(&listener->lock);
SET_ATOMIC_FLAG(&listener->closing);

//
// Only the head connection is woken when holding the lock.
// This is an optimization. The next connection in the list is woken up in the handler of the PN_RAW_DISCONNECTED
// event of this connection (See close_connection_XSIDE_IO() to see where the next connection in the list is woken up).
// That way, we don't have to wake all the connections when holding the lock.
//
qd_tcp_connection_t *conn = DEQ_HEAD(listener->connections);
if (conn)
pn_raw_connection_wake(conn->raw_conn);
sys_mutex_unlock(&listener->lock);
}
//
// If all the connections associated with this listener has been closed, this call to
// qd_tcp_listener_decref should free the listener
//
Expand All @@ -2413,6 +2449,27 @@ QD_EXPORT void qd_dispatch_delete_tcp_connector(qd_dispatch_t *qd, void *impl)
connector->core_conn = 0;
qd_connection_counter_dec(QD_PROTOCOL_TCP);
//
// Initiate termination of existing connections
//
if (tcp_context->qd->terminate_tcp_conns) {
// Note: PN_RAW_CONNECTION_CONNECTED event or PN_RAW_CONNECTION_DISCONNECTED event
// could come upon any of the connections. We need to hold the connector->lock
// to prevent any modification of the connections list while it is being traversed.
sys_mutex_lock(&connector->lock);
SET_ATOMIC_FLAG(&connector->closing);
//
// Only the head connection is woken when holding the lock.
// This is an optimization. The next connection in the list is woken up in the handler of the PN_RAW_DISCONNECTED
// event of this connection (See close_connection_XSIDE_IO() to see where the next connection in the list is woken up).
// That way, we don't have to wake all the connections when holding the lock.
//
qd_tcp_connection_t *conn = DEQ_HEAD(connector->connections);
if (conn)
pn_raw_connection_wake(conn->raw_conn);
sys_mutex_unlock(&connector->lock);
}
//
//
// If all the connections associated with this listener has been closed, this call to
// qd_tcp_listener_decref should free the listener
//
Expand Down Expand Up @@ -2492,6 +2549,8 @@ qd_tcp_connector_t *qd_dispatch_configure_tcp_connector(qd_dispatch_t *qd, qd_en
connector->activate_timer = qd_timer(tcp_context->qd, on_core_activate_TIMER_IO, connector);
connector->common.context_type = TL_CONNECTOR;
sys_mutex_init(&connector->lock);
sys_atomic_init(&connector->closing, 0);


qd_log(LOG_TCP_ADAPTOR, QD_LOG_INFO,
"Configured TcpConnector for %s, %s:%s",
Expand Down
4 changes: 2 additions & 2 deletions src/adaptors/tcp/tcp_adaptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ struct qd_tcp_listener_t {
uint64_t connections_opened;
uint64_t connections_closed;
sys_atomic_t ref_count;
bool closing;
sys_atomic_t closing;
};


Expand All @@ -88,7 +88,7 @@ typedef struct qd_tcp_connector_t {
uint64_t connections_opened;
uint64_t connections_closed;
sys_atomic_t ref_count;
bool closing;
sys_atomic_t closing;
} qd_tcp_connector_t;


Expand Down
Loading

0 comments on commit e50c2d5

Please sign in to comment.