Skip to content

Commit

Permalink
MQTT memory / connection issues fix (#1445)
Browse files Browse the repository at this point in the history
  • Loading branch information
djphoenix authored and devsaurus committed Aug 14, 2016
1 parent 9a062fc commit 3c55e8c
Showing 1 changed file with 48 additions and 36 deletions.
84 changes: 48 additions & 36 deletions app/modules/mqtt.c
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ static void mqtt_socket_disconnected(void *arg) // tcp only
mud->pesp_conn = NULL;
}

mud->connected = false;
luaL_unref(L, LUA_REGISTRYINDEX, mud->self_ref);
mud->self_ref = LUA_NOREF; // unref this, and the mqtt.socket userdata will delete it self
}
Expand Down Expand Up @@ -156,6 +157,14 @@ static void mqtt_socket_reconnected(void *arg, sint8_t err)
pesp_conn->proto.tcp->local_port = espconn_port();
socket_connect(pesp_conn);
} else {
#ifdef CLIENT_SSL_ENABLE
if (mud->secure) {
espconn_secure_disconnect(pesp_conn);
} else
#endif
{
espconn_disconnect(pesp_conn);
}
mqtt_socket_disconnected(arg);
}
NODE_DBG("leave mqtt_socket_reconnected.\n");
Expand Down Expand Up @@ -804,6 +813,9 @@ static int mqtt_delete( lua_State* L )
c_free(mud->pesp_conn);
mud->pesp_conn = NULL; // for socket, it will free this when disconnected
}
while(mud->mqtt_state.pending_msg_q) {
msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q)));
}

// ---- alloc-ed in mqtt_socket_lwt()
if(mud->connect_info.will_topic){
Expand Down Expand Up @@ -873,8 +885,7 @@ static sint8 socket_connect(struct espconn *pesp_conn)
#ifdef CLIENT_SSL_ENABLE
if(mud->secure)
{
espconn_secure_set_size(ESPCONN_CLIENT, 5120); /* set SSL buffer size */
espconn_status = espconn_secure_connect(pesp_conn);
espconn_status = espconn_secure_connect(pesp_conn);
}
else
#endif
Expand All @@ -884,7 +895,7 @@ static sint8 socket_connect(struct espconn *pesp_conn)

os_timer_arm(&mud->mqttTimer, 1000, 1);

NODE_DBG("leave socket_connect.\n");
NODE_DBG("leave socket_connect, heap = %u.\n", system_get_free_heap_size());

return espconn_status;
}
Expand Down Expand Up @@ -971,22 +982,17 @@ static int mqtt_socket_connect( lua_State* L )
return luaL_error(L, "already connected");
}

if(mud->pesp_conn){ //TODO: should I free tcp struct directly or ask user to call close()???
mud->pesp_conn->reverse = NULL;
if(mud->pesp_conn->proto.tcp)
c_free(mud->pesp_conn->proto.tcp);
mud->pesp_conn->proto.tcp = NULL;
c_free(mud->pesp_conn);
mud->pesp_conn = NULL;
struct espconn *pesp_conn = mud->pesp_conn;
if(!pesp_conn) {
pesp_conn = mud->pesp_conn = (struct espconn *)c_zalloc(sizeof(struct espconn));
} else {
espconn_delete(pesp_conn);
}

struct espconn *pesp_conn = NULL;
pesp_conn = mud->pesp_conn = (struct espconn *)c_zalloc(sizeof(struct espconn));
if(!pesp_conn)
return luaL_error(L, "not enough memory");

pesp_conn->proto.udp = NULL;
pesp_conn->proto.tcp = (esp_tcp *)c_zalloc(sizeof(esp_tcp));
if (!pesp_conn->proto.tcp)
pesp_conn->proto.tcp = (esp_tcp *)c_zalloc(sizeof(esp_tcp));
if(!pesp_conn->proto.tcp){
c_free(pesp_conn);
pesp_conn = mud->pesp_conn = NULL;
Expand Down Expand Up @@ -1021,7 +1027,8 @@ static int mqtt_socket_connect( lua_State* L )
NODE_DBG("TCP port is set: %d.\n", port);
}
pesp_conn->proto.tcp->remote_port = port;
pesp_conn->proto.tcp->local_port = espconn_port();
if (pesp_conn->proto.tcp->local_port == 0)
pesp_conn->proto.tcp->local_port = espconn_port();
mud->mqtt_state.port = port;

if ( (stack<=top) && lua_isnumber(L, stack) )
Expand Down Expand Up @@ -1121,31 +1128,33 @@ static int mqtt_socket_close( lua_State* L )
return 1;
}

// Send disconnect message
mqtt_message_t* temp_msg = mqtt_msg_disconnect(&mud->mqtt_state.mqtt_connection);
NODE_DBG("Send MQTT disconnect infomation, data len: %d, d[0]=%d \r\n", temp_msg->length, temp_msg->data[0]);

sint8 espconn_status;
#ifdef CLIENT_SSL_ENABLE
if(mud->secure)
espconn_status = espconn_secure_send(mud->pesp_conn, temp_msg->data, temp_msg->length);
else
#endif
espconn_status = espconn_send(mud->pesp_conn, temp_msg->data, temp_msg->length);

mud->mqtt_state.auto_reconnect = 0; // stop auto reconnect.

sint8 espconn_status = ESPCONN_CONN;
if (mud->connected) {
// Send disconnect message
mqtt_message_t* temp_msg = mqtt_msg_disconnect(&mud->mqtt_state.mqtt_connection);
NODE_DBG("Send MQTT disconnect infomation, data len: %d, d[0]=%d \r\n", temp_msg->length, temp_msg->data[0]);

#ifdef CLIENT_SSL_ENABLE
if(mud->secure){
if(mud->pesp_conn->proto.tcp->remote_port || mud->pesp_conn->proto.tcp->local_port)
espconn_status |= espconn_secure_disconnect(mud->pesp_conn);
}
else
if(mud->secure) {
espconn_status = espconn_secure_send(mud->pesp_conn, temp_msg->data, temp_msg->length);
if(mud->pesp_conn->proto.tcp->remote_port || mud->pesp_conn->proto.tcp->local_port)
espconn_status |= espconn_secure_disconnect(mud->pesp_conn);
} else
#endif
{
if(mud->pesp_conn->proto.tcp->remote_port || mud->pesp_conn->proto.tcp->local_port)
espconn_status |= espconn_disconnect(mud->pesp_conn);
{
espconn_status = espconn_send(mud->pesp_conn, temp_msg->data, temp_msg->length);
if(mud->pesp_conn->proto.tcp->remote_port || mud->pesp_conn->proto.tcp->local_port)
espconn_status |= espconn_disconnect(mud->pesp_conn);
}
}
mud->connected = 0;

while (mud->mqtt_state.pending_msg_q) {
msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q)));
}

NODE_DBG("leave mqtt_socket_close.\n");

if (espconn_status == ESPCONN_OK) {
Expand Down Expand Up @@ -1609,6 +1618,9 @@ static const LUA_REG_TYPE mqtt_map[] = {
int luaopen_mqtt( lua_State *L )
{
luaL_rometatable(L, "mqtt.socket", (void *)mqtt_socket_map); // create metatable for mqtt.socket
#ifdef CLIENT_SSL_ENABLE
espconn_secure_set_size(ESPCONN_CLIENT, 4096);
#endif
return 0;
}

Expand Down

0 comments on commit 3c55e8c

Please sign in to comment.