Skip to content

Commit

Permalink
* MDF [nng/core] move MQTT oriented API out from core part of nng in…
Browse files Browse the repository at this point in the history
… order to be compatible
  • Loading branch information
JaylinYu committed Aug 31, 2021
1 parent 802aaed commit 0e4fe9a
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 66 deletions.
14 changes: 9 additions & 5 deletions include/nng/protocol/mqtt/mqtt_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@
#include <packet.h>
#include <stdlib.h>

#define DISCONNECT_MSG \
#define DISCONNECT_MSG \
"{\"username\":\"%s\"," \
"\"ts\":%llu,\"reason_code\":\"%x\",\"client_id\":\"%s\"}"
#define CONNECT_MSG \
"{\"username\":\"%s\", " \
"\"ts\":%llu,\"proto_name\":\"%s\",\"keepalive\":%d,\"return_code\":\"%x\",\"proto_ver\":%d,\"client_id\":\"%s\", \"clean_start\":%d}"
#define CONNECT_MSG \
"{\"username\":\"%s\", " \
"\"ts\":%llu,\"proto_name\":\"%s\",\"keepalive\":%d,\"return_code\":" \
"\"%x\",\"proto_ver\":%d,\"client_id\":\"%s\", \"clean_start\":%d}"
#define DISCONNECT_TOPIC "$SYS/brokers/disconnected"
#define CONNECT_TOPIC "$SYS/brokers/connected"

Expand Down Expand Up @@ -52,6 +53,9 @@ NNG_DECL nng_msg *nano_msg_composer(
uint8_t retain, uint8_t qos, mqtt_string *payload, mqtt_string *topic);
NNG_DECL nng_msg *nano_msg_notify_disconnect(conn_param *cparam, uint8_t code);
NNG_DECL nng_msg *nano_msg_notify_connect(conn_param *cparam, uint8_t code);
NNG_DECL nano_pipe_db * nano_msg_get_subtopic(nng_msg *msg, nano_pipe_db *root, conn_param *cparam);
NNG_DECL nano_pipe_db *nano_msg_get_subtopic(
nng_msg *msg, nano_pipe_db *root, conn_param *cparam);
NNG_DECL void nano_msg_free_pipedb(nano_pipe_db *db);
NNG_DECL void nano_msg_ubsub_free(nano_pipe_db *db);

#endif // NNG_MQTT_H
49 changes: 0 additions & 49 deletions src/core/message.c
Original file line number Diff line number Diff line change
Expand Up @@ -735,52 +735,3 @@ nni_msg_get_timestamp(nni_msg *m)
{
return m->times;
}

void
nano_msg_free_pipedb(nano_pipe_db *db)
{
uint8_t len;
nano_pipe_db *db_next;

if (NULL == db) {
return;
}
db = db->root;

while (db) {
len = strlen(db->topic);
nng_free(db->topic, len);
db_next = db->next;
nng_free(db, sizeof(nano_pipe_db));
db = db_next;
}
return;
}

void
nano_msg_ubsub_free(nano_pipe_db *db)
{
nano_pipe_db *ptr, *tmp;
uint8_t len;

if (NULL == db) {
return;
}
if (db == db->root) {
ptr = db;
tmp = db->next;
while (ptr) {
ptr->root = tmp;
ptr = ptr->next;
}
} else {
tmp = db->prev;
tmp->next = db->next;
db->next->prev = tmp;
}

len = strlen(db->topic);
nng_free(db->topic, len);
nng_free(db, sizeof(nano_pipe_db));
return;
}
2 changes: 0 additions & 2 deletions src/core/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,6 @@ extern void nni_msg_set_payload_ptr(nni_msg *m, uint8_t *ptr);
extern void nni_msg_set_remaining_len(nni_msg *m, size_t len);
extern void nni_msg_set_cmd_type(nni_msg *m, uint8_t cmd);
extern void nni_msg_set_conn_param(nni_msg *m, void *ptr);
extern void nano_msg_free_pipedb(nano_pipe_db *db);
extern void nano_msg_ubsub_free(nano_pipe_db *db);
extern uint8_t nni_msg_get_preset_qos(nni_msg *m);
extern uint16_t nni_msg_get_pub_pid(nni_msg *m);

Expand Down
53 changes: 51 additions & 2 deletions src/sp/protocol/mqtt/mqtt_parser.c
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ ws_fixed_header_adaptor(uint8_t *packet, nng_msg *dst)
nni_msg_set_cmd_type(m, *packet & 0xf0);
nni_msg_set_remaining_len(m, len);
rv = nni_msg_header_append(m, packet, pos);
printf("len !!!! 2 %d\n", len);

if (len > 0) {
nni_msg_append(m, packet + pos, len);
}
Expand Down Expand Up @@ -945,4 +945,53 @@ nano_msg_get_subtopic(nni_msg *msg, nano_pipe_db *root, conn_param *cparam)
}

return root;
}
}

void
nano_msg_free_pipedb(nano_pipe_db *db)
{
uint8_t len;
nano_pipe_db *db_next;

if (NULL == db) {
return;
}
db = db->root;

while (db) {
len = strlen(db->topic);
nng_free(db->topic, len);
db_next = db->next;
nng_free(db, sizeof(nano_pipe_db));
db = db_next;
}
return;
}

void
nano_msg_ubsub_free(nano_pipe_db *db)
{
nano_pipe_db *ptr, *tmp;
uint8_t len;

if (NULL == db) {
return;
}
if (db == db->root) {
ptr = db;
tmp = db->next;
while (ptr) {
ptr->root = tmp;
ptr = ptr->next;
}
} else {
tmp = db->prev;
tmp->next = db->next;
db->next->prev = tmp;
}

len = strlen(db->topic);
nng_free(db->topic, len);
nng_free(db, sizeof(nano_pipe_db));
return;
}
19 changes: 11 additions & 8 deletions src/sp/transport/ws/websocket.c
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,12 @@ wstran_pipe_recv_cb(void *arg)
ws_pipe *p = arg;
uint32_t len = 0, rv, pos = 1;
uint8_t *ptr;
nni_msg *smsg, *msg;
nni_msg *smsg, *msg;
nni_aio *raio = p->rxaio;
nni_aio *uaio = NULL;

nni_mtx_lock(&p->mtx);
//only sets uaio at first time
// only sets uaio at first time
if (p->user_rxaio != NULL) {
uaio = p->user_rxaio;
}
Expand All @@ -117,7 +117,7 @@ wstran_pipe_recv_cb(void *arg)
goto reset;
}
}
//TODO use IOV instead of appending msg
// TODO use IOV instead of appending msg
nni_msg_append(p->tmp_msg, ptr, nni_msg_len(msg));
ptr = nni_msg_body(p->tmp_msg); // packet might be sticky?

Expand All @@ -126,7 +126,8 @@ wstran_pipe_recv_cb(void *arg)
goto recv;
}
len = get_var_integer(ptr, &pos);
if (*(ptr + pos - 1) > 0x7f) { // continue to next byte of remaining length
if (*(ptr + pos - 1) >
0x7f) { // continue to next byte of remaining length
if (p->gotrxhead >= NNI_NANO_MAX_HEADER_SIZE) {
// length error
rv = NNG_EMSGSIZE;
Expand All @@ -151,16 +152,18 @@ wstran_pipe_recv_cb(void *arg)
if (uaio == NULL) {
uaio = p->ep_aio;
}
if (uaio != NULL) {
if (uaio != NULL) {
p->gotrxhead = 0;
p->wantrxhead = 0;
nni_msg_free(msg);
if (nni_msg_cmd_type(p->tmp_msg) == CMD_CONNECT) {
// end of nego
if (p->ws_param == NULL) {
p->ws_param = nng_alloc(sizeof(struct conn_param));
p->ws_param =
nng_alloc(sizeof(struct conn_param));
}
if (conn_handler(nni_msg_body(p->tmp_msg), p->ws_param) != 0) {
if (conn_handler(
nni_msg_body(p->tmp_msg), p->ws_param) != 0) {
goto reset;
}
nni_msg_free(p->tmp_msg);
Expand All @@ -169,7 +172,7 @@ wstran_pipe_recv_cb(void *arg)
nni_aio_set_output(uaio, 0, p);
nni_aio_finish(uaio, 0, 0);
nni_mtx_unlock(&p->mtx);
return;
return;
} else {
if (nni_msg_alloc(&smsg, 0) != 0) {
goto reset;
Expand Down

0 comments on commit 0e4fe9a

Please sign in to comment.