Skip to content

Commit

Permalink
Merge pull request #2 from nanomq/alvin/develop
Browse files Browse the repository at this point in the history
Resolved compile warning
  • Loading branch information
alvin1221 authored Sep 2, 2021
2 parents 0e4fe9a + 0452cf0 commit a262313
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 42 deletions.
11 changes: 7 additions & 4 deletions include/nng/protocol/mqtt/mqtt_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@

#define DISCONNECT_MSG \
"{\"username\":\"%s\"," \
"\"ts\":%llu,\"reason_code\":\"%x\",\"client_id\":\"%s\"}"
#define CONNECT_MSG \
"{\"username\":\"%s\", " \
"\"ts\":%llu,\"proto_name\":\"%s\",\"keepalive\":%d,\"return_code\":" \
"\"ts\":%lu,\"reason_code\":\"%x\",\"client_id\":\"%s\"}"

#define CONNECT_MSG \
"{\"username\":\"%s\", " \
"\"ts\":%lu,\"proto_name\":\"%s\",\"keepalive\":%d,\"return_code\":" \
"\"%x\",\"proto_ver\":%d,\"client_id\":\"%s\", \"clean_start\":%d}"

#define DISCONNECT_TOPIC "$SYS/brokers/disconnected"

#define CONNECT_TOPIC "$SYS/brokers/connected"

// Variables & Structs
Expand Down
23 changes: 11 additions & 12 deletions src/sp/protocol/mqtt/mqtt_parser.c
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,6 @@ ws_fixed_header_adaptor(uint8_t *packet, nng_msg *dst)
nni_msg *m;
int rv;
uint32_t len;
uint8_t *ptr;
size_t pos = 1;

m = (nni_msg *) dst;
Expand Down Expand Up @@ -344,17 +343,17 @@ int32_t
conn_handler(uint8_t *packet, conn_param *cparam)
{

uint32_t len, tmp, pos = 0, len_of_properties = 0;
int len_of_str = 0, len_of_var = 0;
int32_t rv = 0;
uint32_t len, tmp, pos = 0, len_of_properties = 0, len_of_var = 0;
int len_of_str = 0;
int32_t rv = 0;
uint8_t property_id;

if (packet[pos] != CMD_CONNECT) {
rv = -1;
return rv;
} else {
pos++;
}
}

init_conn_param(cparam);
// remaining length
Expand Down Expand Up @@ -789,8 +788,8 @@ uint8_t
verify_connect(conn_param *cparam, conf *conf)
{
int i, n = conf->auths.count;
char *username = cparam->username.body;
char *password = cparam->password.body;
char *username = (char *) cparam->username.body;
char *password = (char *) cparam->password.body;

if (conf->auths.count == 0 || conf->allow_anonymous == true) {
debug_msg("WARNING: no valid entry in "
Expand Down Expand Up @@ -824,9 +823,9 @@ nano_msg_notify_disconnect(conn_param *cparam, uint8_t code)
{
nni_msg * msg;
mqtt_string string, topic;
uint8_t buff[256];
snprintf(buff, 256, DISCONNECT_MSG, cparam->username.body, nni_clock(),
code, cparam->clientid.body);
char buff[256];
snprintf(buff, 256, DISCONNECT_MSG, (char *) cparam->username.body,
nni_clock(), code, (char *) cparam->clientid.body);
string.body = buff;
string.len = strlen(string.body);
topic.body = DISCONNECT_TOPIC;
Expand All @@ -840,7 +839,7 @@ nano_msg_notify_connect(conn_param *cparam, uint8_t code)
{
nni_msg * msg;
mqtt_string string, topic;
uint8_t buff[256];
char buff[256];
snprintf(buff, 256, CONNECT_MSG, cparam->username.body, nni_clock(),
cparam->pro_name.body, cparam->keepalive_mqtt, code,
cparam->pro_ver, cparam->clientid.body, cparam->clean_start);
Expand Down Expand Up @@ -891,7 +890,7 @@ nano_msg_get_subtopic(nni_msg *msg, nano_pipe_db *root, conn_param *cparam)
iter = root;
while (iter) {
if (strlen(iter->topic) == len_of_topic &&
!strncmp(payload_ptr + bpos + 2,
!strncmp((char *)(payload_ptr + bpos + 2),
iter->topic, len_of_topic)) {
repeat = true;
bpos += (2 + len_of_topic);
Expand Down
48 changes: 27 additions & 21 deletions src/sp/protocol/reqrep0/nano_tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
#include "core/sockimpl.h"
#include "nng/protocol/mqtt/mqtt_parser.h"
#include "nng/protocol/mqtt/nano_tcp.h"

#include "nng/protocol/mqtt/mqtt.h"

#include <sub_handler.h>
Expand All @@ -33,8 +32,8 @@ typedef struct cs_msg_list cs_msg_list;
static void nano_pipe_send_cb(void *);
static void nano_pipe_recv_cb(void *);
static void nano_pipe_fini(void *);
static void nano_pipe_close(void *, uint8_t reason_code);
static inline void close_pipe(nano_pipe *p, uint8_t reason_code);
static void nano_pipe_close(void *);
static inline void close_pipe(nano_pipe *p);
// static void nano_period_check(nano_sock *s, nni_list *sent_list, void *arg);
// static void nano_keepalive(nano_pipe *p, void *arg);

Expand Down Expand Up @@ -120,7 +119,8 @@ nano_pipe_timer_cb(void *arg)
"timeout!");
// TODO check keepalived timer interval
nni_mtx_unlock(&p->lk);
nano_pipe_close(p,0x8D);
p->reason_code = 0x8D;
nano_pipe_close(p);
return;
}
p->ka_refresh++;
Expand Down Expand Up @@ -301,7 +301,7 @@ nano_ctx_send(void *arg, nni_aio *aio)
debug_msg("WARNING: pipe %d occupied! resending in cb!", pipe);
if (nni_lmq_full(&p->rlmq)) {
// Make space for the new message. TODO add max limit of msgq len in conf
if (rv = nni_lmq_resize(&p->rlmq, nni_lmq_cap(&p->rlmq) * 2) != 0) {
if ((rv = nni_lmq_resize(&p->rlmq, nni_lmq_cap(&p->rlmq) * 2)) != 0) {
debug_syslog("warning msg dropped!");
nni_msg *old;
(void) nni_lmq_getq(&p->rlmq, &old);
Expand Down Expand Up @@ -822,7 +822,7 @@ nano_pipe_start(void *arg)
}

static inline void
close_pipe(nano_pipe *p, uint8_t reason_code)
close_pipe(nano_pipe *p)
{
nano_sock *s = p->rep;
nano_ctx * ctx;
Expand Down Expand Up @@ -855,21 +855,21 @@ close_pipe(nano_pipe *p, uint8_t reason_code)
}

static void
nano_pipe_close(void *arg, uint8_t reason_code)
nano_pipe_close(void *arg)
{
nano_pipe * p = arg;
nano_sock * s = p->rep;
nano_ctx * ctx;
conn_param *cparam;
// conn_param *cparam;
nni_aio * aio = NULL;
nni_msg * msg;

debug_msg("################# nano_pipe_close ##############");
nni_mtx_lock(&s->lk);
close_pipe(p, reason_code);
close_pipe(p);
// pub disconnect event
if ((ctx = nni_list_first(&s->recvq)) != NULL) {
msg = nano_msg_notify_disconnect(p->conn_param, reason_code);
msg = nano_msg_notify_disconnect(p->conn_param, p->reason_code);
if (msg == NULL) {
nni_mtx_unlock(&s->lk);
return;
Expand Down Expand Up @@ -1025,32 +1025,38 @@ nano_pipe_recv_cb(void *arg)
nni_msg_set_pipe(msg, p->id);
ptr = nni_msg_body(msg);

conn_param *cparam;
uint32_t len, len_of_varint = 0;
uint8_t qos_pac;
size_t tlen;

// TODO HOOK
switch (nng_msg_cmd_type(msg)) {
conn_param *cparam;
uint32_t len, len_of_varint = 0;
uint8_t qos_pac;
size_t tlen;
case CMD_UNSUBSCRIBE:
goto unsub;
case CMD_SUBSCRIBE:
// TODO only cache topic hash when it is above qos 1/2
nni_mtx_lock(&p->lk);
cparam = p->conn_param;
pipe_db = nano_msg_get_subtopic(msg, p->pipedb_root, cparam); // TODO potential memleak when sub failed
cparam = p->conn_param;
pipe_db = nano_msg_get_subtopic(msg, p->pipedb_root,
cparam); // TODO potential memleak when sub failed
p->pipedb_root = pipe_db;
while (pipe_db) {
nni_id_set(&npipe->nano_db, DJBHash(pipe_db->topic), pipe_db);
pipe_db = pipe_db->next;
for (; pipe_db != NULL; pipe_db = pipe_db->next) {
nni_id_set(
&npipe->nano_db, DJBHash(pipe_db->topic), pipe_db);
}
nni_mtx_unlock(&p->lk);
case CMD_UNSUBSCRIBE:

// __attribute__((fallthrough))
unsub:
if (cparam->pro_ver == PROTOCOL_VERSION_v5) {
len = get_var_integer(ptr + 2, &len_of_varint);
nni_msg_set_payload_ptr(
msg, ptr + 2 + len + len_of_varint);
} else {
nni_msg_set_payload_ptr(msg, ptr + 2);
}
//TODO remove topic from pipe_db
// TODO remove topic from pipe_db
break;
case CMD_PUBLISH:
NNI_GET16(ptr, tlen);
Expand Down
5 changes: 1 addition & 4 deletions src/sp/transport/tcp/tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ tcptran_pipe_send_cb(void *arg)
static void
tcptran_pipe_recv_cb(void *arg)
{
uint8_t *variable_ptr = NULL, *payload_ptr = NULL;
uint8_t *payload_ptr = NULL;
// uint8_t * header_ptr;
nni_aio *aio;
nni_iov iov;
Expand All @@ -438,7 +438,6 @@ tcptran_pipe_recv_cb(void *arg)
tcptran_pipe *p = arg;
nni_aio * rxaio = p->rxaio;
conn_param * cparam;
uint32_t len_of_varint = 0;

debug_msg("tcptran_pipe_recv_cb %p\n", p);
nni_mtx_lock(&p->mtx);
Expand Down Expand Up @@ -549,14 +548,12 @@ tcptran_pipe_recv_cb(void *arg)
debug_msg("remain_len %d cparam %p clientid %s username %s proto %d\n",
len, cparam, cparam->clientid.body, cparam->username.body,
cparam->pro_ver);
variable_ptr = nni_msg_body(msg);

// set the payload pointer of msg according to packet_type
debug_msg("The type of msg is %x", type);
if (type == CMD_PUBLISH) {
uint8_t qos_pac;
uint16_t pid;
size_t tlen;

qos_pac = nni_msg_get_pub_qos(msg);
if (qos_pac > 0) {
Expand Down
2 changes: 1 addition & 1 deletion src/sp/transport/ws/websocket.c
Original file line number Diff line number Diff line change
Expand Up @@ -562,8 +562,8 @@ wstran_listener_close(void *arg)
static void
ws_pipe_start(ws_pipe *pipe, nng_stream *conn)
{
NNI_ARG_UNUSED(conn);
ws_pipe *p = pipe;
int rv;
debug_msg("ws_pipe_start!");

nng_stream_recv(p->ws, p->rxaio);
Expand Down

0 comments on commit a262313

Please sign in to comment.