Skip to content

Commit

Permalink
MQTT5 support.
Browse files Browse the repository at this point in the history
  • Loading branch information
dustin committed Sep 22, 2019
1 parent 2ba5912 commit f80dae7
Show file tree
Hide file tree
Showing 8 changed files with 844 additions and 132 deletions.
25 changes: 14 additions & 11 deletions src/MQTTClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ inline lwmqtt_err_t lwmqtt_arduino_network_write(void *ref, uint8_t *buffer, siz
}

static void MQTTClientHandler(lwmqtt_client_t * /*client*/, void *ref, lwmqtt_string_t topic,
lwmqtt_message_t message) {
lwmqtt_message_t message, lwmqtt_serialized_properties_t props) {
// get callback
auto cb = (MQTTClientCallback *)ref;

Expand All @@ -72,7 +72,7 @@ static void MQTTClientHandler(lwmqtt_client_t * /*client*/, void *ref, lwmqtt_st

// call the advanced callback and return if available
if (cb->advanced != nullptr) {
cb->advanced(cb->client, terminated_topic, (char *)message.payload, (int)message.payload_len);
cb->advanced(cb->client, terminated_topic, (char *)message.payload, (int)message.payload_len, props);
return;
}

Expand Down Expand Up @@ -118,7 +118,7 @@ MQTTClient::~MQTTClient() {
free(this->writeBuf);
}

void MQTTClient::begin(const char hostname[], int port, Client &client) {
void MQTTClient::begin(const char hostname[], int port, Client &client, lwmqtt_protocol_t protocol) {
// set hostname and port
this->setHost(hostname, port);

Expand All @@ -127,6 +127,7 @@ void MQTTClient::begin(const char hostname[], int port, Client &client) {

// initialize client
lwmqtt_init(&this->client, this->writeBuf, this->bufSize, this->readBuf, this->bufSize);
lwmqtt_set_protocol(&this->client, protocol);

// set timers
lwmqtt_set_timers(&this->client, &this->timer1, &this->timer2, lwmqtt_arduino_timer_set, lwmqtt_arduino_timer_get);
Expand Down Expand Up @@ -222,7 +223,8 @@ void MQTTClient::setOptions(int keepAlive, bool cleanSession, int timeout) {
this->timeout = (uint32_t)timeout;
}

bool MQTTClient::publish(const char topic[], const char payload[], int length, bool retained, int qos) {
bool MQTTClient::publish(const char topic[], const char payload[], int length, bool retained, int qos,
lwmqtt_properties_t props) {
// return immediately if not connected
if (!this->connected()) {
return false;
Expand All @@ -236,7 +238,7 @@ bool MQTTClient::publish(const char topic[], const char payload[], int length, b
message.qos = lwmqtt_qos_t(qos);

// publish message
this->_lastError = lwmqtt_publish(&this->client, lwmqtt_string(topic), message, this->timeout);
this->_lastError = lwmqtt_publish(&this->client, lwmqtt_string(topic), message, props, this->timeout);
if (this->_lastError != LWMQTT_SUCCESS) {
// close connection
this->close();
Expand Down Expand Up @@ -294,14 +296,15 @@ bool MQTTClient::connect(const char clientId[], const char username[], const cha
return true;
}

bool MQTTClient::subscribe(const char topic[], int qos) {
bool MQTTClient::subscribe(const char topic[], int qos, lwmqtt_properties_t props) {
// return immediately if not connected
if (!this->connected()) {
return false;
}

// subscribe to topic
this->_lastError = lwmqtt_subscribe_one(&this->client, lwmqtt_string(topic), (lwmqtt_qos_t)qos, this->timeout);
lwmqtt_sub_options_t subopts = {.qos = (lwmqtt_qos_t)qos};
this->_lastError = lwmqtt_subscribe_one(&this->client, lwmqtt_string(topic), subopts, props, this->timeout);
if (this->_lastError != LWMQTT_SUCCESS) {
// close connection
this->close();
Expand All @@ -312,14 +315,14 @@ bool MQTTClient::subscribe(const char topic[], int qos) {
return true;
}

bool MQTTClient::unsubscribe(const char topic[]) {
bool MQTTClient::unsubscribe(const char topic[], lwmqtt_properties_t props) {
// return immediately if not connected
if (!this->connected()) {
return false;
}

// unsubscribe from topic
this->_lastError = lwmqtt_unsubscribe_one(&this->client, lwmqtt_string(topic), this->timeout);
this->_lastError = lwmqtt_unsubscribe_one(&this->client, lwmqtt_string(topic), props, this->timeout);
if (this->_lastError != LWMQTT_SUCCESS) {
// close connection
this->close();
Expand Down Expand Up @@ -368,14 +371,14 @@ bool MQTTClient::connected() {
return this->netClient != nullptr && this->netClient->connected() == 1 && this->_connected;
}

bool MQTTClient::disconnect() {
bool MQTTClient::disconnect(uint8_t reason, lwmqtt_properties_t props) {
// return immediately if not connected anymore
if (!this->connected()) {
return false;
}

// cleanly disconnect
this->_lastError = lwmqtt_disconnect(&this->client, this->timeout);
this->_lastError = lwmqtt_disconnect(&this->client, reason, props, this->timeout);

// close
this->close();
Expand Down
40 changes: 32 additions & 8 deletions src/MQTTClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ typedef struct {
class MQTTClient;

typedef void (*MQTTClientCallbackSimple)(String &topic, String &payload);
typedef void (*MQTTClientCallbackAdvanced)(MQTTClient *client, char topic[], char bytes[], int length);
typedef void (*MQTTClientCallbackAdvanced)(MQTTClient *client, char topic[], char bytes[], int length,
lwmqtt_serialized_properties_t props);

typedef struct {
MQTTClient *client = nullptr;
Expand All @@ -50,7 +51,7 @@ class MQTTClient {
lwmqtt_arduino_network_t network = {nullptr};
lwmqtt_arduino_timer_t timer1 = {0, nullptr};
lwmqtt_arduino_timer_t timer2 = {0, nullptr};
lwmqtt_client_t client = {0};
lwmqtt_client_t client = {LWMQTT_MQTT311, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};

bool _connected = false;
lwmqtt_return_code_t _returnCode = (lwmqtt_return_code_t)0;
Expand All @@ -61,8 +62,9 @@ class MQTTClient {

~MQTTClient();

void begin(const char hostname[], Client &client) { this->begin(hostname, 1883, client); }
void begin(const char hostname[], int port, Client &client);
void begin(const char hostname[], Client &client) { this->begin(hostname, 1883, client, LWMQTT_MQTT311); }
void begin(const char hostname[], int port, Client &client) { this->begin(hostname, port, client, LWMQTT_MQTT311); }
void begin(const char hostname[], int port, Client &client, lwmqtt_protocol_t protocol);

void onMessage(MQTTClientCallbackSimple cb);
void onMessageAdvanced(MQTTClientCallbackAdvanced cb);
Expand Down Expand Up @@ -91,6 +93,10 @@ class MQTTClient {
bool publish(const String &topic, const String &payload, bool retained, int qos) {
return this->publish(topic.c_str(), payload.c_str(), retained, qos);
}
bool publish(const String &topic, const String &payload, bool retained, int qos, lwmqtt_properties_t props) {
return this->publish(topic.c_str(), payload.c_str(), retained, qos, props);
}

bool publish(const char topic[], const String &payload) { return this->publish(topic, payload.c_str()); }
bool publish(const char topic[], const String &payload, bool retained, int qos) {
return this->publish(topic, payload.c_str(), retained, qos);
Expand All @@ -104,23 +110,41 @@ class MQTTClient {
bool publish(const char topic[], const char payload[], int length) {
return this->publish(topic, payload, length, false, 0);
}
bool publish(const char topic[], const char payload[], int length, bool retained, int qos);
bool publish(const char topic[], const char payload[], int length, bool retained, int qos) {
lwmqtt_properties_t props = lwmqtt_empty_props;
return this->publish(topic, payload, length, retained, qos, props);
}

bool publish(const char topic[], const char payload[], int length, bool retained, int qos, lwmqtt_properties_t props);

bool subscribe(const String &topic) { return this->subscribe(topic.c_str()); }
bool subscribe(const String &topic, int qos) { return this->subscribe(topic.c_str(), qos); }
bool subscribe(const char topic[]) { return this->subscribe(topic, 0); }
bool subscribe(const char topic[], int qos);
bool subscribe(const char topic[], int qos) {
lwmqtt_properties_t props = lwmqtt_empty_props;
return this->subscribe(topic, qos, props);
}
bool subscribe(const char topic[], int qos, lwmqtt_properties_t props);

bool unsubscribe(const String &topic) { return this->unsubscribe(topic.c_str()); }
bool unsubscribe(const char topic[]);
bool unsubscribe(const char topic[]) {
lwmqtt_properties_t props = lwmqtt_empty_props;
return this->unsubscribe(topic, props);
}
bool unsubscribe(const char topic[], lwmqtt_properties_t props);

bool loop();
bool connected();

lwmqtt_err_t lastError() { return this->_lastError; }
lwmqtt_return_code_t returnCode() { return this->_returnCode; }

bool disconnect();
bool disconnect() {
lwmqtt_properties_t props = lwmqtt_empty_props;
return this->disconnect(0, props);
}

bool disconnect(uint8_t reason, lwmqtt_properties_t props);

private:
void close();
Expand Down
Loading

0 comments on commit f80dae7

Please sign in to comment.