Skip to content

Commit

Permalink
Couple conn->core_conn setup and delete with DEQ_INSERT(conn) and DEQ…
Browse files Browse the repository at this point in the history
…_REMOVE(conn), repectively
  • Loading branch information
gabordozsa committed Aug 12, 2024
1 parent f5ceedf commit 35d4d8b
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 112 deletions.
176 changes: 65 additions & 111 deletions src/adaptors/tcp/tcp_adaptor.c
Original file line number Diff line number Diff line change
Expand Up @@ -440,30 +440,6 @@ static void set_state_XSIDE_IO(qd_tcp_connection_t *conn, qd_tcp_connection_stat
conn->state = new_state;
}


static void terminate_connections(qd_tcp_connection_list_t *connections)
{
qd_tcp_connection_t *conn = DEQ_HEAD(*connections);
while (conn) {
qd_tcp_connection_t *next_conn = DEQ_NEXT(conn);
// Note: PN_RAW_CONNECTION_CONNECTED event or PN_RAW_CONNECTION_DISCONNECTED event
// could come upon any of the connections.
// We take the acivation_lock and check the atomic raw_opened flag to ensure
// that the value of core_conn cannot change while enqueing the 'close' action
// for the core thread (in case of PN_RAW_CONNECTION_DISCONNECTED event).
// We set conn->pending close if we cannot enqueue a 'close' action. This flag
// will trigger the closing at link setup in the PN_RAW_CONNECTION_CONNECTED
// event handler.
sys_mutex_lock(&conn->activation_lock);
if (IS_ATOMIC_FLAG_SET(&conn->raw_opened) && !!conn->core_conn)
qdr_core_close_connection(conn->core_conn);
else
conn->pending_close = true;
sys_mutex_unlock(&conn->activation_lock);
conn = next_conn;
}
}

//
// Connector/Connection cleanup
//
Expand Down Expand Up @@ -523,7 +499,16 @@ static void free_connection_IO(void *context)
if (conn->common.parent->context_type == TL_LISTENER) {
qd_tcp_listener_t *listener = (qd_tcp_listener_t*) conn->common.parent;
sys_mutex_lock(&listener->lock);
listener->connections_closed++;
DEQ_REMOVE(listener->connections, conn);
if (!!conn->core_conn) {
// Note: core_conn can be accessed when management thread is deleting the listener
// and initiates closing the TCP connections. We need to hold the listener->lock when
// removing core_conn.
qdr_connection_closed(conn->core_conn);
conn->core_conn = 0;
qd_connection_counter_dec(QD_PROTOCOL_TCP);
}
sys_mutex_unlock(&listener->lock);
//
// Call listener decref when a connection associated with the listener is removed (DEQ_REMOVE(listener->connections, conn))
Expand All @@ -533,7 +518,16 @@ static void free_connection_IO(void *context)
} else {
qd_tcp_connector_t *connector = (qd_tcp_connector_t*) conn->common.parent;
sys_mutex_lock(&connector->lock);
connector->connections_closed++;
DEQ_REMOVE(connector->connections, conn);
if (!!conn->core_conn) {
// Note: core_conn can be accessed when management thread is deleting the connector
// and initiates closing the TCP connections. We need to hold the connector->lock when
// removing core_conn.
qdr_connection_closed(conn->core_conn);
conn->core_conn = 0;
qd_connection_counter_dec(QD_PROTOCOL_TCP);
}
sys_mutex_unlock(&connector->lock);
//
// Call connector decref when a connection associated with the connector is removed (DEQ_REMOVE(connector->connections, conn))
Expand Down Expand Up @@ -648,12 +642,6 @@ static void close_connection_XSIDE_IO(qd_tcp_connection_t *conn)
vflow_end_record(conn->common.vflow);
}

if (!!conn->core_conn) {
qdr_connection_closed(conn->core_conn);
conn->core_conn = 0;
qd_connection_counter_dec(QD_PROTOCOL_TCP);
}

qd_tls_free2(conn->tls);
qd_tls_domain_decref(conn->tls_domain);
free(conn->alpn_protocol);
Expand All @@ -667,24 +655,9 @@ static void close_connection_XSIDE_IO(qd_tcp_connection_t *conn)
conn->outbound_delivery = 0;
conn->observer_handle = 0;
conn->common.vflow = 0;
conn->core_conn = 0;
conn->tls = 0;
conn->tls_domain = 0;

if (conn->common.parent) {
if (conn->common.parent->context_type == TL_LISTENER) {
qd_tcp_listener_t *li = (qd_tcp_listener_t*) conn->common.parent;
sys_mutex_lock(&li->lock);
li->connections_closed++;
sys_mutex_unlock(&li->lock);
} else {
qd_tcp_connector_t *cr = (qd_tcp_connector_t*) conn->common.parent;
sys_mutex_lock(&cr->lock);
cr->connections_closed++;
sys_mutex_unlock(&cr->lock);
}
}

free_connection_IO(conn);
}

Expand Down Expand Up @@ -939,56 +912,55 @@ static uint64_t consume_message_body_XSIDE_IO(qd_tcp_connection_t *conn, qd_mess
}


static bool link_setup_LSIDE_IO(qd_tcp_connection_t *conn)
static void link_setup_LSIDE_IO(qd_tcp_connection_t *conn)
{
ASSERT_RAW_IO;
qd_tcp_listener_t *li = (qd_tcp_listener_t*) conn->common.parent;
qdr_terminus_t *target = qdr_terminus(0);
qdr_terminus_t *source = qdr_terminus(0);
char host[64]; // for numeric remote client IP:port address
bool pending_close;
char host[64]; // for numeric remote client IP:port address

qdr_terminus_set_address(target, li->adaptor_config->address);
qdr_terminus_set_dynamic(source);

qd_raw_conn_get_address_buf(conn->raw_conn, host, sizeof(host));

// management thread might be trying to access conn->core_conn
// if tcpListener is being deleted
sys_mutex_lock(&conn->activation_lock);
sys_mutex_lock(&li->lock);
conn->core_conn = TL_open_core_connection(conn->conn_id, true, host);
pending_close = conn->pending_close; // if tcpListener is being deleted
sys_mutex_unlock(&conn->activation_lock);

qdr_connection_set_context(conn->core_conn, conn);
// Add the tcp conn to the listener list only when core_conn is setup
DEQ_INSERT_TAIL(li->connections, conn);
if (li->closing)
// listener is being deleted and all connections must be closed
qdr_core_close_connection(conn->core_conn);
sys_mutex_unlock(&li->lock);

conn->inbound_link = qdr_link_first_attach(conn->core_conn, QD_INCOMING, qdr_terminus(0), target, "tcp.lside.in", 0, false, 0, &conn->inbound_link_id);
qdr_link_set_context(conn->inbound_link, conn);
conn->outbound_link = qdr_link_first_attach(conn->core_conn, QD_OUTGOING, source, qdr_terminus(0), "tcp.lside.out", 0, false, 0, &conn->outbound_link_id);
qdr_link_set_context(conn->outbound_link, conn);
qdr_link_set_user_streaming(conn->outbound_link);
qdr_link_flow(tcp_context->core, conn->outbound_link, 1, false);

return pending_close;
}


static bool link_setup_CSIDE_IO(qd_tcp_connection_t *conn, qdr_delivery_t *delivery)
static void link_setup_CSIDE_IO(qd_tcp_connection_t *conn, qdr_delivery_t *delivery)
{
ASSERT_RAW_IO;

assert(conn->common.parent->context_type == TL_CONNECTOR);
const char *host = ((qd_tcp_connector_t *)conn->common.parent)->adaptor_config->host_port;
bool pending_close;

// management thread might be trying to access conn->core_conn
// if tcpConnector is being deleted
sys_mutex_lock(&conn->activation_lock);
conn->core_conn = TL_open_core_connection(conn->conn_id, false, host);
pending_close = conn->pending_close; // if tcpListener is being deleted
sys_mutex_unlock(&conn->activation_lock);
qd_tcp_connector_t *connector = (qd_tcp_connector_t *) conn->common.parent;
const char *host = connector->adaptor_config->host_port;

sys_mutex_lock(&connector->lock);
conn->core_conn = TL_open_core_connection(conn->conn_id, false, host);
qdr_connection_set_context(conn->core_conn, conn);
// Add the tcp conn to the connector list only when core_conn is setup
DEQ_INSERT_TAIL(connector->connections, conn);
if (connector->closing)
// connector is being deleted and all connections must be closed
qdr_core_close_connection(conn->core_conn);
sys_mutex_unlock(&connector->lock);

// use an anonymous inbound link in order to ensure credit arrives otherwise if the client has dropped the state machine will stall waiting for credit
conn->inbound_link = qdr_link_first_attach(conn->core_conn, QD_INCOMING, qdr_terminus(0), qdr_terminus(0), "tcp.cside.in", 0, false, 0, &conn->inbound_link_id);
Expand All @@ -1006,8 +978,6 @@ static bool link_setup_CSIDE_IO(qd_tcp_connection_t *conn, qdr_delivery_t *deliv
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, DLV_FMT " TCP enabling consumer activation", DLV_ARGS(delivery));
qd_message_set_consumer_activation(conn->outbound_stream, &activation);
qd_message_start_unicast_cutthrough(conn->outbound_stream);

return pending_close;
}


Expand Down Expand Up @@ -1282,18 +1252,9 @@ static uint64_t handle_first_outbound_delivery_CSIDE(qd_tcp_connector_t *connect
conn->context.handler = on_connection_event_CSIDE_IO;

sys_mutex_lock(&connector->lock);
DEQ_INSERT_TAIL(connector->connections, conn);

connector->connections_opened++;

vflow_set_uint64(connector->common.vflow, VFLOW_ATTRIBUTE_FLOW_COUNT_L4, connector->connections_opened);
vflow_set_ref_from_record(conn->common.vflow, VFLOW_ATTRIBUTE_CONNECTOR, connector->common.vflow);

if (connector->closing) {
// Management thread triggered deletion of tcpConnector and closing all coresponding
// connections. This conn was skipped due to not in the connections list yet.
conn->pending_close = true;
}
sys_mutex_unlock(&connector->lock);

conn->raw_conn = pn_raw_connection();
Expand Down Expand Up @@ -1713,13 +1674,8 @@ static void connection_run_LSIDE_IO(qd_tcp_connection_t *conn)
repeat = true;
}
} else {
bool pending_close = link_setup_LSIDE_IO(conn);
if (pending_close) {
close_raw_connection(conn, "TCP-listener-deleted", "Forced termination");
set_state_XSIDE_IO(conn, XSIDE_CLOSING); // prevent further connection I/O
} else {
set_state_XSIDE_IO(conn, LSIDE_LINK_SETUP);
}
link_setup_LSIDE_IO(conn);
set_state_XSIDE_IO(conn, LSIDE_LINK_SETUP);
}
}
break;
Expand All @@ -1745,13 +1701,8 @@ static void connection_run_LSIDE_IO(qd_tcp_connection_t *conn)
//
// Handshake completed, begin the setup of the inbound and outbound links for this connection.
//
bool pending_close = link_setup_LSIDE_IO(conn);
if (pending_close) {
close_raw_connection(conn, "TCP-listener-deleted", "Forced termination");
set_state_XSIDE_IO(conn, XSIDE_CLOSING); // prevent further connection I/O
} else {
set_state_XSIDE_IO(conn, LSIDE_LINK_SETUP);
}
link_setup_LSIDE_IO(conn);
set_state_XSIDE_IO(conn, LSIDE_LINK_SETUP);
}
}
break;
Expand Down Expand Up @@ -1832,13 +1783,8 @@ static void connection_run_CSIDE_IO(qd_tcp_connection_t *conn)
break;
}
}
bool pending_close = link_setup_CSIDE_IO(conn, conn->outbound_delivery);
if (pending_close) {
close_raw_connection(conn, "TCP-connector-deleted", "Forced termination");
set_state_XSIDE_IO(conn, XSIDE_CLOSING); // prevent further connection I/O
} else {
set_state_XSIDE_IO(conn, CSIDE_LINK_SETUP);
}
link_setup_CSIDE_IO(conn, conn->outbound_delivery);
set_state_XSIDE_IO(conn, CSIDE_LINK_SETUP);
}
break;

Expand Down Expand Up @@ -2015,7 +1961,6 @@ static int setup_tls_session(qd_tcp_connection_t *conn, const qd_tls_domain_t *p
return 0;
}


//=================================================================================
// Handlers for events from the Raw Connections
//=================================================================================
Expand Down Expand Up @@ -2105,16 +2050,8 @@ static void on_accept(qd_adaptor_listener_t *adaptor_listener, pn_listener_t *pn
conn->context.handler = on_connection_event_LSIDE_IO;

sys_mutex_lock(&listener->lock);
DEQ_INSERT_TAIL(listener->connections, conn);

listener->connections_opened++;
vflow_set_uint64(listener->common.vflow, VFLOW_ATTRIBUTE_FLOW_COUNT_L4, listener->connections_opened);

if (listener->closing) {
// Management thread triggered deletion of tcpListener and closing all coresponding
// connections. This conn was skipped due to not in the connections list yet.
conn->pending_close = true;
}
sys_mutex_unlock(&listener->lock);

if (listener->protocol_observer) {
Expand Down Expand Up @@ -2462,9 +2399,17 @@ QD_EXPORT void qd_dispatch_delete_tcp_listener(qd_dispatch_t *qd, void *impl)
// Initiate termination of existing connections
//
if (tcp_context->qd->terminate_tcp_conns) {
// Note: PN_RAW_CONNECTION_CONNECTED event or PN_RAW_CONNECTION_DISCONNECTED event
// could come upon any of the connections. We need to hold the listener->lock
// to prevent any modification of the connections list while it is being traversed.
sys_mutex_lock(&listener->lock);
listener->closing = true;
terminate_connections(&listener->connections);
listener->closing = true;
qd_tcp_connection_t *conn = DEQ_HEAD(listener->connections);
while (conn) {
assert(conn->core_conn != 0);
qdr_core_close_connection(conn->core_conn);
conn = DEQ_NEXT(conn);
}
sys_mutex_unlock(&listener->lock);
}
//
Expand Down Expand Up @@ -2497,12 +2442,21 @@ QD_EXPORT void qd_dispatch_delete_tcp_connector(qd_dispatch_t *qd, void *impl)
// Initiate termination of existing connections
//
if (tcp_context->qd->terminate_tcp_conns) {
// Note: PN_RAW_CONNECTION_CONNECTED event or PN_RAW_CONNECTION_DISCONNECTED event
// could come upon any of the connections. We need to hold the connector->lock
// to prevent any modification of the connections list while it is being traversed.
sys_mutex_lock(&connector->lock);
connector->closing = true;
terminate_connections(&connector->connections);
connector->closing = true;
qd_tcp_connection_t *conn = DEQ_HEAD(connector->connections);
while (conn) {
assert(conn->core_conn != 0);
qdr_core_close_connection(conn->core_conn);
conn = DEQ_NEXT(conn);
}
sys_mutex_unlock(&connector->lock);
}
//
//
// If all the connections associated with this listener has been closed, this call to
// qd_tcp_listener_decref should free the listener
//
Expand Down
1 change: 0 additions & 1 deletion src/adaptors/tcp/tcp_adaptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ typedef struct qd_tcp_connection_t {
bool inbound_first_octet;
bool outbound_first_octet;
bool outbound_body_complete;
bool pending_close;
} qd_tcp_connection_t;


Expand Down

0 comments on commit 35d4d8b

Please sign in to comment.