Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increase internal buffer size #2

Merged
merged 2 commits into from
Jun 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion library.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"type": "git",
"url": "https://github.com/thingsboard/pubsubclient.git"
},
"version": "2.9.1",
"version": "2.9.2",
"exclude": "tests",
"examples": "examples/*/*.ino",
"frameworks": "arduino",
Expand Down
2 changes: 1 addition & 1 deletion library.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name=TBPubSubClient
version=2.9.1
version=2.9.2
author=Nick O'Leary <nick.oleary@gmail.com>
maintainer=ThingsBoard Team
sentence=A client library for MQTT messaging.
Expand Down
48 changes: 24 additions & 24 deletions src/TBPubSubClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ boolean PubSubClient::connect(const char *id, const char *user, const char *pass
if (result == 1) {
nextMsgId = 1;
// Leave room in the buffer for header and variable length field
uint16_t length = MQTT_MAX_HEADER_SIZE;
size_t length = MQTT_MAX_HEADER_SIZE;
unsigned int j;

#if MQTT_VERSION == MQTT_VERSION_3_1
Expand Down Expand Up @@ -299,8 +299,8 @@ boolean PubSubClient::readByte(uint8_t * result) {
}

// reads a byte into result[*index] and increments index
boolean PubSubClient::readByte(uint8_t * result, uint16_t * index){
uint16_t current_index = *index;
boolean PubSubClient::readByte(uint8_t * result, size_t * index){
size_t current_index = *index;
uint8_t * write_address = &(result[current_index]);
if(readByte(write_address)){
*index = current_index + 1;
Expand All @@ -310,7 +310,7 @@ boolean PubSubClient::readByte(uint8_t * result, uint16_t * index){
}

uint32_t PubSubClient::readPacket(uint8_t* lengthLength) {
uint16_t len = 0;
size_t len = 0;
if(!readByte(this->buffer, &len)) return 0;
bool isPublish = (this->buffer[0]&0xF0) == MQTTPUBLISH;
uint32_t multiplier = 1;
Expand Down Expand Up @@ -344,9 +344,9 @@ uint32_t PubSubClient::readPacket(uint8_t* lengthLength) {
skip += 2;
}
}
uint32_t idx = len;
size_t idx = len;

for (uint32_t i = start;i<length;i++) {
for (size_t i = start;i<length;i++) {
if(!readByte(&digit)) return 0;
if (this->stream) {
if (isPublish && idx-*lengthLength-2>skip) {
Expand Down Expand Up @@ -386,7 +386,7 @@ boolean PubSubClient::loop() {
}
if (_client->available()) {
uint8_t llen;
uint16_t len = readPacket(&llen);
size_t len = readPacket(&llen);
uint16_t msgId = 0;
uint8_t *payload;
if (len > 0) {
Expand Down Expand Up @@ -452,11 +452,11 @@ boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigne
return false;
}
// Leave room in the buffer for header and variable length field
uint16_t length = MQTT_MAX_HEADER_SIZE;
size_t length = MQTT_MAX_HEADER_SIZE;
length = writeString(topic,this->buffer,length);

// Add payload
uint16_t i;
size_t i;
for (i=0;i<plength;i++) {
this->buffer[length++] = payload[i];
}
Expand All @@ -479,7 +479,7 @@ boolean PubSubClient::publish_P(const char* topic, const uint8_t* payload, unsig
uint8_t llen = 0;
uint8_t digit;
unsigned int rc = 0;
uint16_t tlen;
size_t tlen;
unsigned int pos = 0;
unsigned int i;
uint8_t header;
Expand Down Expand Up @@ -526,14 +526,14 @@ boolean PubSubClient::publish_P(const char* topic, const uint8_t* payload, unsig
boolean PubSubClient::beginPublish(const char* topic, unsigned int plength, boolean retained) {
if (connected()) {
// Send the header and variable length field
uint16_t length = MQTT_MAX_HEADER_SIZE;
size_t length = MQTT_MAX_HEADER_SIZE;
length = writeString(topic,this->buffer,length);
uint8_t header = MQTTPUBLISH;
if (retained) {
header |= 1;
}
size_t hlen = buildHeader(header, this->buffer, plength+length-MQTT_MAX_HEADER_SIZE);
uint16_t rc = _client->write(this->buffer+(MQTT_MAX_HEADER_SIZE-hlen),length-(MQTT_MAX_HEADER_SIZE-hlen));
size_t rc = _client->write(this->buffer+(MQTT_MAX_HEADER_SIZE-hlen),length-(MQTT_MAX_HEADER_SIZE-hlen));
lastOutActivity = millis();
return (rc == (length-(MQTT_MAX_HEADER_SIZE-hlen)));
}
Expand All @@ -554,12 +554,12 @@ size_t PubSubClient::write(const uint8_t *buffer, size_t size) {
return _client->write(buffer,size);
}

size_t PubSubClient::buildHeader(uint8_t header, uint8_t* buf, uint16_t length) {
size_t PubSubClient::buildHeader(uint8_t header, uint8_t* buf, size_t length) {
uint8_t lenBuf[4];
uint8_t llen = 0;
uint8_t digit;
uint8_t pos = 0;
uint16_t len = length;
size_t len = length;
do {

digit = len & 127; //digit = len %128
Expand All @@ -578,13 +578,13 @@ size_t PubSubClient::buildHeader(uint8_t header, uint8_t* buf, uint16_t length)
return llen+1; // Full header size is variable length bit plus the 1-byte fixed header
}

boolean PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) {
uint16_t rc;
uint8_t hlen = buildHeader(header, buf, length);
boolean PubSubClient::write(uint8_t header, uint8_t* buf, size_t length) {
size_t rc;
size_t hlen = buildHeader(header, buf, length);

#ifdef MQTT_MAX_TRANSFER_SIZE
uint8_t* writeBuf = buf+(MQTT_MAX_HEADER_SIZE-hlen);
uint16_t bytesRemaining = length+hlen; //Match the length type
size_t bytesRemaining = length+hlen; //Match the length type
uint8_t bytesToWrite;
boolean result = true;
while((bytesRemaining > 0) && result) {
Expand Down Expand Up @@ -620,7 +620,7 @@ boolean PubSubClient::subscribe(const char* topic, uint8_t qos) {
}
if (connected()) {
// Leave room in the buffer for header and variable length field
uint16_t length = MQTT_MAX_HEADER_SIZE;
size_t length = MQTT_MAX_HEADER_SIZE;
nextMsgId++;
if (nextMsgId == 0) {
nextMsgId = 1;
Expand All @@ -644,7 +644,7 @@ boolean PubSubClient::unsubscribe(const char* topic) {
return false;
}
if (connected()) {
uint16_t length = MQTT_MAX_HEADER_SIZE;
size_t length = MQTT_MAX_HEADER_SIZE;
nextMsgId++;
if (nextMsgId == 0) {
nextMsgId = 1;
Expand All @@ -667,9 +667,9 @@ void PubSubClient::disconnect() {
lastInActivity = lastOutActivity = millis();
}

uint16_t PubSubClient::writeString(const char* string, uint8_t* buf, uint16_t pos) {
size_t PubSubClient::writeString(const char* string, uint8_t* buf, size_t pos) {
const char* idp = string;
uint16_t i = 0;
size_t i = 0;
pos += 2;
while (*idp) {
buf[pos++] = *idp++;
Expand Down Expand Up @@ -737,7 +737,7 @@ int PubSubClient::state() {
return this->_state;
}

boolean PubSubClient::setBufferSize(uint16_t size) {
boolean PubSubClient::setBufferSize(size_t size) {
if (size == 0) {
// Cannot set it back to 0
return false;
Expand All @@ -756,7 +756,7 @@ boolean PubSubClient::setBufferSize(uint16_t size) {
return (this->buffer != NULL);
}

uint16_t PubSubClient::getBufferSize() {
size_t PubSubClient::getBufferSize() {
return this->bufferSize;
}
PubSubClient& PubSubClient::setKeepAlive(uint16_t keepAlive) {
Expand Down
15 changes: 7 additions & 8 deletions src/TBPubSubClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class PubSubClient : public Print {
private:
Client* _client;
uint8_t* buffer;
uint16_t bufferSize;
size_t bufferSize;
uint16_t keepAlive;
uint16_t socketTimeout;
uint16_t nextMsgId;
Expand All @@ -103,14 +103,14 @@ class PubSubClient : public Print {
MQTT_CALLBACK_SIGNATURE;
uint32_t readPacket(uint8_t*);
boolean readByte(uint8_t * result);
boolean readByte(uint8_t * result, uint16_t * index);
boolean write(uint8_t header, uint8_t* buf, uint16_t length);
uint16_t writeString(const char* string, uint8_t* buf, uint16_t pos);
boolean readByte(uint8_t * result, size_t * index);
boolean write(uint8_t header, uint8_t* buf, size_t length);
size_t writeString(const char* string, uint8_t* buf, size_t pos);
// Build up the header ready to send
// Returns the size of the header
// Note: the header is built at the end of the first MQTT_MAX_HEADER_SIZE bytes, so will start
// (MQTT_MAX_HEADER_SIZE - <returned size>) bytes into the buffer
size_t buildHeader(uint8_t header, uint8_t* buf, uint16_t length);
size_t buildHeader(uint8_t header, uint8_t* buf, size_t length);
IPAddress ip;
const char* domain;
uint16_t port;
Expand Down Expand Up @@ -143,8 +143,8 @@ class PubSubClient : public Print {
PubSubClient& setKeepAlive(uint16_t keepAlive);
PubSubClient& setSocketTimeout(uint16_t timeout);

boolean setBufferSize(uint16_t size);
uint16_t getBufferSize();
boolean setBufferSize(size_t size);
size_t getBufferSize();

boolean connect(const char* id);
boolean connect(const char* id, const char* user, const char* pass);
Expand Down Expand Up @@ -184,5 +184,4 @@ class PubSubClient : public Print {

};


#endif