Skip to content

Commit

Permalink
* MDF [mqtt] switch copyright text & set a max limit for lmq.
Browse files Browse the repository at this point in the history
  • Loading branch information
JaylinYu committed Feb 2, 2022
1 parent a104d42 commit 89dfb38
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 68 deletions.
64 changes: 4 additions & 60 deletions src/mqtt/protocol/mqtt/mqtt_client.c
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// Author: eeff <eeff at eeff dot dev>
//
// Copyright 2020 NanoMQ Team, Inc. <jaylin@emqx.io>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
Expand Down Expand Up @@ -172,8 +173,8 @@ mqtt_pipe_init(void *arg, nni_pipe *pipe, void *s)
// accidental collision across restarts.
nni_id_map_init(&p->sent_unack, 0x0000u, 0xffffu, true);
nni_id_map_init(&p->recv_unack, 0x0000u, 0xffffu, true);
nni_lmq_init(&p->recv_messages, 6); // remove hard code value
nni_lmq_init(&p->send_messages, 6); // remove hard code value
nni_lmq_init(&p->recv_messages, 10); // remove hard code value
nni_lmq_init(&p->send_messages, 10); // remove hard code value

return (0);
}
Expand Down Expand Up @@ -257,7 +258,6 @@ mqtt_pipe_close(void *arg)
nni_aio_close(&p->time_aio);
nni_lmq_flush(&p->recv_messages);
nni_lmq_flush(&p->send_messages);
//TODO free msg for each map
nni_id_map_foreach(&p->sent_unack, mqtt_close_unack_msg_cb);
nni_id_map_foreach(&p->recv_unack, mqtt_close_unack_msg_cb);
nni_mtx_unlock(&s->mtx);
Expand Down Expand Up @@ -452,51 +452,10 @@ mqtt_recv_cb(void *arg)
return;

case NNG_MQTT_PUBREC:
// // we have received a PUBREC in the QoS 2 delivery,
// // then send a PUBREL
// packet_id = nni_mqtt_msg_get_pubrec_packet_id(msg);
// nni_msg_free(msg);
// work = nni_id_get(&p->sent_unack, packet_id);
// if (NULL == work) {
// // ignore this message
// nni_mtx_unlock(&s->mtx);
// return;
// }
// // the transport handled sending the PUBREL for us,
// // expect to receive a PUBCOMP
// if (work_packet_type(work) == packet_type) {
// work_set_recv(work, NNG_MQTT_PUBCOMP);
// work_timer_cancel(work);
// } else if (work_packet_type(work) == NNG_MQTT_PUBLISH &&
// 2 == work->qos) {
// // scheduling disorder
// work_set_acked(work);
// } else {
// work_set_error(work);
// work_timer_cancel(work);
// }
nni_msg_free(msg);
break;

case NNG_MQTT_PUBREL:
// we have received a PUBREL, then send a PUBCOMP
// packet_id = nni_mqtt_msg_get_pubrel_packet_id(msg);
// cached_msg = nni_id_get(&p->recv_unack, packet_id);
// if (NULL == work) {
// // ignore this message
// nni_msg_free(msg);
// nni_mtx_unlock(&s->mtx);
// return;
// }
// // the transport handled sending the PUBCOMP for us
// work_set_final(work);
// nni_id_remove(&p->recv_unack, work->packet_id);
// ownership of work->msg to the lmq
// mqtt_pipe_recv_msgq_putq(p, work->msg);
// mqtt_run_recv_queue(s);
// work->msg = msg;
// mqtt_sock_free_work(s, work); // will release msg
// nni_mtx_unlock(&s->mtx);
packet_id = nni_mqtt_msg_get_pubrel_packet_id(msg);
cached_msg = nni_id_get(&p->recv_unack, packet_id);
nni_msg_free(msg);
Expand Down Expand Up @@ -527,8 +486,6 @@ mqtt_recv_cb(void *arg)
if (2 > qos) {
// QoS 0, successful receipt
// QoS 1, the transport handled sending a PUBACK
// mqtt_pipe_recv_msgq_putq(p, msg);
// mqtt_run_recv_queue(s);
if ((ctx = nni_list_first(&s->recv_queue)) == NULL) {
// No one waiting to receive yet, putting msg
// into lmq
Expand All @@ -545,19 +502,6 @@ mqtt_recv_cb(void *arg)
nni_aio_finish(user_aio, 0, nni_msg_len(msg));
return;
} else {
// work = mqtt_sock_get_work(s);
// if (work == NULL) {
// nni_mtx_unlock(&s->mtx);
// nni_pipe_close(p->pipe);
// return;
// }
// work->qos = qos;
// work->msg = msg; // keep the message
// work->packet_id =
// nni_mqtt_msg_get_publish_packet_id(msg);
// // the transport handled sending PUBREC,
// // expect to receive a PUBREL
// work_set_recv(work, NNG_MQTT_PUBREL);
packet_id = nni_mqtt_msg_get_publish_packet_id(msg);
nni_id_set(&p->recv_unack, packet_id, msg);
}
Expand Down
24 changes: 16 additions & 8 deletions src/mqtt/transport/tcp/mqtt_tcp.c
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
//
// Copyright 2021 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
// Copyright 2019 Devolutions <info@devolutions.net>
// Copyright 2020 NanoMQ Team, Inc. <jaylin@emqx.io>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
Expand All @@ -24,6 +22,7 @@ typedef struct mqtt_tcptran_pipe mqtt_tcptran_pipe;
typedef struct mqtt_tcptran_ep mqtt_tcptran_ep;

#define NNI_NANO_MAX_HEADER_SIZE 5
#define NNI_NANO_MAX_LMQ_SIZE 128

// tcp_pipe is one end of a TCP connection.
struct mqtt_tcptran_pipe {
Expand Down Expand Up @@ -162,6 +161,7 @@ mqtt_tcptran_pipe_close(void *arg)

nni_mtx_lock(&p->mtx);
p->closed = true;
nni_lmq_flush(&p->rslmq);
nni_mtx_unlock(&p->mtx);

nni_aio_close(p->rxaio);
Expand All @@ -170,7 +170,6 @@ mqtt_tcptran_pipe_close(void *arg)
nni_aio_close(p->txaio);
nni_aio_close(p->negoaio);
nni_aio_close(&p->tmaio);

nng_stream_close(p->conn);
}

Expand All @@ -193,7 +192,7 @@ mqtt_tcptran_pipe_init(void *arg, nni_pipe *npipe)
mqtt_tcptran_pipe *p = arg;
p->npipe = npipe;

nni_lmq_init(&p->rslmq, 1024); // FIXME: remove hard code value
nni_lmq_init(&p->rslmq, 16);
nni_aio_init(&p->tmaio, mqtt_pipe_timer_cb, p);
p->busy = false;
nni_sleep_aio(p->keepalive, &p->tmaio);
Expand Down Expand Up @@ -639,14 +638,23 @@ mqtt_tcptran_pipe_recv_cb(void *arg)
if (nni_lmq_full(&p->rslmq)) {
// Make space for the new message. TODO add max
// limit of msgq len in conf
if ((rv = nni_lmq_resize(&p->rslmq,
nni_lmq_cap(&p->rslmq) * 2)) != 0) {
if (nni_lmq_cap(&p->rslmq) <=
NNI_NANO_MAX_LMQ_SIZE) {
if ((rv = nni_lmq_resize(&p->rslmq,
nni_lmq_cap(&p->rslmq) *
2)) == 0) {
nni_lmq_put(&p->rslmq, qmsg);
} else {
//memory error.
nni_msg_free(qmsg);
}
} else {
nni_msg *old;
(void) nni_lmq_get(&p->rslmq, &old);
nni_msg_free(old);
nni_lmq_put(&p->rslmq, qmsg);
}
}
nni_lmq_put(&p->rslmq, qmsg);
}
ack = false;
}
Expand Down

0 comments on commit 89dfb38

Please sign in to comment.