Skip to content

Commit

Permalink
Merge pull request #2 from MathewHDYT/master
Browse files Browse the repository at this point in the history
Increase internal buffer size
  • Loading branch information
imbeacon authored Jun 28, 2023
2 parents a4d4245 + f8dfd97 commit 0bcddc9
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 34 deletions.
2 changes: 1 addition & 1 deletion library.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"type": "git",
"url": "https://github.com/thingsboard/pubsubclient.git"
},
"version": "2.9.1",
"version": "2.9.2",
"exclude": "tests",
"examples": "examples/*/*.ino",
"frameworks": "arduino",
Expand Down
2 changes: 1 addition & 1 deletion library.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name=TBPubSubClient
version=2.9.1
version=2.9.2
author=Nick O'Leary <nick.oleary@gmail.com>
maintainer=ThingsBoard Team
sentence=A client library for MQTT messaging.
Expand Down
48 changes: 24 additions & 24 deletions src/TBPubSubClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ boolean PubSubClient::connect(const char *id, const char *user, const char *pass
if (result == 1) {
nextMsgId = 1;
// Leave room in the buffer for header and variable length field
uint16_t length = MQTT_MAX_HEADER_SIZE;
size_t length = MQTT_MAX_HEADER_SIZE;
unsigned int j;

#if MQTT_VERSION == MQTT_VERSION_3_1
Expand Down Expand Up @@ -299,8 +299,8 @@ boolean PubSubClient::readByte(uint8_t * result) {
}

// reads a byte into result[*index] and increments index
boolean PubSubClient::readByte(uint8_t * result, uint16_t * index){
uint16_t current_index = *index;
boolean PubSubClient::readByte(uint8_t * result, size_t * index){
size_t current_index = *index;
uint8_t * write_address = &(result[current_index]);
if(readByte(write_address)){
*index = current_index + 1;
Expand All @@ -310,7 +310,7 @@ boolean PubSubClient::readByte(uint8_t * result, uint16_t * index){
}

uint32_t PubSubClient::readPacket(uint8_t* lengthLength) {
uint16_t len = 0;
size_t len = 0;
if(!readByte(this->buffer, &len)) return 0;
bool isPublish = (this->buffer[0]&0xF0) == MQTTPUBLISH;
uint32_t multiplier = 1;
Expand Down Expand Up @@ -344,9 +344,9 @@ uint32_t PubSubClient::readPacket(uint8_t* lengthLength) {
skip += 2;
}
}
uint32_t idx = len;
size_t idx = len;

for (uint32_t i = start;i<length;i++) {
for (size_t i = start;i<length;i++) {
if(!readByte(&digit)) return 0;
if (this->stream) {
if (isPublish && idx-*lengthLength-2>skip) {
Expand Down Expand Up @@ -386,7 +386,7 @@ boolean PubSubClient::loop() {
}
if (_client->available()) {
uint8_t llen;
uint16_t len = readPacket(&llen);
size_t len = readPacket(&llen);
uint16_t msgId = 0;
uint8_t *payload;
if (len > 0) {
Expand Down Expand Up @@ -452,11 +452,11 @@ boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigne
return false;
}
// Leave room in the buffer for header and variable length field
uint16_t length = MQTT_MAX_HEADER_SIZE;
size_t length = MQTT_MAX_HEADER_SIZE;
length = writeString(topic,this->buffer,length);

// Add payload
uint16_t i;
size_t i;
for (i=0;i<plength;i++) {
this->buffer[length++] = payload[i];
}
Expand All @@ -479,7 +479,7 @@ boolean PubSubClient::publish_P(const char* topic, const uint8_t* payload, unsig
uint8_t llen = 0;
uint8_t digit;
unsigned int rc = 0;
uint16_t tlen;
size_t tlen;
unsigned int pos = 0;
unsigned int i;
uint8_t header;
Expand Down Expand Up @@ -526,14 +526,14 @@ boolean PubSubClient::publish_P(const char* topic, const uint8_t* payload, unsig
boolean PubSubClient::beginPublish(const char* topic, unsigned int plength, boolean retained) {
if (connected()) {
// Send the header and variable length field
uint16_t length = MQTT_MAX_HEADER_SIZE;
size_t length = MQTT_MAX_HEADER_SIZE;
length = writeString(topic,this->buffer,length);
uint8_t header = MQTTPUBLISH;
if (retained) {
header |= 1;
}
size_t hlen = buildHeader(header, this->buffer, plength+length-MQTT_MAX_HEADER_SIZE);
uint16_t rc = _client->write(this->buffer+(MQTT_MAX_HEADER_SIZE-hlen),length-(MQTT_MAX_HEADER_SIZE-hlen));
size_t rc = _client->write(this->buffer+(MQTT_MAX_HEADER_SIZE-hlen),length-(MQTT_MAX_HEADER_SIZE-hlen));
lastOutActivity = millis();
return (rc == (length-(MQTT_MAX_HEADER_SIZE-hlen)));
}
Expand All @@ -554,12 +554,12 @@ size_t PubSubClient::write(const uint8_t *buffer, size_t size) {
return _client->write(buffer,size);
}

size_t PubSubClient::buildHeader(uint8_t header, uint8_t* buf, uint16_t length) {
size_t PubSubClient::buildHeader(uint8_t header, uint8_t* buf, size_t length) {
uint8_t lenBuf[4];
uint8_t llen = 0;
uint8_t digit;
uint8_t pos = 0;
uint16_t len = length;
size_t len = length;
do {

digit = len & 127; //digit = len %128
Expand All @@ -578,13 +578,13 @@ size_t PubSubClient::buildHeader(uint8_t header, uint8_t* buf, uint16_t length)
return llen+1; // Full header size is variable length bit plus the 1-byte fixed header
}

boolean PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) {
uint16_t rc;
uint8_t hlen = buildHeader(header, buf, length);
boolean PubSubClient::write(uint8_t header, uint8_t* buf, size_t length) {
size_t rc;
size_t hlen = buildHeader(header, buf, length);

#ifdef MQTT_MAX_TRANSFER_SIZE
uint8_t* writeBuf = buf+(MQTT_MAX_HEADER_SIZE-hlen);
uint16_t bytesRemaining = length+hlen; //Match the length type
size_t bytesRemaining = length+hlen; //Match the length type
uint8_t bytesToWrite;
boolean result = true;
while((bytesRemaining > 0) && result) {
Expand Down Expand Up @@ -620,7 +620,7 @@ boolean PubSubClient::subscribe(const char* topic, uint8_t qos) {
}
if (connected()) {
// Leave room in the buffer for header and variable length field
uint16_t length = MQTT_MAX_HEADER_SIZE;
size_t length = MQTT_MAX_HEADER_SIZE;
nextMsgId++;
if (nextMsgId == 0) {
nextMsgId = 1;
Expand All @@ -644,7 +644,7 @@ boolean PubSubClient::unsubscribe(const char* topic) {
return false;
}
if (connected()) {
uint16_t length = MQTT_MAX_HEADER_SIZE;
size_t length = MQTT_MAX_HEADER_SIZE;
nextMsgId++;
if (nextMsgId == 0) {
nextMsgId = 1;
Expand All @@ -667,9 +667,9 @@ void PubSubClient::disconnect() {
lastInActivity = lastOutActivity = millis();
}

uint16_t PubSubClient::writeString(const char* string, uint8_t* buf, uint16_t pos) {
size_t PubSubClient::writeString(const char* string, uint8_t* buf, size_t pos) {
const char* idp = string;
uint16_t i = 0;
size_t i = 0;
pos += 2;
while (*idp) {
buf[pos++] = *idp++;
Expand Down Expand Up @@ -737,7 +737,7 @@ int PubSubClient::state() {
return this->_state;
}

boolean PubSubClient::setBufferSize(uint16_t size) {
boolean PubSubClient::setBufferSize(size_t size) {
if (size == 0) {
// Cannot set it back to 0
return false;
Expand All @@ -756,7 +756,7 @@ boolean PubSubClient::setBufferSize(uint16_t size) {
return (this->buffer != NULL);
}

uint16_t PubSubClient::getBufferSize() {
size_t PubSubClient::getBufferSize() {
return this->bufferSize;
}
PubSubClient& PubSubClient::setKeepAlive(uint16_t keepAlive) {
Expand Down
15 changes: 7 additions & 8 deletions src/TBPubSubClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class PubSubClient : public Print {
private:
Client* _client;
uint8_t* buffer;
uint16_t bufferSize;
size_t bufferSize;
uint16_t keepAlive;
uint16_t socketTimeout;
uint16_t nextMsgId;
Expand All @@ -103,14 +103,14 @@ class PubSubClient : public Print {
MQTT_CALLBACK_SIGNATURE;
uint32_t readPacket(uint8_t*);
boolean readByte(uint8_t * result);
boolean readByte(uint8_t * result, uint16_t * index);
boolean write(uint8_t header, uint8_t* buf, uint16_t length);
uint16_t writeString(const char* string, uint8_t* buf, uint16_t pos);
boolean readByte(uint8_t * result, size_t * index);
boolean write(uint8_t header, uint8_t* buf, size_t length);
size_t writeString(const char* string, uint8_t* buf, size_t pos);
// Build up the header ready to send
// Returns the size of the header
// Note: the header is built at the end of the first MQTT_MAX_HEADER_SIZE bytes, so will start
// (MQTT_MAX_HEADER_SIZE - <returned size>) bytes into the buffer
size_t buildHeader(uint8_t header, uint8_t* buf, uint16_t length);
size_t buildHeader(uint8_t header, uint8_t* buf, size_t length);
IPAddress ip;
const char* domain;
uint16_t port;
Expand Down Expand Up @@ -143,8 +143,8 @@ class PubSubClient : public Print {
PubSubClient& setKeepAlive(uint16_t keepAlive);
PubSubClient& setSocketTimeout(uint16_t timeout);

boolean setBufferSize(uint16_t size);
uint16_t getBufferSize();
boolean setBufferSize(size_t size);
size_t getBufferSize();

boolean connect(const char* id);
boolean connect(const char* id, const char* user, const char* pass);
Expand Down Expand Up @@ -184,5 +184,4 @@ class PubSubClient : public Print {

};


#endif

0 comments on commit 0bcddc9

Please sign in to comment.