From dd71b09ef7cf276140e88923fb17c60576fa4625 Mon Sep 17 00:00:00 2001 From: Jaylin Date: Tue, 1 Feb 2022 14:27:18 +0800 Subject: [PATCH] * FIX [mqtt/protocol] fix #6 & remove commented code. --- src/mqtt/protocol/mqtt/mqtt_client.c | 115 +++++++-------------------- 1 file changed, 27 insertions(+), 88 deletions(-) diff --git a/src/mqtt/protocol/mqtt/mqtt_client.c b/src/mqtt/protocol/mqtt/mqtt_client.c index 940e7229..a139ce8d 100644 --- a/src/mqtt/protocol/mqtt/mqtt_client.c +++ b/src/mqtt/protocol/mqtt/mqtt_client.c @@ -33,7 +33,6 @@ static void mqtt_sock_recv(void *arg, nni_aio *aio); static void mqtt_send_cb(void *arg); static void mqtt_recv_cb(void *arg); static void mqtt_timer_cb(void *arg); -static void mqtt_run_recv_queue(mqtt_sock_t *s); static int mqtt_pipe_init(void *arg, nni_pipe *pipe, void *s); static void mqtt_pipe_fini(void *arg); @@ -83,7 +82,7 @@ struct mqtt_sock_s { mqtt_ctx_t master; // to which we delegate send/recv calls mqtt_pipe_t * mqtt_pipe; nni_list recv_queue; // ctx pending to receive - nni_list send_queue; // ctx pending to send + // nni_list send_queue; // ctx pending to send }; /****************************************************************************** @@ -111,25 +110,13 @@ mqtt_sock_init(void *arg, nni_sock *sock) s->mqtt_pipe = NULL; NNI_LIST_INIT(&s->recv_queue, mqtt_ctx_t, rqnode); - NNI_LIST_INIT(&s->send_queue, mqtt_ctx_t, sqnode); + // NNI_LIST_INIT(&s->send_queue, mqtt_ctx_t, sqnode); } static void mqtt_sock_fini(void *arg) { mqtt_sock_t *s = arg; - // work_t * work; - - // nni_mtx_lock(&s->mtx); - // NNI_ASSERT(nni_list_empty(&s->recv_queue)); - - // while (NULL != (work = nni_list_first(&s->free_list))) { - // nni_list_remove(&s->free_list, work); - // work_fini(work); - // nni_free(work, sizeof(work_t)); - // } - // nni_mtx_unlock(&s->mtx); - mqtt_ctx_fini(&s->master); nni_mtx_fini(&s->mtx); } @@ -162,16 +149,6 @@ mqtt_sock_recv(void *arg, nni_aio *aio) mqtt_ctx_recv(&s->master, aio); } -// Note: This routine should be called with the sock lock held. -static inline void -mqtt_sock_close_work_queue(mqtt_sock_t *s, nni_list *queue) -{ - nni_msg * msg; - while (NULL != (msg = nni_list_first(queue))) { - nni_msg_free(msg); - } -} - /****************************************************************************** * Pipe Implementation * ******************************************************************************/ @@ -195,8 +172,8 @@ mqtt_pipe_init(void *arg, nni_pipe *pipe, void *s) // accidental collision across restarts. nni_id_map_init(&p->sent_unack, 0x0000u, 0xffffu, true); nni_id_map_init(&p->recv_unack, 0x0000u, 0xffffu, true); - nni_lmq_init(&p->recv_messages, 1024); // remove hard code value - nni_lmq_init(&p->send_messages, 1024); // remove hard code value + nni_lmq_init(&p->recv_messages, 6); // remove hard code value + nni_lmq_init(&p->send_messages, 6); // remove hard code value return (0); } @@ -234,14 +211,13 @@ mqtt_pipe_start(void *arg) nni_mtx_lock(&s->mtx); s->mqtt_pipe = p; - // mqtt_send_start(s); nni_mtx_unlock(&s->mtx); //initiate the resend timer nni_sleep_aio(s->retry, &p->time_aio); nni_pipe_recv(p->pipe, &p->recv_aio); - if ((c = nni_list_first(&s->send_queue)) != NULL) { - mqtt_ctx_send(c, c->saio); - } + // if ((c = nni_list_first(&s->send_queue)) != NULL) { + // mqtt_ctx_send(c, c->saio); + // } return (0); } @@ -258,9 +234,14 @@ void mqtt_close_unack_msg_cb(void *arg) { nni_msg * msg = arg; + nni_aio * aio = NULL; + + aio = nni_mqtt_msg_get_aio(msg); + if (aio) { + nni_aio_finish_error(aio, NNG_ECLOSED); + } nni_msg_free(msg); - //TODO trigger aio inside msg? - // mqtt_sock_close_work(work->mqtt_sock, work); + } static void @@ -274,7 +255,8 @@ mqtt_pipe_close(void *arg) nni_aio_close(&p->send_aio); nni_aio_close(&p->recv_aio); nni_aio_close(&p->time_aio); - // mqtt_sock_close_work_queue(s, &s->recv_queue); + nni_lmq_flush(&p->recv_messages); + nni_lmq_flush(&p->send_messages); //TODO free msg for each map nni_id_map_foreach(&p->sent_unack, mqtt_close_unack_msg_cb); nni_id_map_foreach(&p->recv_unack, mqtt_close_unack_msg_cb); @@ -464,7 +446,8 @@ mqtt_recv_cb(void *arg) break; case NNG_MQTT_PINGRESP: - // do nothing + // free msg + nni_msg_free(msg); nni_mtx_unlock(&s->mtx); return; @@ -587,21 +570,6 @@ mqtt_recv_cb(void *arg) return; } - // if (work_is_error(work)) { - // // protocol error, just close the connection - // nni_mtx_unlock(&s->mtx); - // nni_aio_finish_error(work->user_aio, NNG_EPROTO); - // nni_pipe_close(p->pipe); - // return; - // } else if (work_is_final(work)) { - // // good news, protocol state machine run to the end - // nni_aio *aio = work->user_aio; - // mqtt_sock_free_work(s, work); - // nni_mtx_unlock(&s->mtx); - // nni_aio_finish(aio, 0, 0); - // return; - // } - nni_mtx_unlock(&s->mtx); if (user_aio) { nni_aio_finish(user_aio, 0, 0); @@ -610,30 +578,6 @@ mqtt_recv_cb(void *arg) return; } -// Note: This routine should be called with the sock lock held. -// static void -// mqtt_run_recv_queue(mqtt_sock_t *s) -// { -// work_t * work = nni_list_first(&s->recv_queue); -// mqtt_pipe_t *p = s->mqtt_pipe; -// nni_msg * msg; - -// while (NULL != work) { -// if (0 != nni_lmq_get(&p->recv_messages, &msg)) { -// break; -// } -// nni_list_remove(&s->recv_queue, work); -// // nni_pipe_recv(p->pipe, &work->recv_aio); -// nni_aio_set_msg(work->user_aio, msg); -// nni_aio_finish(work->user_aio, 0, -// nni_msg_header_len(msg) + nni_msg_len(msg)); -// mqtt_sock_free_work(s, work); -// work = nni_list_first(&s->recv_queue); -// } - -// return; -// } - /****************************************************************************** * Context Implementation * ******************************************************************************/ @@ -659,7 +603,7 @@ mqtt_ctx_fini(void *arg) nni_mtx_lock(&s->mtx); if ((aio = ctx->saio) != NULL) { ctx->saio = NULL; - nni_list_remove(&s->send_queue, ctx); + // nni_list_remove(&s->send_queue, ctx); nni_aio_finish_error(aio, NNG_ECLOSED); } if ((aio = ctx->raio) != NULL) { @@ -695,11 +639,15 @@ mqtt_ctx_send(void *arg, nni_aio *aio) if (p == NULL) { // connection is not established yet - // cache ctx for next - ctx->saio = aio; - ctx->raio = NULL; - nni_list_append(&s->send_queue, ctx); + // cache ctx for next or just finish_error? nni_mtx_unlock(&s->mtx); + nni_aio_set_msg(aio, NULL); + nni_aio_finish(aio, 0, nni_msg_len(msg)); + nni_msg_free(msg); + // ctx->saio = aio; + // ctx->raio = NULL; + // nni_list_append(&s->send_queue, ctx); + // nni_mtx_unlock(&s->mtx); return; } msg = nni_aio_get_msg(aio); @@ -808,15 +756,6 @@ mqtt_ctx_recv(void *arg, nni_aio *aio) nni_list_append(&s->recv_queue, ctx); nni_mtx_unlock(&s->mtx); return; - // msg = nni_aio_get_msg(&p->recv_aio); - // if (msg == NULL) { - // nni_mtx_unlock(&s->mtx); - // return; - // } - // nni_aio_set_msg(&p->recv_aio, NULL); - // nni_mtx_unlock(&s->mtx); - // nni_aio_set_msg(aio, msg); - // nni_aio_finish(aio, 0, nni_msg_len(msg)); } /******************************************************************************