Skip to content

Commit

Permalink
Fixes skupperproject#1274: improve locking of the qd_connector_t object
Browse files Browse the repository at this point in the history
  • Loading branch information
kgiusti committed Oct 25, 2023
1 parent 15af2ed commit b2b61f2
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 36 deletions.
3 changes: 1 addition & 2 deletions src/connection_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -664,13 +664,12 @@ QD_EXPORT qd_error_t qd_entity_refresh_connector(qd_entity_t* entity, void *impl
{
qd_connector_t *connector = (qd_connector_t*) impl;

int conn_index = connector->conn_index;

int i = 1;
int num_items = 0;

sys_mutex_lock(&connector->lock);

int conn_index = connector->conn_index;
qd_failover_item_list_t conn_info_list = connector->conn_info_list;

int conn_info_len = DEQ_SIZE(conn_info_list);
Expand Down
10 changes: 4 additions & 6 deletions src/router_node.c
Original file line number Diff line number Diff line change
Expand Up @@ -1560,13 +1560,11 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool
}

if (conn->connector) {
char conn_msg[300];
qd_format_string(conn_msg, 300, "[C%"PRIu64"] Connection Opened: dir=%s host=%s encrypted=%s"
" auth=%s user=%s container_id=%s",
connection_id, inbound ? "in" : "out", host, encrypted ? proto : "no",
authenticated ? mech : "no", (char*) user, container);
sys_mutex_lock(&conn->connector->lock);
strcpy(conn->connector->conn_msg, conn_msg);
qd_format_string(conn->connector->conn_msg, QD_CXTR_CONN_MSG_BUF_SIZE,
"[C%"PRIu64"] Connection Opened: dir=%s host=%s encrypted=%s auth=%s user=%s container_id=%s",
connection_id, inbound ? "in" : "out", host, encrypted ? proto : "no",
authenticated ? mech : "no", (char*) user, container);
sys_mutex_unlock(&conn->connector->lock);
}
}
Expand Down
47 changes: 24 additions & 23 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ char *COMPONENT_SEPARATOR = ";";
static const int BACKLOG = 50; /* Listening backlog */

static bool setup_ssl_sasl_and_open(qd_connection_t *ctx); // true if ssl, sasl, and open succeeded
static qd_failover_item_t *qd_connector_get_conn_info(qd_connector_t *ct);
static qd_failover_item_t *qd_connector_get_conn_info_lh(qd_connector_t *ct);

/**
* This function is set as the pn_transport->tracer and is invoked when proton tries to write the log message to pn_transport->tracer
Expand Down Expand Up @@ -979,10 +979,10 @@ static void startup_timer_handler(void *context)
qd_connection_invoke_deferred(ctx, timeout_on_handshake, context);
}

static void qd_increment_conn_index(qd_connection_t *ctx)
static void qd_increment_conn_index_lh(qd_connection_t *ctx)
{
if (ctx->connector) {
qd_failover_item_t *item = qd_connector_get_conn_info(ctx->connector);
qd_failover_item_t *item = qd_connector_get_conn_info_lh(ctx->connector);

if (item->retries == 1) {
ctx->connector->conn_index += 1;
Expand Down Expand Up @@ -1044,10 +1044,12 @@ static bool handle(qd_server_t *qd_server, pn_event_t *e, pn_connection_t *pn_co
if (ctx && !ctx->opened) {
ctx->opened = true;
if (ctx->connector) {
sys_mutex_lock(&ctx->connector->lock);
ctx->connector->delay = 2000; // Delay re-connect in case there is a recurring error
qd_failover_item_t *item = qd_connector_get_conn_info(ctx->connector);
qd_failover_item_t *item = qd_connector_get_conn_info_lh(ctx->connector);
if (item)
item->retries = 0;
sys_mutex_unlock(&ctx->connector->lock);
}
}
break;
Expand All @@ -1061,22 +1063,26 @@ static bool handle(qd_server_t *qd_server, pn_event_t *e, pn_connection_t *pn_co
pn_transport_t *transport = pn_event_transport(e);
pn_condition_t* condition = transport ? pn_transport_condition(transport) : NULL;
if (ctx && ctx->connector) { /* Outgoing connection */
qd_increment_conn_index(ctx);
const qd_server_config_t *config = &ctx->connector->config;
char conn_msg[QD_CXTR_CONN_MSG_BUF_SIZE]; // avoid holding connector lock when logging

sys_mutex_lock(&ctx->connector->lock);
qd_increment_conn_index_lh(ctx);
// note: will transition back to STATE_CONNECTING when ctx is freed (pn_connection_free)
ctx->connector->state = CXTR_STATE_FAILED;
char conn_msg[300];
if (condition && pn_condition_is_set(condition)) {
qd_format_string(conn_msg, 300, "[C%"PRIu64"] Connection to %s failed: %s %s", ctx->connection_id, config->host_port,
pn_condition_get_name(condition), pn_condition_get_description(condition));
strcpy(ctx->connector->conn_msg, conn_msg);

qd_log(LOG_SERVER, QD_LOG_ERROR, "%s", conn_msg);
if (condition && pn_condition_is_set(condition)) {
qd_format_string(conn_msg, sizeof(conn_msg), "[C%"PRIu64"] Connection to %s failed: %s %s",
ctx->connection_id, config->host_port, pn_condition_get_name(condition),
pn_condition_get_description(condition));
} else {
qd_format_string(conn_msg, 300, "[C%"PRIu64"] Connection to %s failed", ctx->connection_id, config->host_port);
strcpy(ctx->connector->conn_msg, conn_msg);
qd_log(LOG_SERVER, QD_LOG_ERROR, "%s", conn_msg);
qd_format_string(conn_msg, sizeof(conn_msg), "[C%"PRIu64"] Connection to %s failed",
ctx->connection_id, config->host_port);
}
strncpy(ctx->connector->conn_msg, conn_msg, QD_CXTR_CONN_MSG_BUF_SIZE);
sys_mutex_unlock(&ctx->connector->lock);

qd_log(LOG_SERVER, QD_LOG_ERROR, "%s", conn_msg);

} else if (ctx && ctx->listener) { /* Incoming connection */
if (condition && pn_condition_is_set(condition)) {
qd_log(LOG_SERVER, QD_LOG_ERROR,
Expand Down Expand Up @@ -1166,7 +1172,7 @@ static void *thread_run(void *arg)
}


static qd_failover_item_t *qd_connector_get_conn_info(qd_connector_t *ct) {
static qd_failover_item_t *qd_connector_get_conn_info_lh(qd_connector_t *ct) {

qd_failover_item_t *item = DEQ_HEAD(ct->conn_info_list);

Expand All @@ -1179,7 +1185,7 @@ static qd_failover_item_t *qd_connector_get_conn_info(qd_connector_t *ct) {
}


/* Timer callback to try/retry connection open */
/* Timer callback to try/retry connection open, connector->lock held */
static void try_open_lh(qd_connector_t *connector, qd_connection_t *connection)
{
// connection until pn_proactor_connect is called below
Expand All @@ -1204,7 +1210,7 @@ static void try_open_lh(qd_connector_t *connector, qd_connection_t *connection)
// hostname in the open frame.
//

qd_failover_item_t *item = qd_connector_get_conn_info(connector);
qd_failover_item_t *item = qd_connector_get_conn_info_lh(connector);

char *current_host = item->host;
char *host_port = item->host_port;
Expand Down Expand Up @@ -1717,10 +1723,6 @@ qd_connector_t *qd_server_connector(qd_server_t *server)
connector->timer = qd_timer(server->qd, try_open_cb, connector);
if (!connector->timer)
goto error;
connector->conn_msg = (char*) malloc(300);
if (!connector->conn_msg)
goto error;
memset(connector->conn_msg, 0, 300);

connector->server = server;
connector->conn_index = 1;
Expand Down Expand Up @@ -1780,7 +1782,6 @@ void qd_connector_decref(qd_connector_t* connector)
item = DEQ_HEAD(connector->conn_info_list);
}
if (connector->policy_vhost) free(connector->policy_vhost);
free(connector->conn_msg);
free_qd_connector_t(connector);
}
}
Expand Down
7 changes: 5 additions & 2 deletions src/server_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ DEQ_DECLARE(qd_listener_t, qd_listener_list_t);
* Connector objects represent the desire to create and maintain an outgoing transport connection.
*/
struct qd_connector_t {
DEQ_LINKS(qd_connector_t);

/* Referenced by connection_manager and pn_connection_t */
sys_atomic_t ref_count;
qd_server_t *server;
Expand All @@ -125,7 +127,6 @@ struct qd_connector_t {
/* Connector state and ctx can be modified by I/O or management threads. */
sys_mutex_t lock;
cxtr_state_t state;
char *conn_msg;
qd_connection_t *qd_conn;

/* This conn_list contains all the connection information needed to make a connection. It also includes failover connection information */
Expand All @@ -139,7 +140,9 @@ struct qd_connector_t {
bool is_data_connector;
char group_correlator[QD_DISCRIMINATOR_SIZE];

DEQ_LINKS(qd_connector_t);
/* holds proton transport error condition text on connection failure */
#define QD_CXTR_CONN_MSG_BUF_SIZE 300
char conn_msg[QD_CXTR_CONN_MSG_BUF_SIZE];
};

DEQ_DECLARE(qd_connector_t, qd_connector_list_t);
Expand Down
3 changes: 0 additions & 3 deletions tests/tsan.supp
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,6 @@ race:^_lws_log$
race:^_lws_logv$
race:^__lws_logv$

# ISSUE-537 - Suppress the race in qd_entity_refresh_connector for now.
race:^qd_entity_refresh_connector$

#
# External libraries
#
Expand Down

0 comments on commit b2b61f2

Please sign in to comment.