Skip to content

Commit

Permalink
Fixes #1606: Prevent connection activation if the listener or connect…
Browse files Browse the repository at this point in the history
…or is closed (#1607)
  • Loading branch information
ganeshmurthy authored Sep 6, 2024
1 parent 31c72f6 commit 787afa2
Showing 1 changed file with 42 additions and 42 deletions.
84 changes: 42 additions & 42 deletions src/adaptors/tcp/tcp_adaptor.c
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ struct qdr_tcp_connection_t {
bool ingress;
bool flow_enabled;
bool is_egress_dispatcher_conn;
bool connector_closed;//only used if egress_dispatcher=true
bool in_list; // This connection is in the adaptor's connections list
bool raw_read_shutdown; // stream closed
bool read_eos_seen;
Expand Down Expand Up @@ -278,15 +277,8 @@ static void on_activate(void *context)
qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) context;

qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%" PRIu64 "] on_activate", conn->conn_id);
while (qdr_connection_process(conn->qdr_conn)) {}
if (conn->is_egress_dispatcher_conn && conn->connector_closed) {
detach_links(conn);
qdr_connection_set_context(conn->qdr_conn, 0);
qdr_connection_closed(conn->qdr_conn);
qd_connection_counter_dec(QD_PROTOCOL_TCP);
conn->qdr_conn = 0;
free_qdr_tcp_connection(conn);
}

while (conn->qdr_conn && qdr_connection_process(conn->qdr_conn)) {}
}

/**
Expand Down Expand Up @@ -1859,40 +1851,37 @@ qd_tcp_connector_t *qd_dispatch_configure_tcp_connector_legacy(qd_dispatch_t *qd
return c;
}

void qd_dispatch_delete_tcp_connector_legacy(qd_dispatch_t *qd, qd_tcp_connector_t *ct)
void qd_dispatch_delete_tcp_connector_legacy(qd_dispatch_t *qd, qd_tcp_connector_t *connector)
{
if (ct) {
if (connector) {
qd_log(LOG_TCP_ADAPTOR, QD_LOG_INFO, "Deleted TcpConnector for %s, %s:%s",
ct->config->adaptor_config->address, ct->config->adaptor_config->host, ct->config->adaptor_config->port);
connector->config->adaptor_config->address, connector->config->adaptor_config->host, connector->config->adaptor_config->port);

// need to close the pseudo-connection used for dispatching
// deliveries out to live connections:
handle_disconnected((qdr_tcp_connection_t*) ct->dispatcher_conn);
ct->dispatcher_conn = 0;
DEQ_REMOVE(tcp_adaptor->connectors, ct);

//
// Initiate termination of existing connections
//
if (qd->terminate_tcp_conns) {
// Note: PN_RAW_CONNECTION_CONNECTED event or PN_RAW_CONNECTION_DISCONNECTED event
// could come upon any of the connections. We need to hold the connector->lock
// to prevent any modification of the connections list while it is being traversed.
sys_mutex_lock(&ct->lock);
SET_ATOMIC_FLAG(&ct->closing);
//
// Only the head connection is woken when holding the lock.
// This is an optimization. The next connection in the list is woken up in the handler of the PN_RAW_DISCONNECTED
// event of this connection (See close_connection_XSIDE_IO() to see where the next connection in the list is woken up).
// That way, we don't have to wake all the connections when holding the lock.
//
qdr_tcp_connection_t *conn = DEQ_HEAD(ct->connections);
if (conn)
pn_raw_connection_wake(conn->pn_raw_conn);
sys_mutex_unlock(&ct->lock);
SET_ATOMIC_FLAG(&connector->closing);
}

qd_tcp_connector_decref(ct);
DEQ_REMOVE(tcp_adaptor->connectors, connector);

qdr_tcp_connection_t* dispatcher_conn = connector->dispatcher_conn;
sys_mutex_lock(&dispatcher_conn->activation_lock);
qd_timer_t *temp_timer = dispatcher_conn->activate_timer;
//
// The dispatcher_conn->activate_timer has been zeroed out while holding dispatcher_conn->activation_lock
// This will ensure that no further timer call backs will be scheduled, calls to qdr_tcp_activate_CT()
// will not schedule the timer anymore. This will have to be done before the connector->dispatcher_conn is cleaned up.
//
dispatcher_conn->activate_timer = 0;
sys_mutex_unlock(&dispatcher_conn->activation_lock);
//
// The call to qd_timer_free() will cancel any existing timers and free the activation_timer
//
qd_timer_free(temp_timer);
// need to close the pseudo-connection used for dispatching
// deliveries out to live connections:
handle_disconnected((qdr_tcp_connection_t*) connector->dispatcher_conn);
connector->dispatcher_conn = 0;
qd_tcp_connector_decref(connector);
}
}

Expand Down Expand Up @@ -2414,21 +2403,32 @@ static void qdr_tcp_activate_CT(void *notused, qdr_connection_t *c)
qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) context;
LOCK(&conn->activation_lock);
if (conn->pn_raw_conn && !(IS_ATOMIC_FLAG_SET(&conn->raw_closed_read) && IS_ATOMIC_FLAG_SET(&conn->raw_closed_write))) {
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG,
"[C%" PRIu64 "] qdr_tcp_activate_CT: call pn_raw_connection_wake()", conn->conn_id);
pn_raw_connection_wake(conn->pn_raw_conn);
UNLOCK(&conn->activation_lock);
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG,
"[C%" PRIu64 "] qdr_tcp_activate_CT: called pn_raw_connection_wake()", conn->conn_id);
} else if (conn->activate_timer) {
UNLOCK(&conn->activation_lock);

// On egress, the raw connection is only created once the
// first part of the message encapsulating the
// client->server half of the stream has been
// received. Prior to that however a subscribing link (and
// its associated connection must be setup), for which we
// fake wakeup by using a timer.

//
// https://github.com/skupperproject/skupper-router/issues/1606
//
assert(conn->is_egress_dispatcher_conn);
//
// If connector was already deleted using qd_dispatch_delete_tcp_connector_legacy(), it would have zeroed out the conn->activate_timer, so
// it cannot be scheduled anymore.
//
qd_timer_schedule(conn->activate_timer, 0);
UNLOCK(&conn->activation_lock);
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG,
"[C%" PRIu64 "] qdr_tcp_activate_CT: schedule activate_timer", conn->conn_id);
qd_timer_schedule(conn->activate_timer, 0);

} else {
UNLOCK(&conn->activation_lock);
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%" PRIu64 "] qdr_tcp_activate_CT: Cannot activate",
Expand Down

0 comments on commit 787afa2

Please sign in to comment.