Skip to content

Commit

Permalink
Merge pull request #1042 from nanomq/jaylin/reconnect_issue
Browse files Browse the repository at this point in the history
Fix conflicts
  • Loading branch information
JaylinYu authored Aug 12, 2024
2 parents 3104b16 + 51199c2 commit e585885
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 63 deletions.
2 changes: 1 addition & 1 deletion src/core/dialer.c
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ dialer_timer_cb(void *arg)
if ((rv = nni_aio_result(&d->d_tmo_aio)) == 0) {
dialer_connect_start(d);
} else {
log_error("timer cb rv%d", rv);
log_error("failed timer cb rv %d", rv);
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/core/sockimpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ struct nni_pipe {
bool cache;
uint16_t packet_id;
nni_list *subinfol; // additional info for sub
void *nano_qos_db; // qos msgs, 'sqlite' or 'nni_id_hash_map'
// nano_qos_db stores qos msgs in 'sqlite' or 'nni_id_hash_map'
void *nano_qos_db; // protected by pipe lock.
};

extern int nni_sock_add_dialer(nni_sock *, nni_dialer *);
Expand Down
21 changes: 15 additions & 6 deletions src/mqtt/protocol/mqtt/mqtt_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -435,24 +435,29 @@ mqtt_send_msg(nni_aio *aio, mqtt_ctx_t *arg)
// pass proto_data to cached aio, either it is freed in ack or in cancel
nni_aio_set_prov_data(aio, nni_msg_get_proto_data(msg));
if (taio != NULL) {
nni_plat_printf("Warning : msg %d lost due to "
"packetID duplicated!",
packet_id);
log_warn("Warning : msg %d lost due to "
"packetID duplicated!", packet_id);
nni_aio_finish_error(taio, NNG_ECANCELED);
nni_id_remove(&p->sent_unack, packet_id);
nni_msg_free(nni_aio_get_msg(taio));
nni_aio_set_msg(taio, NULL);
}
if (0 != nni_id_set(&p->sent_unack, packet_id, aio)) {
nni_plat_printf("Warning : aio caching failed");
nni_aio_finish_error(aio, NNG_ECANCELED);
} else {
int rv;
if ((rv = nni_aio_schedule(aio, mqtt_ctx_cancel_send, ctx)) != 0) {
log_warn("Cancel_Func scheduling failed, send abort!");
nni_id_remove(&p->sent_unack, packet_id);
nni_aio_set_msg(aio, NULL);
nni_mtx_unlock(&s->mtx);
nni_msg_free(msg); // User need to realloc this msg again
nni_aio_finish_error(aio, rv);
return;
}
// pass proto_data to cached aio, either it is freed in ack or in cancel
nni_aio_set_prov_data(aio, nni_msg_get_proto_data(msg));
nni_msg_clone(msg);
}
break;
Expand Down Expand Up @@ -527,9 +532,7 @@ static void
mqtt_pipe_stop(void *arg)
{
mqtt_pipe_t *p = arg;
nni_aio_abort(&p->send_aio, NNG_ECANCELED);
nni_aio_stop(&p->send_aio);
nni_aio_abort(&p->recv_aio, NNG_ECANCELED);
nni_aio_stop(&p->recv_aio);
nni_aio_stop(&p->time_aio);
}
Expand Down Expand Up @@ -944,6 +947,7 @@ mqtt_recv_cb(void *arg)
// in case data race in cancel
nni_aio_set_prov_data(user_aio, NULL);
nni_msg_free(nni_aio_get_msg(user_aio));
nni_aio_set_msg(user_aio, NULL);
if (packet_type == NNG_MQTT_SUBACK ||
packet_type == NNG_MQTT_UNSUBACK) {
nni_msg_clone(msg);
Expand Down Expand Up @@ -1155,14 +1159,19 @@ mqtt_ctx_cancel_send(nni_aio *aio, void *arg, int rv)
"timeout!", packet_id);
nni_id_remove(&p->sent_unack, packet_id);
nni_msg_free(nni_aio_get_msg(taio));
nni_aio_set_msg(taio, NULL);
nni_aio_set_prov_data(taio, NULL);
}
if (taio == aio)
nni_aio_finish_error(aio, NNG_ECANCELED);
else
log_error("canceling wrong aio!");
}
}

if (nni_aio_list_active(aio)) {
nni_aio_list_remove(aio);
}
nni_aio_finish_error(aio, NNG_ECANCELED);
nni_mtx_unlock(&s->mtx);

}
Expand Down
96 changes: 58 additions & 38 deletions src/sp/protocol/mqtt/nmq_mqtt.c
Original file line number Diff line number Diff line change
Expand Up @@ -69,27 +69,26 @@ struct nano_sock {

// nano_pipe is our per-pipe protocol private structure.
struct nano_pipe {
nni_mtx lk;
nni_pipe *pipe;
nano_sock *broker;
uint32_t id; // pipe id of nni_pipe
uint16_t rid; // index of packet ID for resending
uint16_t keepalive;
void *tree; // root node of db tree
nni_mtx lk;
nni_pipe *pipe;
nano_sock *broker;
conn_param *conn_param;
nni_lmq rlmq; // only for sending cache
uint8_t reason_code;
uint32_t id; // pipe id of nni_pipe
uint16_t rid; // index of packet ID for resending
uint16_t keepalive;
uint16_t ka_refresh; // ka_refresh count how many times the keepalive
// timer has been triggered
bool busy;
bool event; // indicates if exposure disconnect event is valid
void *tree; // root node of db tree
void *nano_qos_db; // 'sqlite' or 'nni_id_hash_map'
nni_aio aio_send;
nni_aio aio_recv;
nni_aio aio_timer;
nni_list_node rnode; // receivable list linkage
bool busy;
bool closed;
bool event; // indicates if exposure disconnect event is valid
uint8_t reason_code;
// ka_refresh count how many times the keepalive timer has
// been triggered
uint16_t ka_refresh;
conn_param *conn_param;
nni_lmq rlmq; // only for sending cache
void *nano_qos_db; // 'sqlite' or 'nni_id_hash_map'
nni_atomic_bool closed;
};

void
Expand Down Expand Up @@ -272,9 +271,8 @@ nano_pipe_timer_cb(void *arg)
}
}
}

nni_mtx_unlock(&p->lk);
nni_sleep_aio(qos_duration * 1000, &p->aio_timer);
nni_mtx_unlock(&p->lk);
return;
}

Expand Down Expand Up @@ -399,8 +397,8 @@ nano_ctx_send(void *arg, nni_aio *aio)
}

// 2 locks here cause performance degradation
nni_mtx_lock(&p->lk);
nni_mtx_unlock(&s->lk);
nni_mtx_lock(&p->lk);

if (p->pipe->cache) {
if (nni_msg_get_type(msg) == CMD_PUBLISH) {
Expand Down Expand Up @@ -583,19 +581,22 @@ nano_pipe_fini(void *arg)
nni_aio_set_msg(&p->aio_send, NULL);
nni_msg_free(msg);
}
void *nano_qos_db = p->pipe->nano_qos_db;

//Safely free the msgs in qos_db, only when nano_qos_db is not taken by new pipe
nni_mtx_lock(&p->lk);
void *nano_qos_db = p->pipe->nano_qos_db;
if (p->event == true) {
if (!p->broker->conf->sqlite.enable && nano_qos_db != NULL) {
nni_qos_db_remove_all_msg(false,
nano_qos_db, nmq_close_unack_msg_cb);
nni_qos_db_fini_id_hash(nano_qos_db);
p->pipe->nano_qos_db = NULL;
}
} else {
// we keep all structs in broker layer, except this conn_param
conn_param_free(p->conn_param);
}
nni_mtx_unlock(&p->lk);

nni_mtx_fini(&p->lk);
nni_aio_fini(&p->aio_send);
Expand Down Expand Up @@ -630,6 +631,9 @@ nano_pipe_init(void *arg, nni_pipe *pipe, void *s)
p->tree = sock->db;
if (p->conn_param != NULL)
p->keepalive = p->conn_param->keepalive_mqtt;
if (!p->broker->conf->sqlite.enable && pipe->nano_qos_db == NULL) {
nni_qos_db_init_id_hash(pipe->nano_qos_db);
}

return (0);
}
Expand Down Expand Up @@ -691,7 +695,7 @@ nano_pipe_start(void *arg)
nni_mtx_unlock(&s->lk);
return NNG_ECONNSHUT;
}

// nni_mtx_lock(&p->lk);
if (p->conn_param->clean_start == 0) {
old = nni_id_get(&s->cached_sessions, p->pipe->p_id);
if (old != NULL) {
Expand All @@ -701,10 +705,11 @@ nano_pipe_start(void *arg)

if (!is_sqlite && p->pipe->nano_qos_db!= NULL) {
nni_qos_db_fini_id_hash(p->pipe->nano_qos_db);
p->pipe->nano_qos_db = NULL;
}

p->pipe->nano_qos_db = old->nano_qos_db;

log_info("resuming session %d with %d", npipe->p_id, old->pipe->p_id);
nni_pipe_id_swap(npipe->p_id, old->pipe->p_id);
p->id = nni_pipe_id(npipe);
// set event to false so that no notification will be
Expand Down Expand Up @@ -760,6 +765,7 @@ nano_pipe_start(void *arg)
// TODO disconnect client && send connack with reason code 0x05
log_warn("Invalid auth info.");
}
// nni_mtx_unlock(&p->lk);
nni_mtx_unlock(&s->lk);

// TODO MQTT V5 check return code
Expand Down Expand Up @@ -788,6 +794,7 @@ nano_pipe_start(void *arg)
nni_aio_set_msg(&p->aio_recv, msg);
// connection rate is not fast enough in this way.
nni_aio_finish_sync(&p->aio_recv, 0, nni_msg_len(msg));
nni_atomic_set_bool(&p->closed, false);
return (rv);
}

Expand All @@ -798,25 +805,21 @@ close_pipe(nano_pipe *p)
nano_pipe *t = NULL;
nano_sock *s = p->broker;

nni_atomic_set_bool(&p->closed, true);
nni_mtx_lock(&s->lk);
if (nni_list_active(&s->recvpipes, p)) {
nni_msg *msg = nni_aio_get_msg(&p->aio_recv);
if (msg)
nni_msg_free(msg);
conn_param_free(p->conn_param);
nni_list_remove(&s->recvpipes, p);
}
nni_aio_close(&p->aio_send);
nni_aio_close(&p->aio_recv);
nni_aio_close(&p->aio_timer);
nni_mtx_lock(&p->lk);
p->closed = true;

nano_nni_lmq_flush(&p->rlmq, false);
nni_mtx_unlock(&p->lk);
// only remove matched pipe, could have been overwritten
t = nni_id_get(&s->pipes, nni_pipe_id(p->pipe));
if (t == p)
nni_id_remove(&s->pipes, nni_pipe_id(p->pipe));
nni_mtx_unlock(&s->lk);
nano_nni_lmq_flush(&p->rlmq, false);
}

static int
Expand All @@ -837,7 +840,7 @@ nano_pipe_close(void *arg)
return -1;
}

nni_mtx_lock(&s->lk);
nni_mtx_lock(&p->lk);
// we freed the conn_param when restoring pipe
// so check status of conn_param. just let it close silently
if (p->conn_param->clean_start == 0) {
Expand All @@ -850,8 +853,9 @@ nano_pipe_close(void *arg)
log_debug("keep session id [%s] ", p->conn_param->clientid.body);
nni_pipe_rele(new_pipe);
if (new_pipe == npipe) {
log_info("session stored %d", npipe->p_id);
nni_id_set(&s->cached_sessions, npipe->p_id, p);
nni_mtx_lock(&p->lk);
nni_mtx_lock(&s->lk);
// set event to false avoid of sending the
// disconnecting msg
p->event = false;
Expand All @@ -874,18 +878,27 @@ nano_pipe_close(void *arg)
}
}

// have to stop aio timer first, otherwise we hit null qos_db
nni_aio_stop(&p->aio_timer);
// have to close & stop aio timer first, otherwise we hit null qos_db
nni_aio_finish_error(&p->aio_timer, NNG_ECANCELED);
nni_aio_close(&p->aio_timer);
nni_aio_close(&p->aio_send);
nni_aio_close(&p->aio_recv);
// take params from npipe to new pipe
new_pipe->packet_id = npipe->packet_id;
// there should be no msg in this map
if (new_pipe->nano_qos_db != NULL)
nni_qos_db_fini_id_hash(new_pipe->nano_qos_db);
new_pipe->nano_qos_db = npipe->nano_qos_db;
npipe->nano_qos_db = NULL;

nni_list *l = new_pipe->subinfol;
new_pipe->subinfol = npipe->subinfol;
npipe->subinfol = l;
log_info("client kick itself while keeping session!");
} else {
nni_aio_close(&p->aio_send);
nni_aio_close(&p->aio_recv);
nni_aio_close(&p->aio_timer);
}
close_pipe(p);

Expand All @@ -896,7 +909,7 @@ nano_pipe_close(void *arg)
msg =
nano_msg_notify_disconnect(p->conn_param, p->reason_code);
if (msg == NULL) {
nni_mtx_unlock(&s->lk);
nni_mtx_unlock(&p->lk);
return 0;
}
nni_msg_set_conn_param(msg, p->conn_param);
Expand All @@ -905,12 +918,14 @@ nano_pipe_close(void *arg)
nni_msg_set_cmd_type(msg, CMD_DISCONNECT_EV);
nni_msg_set_pipe(msg, p->id);

nni_mtx_lock(&s->lk);
// expose disconnect event
if ((ctx = nni_list_first(&s->recvq)) != NULL) {
aio = ctx->raio;
ctx->raio = NULL;
nni_list_remove(&s->recvq, ctx);
nni_mtx_unlock(&s->lk);
nni_mtx_unlock(&p->lk);
nni_aio_set_msg(aio, msg);
nni_aio_finish(aio, 0, nni_msg_len(msg));
return 0;
Expand All @@ -927,8 +942,9 @@ nano_pipe_close(void *arg)
}
nni_lmq_put(&s->waitlmq, msg);
}
nni_mtx_unlock(&s->lk);
}
nni_mtx_unlock(&s->lk);
nni_mtx_unlock(&p->lk);
return 0;
}

Expand Down Expand Up @@ -1079,6 +1095,10 @@ nano_pipe_recv_cb(void *arg)
if (msg == NULL) {
goto end;
}
if (nni_atomic_get_bool(&p->closed)) {
nni_msg_free(msg);
return;
}

// ttl = nni_atomic_get(&s->ttl);
nni_msg_set_pipe(msg, p->id);
Expand Down Expand Up @@ -1195,7 +1215,7 @@ nano_pipe_recv_cb(void *arg)
goto drop;
}
nni_mtx_lock(&s->lk);
if (p->closed) {
if (nni_atomic_get_bool(&p->closed)) {
// If we are closed, then we can't return data.
// This drops DISCONNECT packet.
nni_aio_set_msg(&p->aio_recv, NULL);
Expand Down
10 changes: 4 additions & 6 deletions src/sp/transport/mqtt/broker_tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,7 @@ tcptran_pipe_init(void *arg, nni_pipe *npipe)
clientid_key = DJBHashn(cid, strlen(cid));
rv = nni_pipe_set_pid(npipe, clientid_key);
log_debug("change p_id by hashing %d rv %d", clientid_key, rv);
p->npipe = npipe;
if (!p->conf->sqlite.enable && npipe->nano_qos_db == NULL) {
nni_qos_db_init_id_hash(npipe->nano_qos_db);
}
p->npipe = npipe;
p->conn_buf = NULL;
p->busy = false;

Expand Down Expand Up @@ -609,7 +606,8 @@ nmq_tcptran_pipe_send_cb(void *arg)
}

msg = nni_aio_get_msg(aio);

if (p->closed)
goto exit;
if (nni_aio_get_prov_data(txaio) != NULL) {
// msgs left behind due to multiple topics matched
if (p->pro_ver == MQTT_PROTOCOL_VERSION_v311 ||
Expand All @@ -624,7 +622,7 @@ nmq_tcptran_pipe_send_cb(void *arg)
nni_mtx_unlock(&p->mtx);
return;
}

exit:
nni_aio_list_remove(aio);
tcptran_pipe_send_start(p);

Expand Down
Loading

0 comments on commit e585885

Please sign in to comment.