Skip to content

Commit

Permalink
* MDF [mqtt/protocol] fix #10 & add static inline func
Browse files Browse the repository at this point in the history
  • Loading branch information
JaylinYu committed Feb 2, 2022
1 parent 89dfb38 commit b8a7fbc
Showing 1 changed file with 118 additions and 86 deletions.
204 changes: 118 additions & 86 deletions src/mqtt/protocol/mqtt/mqtt_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ struct mqtt_sock_s {
mqtt_ctx_t master; // to which we delegate send/recv calls
mqtt_pipe_t * mqtt_pipe;
nni_list recv_queue; // ctx pending to receive
// nni_list send_queue; // ctx pending to send
nni_list send_queue; // ctx pending to send
};

/******************************************************************************
Expand Down Expand Up @@ -111,7 +111,7 @@ mqtt_sock_init(void *arg, nni_sock *sock)

s->mqtt_pipe = NULL;
NNI_LIST_INIT(&s->recv_queue, mqtt_ctx_t, rqnode);
// NNI_LIST_INIT(&s->send_queue, mqtt_ctx_t, sqnode);
NNI_LIST_INIT(&s->send_queue, mqtt_ctx_t, sqnode);
}

static void
Expand Down Expand Up @@ -154,6 +154,17 @@ mqtt_sock_recv(void *arg, nni_aio *aio)
* Pipe Implementation *
******************************************************************************/

static uint16_t
mqtt_pipe_get_next_packet_id(mqtt_pipe_t *p)
{
int packet_id;
do {
packet_id = nni_atomic_get(&p->next_packet_id);
} while (
!nni_atomic_cas(&p->next_packet_id, packet_id, packet_id + 1));
return packet_id & 0xFFFF;
}

static int
mqtt_pipe_init(void *arg, nni_pipe *pipe, void *s)
{
Expand Down Expand Up @@ -203,6 +214,81 @@ mqtt_pipe_fini(void *arg)
nni_lmq_fini(&p->send_messages);
}


static inline void
mqtt_send_msg(nni_aio *aio, mqtt_ctx_t *arg)
{
mqtt_ctx_t * ctx = arg;
mqtt_sock_t *s = ctx->mqtt_sock;
mqtt_pipe_t *p = s->mqtt_pipe;
uint16_t ptype, packet_id;
uint8_t qos;
nni_msg * msg;
nni_msg * tmsg;

msg = nni_aio_get_msg(aio);
ptype = nni_mqtt_msg_get_packet_type(msg);
switch (ptype) {
case NNG_MQTT_CONNECT:
case NNG_MQTT_PINGREQ:
break;

case NNG_MQTT_PUBLISH:
qos = nni_mqtt_msg_get_publish_qos(msg);
if (0 == qos) {
nni_aio_finish(aio, 0, 0);
break; // QoS 0 need no packet id
}
case NNG_MQTT_SUBSCRIBE:
case NNG_MQTT_UNSUBSCRIBE:
packet_id = mqtt_pipe_get_next_packet_id(p);
nni_mqtt_msg_set_packet_id(msg, packet_id);
nni_mqtt_msg_set_aio(msg, aio);
tmsg = nni_id_get(&p->sent_unack, packet_id);
if (tmsg != NULL) {
nni_plat_printf("Warning : msg %d lost due to "
"packetID duplicated!",
packet_id);
nni_aio_finish_error(
nni_mqtt_msg_get_aio(tmsg), NNG_EPROTO);
nni_msg_free(tmsg);
nni_id_remove(&p->sent_unack, packet_id);
}
nni_msg_clone(msg);
if (nni_id_set(&p->sent_unack, packet_id, msg) != 0) {
// nni_println("Warning! QoS msg caching failed");
nni_msg_free(msg);
}
break;

default:
nni_mtx_unlock(&s->mtx);
nni_aio_finish_error(aio, NNG_EPROTO);
return;
}
if (!p->busy) {
p->busy = true;
nni_mqtt_msg_encode(msg);
nni_aio_set_msg(&p->send_aio, msg);
nni_aio_bump_count(aio,
nni_msg_header_len(msg) + nni_msg_len(msg));
nni_pipe_send(p->pipe, &p->send_aio);
nni_mtx_unlock(&s->mtx);
nni_aio_set_msg(aio, NULL);
return;
}
if (nni_lmq_full(&p->send_messages)) {
(void) nni_lmq_get(&p->send_messages, &tmsg);
nni_msg_free(tmsg);
}

if (0 != nni_lmq_put(&p->send_messages, msg)) {
// nni_println("Warning! msg lost due to busy socket");
}
nni_mtx_unlock(&s->mtx);
return;
}

static int
mqtt_pipe_start(void *arg)
{
Expand All @@ -212,13 +298,17 @@ mqtt_pipe_start(void *arg)

nni_mtx_lock(&s->mtx);
s->mqtt_pipe = p;
if ((c = nni_list_first(&s->send_queue)) != NULL) {
nni_list_remove(&s->send_queue, c);
mqtt_send_msg(c->saio, c);
nni_sleep_aio(s->retry, &p->time_aio);
nni_pipe_recv(p->pipe, &p->recv_aio);
return;
}
nni_mtx_unlock(&s->mtx);
//initiate the resend timer
nni_sleep_aio(s->retry, &p->time_aio);
nni_pipe_recv(p->pipe, &p->recv_aio);
// if ((c = nni_list_first(&s->send_queue)) != NULL) {
// mqtt_ctx_send(c, c->saio);
// }
return (0);
}

Expand Down Expand Up @@ -260,22 +350,12 @@ mqtt_pipe_close(void *arg)
nni_lmq_flush(&p->send_messages);
nni_id_map_foreach(&p->sent_unack, mqtt_close_unack_msg_cb);
nni_id_map_foreach(&p->recv_unack, mqtt_close_unack_msg_cb);
//clean ctx queue when pipe was closed.
nni_mtx_unlock(&s->mtx);

nni_atomic_set_bool(&p->closed, true);
}

static uint16_t
mqtt_pipe_get_next_packet_id(mqtt_pipe_t *p)
{
int packet_id;
do {
packet_id = nni_atomic_get(&p->next_packet_id);
} while (
!nni_atomic_cas(&p->next_packet_id, packet_id, packet_id + 1));
return packet_id & 0xFFFF;
}

static inline void
mqtt_pipe_recv_msgq_putq(mqtt_pipe_t *p, nni_msg *msg)
{
Expand Down Expand Up @@ -348,6 +428,7 @@ mqtt_send_cb(void *arg)
{
mqtt_pipe_t *p = arg;
mqtt_sock_t *s = p->mqtt_sock;
mqtt_ctx_t *c = NULL;
nni_msg *msg;

if (nni_aio_result(&p->send_aio) != 0) {
Expand All @@ -360,21 +441,28 @@ mqtt_send_cb(void *arg)
}
nni_mtx_lock(&s->mtx);

p->busy = false;
if (nni_atomic_get_bool(&s->closed) ||
nni_atomic_get_bool(&p->closed)) {
// This occurs if the mqtt_pipe_close has been called.
// In that case we don't want any more processing.
nni_mtx_unlock(&s->mtx);
return;
}
//check cached ctx first
if ((c = nni_list_first(&s->send_queue)) != NULL) {
nni_list_remove(&s->send_queue, c);
mqtt_send_msg(c->saio, c);
return;
}
if (nni_lmq_get(&p->send_messages, &msg) == 0) {
p->busy = true;
nni_mqtt_msg_encode(msg);
nni_aio_set_msg(&p->send_aio, msg);
nni_pipe_send(p->pipe, &p->send_aio);
nni_mtx_unlock(&s->mtx);
return;
}

p->busy = false;
nni_mtx_unlock(&s->mtx);
return;
Expand Down Expand Up @@ -477,7 +565,7 @@ mqtt_recv_cb(void *arg)
ctx->raio = NULL;
nni_aio_set_msg(user_aio, cached_msg);
nni_mtx_unlock(&s->mtx);
nni_aio_finish(user_aio, 0, nni_msg_len(cached_msg));
nni_aio_finish(user_aio, 0, 0);
return;

case NNG_MQTT_PUBLISH:
Expand All @@ -499,7 +587,7 @@ mqtt_recv_cb(void *arg)
ctx->raio = NULL;
nni_aio_set_msg(user_aio, msg);
nni_mtx_unlock(&s->mtx);
nni_aio_finish(user_aio, 0, nni_msg_len(msg));
nni_aio_finish(user_aio, 0, 0);
return;
} else {
packet_id = nni_mqtt_msg_get_publish_packet_id(msg);
Expand Down Expand Up @@ -547,7 +635,7 @@ mqtt_ctx_fini(void *arg)
nni_mtx_lock(&s->mtx);
if ((aio = ctx->saio) != NULL) {
ctx->saio = NULL;
// nni_list_remove(&s->send_queue, ctx);
nni_list_remove(&s->send_queue, ctx);
nni_aio_finish_error(aio, NNG_ECLOSED);
}
if ((aio = ctx->raio) != NULL) {
Expand Down Expand Up @@ -581,79 +669,22 @@ mqtt_ctx_send(void *arg, nni_aio *aio)
return;
}

if (p == NULL) {
// connection is not established yet
// cache ctx for next or just finish_error?
nni_mtx_unlock(&s->mtx);
nni_aio_set_msg(aio, NULL);
nni_aio_finish(aio, 0, nni_msg_len(msg));
nni_msg_free(msg);
// ctx->saio = aio;
// ctx->raio = NULL;
// nni_list_append(&s->send_queue, ctx);
// nni_mtx_unlock(&s->mtx);
return;
}
msg = nni_aio_get_msg(aio);
ptype = nni_mqtt_msg_get_packet_type(msg);
switch (ptype) {
case NNG_MQTT_CONNECT:
case NNG_MQTT_PINGREQ:
break;

case NNG_MQTT_PUBLISH:
qos = nni_mqtt_msg_get_publish_qos(msg);
if (0 == qos) {
nni_aio_finish(aio, 0, 0);
break; // QoS 0 need no packet id
}
case NNG_MQTT_SUBSCRIBE:
case NNG_MQTT_UNSUBSCRIBE:
packet_id = mqtt_pipe_get_next_packet_id(p);
nni_mqtt_msg_set_packet_id(msg, packet_id);
nni_mqtt_msg_set_aio(msg, aio);
tmsg = nni_id_get(&p->sent_unack, packet_id);
if (tmsg != NULL) {
nni_plat_printf("Warning : msg %d lost due to "
"packetID duplicated!",
packet_id);
nni_aio_finish_error(
nni_mqtt_msg_get_aio(tmsg), NNG_EPROTO);
nni_msg_free(tmsg);
nni_id_remove(&p->sent_unack, packet_id);
}
nni_msg_clone(msg);
if (nni_id_set(&p->sent_unack, packet_id, msg) != 0) {
// nni_println("Warning! QoS msg caching failed");
nni_msg_free(msg);
}
break;

default:
if (msg == NULL) {
nni_mtx_unlock(&s->mtx);
nni_aio_set_msg(aio, NULL);
nni_aio_finish_error(aio, NNG_EPROTO);
return;
}
if (!p->busy) {
p->busy = true;
nni_mqtt_msg_encode(msg);
nni_aio_set_msg(&p->send_aio, msg);
nni_aio_bump_count(aio,
nni_msg_header_len(msg) + nni_msg_len(msg));
nni_pipe_send(p->pipe, &p->send_aio);
if (p == NULL) {
// connection is not established yet
// cache ctx
ctx->saio = aio;
ctx->raio = NULL;
nni_list_append(&s->send_queue, ctx);
nni_mtx_unlock(&s->mtx);
nni_aio_set_msg(aio, NULL);
return;
}
if (nni_lmq_full(&p->send_messages)) {
(void) nni_lmq_get(&p->send_messages, &tmsg);
nni_msg_free(tmsg);
}

if (0 != nni_lmq_put(&p->send_messages, msg)) {
// nni_println("Warning! msg lost due to busy socket");
}
nni_mtx_unlock(&s->mtx);
mqtt_send_msg(aio, ctx);
nni_aio_set_msg(aio, NULL);
return;
}
Expand Down Expand Up @@ -697,6 +728,7 @@ mqtt_ctx_recv(void *arg, nni_aio *aio)
return;
}
ctx->raio = aio;
ctx->saio = NULL;
nni_list_append(&s->recv_queue, ctx);
nni_mtx_unlock(&s->mtx);
return;
Expand Down

0 comments on commit b8a7fbc

Please sign in to comment.