Skip to content

Commit

Permalink
Check for conn->raw_conn in delete loop
Browse files Browse the repository at this point in the history
  • Loading branch information
ganeshmurthy committed Aug 15, 2024
1 parent dd78947 commit f6c04a4
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 67 deletions.
3 changes: 1 addition & 2 deletions src/adaptors/adaptor_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ qd_error_t qd_load_adaptor_config(qdr_core_t *core, qd_adaptor_config_t *config,
config->site_id = qd_entity_opt_string(entity, "siteId", 0); CHECK();
config->ssl_profile_name = qd_entity_opt_string(entity, "sslProfile", 0); CHECK();
config->authenticate_peer = qd_entity_opt_bool(entity, "authenticatePeer", false); CHECK();
config->verify_host_name = qd_entity_opt_bool(entity, "verifyHostname", false);
CHECK();
config->verify_host_name = qd_entity_opt_bool(entity, "verifyHostname", false); CHECK();

config->backlog = qd_entity_opt_long(entity, "backlog", 0);
CHECK();
Expand Down
134 changes: 70 additions & 64 deletions src/adaptors/tcp/tcp_adaptor.c
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ static void set_state_XSIDE_IO(qd_tcp_connection_t *conn, qd_tcp_connection_stat
// thread for activation (see CORE_activate()). During cleanup of these objects we need to ensure that both the I/O and
// Core threads do not reference them after they have been deallocated. To do this we use a two-phase approach to
// freeing these objects. In the first phase all non-activation-related resources are released by the I/O thread (see
// qd_tcp_connector_decref(), free_connection_IO). Then the object is passed to the Core thread for cleanup of the activation
// qd_tcp_connector_decref()). Then the object is passed to the Core thread for cleanup of the activation
// resources and freeing the base object (see free_tcp_resource(), qdr_core_free_tcp_resource_CT()).
//
// tcp_listener_t does not use a qdr_connection_t so this process does not apply to it.
Expand Down Expand Up @@ -481,63 +481,6 @@ static void free_tcp_resource(qd_tcp_common_t *resource)
qdr_action_enqueue(tcp_context->core, action);
}

static void free_connection_IO(void *context)
{
// No thread assertion here - can be RAW_IO or TIMER_IO
qd_tcp_connection_t *conn = (qd_tcp_connection_t*) context;
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] Cleaning up resources", conn->conn_id);

// Disable activation via Core thread. The lock needs to be taken to ensure the core thread is not currently
// attempting to activate the connection: after the mutex is unlocked we're guaranteed no further activations can
// take place.
sys_mutex_lock(&conn->activation_lock);
CLEAR_ATOMIC_FLAG(&conn->raw_opened);
sys_mutex_unlock(&conn->activation_lock);
// Do NOT free the core_activation lock since the core may be holding it

if (conn->common.parent) {
if (conn->common.parent->context_type == TL_LISTENER) {
qd_tcp_listener_t *listener = (qd_tcp_listener_t*) conn->common.parent;
sys_mutex_lock(&listener->lock);
listener->connections_closed++;
if (IS_ATOMIC_FLAG_SET(&listener->closing)) {
// Wake up the next conn on the list to get it closed
qd_tcp_connection_t *next_conn = DEQ_NEXT(conn);
if (!!next_conn)
pn_raw_connection_wake(next_conn->raw_conn);
}
DEQ_REMOVE(listener->connections, conn);
sys_mutex_unlock(&listener->lock);
//
// Call listener decref when a connection associated with the listener is removed (DEQ_REMOVE(listener->connections, conn))
//
conn->common.parent = 0;
qd_tcp_listener_decref(listener);
} else {
qd_tcp_connector_t *connector = (qd_tcp_connector_t*) conn->common.parent;
sys_mutex_lock(&connector->lock);
connector->connections_closed++;
if (IS_ATOMIC_FLAG_SET(&connector->closing)) {
// Wake up the next conn on the list to get it closed
qd_tcp_connection_t *next_conn = DEQ_NEXT(conn);
if (!!next_conn)
pn_raw_connection_wake(next_conn->raw_conn);
}
DEQ_REMOVE(connector->connections, conn);
sys_mutex_unlock(&connector->lock);
//
// Call connector decref when a connection associated with the connector is removed (DEQ_REMOVE(connector->connections, conn))
//
conn->common.parent = 0;
qd_tcp_connector_decref(connector);
}
}

// Pass connection to Core for final deallocation. The Core will free the activation_lock and the related flags. See
// qdr_core_free_tcp_resource_CT()
free_tcp_resource(&conn->common);
}

// Initate close of the raw connection.
//
// The close will be complete when the PN_RAW_CONNECTION_DISCONNECTED event is handled. At that point any associated
Expand Down Expand Up @@ -659,7 +602,60 @@ static void close_connection_XSIDE_IO(qd_tcp_connection_t *conn)
conn->tls = 0;
conn->tls_domain = 0;

free_connection_IO(conn);
// No thread assertion here - can be RAW_IO or TIMER_IO
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] Cleaning up resources", conn->conn_id);

// Disable activation via Core thread. The lock needs to be taken to ensure the core thread is not currently
// attempting to activate the connection: after the mutex is unlocked we're guaranteed no further activations can
// take place.
sys_mutex_lock(&conn->activation_lock);
CLEAR_ATOMIC_FLAG(&conn->raw_opened);
sys_mutex_unlock(&conn->activation_lock);
// Do NOT free the core_activation lock since the core may be holding it

if (conn->common.parent) {
if (conn->common.parent->context_type == TL_LISTENER) {
qd_tcp_listener_t *listener = (qd_tcp_listener_t*) conn->common.parent;
sys_mutex_lock(&listener->lock);
listener->connections_closed++;
if (IS_ATOMIC_FLAG_SET(&listener->closing)) {
// Wake up the next conn on the list to get it closed
// See qd_dispatch_delete_tcp_listener() where the head connection is woken up.
qd_tcp_connection_t *next_conn = DEQ_NEXT(conn);
if (!!next_conn)
pn_raw_connection_wake(next_conn->raw_conn);
}
DEQ_REMOVE(listener->connections, conn);
sys_mutex_unlock(&listener->lock);
//
// Call listener decref when a connection associated with the listener is removed (DEQ_REMOVE(listener->connections, conn))
//
conn->common.parent = 0;
qd_tcp_listener_decref(listener);
} else {
qd_tcp_connector_t *connector = (qd_tcp_connector_t*) conn->common.parent;
sys_mutex_lock(&connector->lock);
connector->connections_closed++;
if (IS_ATOMIC_FLAG_SET(&connector->closing)) {
// Wake up the next conn on the list to get it closed
// See qd_dispatch_delete_tcp_connector() where the head connection is woken up.
qd_tcp_connection_t *next_conn = DEQ_NEXT(conn);
if (!!next_conn)
pn_raw_connection_wake(next_conn->raw_conn);
}
DEQ_REMOVE(connector->connections, conn);
sys_mutex_unlock(&connector->lock);
//
// Call connector decref when a connection associated with the connector is removed (DEQ_REMOVE(connector->connections, conn))
//
conn->common.parent = 0;
qd_tcp_connector_decref(connector);
}
}

// Pass connection to Core for final deallocation. The Core will free the activation_lock and the related flags. See
// qdr_core_free_tcp_resource_CT()
free_tcp_resource(&conn->common);
}


Expand Down Expand Up @@ -1733,7 +1729,6 @@ static void connection_run_LSIDE_IO(qd_tcp_connection_t *conn)
// Don't do anything
//
return;
break;

default:
assert(false);
Expand Down Expand Up @@ -1814,7 +1809,6 @@ static void connection_run_CSIDE_IO(qd_tcp_connection_t *conn)
// Don't do anything
//
return;
break;

default:
assert(false);
Expand Down Expand Up @@ -2295,7 +2289,6 @@ static void CORE_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t di

static void CORE_connection_close(void *context, qdr_connection_t *conn, qdr_error_t *error)
{
// hahaha
qd_tcp_common_t *common = (qd_tcp_common_t*) qdr_connection_get_context(conn);
qd_tcp_connection_t *tcp_conn = (qd_tcp_connection_t*) common;
if (tcp_conn) {
Expand Down Expand Up @@ -2417,8 +2410,15 @@ QD_EXPORT void qd_dispatch_delete_tcp_listener(qd_dispatch_t *qd, void *impl)
// to prevent any modification of the connections list while it is being traversed.
sys_mutex_lock(&listener->lock);
SET_ATOMIC_FLAG(&listener->closing);

//
// Only the head connection is woken when holding the lock.
// This is an optimization. The next connection in the list is woken up in the handler of the PN_RAW_DISCONNECTED
// event of this connection (See close_connection_XSIDE_IO() to see where the next connection in the list is woken up).
// That way, we don't have to wake all the connections when holding the lock.
//
qd_tcp_connection_t *conn = DEQ_HEAD(listener->connections);
if (conn)
if (conn && conn->raw_conn)
pn_raw_connection_wake(conn->raw_conn);
sys_mutex_unlock(&listener->lock);
}
Expand Down Expand Up @@ -2457,8 +2457,14 @@ QD_EXPORT void qd_dispatch_delete_tcp_connector(qd_dispatch_t *qd, void *impl)
// to prevent any modification of the connections list while it is being traversed.
sys_mutex_lock(&connector->lock);
SET_ATOMIC_FLAG(&connector->closing);
//
// Only the head connection is woken when holding the lock.
// This is an optimization. The next connection in the list is woken up in the handler of the PN_RAW_DISCONNECTED
// event of this connection (See close_connection_XSIDE_IO() to see where the next connection in the list is woken up).
// That way, we don't have to wake all the connections when holding the lock.
//
qd_tcp_connection_t *conn = DEQ_HEAD(connector->connections);
if (conn)
if (conn && conn->raw_conn)
pn_raw_connection_wake(conn->raw_conn);
sys_mutex_unlock(&connector->lock);
}
Expand Down
2 changes: 1 addition & 1 deletion src/dispatch_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ struct qd_dispatch_t {
char *metadata;
bool timestamps_in_utc;
char *data_connection_count;
bool terminate_tcp_conns;
bool terminate_tcp_conns;
};

qd_dispatch_t *qd_dispatch_get_dispatch(void);
Expand Down

0 comments on commit f6c04a4

Please sign in to comment.