Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 1563 #1601

Merged
merged 20 commits into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
ad1965d
Fixes #1563: Delete corresponding connections optionally on deletion …
gabordozsa Jul 17, 2024
578377d
Add the TCP connection termination test to the system tests list
gabordozsa Jul 25, 2024
6e23a7c
Add SSL scenario to the TCP connections termination system test
gabordozsa Jul 26, 2024
247b06a
Use Qdrouterd.SKManager to run skmanage commands
gabordozsa Jul 26, 2024
dffd691
Fix python-checker warnings
gabordozsa Jul 26, 2024
e2fd2fe
Take the listener/connector lock for traversing the connections list
gabordozsa Jul 26, 2024
b2b9f3f
Make the new config flag global to the router
gabordozsa Aug 2, 2024
d9632ac
Change the name of the new config flag to dropTcpConnections
gabordozsa Aug 5, 2024
8ad3ddd
Fix comments
gabordozsa Aug 5, 2024
c70c972
Make the the default value True for the new config flag
gabordozsa Aug 5, 2024
007f1aa
Take activation_lock and check raw_opened flag when trigger closing c…
gabordozsa Aug 6, 2024
5259951
Check if core_conn is not zero when trigger closing connection
gabordozsa Aug 6, 2024
c5543a0
Add pending_close flag to qd_tcp_connection struct to handle early te…
gabordozsa Aug 7, 2024
f5ceedf
Fix data race warning for conn->core_conn
gabordozsa Aug 7, 2024
35d4d8b
Couple conn->core_conn setup and delete with DEQ_INSERT(conn) and DEQ…
gabordozsa Aug 12, 2024
2c3629b
Try to remove a connection from the connections list only if core_con…
gabordozsa Aug 12, 2024
83c4988
Only set closing flag in delete loop and wake up the connections
gabordozsa Aug 14, 2024
0584372
Fix and adjust system test timeouts
gabordozsa Aug 14, 2024
dd78947
Only wake up the HEAD conn at delete. Wake up the NEXT conn in the cl…
gabordozsa Aug 15, 2024
0ae678b
Check for conn->raw_conn in delete loop
ganeshmurthy Aug 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 139 additions & 80 deletions src/adaptors/tcp/tcp_adaptor.c
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ static void set_state_XSIDE_IO(qd_tcp_connection_t *conn, qd_tcp_connection_stat
// thread for activation (see CORE_activate()). During cleanup of these objects we need to ensure that both the I/O and
// Core threads do not reference them after they have been deallocated. To do this we use a two-phase approach to
// freeing these objects. In the first phase all non-activation-related resources are released by the I/O thread (see
// qd_tcp_connector_decref(), free_connection_IO). Then the object is passed to the Core thread for cleanup of the activation
// qd_tcp_connector_decref()). Then the object is passed to the Core thread for cleanup of the activation
// resources and freeing the base object (see free_tcp_resource(), qdr_core_free_tcp_resource_CT()).
//
// tcp_listener_t does not use a qdr_connection_t so this process does not apply to it.
Expand Down Expand Up @@ -481,49 +481,6 @@ static void free_tcp_resource(qd_tcp_common_t *resource)
qdr_action_enqueue(tcp_context->core, action);
}

static void free_connection_IO(void *context)
{
// No thread assertion here - can be RAW_IO or TIMER_IO
qd_tcp_connection_t *conn = (qd_tcp_connection_t*) context;
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] Cleaning up resources", conn->conn_id);

// Disable activation via Core thread. The lock needs to be taken to ensure the core thread is not currently
// attempting to activate the connection: after the mutex is unlocked we're guaranteed no further activations can
// take place.
sys_mutex_lock(&conn->activation_lock);
CLEAR_ATOMIC_FLAG(&conn->raw_opened);
sys_mutex_unlock(&conn->activation_lock);
// Do NOT free the core_activation lock since the core may be holding it

if (conn->common.parent) {
if (conn->common.parent->context_type == TL_LISTENER) {
qd_tcp_listener_t *listener = (qd_tcp_listener_t*) conn->common.parent;
sys_mutex_lock(&listener->lock);
DEQ_REMOVE(listener->connections, conn);
sys_mutex_unlock(&listener->lock);
//
// Call listener decref when a connection associated with the listener is removed (DEQ_REMOVE(listener->connections, conn))
//
conn->common.parent = 0;
qd_tcp_listener_decref(listener);
} else {
qd_tcp_connector_t *connector = (qd_tcp_connector_t*) conn->common.parent;
sys_mutex_lock(&connector->lock);
DEQ_REMOVE(connector->connections, conn);
sys_mutex_unlock(&connector->lock);
//
// Call connector decref when a connection associated with the connector is removed (DEQ_REMOVE(connector->connections, conn))
//
conn->common.parent = 0;
qd_tcp_connector_decref(connector);
}
}

// Pass connection to Core for final deallocation. The Core will free the activation_lock and the related flags. See
// qdr_core_free_tcp_resource_CT()
free_tcp_resource(&conn->common);
}

// Initate close of the raw connection.
//
// The close will be complete when the PN_RAW_CONNECTION_DISCONNECTED event is handled. At that point any associated
Expand Down Expand Up @@ -561,6 +518,46 @@ static void close_connection_XSIDE_IO(qd_tcp_connection_t *conn)
if (conn->state != XSIDE_CLOSING)
set_state_XSIDE_IO(conn, XSIDE_CLOSING);

if (conn->common.parent) {
if (conn->common.parent->context_type == TL_LISTENER) {
qd_tcp_listener_t *listener = (qd_tcp_listener_t*) conn->common.parent;
sys_mutex_lock(&listener->lock);
listener->connections_closed++;
if (IS_ATOMIC_FLAG_SET(&listener->closing)) {
// Wake up the next conn on the list to get it closed
// See qd_dispatch_delete_tcp_listener() where the head connection is woken up.
qd_tcp_connection_t *next_conn = DEQ_NEXT(conn);
if (!!next_conn)
pn_raw_connection_wake(next_conn->raw_conn);
}
DEQ_REMOVE(listener->connections, conn);
sys_mutex_unlock(&listener->lock);
//
// Call listener decref when a connection associated with the listener is removed (DEQ_REMOVE(listener->connections, conn))
//
conn->common.parent = 0;
qd_tcp_listener_decref(listener);
} else {
qd_tcp_connector_t *connector = (qd_tcp_connector_t*) conn->common.parent;
sys_mutex_lock(&connector->lock);
connector->connections_closed++;
if (IS_ATOMIC_FLAG_SET(&connector->closing)) {
// Wake up the next conn on the list to get it closed
// See qd_dispatch_delete_tcp_connector() where the head connection is woken up.
qd_tcp_connection_t *next_conn = DEQ_NEXT(conn);
if (!!next_conn)
pn_raw_connection_wake(next_conn->raw_conn);
}
DEQ_REMOVE(connector->connections, conn);
sys_mutex_unlock(&connector->lock);
//
// Call connector decref when a connection associated with the connector is removed (DEQ_REMOVE(connector->connections, conn))
//
conn->common.parent = 0;
qd_tcp_connector_decref(connector);
}
}

if (!!conn->raw_conn) {
CLEAR_ATOMIC_FLAG(&conn->raw_opened);
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] close_connection_XSIDE_IO", conn->conn_id);
Expand All @@ -576,8 +573,6 @@ static void close_connection_XSIDE_IO(qd_tcp_connection_t *conn)
sys_mutex_unlock(&conn->activation_lock);
}

free(conn->reply_to);

if (!!conn->inbound_stream) {
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, DLV_FMT " TCP cancel producer activation", DLV_ARGS(conn->inbound_delivery));
qd_message_cancel_producer_activation(conn->inbound_stream);
Expand Down Expand Up @@ -633,6 +628,7 @@ static void close_connection_XSIDE_IO(qd_tcp_connection_t *conn)
qd_tls_free2(conn->tls);
qd_tls_domain_decref(conn->tls_domain);
free(conn->alpn_protocol);
free(conn->reply_to);

conn->reply_to = 0;
conn->inbound_link = 0;
Expand All @@ -643,25 +639,23 @@ static void close_connection_XSIDE_IO(qd_tcp_connection_t *conn)
conn->outbound_delivery = 0;
conn->observer_handle = 0;
conn->common.vflow = 0;
conn->core_conn = 0;
conn->tls = 0;
conn->tls_domain = 0;

if (conn->common.parent) {
if (conn->common.parent->context_type == TL_LISTENER) {
qd_tcp_listener_t *li = (qd_tcp_listener_t*) conn->common.parent;
sys_mutex_lock(&li->lock);
li->connections_closed++;
sys_mutex_unlock(&li->lock);
} else {
qd_tcp_connector_t *cr = (qd_tcp_connector_t*) conn->common.parent;
sys_mutex_lock(&cr->lock);
cr->connections_closed++;
sys_mutex_unlock(&cr->lock);
}
}
// No thread assertion here - can be RAW_IO or TIMER_IO
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] Cleaning up resources", conn->conn_id);

// Disable activation via Core thread. The lock needs to be taken to ensure the core thread is not currently
// attempting to activate the connection: after the mutex is unlocked we're guaranteed no further activations can
// take place.
sys_mutex_lock(&conn->activation_lock);
CLEAR_ATOMIC_FLAG(&conn->raw_opened);
sys_mutex_unlock(&conn->activation_lock);
// Do NOT free the core_activation lock since the core may be holding it

free_connection_IO(conn);
// Pass connection to Core for final deallocation. The Core will free the activation_lock and the related flags. See
// qdr_core_free_tcp_resource_CT()
free_tcp_resource(&conn->common);
}


Expand Down Expand Up @@ -921,15 +915,14 @@ static void link_setup_LSIDE_IO(qd_tcp_connection_t *conn)
qd_tcp_listener_t *li = (qd_tcp_listener_t*) conn->common.parent;
qdr_terminus_t *target = qdr_terminus(0);
qdr_terminus_t *source = qdr_terminus(0);
char host[64]; // for numeric remote client IP:port address
char host[64]; // for numeric remote client IP:port address

qdr_terminus_set_address(target, li->adaptor_config->address);
qdr_terminus_set_dynamic(source);

qd_raw_conn_get_address_buf(conn->raw_conn, host, sizeof(host));
conn->core_conn = TL_open_core_connection(conn->conn_id, true, host);
qdr_connection_set_context(conn->core_conn, conn);

conn->inbound_link = qdr_link_first_attach(conn->core_conn, QD_INCOMING, qdr_terminus(0), target, "tcp.lside.in", 0, false, 0, &conn->inbound_link_id);
qdr_link_set_context(conn->inbound_link, conn);
conn->outbound_link = qdr_link_first_attach(conn->core_conn, QD_OUTGOING, source, qdr_terminus(0), "tcp.lside.out", 0, false, 0, &conn->outbound_link_id);
Expand All @@ -944,7 +937,7 @@ static void link_setup_CSIDE_IO(qd_tcp_connection_t *conn, qdr_delivery_t *deliv
ASSERT_RAW_IO;

assert(conn->common.parent->context_type == TL_CONNECTOR);
const char *host = ((qd_tcp_connector_t *)conn->common.parent)->adaptor_config->host_port;
const char *host = ((qd_tcp_connector_t *) conn->common.parent)->adaptor_config->host_port;
conn->core_conn = TL_open_core_connection(conn->conn_id, false, host);
qdr_connection_set_context(conn->core_conn, conn);

Expand Down Expand Up @@ -1237,17 +1230,16 @@ static uint64_t handle_first_outbound_delivery_CSIDE(qd_tcp_connector_t *connect
conn->context.context = conn;
conn->context.handler = on_connection_event_CSIDE_IO;

conn->raw_conn = pn_raw_connection();
pn_raw_connection_set_context(conn->raw_conn, &conn->context);

sys_mutex_lock(&connector->lock);
DEQ_INSERT_TAIL(connector->connections, conn);

connector->connections_opened++;
vflow_set_uint64(connector->common.vflow, VFLOW_ATTRIBUTE_FLOW_COUNT_L4, connector->connections_opened);
vflow_set_ref_from_record(conn->common.vflow, VFLOW_ATTRIBUTE_CONNECTOR, connector->common.vflow);
sys_mutex_unlock(&connector->lock);

conn->raw_conn = pn_raw_connection();
pn_raw_connection_set_context(conn->raw_conn, &conn->context);

//
// The raw connection establishment must be the last thing done in this function.
// After this call, a separate IO thread may immediately be invoked in the context
Expand Down Expand Up @@ -1736,13 +1728,25 @@ static void connection_run_LSIDE_IO(qd_tcp_connection_t *conn)
//
// Don't do anything
//
break;
return;

default:
assert(false);
break;
}
} while (repeat);

bool closing = false;
if (conn->common.parent)
closing = IS_ATOMIC_FLAG_SET(&((qd_tcp_listener_t *) conn->common.parent)->closing);
if (closing) {
if (!!conn->core_conn) {
qdr_core_close_connection(conn->core_conn);
} else {
close_raw_connection(conn, "Parent-deleted", "Forced closed");
set_state_XSIDE_IO(conn, XSIDE_CLOSING); // prevent further connection I/O
}
}
}


Expand Down Expand Up @@ -1804,13 +1808,25 @@ static void connection_run_CSIDE_IO(qd_tcp_connection_t *conn)
//
// Don't do anything
//
break;
return;

default:
assert(false);
break;
}
}
} while(repeat);

bool closing = false;
if (conn->common.parent)
closing = IS_ATOMIC_FLAG_SET(&((qd_tcp_connector_t *) conn->common.parent)->closing);
if (closing) {
if (!!conn->core_conn) {
qdr_core_close_connection(conn->core_conn);
} else {
close_raw_connection(conn, "Parent-deleted", "Forced closed");
set_state_XSIDE_IO(conn, XSIDE_CLOSING); // prevent further connection I/O
}
}
}


Expand Down Expand Up @@ -1949,7 +1965,6 @@ static int setup_tls_session(qd_tcp_connection_t *conn, const qd_tls_domain_t *p
return 0;
}


//=================================================================================
// Handlers for events from the Raw Connections
//=================================================================================
Expand Down Expand Up @@ -2038,19 +2053,19 @@ static void on_accept(qd_adaptor_listener_t *adaptor_listener, pn_listener_t *pn
conn->context.context = conn;
conn->context.handler = on_connection_event_LSIDE_IO;

conn->raw_conn = pn_raw_connection();
pn_raw_connection_set_context(conn->raw_conn, &conn->context);

if (listener->protocol_observer) {
conn->observer_handle = qdpo_begin(listener->protocol_observer, conn->common.vflow, conn, conn->conn_id);
}

sys_mutex_lock(&listener->lock);
DEQ_INSERT_TAIL(listener->connections, conn);

listener->connections_opened++;
vflow_set_uint64(listener->common.vflow, VFLOW_ATTRIBUTE_FLOW_COUNT_L4, listener->connections_opened);
sys_mutex_unlock(&listener->lock);

if (listener->protocol_observer) {
conn->observer_handle = qdpo_begin(listener->protocol_observer, conn->common.vflow, conn, conn->conn_id);
}

conn->raw_conn = pn_raw_connection();
pn_raw_connection_set_context(conn->raw_conn, &conn->context);
// Note: this will trigger the connection's event handler on another thread:
pn_listener_raw_accept(pn_listener, conn->raw_conn);
}
Expand Down Expand Up @@ -2274,7 +2289,6 @@ static void CORE_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t di

static void CORE_connection_close(void *context, qdr_connection_t *conn, qdr_error_t *error)
{
// hahaha
qd_tcp_common_t *common = (qd_tcp_common_t*) qdr_connection_get_context(conn);
qd_tcp_connection_t *tcp_conn = (qd_tcp_connection_t*) common;
if (tcp_conn) {
Expand Down Expand Up @@ -2356,6 +2370,7 @@ QD_EXPORT void *qd_dispatch_configure_tcp_listener(qd_dispatch_t *qd, qd_entity_

listener->common.context_type = TL_LISTENER;
sys_mutex_init(&listener->lock);
sys_atomic_init(&listener->closing, 0);

sys_mutex_lock(&tcp_context->lock);
DEQ_INSERT_TAIL(tcp_context->listeners, listener);
Expand Down Expand Up @@ -2387,6 +2402,27 @@ QD_EXPORT void qd_dispatch_delete_tcp_listener(qd_dispatch_t *qd, void *impl)
listener->common.vflow = 0;
}
//
// Initiate termination of existing connections
//
if (tcp_context->qd->terminate_tcp_conns) {
// Note: PN_RAW_CONNECTION_CONNECTED event or PN_RAW_CONNECTION_DISCONNECTED event
// could come upon any of the connections. We need to hold the listener->lock
// to prevent any modification of the connections list while it is being traversed.
ganeshmurthy marked this conversation as resolved.
Show resolved Hide resolved
sys_mutex_lock(&listener->lock);
SET_ATOMIC_FLAG(&listener->closing);

//
// Only the head connection is woken when holding the lock.
// This is an optimization. The next connection in the list is woken up in the handler of the PN_RAW_DISCONNECTED
// event of this connection (See close_connection_XSIDE_IO() to see where the next connection in the list is woken up).
// That way, we don't have to wake all the connections when holding the lock.
//
qd_tcp_connection_t *conn = DEQ_HEAD(listener->connections);
if (conn)
pn_raw_connection_wake(conn->raw_conn);
sys_mutex_unlock(&listener->lock);
}
//
// If all the connections associated with this listener has been closed, this call to
// qd_tcp_listener_decref should free the listener
//
Expand All @@ -2413,6 +2449,27 @@ QD_EXPORT void qd_dispatch_delete_tcp_connector(qd_dispatch_t *qd, void *impl)
connector->core_conn = 0;
qd_connection_counter_dec(QD_PROTOCOL_TCP);
//
// Initiate termination of existing connections
//
if (tcp_context->qd->terminate_tcp_conns) {
// Note: PN_RAW_CONNECTION_CONNECTED event or PN_RAW_CONNECTION_DISCONNECTED event
// could come upon any of the connections. We need to hold the connector->lock
// to prevent any modification of the connections list while it is being traversed.
sys_mutex_lock(&connector->lock);
SET_ATOMIC_FLAG(&connector->closing);
//
// Only the head connection is woken when holding the lock.
// This is an optimization. The next connection in the list is woken up in the handler of the PN_RAW_DISCONNECTED
// event of this connection (See close_connection_XSIDE_IO() to see where the next connection in the list is woken up).
// That way, we don't have to wake all the connections when holding the lock.
//
qd_tcp_connection_t *conn = DEQ_HEAD(connector->connections);
if (conn)
pn_raw_connection_wake(conn->raw_conn);
ganeshmurthy marked this conversation as resolved.
Show resolved Hide resolved
sys_mutex_unlock(&connector->lock);
}
//
//
// If all the connections associated with this listener has been closed, this call to
// qd_tcp_listener_decref should free the listener
//
Expand Down Expand Up @@ -2492,6 +2549,8 @@ qd_tcp_connector_t *qd_dispatch_configure_tcp_connector(qd_dispatch_t *qd, qd_en
connector->activate_timer = qd_timer(tcp_context->qd, on_core_activate_TIMER_IO, connector);
connector->common.context_type = TL_CONNECTOR;
sys_mutex_init(&connector->lock);
sys_atomic_init(&connector->closing, 0);


qd_log(LOG_TCP_ADAPTOR, QD_LOG_INFO,
"Configured TcpConnector for %s, %s:%s",
Expand Down
4 changes: 2 additions & 2 deletions src/adaptors/tcp/tcp_adaptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ struct qd_tcp_listener_t {
uint64_t connections_opened;
uint64_t connections_closed;
sys_atomic_t ref_count;
bool closing;
sys_atomic_t closing;
};


Expand All @@ -88,7 +88,7 @@ typedef struct qd_tcp_connector_t {
uint64_t connections_opened;
uint64_t connections_closed;
sys_atomic_t ref_count;
bool closing;
sys_atomic_t closing;
} qd_tcp_connector_t;


Expand Down
Loading
Loading