Skip to content

Commit

Permalink
Check for conn->raw_conn in delete loop
Browse files Browse the repository at this point in the history
  • Loading branch information
ganeshmurthy committed Aug 15, 2024
1 parent dd78947 commit 0ae678b
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 61 deletions.
3 changes: 1 addition & 2 deletions src/adaptors/adaptor_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ qd_error_t qd_load_adaptor_config(qdr_core_t *core, qd_adaptor_config_t *config,
config->site_id = qd_entity_opt_string(entity, "siteId", 0); CHECK();
config->ssl_profile_name = qd_entity_opt_string(entity, "sslProfile", 0); CHECK();
config->authenticate_peer = qd_entity_opt_bool(entity, "authenticatePeer", false); CHECK();
config->verify_host_name = qd_entity_opt_bool(entity, "verifyHostname", false);
CHECK();
config->verify_host_name = qd_entity_opt_bool(entity, "verifyHostname", false); CHECK();

config->backlog = qd_entity_opt_long(entity, "backlog", 0);
CHECK();
Expand Down
122 changes: 64 additions & 58 deletions src/adaptors/tcp/tcp_adaptor.c
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ static void set_state_XSIDE_IO(qd_tcp_connection_t *conn, qd_tcp_connection_stat
// thread for activation (see CORE_activate()). During cleanup of these objects we need to ensure that both the I/O and
// Core threads do not reference them after they have been deallocated. To do this we use a two-phase approach to
// freeing these objects. In the first phase all non-activation-related resources are released by the I/O thread (see
// qd_tcp_connector_decref(), free_connection_IO). Then the object is passed to the Core thread for cleanup of the activation
// qd_tcp_connector_decref()). Then the object is passed to the Core thread for cleanup of the activation
// resources and freeing the base object (see free_tcp_resource(), qdr_core_free_tcp_resource_CT()).
//
// tcp_listener_t does not use a qdr_connection_t so this process does not apply to it.
Expand Down Expand Up @@ -481,19 +481,42 @@ static void free_tcp_resource(qd_tcp_common_t *resource)
qdr_action_enqueue(tcp_context->core, action);
}

static void free_connection_IO(void *context)
// Initate close of the raw connection.
//
// The close will be complete when the PN_RAW_CONNECTION_DISCONNECTED event is handled. At that point any associated
// connection condition information will be read from the raw conn and written to the flow log.
//
// @param conn Holds the raw connection to close
// @param condition Optional condition identifying the reason the connection was closed
// @param description Optional description assocated with condition
//
static void close_raw_connection(qd_tcp_connection_t *conn, const char *condition, const char *description)
{
// No thread assertion here - can be RAW_IO or TIMER_IO
qd_tcp_connection_t *conn = (qd_tcp_connection_t*) context;
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] Cleaning up resources", conn->conn_id);
ASSERT_RAW_IO;

// Disable activation via Core thread. The lock needs to be taken to ensure the core thread is not currently
// attempting to activate the connection: after the mutex is unlocked we're guaranteed no further activations can
// take place.
sys_mutex_lock(&conn->activation_lock);
assert(conn->raw_conn);
if (condition) {
pn_condition_t *cond = pn_raw_connection_condition(conn->raw_conn);
if (!!cond) {
(void) pn_condition_set_name(cond, condition);
if (description) {
(void) pn_condition_set_description(cond, description);
}
}
}

qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] close_raw_connection: %s (%s)", conn->conn_id, condition, description);
CLEAR_ATOMIC_FLAG(&conn->raw_opened);
sys_mutex_unlock(&conn->activation_lock);
// Do NOT free the core_activation lock since the core may be holding it
pn_raw_connection_close(conn->raw_conn);

// Connection cleanup occurs on the PN_RAW_CONNECTION_DISCONNECTED event
}

static void close_connection_XSIDE_IO(qd_tcp_connection_t *conn)
{
ASSERT_RAW_IO;
if (conn->state != XSIDE_CLOSING)
set_state_XSIDE_IO(conn, XSIDE_CLOSING);

if (conn->common.parent) {
if (conn->common.parent->context_type == TL_LISTENER) {
Expand All @@ -502,6 +525,7 @@ static void free_connection_IO(void *context)
listener->connections_closed++;
if (IS_ATOMIC_FLAG_SET(&listener->closing)) {
// Wake up the next conn on the list to get it closed
// See qd_dispatch_delete_tcp_listener() where the head connection is woken up.
qd_tcp_connection_t *next_conn = DEQ_NEXT(conn);
if (!!next_conn)
pn_raw_connection_wake(next_conn->raw_conn);
Expand All @@ -519,6 +543,7 @@ static void free_connection_IO(void *context)
connector->connections_closed++;
if (IS_ATOMIC_FLAG_SET(&connector->closing)) {
// Wake up the next conn on the list to get it closed
// See qd_dispatch_delete_tcp_connector() where the head connection is woken up.
qd_tcp_connection_t *next_conn = DEQ_NEXT(conn);
if (!!next_conn)
pn_raw_connection_wake(next_conn->raw_conn);
Expand All @@ -533,48 +558,6 @@ static void free_connection_IO(void *context)
}
}

// Pass connection to Core for final deallocation. The Core will free the activation_lock and the related flags. See
// qdr_core_free_tcp_resource_CT()
free_tcp_resource(&conn->common);
}

// Initate close of the raw connection.
//
// The close will be complete when the PN_RAW_CONNECTION_DISCONNECTED event is handled. At that point any associated
// connection condition information will be read from the raw conn and written to the flow log.
//
// @param conn Holds the raw connection to close
// @param condition Optional condition identifying the reason the connection was closed
// @param description Optional description assocated with condition
//
static void close_raw_connection(qd_tcp_connection_t *conn, const char *condition, const char *description)
{
ASSERT_RAW_IO;

assert(conn->raw_conn);
if (condition) {
pn_condition_t *cond = pn_raw_connection_condition(conn->raw_conn);
if (!!cond) {
(void) pn_condition_set_name(cond, condition);
if (description) {
(void) pn_condition_set_description(cond, description);
}
}
}

qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] close_raw_connection: %s (%s)", conn->conn_id, condition, description);
CLEAR_ATOMIC_FLAG(&conn->raw_opened);
pn_raw_connection_close(conn->raw_conn);

// Connection cleanup occurs on the PN_RAW_CONNECTION_DISCONNECTED event
}

static void close_connection_XSIDE_IO(qd_tcp_connection_t *conn)
{
ASSERT_RAW_IO;
if (conn->state != XSIDE_CLOSING)
set_state_XSIDE_IO(conn, XSIDE_CLOSING);

if (!!conn->raw_conn) {
CLEAR_ATOMIC_FLAG(&conn->raw_opened);
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] close_connection_XSIDE_IO", conn->conn_id);
Expand Down Expand Up @@ -645,8 +628,8 @@ static void close_connection_XSIDE_IO(qd_tcp_connection_t *conn)
qd_tls_free2(conn->tls);
qd_tls_domain_decref(conn->tls_domain);
free(conn->alpn_protocol);

free(conn->reply_to);

conn->reply_to = 0;
conn->inbound_link = 0;
conn->inbound_stream = 0;
Expand All @@ -659,7 +642,20 @@ static void close_connection_XSIDE_IO(qd_tcp_connection_t *conn)
conn->tls = 0;
conn->tls_domain = 0;

free_connection_IO(conn);
// No thread assertion here - can be RAW_IO or TIMER_IO
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] Cleaning up resources", conn->conn_id);

// Disable activation via Core thread. The lock needs to be taken to ensure the core thread is not currently
// attempting to activate the connection: after the mutex is unlocked we're guaranteed no further activations can
// take place.
sys_mutex_lock(&conn->activation_lock);
CLEAR_ATOMIC_FLAG(&conn->raw_opened);
sys_mutex_unlock(&conn->activation_lock);
// Do NOT free the core_activation lock since the core may be holding it

// Pass connection to Core for final deallocation. The Core will free the activation_lock and the related flags. See
// qdr_core_free_tcp_resource_CT()
free_tcp_resource(&conn->common);
}


Expand Down Expand Up @@ -1733,7 +1729,6 @@ static void connection_run_LSIDE_IO(qd_tcp_connection_t *conn)
// Don't do anything
//
return;
break;

default:
assert(false);
Expand Down Expand Up @@ -1814,7 +1809,6 @@ static void connection_run_CSIDE_IO(qd_tcp_connection_t *conn)
// Don't do anything
//
return;
break;

default:
assert(false);
Expand Down Expand Up @@ -2295,7 +2289,6 @@ static void CORE_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t di

static void CORE_connection_close(void *context, qdr_connection_t *conn, qdr_error_t *error)
{
// hahaha
qd_tcp_common_t *common = (qd_tcp_common_t*) qdr_connection_get_context(conn);
qd_tcp_connection_t *tcp_conn = (qd_tcp_connection_t*) common;
if (tcp_conn) {
Expand Down Expand Up @@ -2417,6 +2410,13 @@ QD_EXPORT void qd_dispatch_delete_tcp_listener(qd_dispatch_t *qd, void *impl)
// to prevent any modification of the connections list while it is being traversed.
sys_mutex_lock(&listener->lock);
SET_ATOMIC_FLAG(&listener->closing);

//
// Only the head connection is woken when holding the lock.
// This is an optimization. The next connection in the list is woken up in the handler of the PN_RAW_DISCONNECTED
// event of this connection (See close_connection_XSIDE_IO() to see where the next connection in the list is woken up).
// That way, we don't have to wake all the connections when holding the lock.
//
qd_tcp_connection_t *conn = DEQ_HEAD(listener->connections);
if (conn)
pn_raw_connection_wake(conn->raw_conn);
Expand Down Expand Up @@ -2457,6 +2457,12 @@ QD_EXPORT void qd_dispatch_delete_tcp_connector(qd_dispatch_t *qd, void *impl)
// to prevent any modification of the connections list while it is being traversed.
sys_mutex_lock(&connector->lock);
SET_ATOMIC_FLAG(&connector->closing);
//
// Only the head connection is woken when holding the lock.
// This is an optimization. The next connection in the list is woken up in the handler of the PN_RAW_DISCONNECTED
// event of this connection (See close_connection_XSIDE_IO() to see where the next connection in the list is woken up).
// That way, we don't have to wake all the connections when holding the lock.
//
qd_tcp_connection_t *conn = DEQ_HEAD(connector->connections);
if (conn)
pn_raw_connection_wake(conn->raw_conn);
Expand Down
2 changes: 1 addition & 1 deletion src/dispatch_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ struct qd_dispatch_t {
char *metadata;
bool timestamps_in_utc;
char *data_connection_count;
bool terminate_tcp_conns;
bool terminate_tcp_conns;
};

qd_dispatch_t *qd_dispatch_get_dispatch(void);
Expand Down

0 comments on commit 0ae678b

Please sign in to comment.