diff --git a/docs/doxygen/include/size_table.md b/docs/doxygen/include/size_table.md
index 3c4566ad8..5997a82b7 100644
--- a/docs/doxygen/include/size_table.md
+++ b/docs/doxygen/include/size_table.md
@@ -9,8 +9,8 @@
core_mqtt.c |
- 3.1K |
- 2.7K |
+ 3.8K |
+ 3.2K |
core_mqtt_state.c |
@@ -19,12 +19,12 @@
core_mqtt_serializer.c |
- 2.7K |
- 2.1K |
+ 2.8K |
+ 2.2K |
Total estimates |
- 7.3K |
- 5.9K |
+ 8.1K |
+ 6.5K |
diff --git a/lexicon.txt b/lexicon.txt
index 86cd3b913..255889b2b 100644
--- a/lexicon.txt
+++ b/lexicon.txt
@@ -112,7 +112,10 @@ initializeconnectinfo
initializesubscribeinfo
initializewillinfo
int
+io
iot
+ioveccount
+ip
isn
iso
keepaliveintervalsec
@@ -240,11 +243,13 @@ pingreqs
pingreqsendtimems
pingresp
pingresps
+piovec
pismatch
plaintext
pmatch
pmessage
pmqttcontext
+pmqttheader
pnameindex
pnetworkbuffer
pnetworkcontext
@@ -279,10 +284,12 @@ psuback
psubackpacket
psubscribeinfo
psubscribes
+psubscription
psubscriptionlist
ptopic
ptopicfilter
ptopicname
+ptotalmessagelength
ptr
ptransport
ptransportinterface
@@ -330,6 +337,11 @@ serializeack
serializeconnect
serializeconnectpacket
serializedisconnect
+serailizedlength
+serializedpasswordlength
+serializedpayloadlength
+serializedtopiclength
+serializedusernamelength
serializepayload
serializepingreq
serializepublish
@@ -376,6 +388,7 @@ tlssend
toolchain
topicfilterlength
topicnamelength
+totalmessagelength
tr
transportcallback
transportinterface
@@ -392,6 +405,7 @@ uint
un
unsuback
unsubscribelist
+updatedlength
updatestateack
updatestatepublish
updatestatestatus
@@ -402,6 +416,7 @@ validatesubscribeunsubscribeparams
validator
waitingforpingresp
willinfo
+writev
xa
xb
xc
diff --git a/source/core_mqtt.c b/source/core_mqtt.c
index 6f794a645..887194fd9 100644
--- a/source/core_mqtt.c
+++ b/source/core_mqtt.c
@@ -51,9 +51,141 @@
*
* @return Total number of bytes sent, or negative value on network error.
*/
-static int32_t sendPacket( MQTTContext_t * pContext,
+static int32_t sendBuffer( MQTTContext_t * pContext,
const uint8_t * pBufferToSend,
- size_t bytesToSend );
+ size_t bytesToSend,
+ uint32_t timeout );
+
+/**
+ * @brief Sends MQTT connect without copying the users data into any buffer.
+ *
+ * @brief param[in] pContext Initialized MQTT context.
+ * @brief param[in] pConnectInfo MQTT CONNECT packet information.
+ * @brief param[in] pWillInfo Last Will and Testament. Pass NULL if Last Will and
+ * Testament is not used.
+ * @brief param[in] remainingLength the length of the connect packet.
+ *
+ * @note This operation may call the transport send function
+ * repeatedly to send bytes over the network until either:
+ * 1. The requested number of bytes @a remainingLength have been sent.
+ * OR
+ * 2. No byte cannot be sent over the network for the MQTT_SEND_RETRY_TIMEOUT_MS
+ * duration.
+ * OR
+ * 3. There is an error in sending data over the network.
+ *
+ * @return #MQTTSendFailed or #MQTTSuccess.
+ */
+static MQTTStatus_t sendConnectWithoutCopy( MQTTContext_t * pContext,
+ const MQTTConnectInfo_t * pConnectInfo,
+ const MQTTPublishInfo_t * pWillInfo,
+ size_t remainingLength );
+
+/**
+ * @brief Sends the vector array passed through the parameters over the network.
+ *
+ * @note The preference is given to 'write' function if it is present in the
+ * transport interface. Otherwise, a send call is made repeatedly to achieve the
+ * result.
+ *
+ * @param[in] pContext Initialized MQTT context.
+ * @param[in] pIoVec The vector array to be sent.
+ * @param[in] ioVecCount The number of elements in the array.
+ *
+ * @return The total number of bytes sent or the error code as received from the
+ * transport interface.
+ */
+static int32_t sendMessageVector( MQTTContext_t * pContext,
+ TransportOutVector_t * pIoVec,
+ size_t ioVecCount );
+
+/**
+ * @brief Add a string and its length after serializing it in a manner outlined by
+ * the MQTT specification.
+ *
+ * @param[in] serailizedLength Array of two bytes to which the vector will point.
+ * The array must remain in scope until the message has been sent.
+ * @param[in] string The string to be serialized.
+ * @param[in] length The length of the string to be serialized.
+ * @param[in] iterator The iterator pointing to the first element in the
+ * transport interface IO array.
+ * @param[out] updatedLength This parameter will be added to with the number of
+ * bytes added to the vector.
+ *
+ * @return The updated pointer to the vector array.
+ */
+static TransportOutVector_t * addEncodedStringToVector( uint8_t serailizedLength[ 2 ],
+ const char * const string,
+ uint16_t length,
+ TransportOutVector_t * iterator,
+ size_t * updatedLength );
+
+/**
+ * @brief Add the will and testament information along with the connection information
+ * to the given vector.
+ *
+ * @param[in] pConnectInfo Connection information of MQTT.
+ * @param[in] pWillInfo The last will and testament information.
+ * @param[out] pTotalMessageLength This parameter will be added to with the number of
+ * bytes added to the vector.
+ * @param[in] iterator The iterator pointing to the first element in the
+ * transport interface IO array.
+ * @param[out] serializedTopicLength This array will be updated with the topic field
+ * length in serialized MQTT format.
+ * @param[out] serializedPayloadLength This array will be updated with the payload
+ * field length in serialized MQTT format.
+ * @param[out] serializedUsernameLength This array will be updated with the user name
+ * field length in serialized MQTT format.
+ * @param[out] serializedPasswordLength This array will be updated with the password
+ * field length in serialized MQTT format.
+ *
+ * @note All the arrays must stay in scope until the message contained in the vector has
+ * been sent.
+ */
+static void addWillAndConnectInfo( const MQTTConnectInfo_t * pConnectInfo,
+ const MQTTPublishInfo_t * pWillInfo,
+ size_t * pTotalMessageLength,
+ TransportOutVector_t * iterator,
+ uint8_t serializedTopicLength[ 2 ],
+ uint8_t serializedPayloadLength[ 2 ],
+ uint8_t serializedUsernameLength[ 2 ],
+ uint8_t serializedPasswordLength[ 2 ] );
+
+/**
+ * @brief Send MQTT SUBSCRIBE message without copying the user data into a buffer and
+ * directly sending it.
+ *
+ * @param[in] pContext Initialized MQTT context.
+ * @param[in] pSubscriptionList List of MQTT subscription info.
+ * @param[in] subscriptionCount The count of elements in the list.
+ * @param[in] packetId The packet ID of the subscribe packet
+ * @param[in] remainingLength The remaining length of the subscribe packet.
+ *
+ * @return #MQTTSuccess or #MQTTSendFailed.
+ */
+static MQTTStatus_t sendSubscribeWithoutCopy( MQTTContext_t * pContext,
+ const MQTTSubscribeInfo_t * pSubscriptionList,
+ size_t subscriptionCount,
+ uint16_t packetId,
+ size_t remainingLength );
+
+/**
+ * @brief Send MQTT UNSUBSCRIBE message without copying the user data into a buffer and
+ * directly sending it.
+ *
+ * @param[in] pContext Initialized MQTT context.
+ * @param[in] pSubscriptionList MQTT subscription info.
+ * @param[in] subscriptionCount The count of elements in the list.
+ * @param[in] packetId The packet ID of the unsubscribe packet.
+ * @param[in] remainingLength The remaining length of the unsubscribe packet.
+ *
+ * @return #MQTTSuccess or #MQTTSendFailed.
+ */
+static MQTTStatus_t sendUnsubscribeWithoutCopy( MQTTContext_t * pContext,
+ const MQTTSubscribeInfo_t * pSubscriptionList,
+ size_t subscriptionCount,
+ uint16_t packetId,
+ size_t remainingLength );
/**
* @brief Calculate the interval between two millisecond timestamps, including
@@ -241,20 +373,6 @@ static MQTTStatus_t validateSubscribeUnsubscribeParams( const MQTTContext_t * pC
size_t subscriptionCount,
uint16_t packetId );
-/**
- * @brief Send serialized publish packet using transport send.
- *
- * @brief param[in] pContext Initialized MQTT context.
- * @brief param[in] pPublishInfo MQTT PUBLISH packet parameters.
- * @brief param[in] headerSize Header size of the PUBLISH packet.
- *
- * @return #MQTTSendFailed if transport write failed;
- * #MQTTSuccess otherwise.
- */
-static MQTTStatus_t sendPublish( MQTTContext_t * pContext,
- const MQTTPublishInfo_t * pPublishInfo,
- size_t headerSize );
-
/**
* @brief Receives a CONNACK MQTT packet.
*
@@ -289,6 +407,27 @@ static MQTTStatus_t receiveConnack( const MQTTContext_t * pContext,
static MQTTStatus_t handleSessionResumption( MQTTContext_t * pContext,
bool sessionPresent );
+
+/**
+ * @brief Send the publish packet without copying the topic string and payload in
+ * the buffer.
+ *
+ * @brief param[in] pContext Initialized MQTT context.
+ * @brief param[in] pPublishInfo MQTT PUBLISH packet parameters.
+ * @brief param[in] pMqttHeader the serialized MQTT header with the header byte;
+ * the encoded length of the packet; and the encoded length of the topic string.
+ * @brief param[in] headerSize Size of the serialized PUBLISH header.
+ * @brief param[in] packetId Packet Id of the publish packet.
+ *
+ * @return #MQTTSendFailed if transport send during resend failed;
+ * #MQTTSuccess otherwise.
+ */
+static MQTTStatus_t sendPublishWithoutCopy( MQTTContext_t * pContext,
+ const MQTTPublishInfo_t * pPublishInfo,
+ const uint8_t * pMqttHeader,
+ size_t headerSize,
+ uint16_t packetId );
+
/**
* @brief Serializes a PUBLISH message.
*
@@ -603,14 +742,99 @@ static bool matchTopicFilter( const char * pTopicName,
/*-----------------------------------------------------------*/
-static int32_t sendPacket( MQTTContext_t * pContext,
+static int32_t sendMessageVector( MQTTContext_t * pContext,
+ TransportOutVector_t * pIoVec,
+ size_t ioVecCount )
+{
+ uint32_t timeoutTime;
+ uint32_t bytesToSend = 0U;
+ int32_t bytesSentOrError = 0U;
+ uint64_t temp = 0;
+ TransportOutVector_t * pIoVectIterator;
+ size_t vectorsToBeSent = ioVecCount;
+
+ assert( pContext != NULL );
+ assert( pIoVec != NULL );
+ assert( pContext->getTime != NULL );
+ /* Send must always be defined */
+ assert( pContext->transportInterface.send );
+
+ timeoutTime = pContext->getTime() + MQTT_SEND_RETRY_TIMEOUT_MS;
+
+ /* Count the total number of bytes to be sent as outlined in the vector. */
+ for( pIoVectIterator = pIoVec; pIoVectIterator < &( pIoVec[ ioVecCount ] ); pIoVectIterator++ )
+ {
+ temp += pIoVectIterator->iov_len;
+ bytesToSend += pIoVectIterator->iov_len;
+ }
+
+ /* Reset the iterator to point to the first entry in the array. */
+ pIoVectIterator = pIoVec;
+
+ while( ( pContext->getTime() < timeoutTime ) &&
+ ( bytesSentOrError < ( int32_t ) bytesToSend ) )
+ {
+ int32_t sendResult;
+ uint32_t bytesSentThisVector = 0U;
+
+ if( pContext->transportInterface.writev != NULL )
+ {
+ sendResult = pContext->transportInterface.writev( pContext->transportInterface.pNetworkContext,
+ pIoVectIterator,
+ vectorsToBeSent );
+ }
+ else
+ {
+ sendResult = sendBuffer( pContext,
+ pIoVectIterator->iov_base,
+ pIoVectIterator->iov_len,
+ ( timeoutTime - pContext->getTime() ) );
+ }
+
+ if( sendResult >= 0 )
+ {
+ bytesSentOrError += sendResult;
+ bytesSentThisVector += sendResult;
+ }
+ else
+ {
+ bytesSentOrError = sendResult;
+ break;
+ }
+
+ while( ( pIoVectIterator < &( pIoVec[ ioVecCount ] ) ) &&
+ ( bytesSentThisVector >= pIoVectIterator->iov_len ) )
+ {
+ bytesSentThisVector -= pIoVectIterator->iov_len;
+ pIoVectIterator++;
+
+ /* Update the number of vector which are yet to be sent. */
+ vectorsToBeSent--;
+ }
+
+ /* Some of the bytes from this vector were sent as well, update the length
+ * and the pointer to data in this vector. */
+ if( ( bytesSentThisVector > 0U ) &&
+ ( pIoVectIterator < &( pIoVec[ ioVecCount ] ) ) )
+ {
+ ( *( ( uint8_t * ) pIoVectIterator->iov_base ) ) += bytesSentThisVector;
+ pIoVectIterator->iov_len -= bytesSentThisVector;
+ }
+ }
+
+ return bytesSentOrError;
+}
+
+static int32_t sendBuffer( MQTTContext_t * pContext,
const uint8_t * pBufferToSend,
- size_t bytesToSend )
+ size_t bytesToSend,
+ uint32_t timeout )
{
const uint8_t * pIndex = pBufferToSend;
size_t bytesRemaining = bytesToSend;
int32_t totalBytesSent = 0, bytesSent;
uint32_t lastSendTimeMs = 0U, timeSinceLastSendMs = 0U;
+ uint32_t timeoutTime;
bool sendError = false;
assert( pContext != NULL );
@@ -619,9 +843,7 @@ static int32_t sendPacket( MQTTContext_t * pContext,
assert( pIndex != NULL );
bytesRemaining = bytesToSend;
-
- /* Record the most recent time of successful transmission. */
- lastSendTimeMs = pContext->getTime();
+ timeoutTime = pContext->getTime() + timeout;
/* Loop until the entire packet is sent. */
while( ( bytesRemaining > 0UL ) && ( sendError == false ) )
@@ -1028,6 +1250,11 @@ static MQTTStatus_t sendPublishAcks( MQTTContext_t * pContext,
int32_t bytesSent = 0;
uint8_t packetTypeByte = 0U;
MQTTPubAckType_t packetType;
+ MQTTFixedBuffer_t localBuffer;
+ uint8_t pubAckPacket[ MQTT_PUBLISH_ACK_PACKET_SIZE ];
+
+ localBuffer.pBuffer = pubAckPacket;
+ localBuffer.size = MQTT_PUBLISH_ACK_PACKET_SIZE;
assert( pContext != NULL );
@@ -1037,15 +1264,18 @@ static MQTTStatus_t sendPublishAcks( MQTTContext_t * pContext,
{
packetType = getAckFromPacketType( packetTypeByte );
- status = MQTT_SerializeAck( &( pContext->networkBuffer ),
+ status = MQTT_SerializeAck( &localBuffer,
packetTypeByte,
packetId );
if( status == MQTTSuccess )
{
- bytesSent = sendPacket( pContext,
- pContext->networkBuffer.pBuffer,
- MQTT_PUBLISH_ACK_PACKET_SIZE );
+ /* Here, we are not using the vector approach for efficiency. There is just one buffer
+ * to be sent which can be achieved with a normal send call. */
+ bytesSent = sendBuffer( pContext,
+ localBuffer.pBuffer,
+ MQTT_PUBLISH_ACK_PACKET_SIZE,
+ MQTT_SEND_RETRY_TIMEOUT_MS );
}
if( bytesSent == ( int32_t ) MQTT_PUBLISH_ACK_PACKET_SIZE )
@@ -1532,56 +1762,373 @@ static MQTTStatus_t validateSubscribeUnsubscribeParams( const MQTTContext_t * pC
/*-----------------------------------------------------------*/
-static MQTTStatus_t sendPublish( MQTTContext_t * pContext,
- const MQTTPublishInfo_t * pPublishInfo,
- size_t headerSize )
+static TransportOutVector_t * addEncodedStringToVector( uint8_t serailizedLength[ 2 ],
+ const char * const string,
+ uint16_t length,
+ TransportOutVector_t * iterator,
+ size_t * updatedLength )
+{
+ size_t packetLength = 0U;
+ const size_t seralizedLengthFieldSize = 2U;
+
+ serailizedLength[ 0 ] = ( ( uint8_t ) ( ( length ) >> 8 ) );
+ serailizedLength[ 1 ] = ( ( uint8_t ) ( ( length ) & 0x00ffU ) );
+
+ iterator->iov_base = serailizedLength;
+ iterator->iov_len = seralizedLengthFieldSize;
+ iterator++;
+
+ iterator->iov_base = string;
+ iterator->iov_len = length;
+ iterator++;
+
+ packetLength = length + seralizedLengthFieldSize;
+
+ ( *updatedLength ) = ( *updatedLength ) + packetLength;
+
+ return iterator;
+}
+
+/*-----------------------------------------------------------*/
+
+static MQTTStatus_t sendSubscribeWithoutCopy( MQTTContext_t * pContext,
+ const MQTTSubscribeInfo_t * pSubscriptionList,
+ size_t subscriptionCount,
+ uint16_t packetId,
+ size_t remainingLength )
{
MQTTStatus_t status = MQTTSuccess;
- int32_t bytesSent = 0;
+ uint8_t subscribeheader[ 7 ];
+ uint8_t * pIndex;
+ TransportOutVector_t pIoVector[ MQTT_SUB_UNSUB_MAX_VECTORS ];
+ TransportOutVector_t * pIterator;
+ uint8_t serializedTopicFieldLength[ 2 ];
+ size_t totalPacketLength = 0U;
+ size_t ioVectorLength = 0U;
+ size_t subscriptionsSent = 0U;
+ /* For subscribe, only three vector slots are required per topic string. */
+ const size_t subscriptionStringVectorSlots = 3U;
+
+ /* The vector array should be at least three element long as the topic
+ * string needs these many vector elements to be stored. */
+ assert( MQTT_SUB_UNSUB_MAX_VECTORS >= subscriptionStringVectorSlots );
+
+ pIndex = subscribeheader;
+ pIterator = pIoVector;
+
+ pIndex = MQTT_SerializeSubscribeHeader( remainingLength,
+ pIndex,
+ packetId );
+
+ /* The header is to be sent first. */
+ pIterator->iov_base = subscribeheader;
+ pIterator->iov_len = ( size_t ) ( pIndex - subscribeheader );
+ pIterator++;
+ ioVectorLength++;
+ totalPacketLength += ( size_t ) ( pIndex - subscribeheader );
+
+ while( ( status == MQTTSuccess ) && ( subscriptionsSent < subscriptionCount ) )
+ {
+ /* Check whether the subscription topic (with QoS) will fit in the
+ * given vector. */
+ while( ( ioVectorLength <= ( MQTT_SUB_UNSUB_MAX_VECTORS - subscriptionStringVectorSlots ) ) &&
+ ( subscriptionsSent < subscriptionCount ) )
+ {
+ /* The topic filter gets sent next. */
+ pIterator = addEncodedStringToVector( serializedTopicFieldLength,
+ pSubscriptionList[ subscriptionsSent ].pTopicFilter,
+ pSubscriptionList[ subscriptionsSent ].topicFilterLength,
+ pIterator,
+ &totalPacketLength );
+
+ /* Lastly, the QoS gets sent. */
+ pIterator->iov_base = &( pSubscriptionList[ subscriptionsSent ].qos );
+ pIterator->iov_len = 1U;
+
+ /* Two slots get used by the topic string length and topic string. And
+ * one slot gets used by the quality of service. */
+ ioVectorLength += subscriptionStringVectorSlots;
+
+ subscriptionsSent++;
+ }
+
+ if( sendMessageVector( pContext, pIoVector, ioVectorLength ) != ( int32_t ) totalPacketLength )
+ {
+ status = MQTTSendFailed;
+ }
- assert( pContext != NULL );
- assert( pPublishInfo != NULL );
- assert( headerSize > 0 );
- assert( pContext->networkBuffer.pBuffer != NULL );
- assert( !( pPublishInfo->payloadLength > 0 ) || ( pPublishInfo->pPayload != NULL ) );
+ /* Update the iterator for the next potential loop iteration. */
+ pIterator = pIoVector;
+ /* Reset the vector length for the next potential loop iteration. */
+ ioVectorLength = 0U;
+ /* Reset the packet length for the next potential loop iteration. */
+ totalPacketLength = 0U;
+ }
+
+ return status;
+}
+
+/*-----------------------------------------------------------*/
+
+static MQTTStatus_t sendUnsubscribeWithoutCopy( MQTTContext_t * pContext,
+ const MQTTSubscribeInfo_t * pSubscriptionList,
+ size_t subscriptionCount,
+ uint16_t packetId,
+ size_t remainingLength )
+{
+ MQTTStatus_t status = MQTTSuccess;
+ uint8_t unsubscribeheader[ 7 ];
+ uint8_t * pIndex;
+ TransportOutVector_t pIoVector[ MQTT_SUB_UNSUB_MAX_VECTORS ];
+ TransportOutVector_t * pIterator;
+ uint8_t serializedTopicFieldLength[ 2 ];
+ size_t totalPacketLength = 0U;
+ size_t unsubscriptionsSent = 0U;
+ size_t ioVectorLength = 0U;
+ /* For unsubscribe, only two vector slots are required per topic string. */
+ const size_t subscriptionStringVectorSlots = 2U;
+
+ /* The vector array should be at least three element long as the topic
+ * string needs these many vector elements to be stored. */
+ assert( MQTT_SUB_UNSUB_MAX_VECTORS >= subscriptionStringVectorSlots );
+
+ pIndex = unsubscribeheader;
+ pIterator = pIoVector;
- /* Send header first. */
- bytesSent = sendPacket( pContext,
- pContext->networkBuffer.pBuffer,
- headerSize );
+ pIndex = MQTT_SerializeUnsubscribeHeader( remainingLength,
+ pIndex,
+ packetId );
- if( bytesSent < ( int32_t ) headerSize )
+ /* The header is to be sent first. */
+ pIterator->iov_base = unsubscribeheader;
+ pIterator->iov_len = ( size_t ) ( pIndex - unsubscribeheader );
+ totalPacketLength += ( size_t ) ( pIndex - unsubscribeheader );
+ pIterator++;
+ ioVectorLength++;
+
+ while( ( status == MQTTSuccess ) && ( unsubscriptionsSent < subscriptionCount ) )
+ {
+ /* Check whether the subscription topic will fit in the given vector. */
+ while( ( ioVectorLength <= ( MQTT_SUB_UNSUB_MAX_VECTORS - subscriptionStringVectorSlots ) ) &&
+ ( unsubscriptionsSent < subscriptionCount ) )
+ {
+ /* The topic filter gets sent next. */
+ pIterator = addEncodedStringToVector( serializedTopicFieldLength,
+ pSubscriptionList[ unsubscriptionsSent ].pTopicFilter,
+ pSubscriptionList[ unsubscriptionsSent ].topicFilterLength,
+ pIterator,
+ &totalPacketLength );
+
+ /* Two slots get used by the topic string length and topic string. And
+ * one slot gets used by the quality of service. */
+ ioVectorLength += subscriptionStringVectorSlots;
+
+ unsubscriptionsSent++;
+ }
+
+ if( sendMessageVector( pContext, pIoVector, ioVectorLength ) != ( int32_t ) totalPacketLength )
+ {
+ status = MQTTSendFailed;
+ }
+
+ /* Update the iterator for the next potential loop iteration. */
+ pIterator = pIoVector;
+ /* Reset the vector length for the next potential loop iteration. */
+ ioVectorLength = 0U;
+ /* Reset the packet length for the next potential loop iteration. */
+ totalPacketLength = 0U;
+ }
+
+ return status;
+}
+
+/*-----------------------------------------------------------*/
+
+static MQTTStatus_t sendPublishWithoutCopy( MQTTContext_t * pContext,
+ const MQTTPublishInfo_t * pPublishInfo,
+ const uint8_t * pMqttHeader,
+ size_t headerSize,
+ uint16_t packetId )
+{
+ MQTTStatus_t status = MQTTSuccess;
+ uint8_t serializedPacketID[ 2 ];
+ TransportOutVector_t pIoVector[ 4 ];
+ size_t ioVectorLength;
+ size_t totalMessageLength;
+ const size_t packetIDLength = 2U;
+
+ /* The header is sent first. */
+ pIoVector[ 0U ].iov_base = pMqttHeader;
+ pIoVector[ 0U ].iov_len = headerSize;
+ totalMessageLength = headerSize;
+
+ /* Then the topic name has to be sent. */
+ pIoVector[ 1U ].iov_base = pPublishInfo->pTopicName;
+ pIoVector[ 1U ].iov_len = pPublishInfo->topicNameLength;
+ totalMessageLength += pPublishInfo->topicNameLength;
+
+ /* The next field's index should be 2 as the first two fields
+ * have been filled in. */
+ ioVectorLength = 2U;
+
+ if( pPublishInfo->qos > MQTTQoS0 )
+ {
+ /* Encode the packet ID. */
+ serializedPacketID[ 0 ] = ( ( uint8_t ) ( ( packetId ) >> 8 ) );
+ serializedPacketID[ 1 ] = ( ( uint8_t ) ( ( packetId ) & 0x00ffU ) );
+
+ pIoVector[ ioVectorLength ].iov_base = serializedPacketID;
+ pIoVector[ ioVectorLength ].iov_len = packetIDLength;
+
+ ioVectorLength++;
+ totalMessageLength += packetIDLength;
+ }
+
+ /* Publish packets are allowed to contain no payload. */
+ if( pPublishInfo->payloadLength > 0U )
+ {
+ pIoVector[ ioVectorLength ].iov_base = pPublishInfo->pPayload;
+ pIoVector[ ioVectorLength ].iov_len = pPublishInfo->payloadLength;
+
+ ioVectorLength++;
+ totalMessageLength += pPublishInfo->payloadLength;
+ }
+
+ if( sendMessageVector( pContext, pIoVector, ioVectorLength ) != ( int32_t ) totalMessageLength )
{
- LogError( ( "Transport send failed for PUBLISH header." ) );
status = MQTTSendFailed;
}
+
+ return status;
+}
+
+/*-----------------------------------------------------------*/
+
+static void addWillAndConnectInfo( const MQTTConnectInfo_t * pConnectInfo,
+ const MQTTPublishInfo_t * pWillInfo,
+ size_t * pTotalMessageLength,
+ TransportOutVector_t * iterator,
+ uint8_t serializedTopicLength[ 2 ],
+ uint8_t serializedPayloadLength[ 2 ],
+ uint8_t serializedUsernameLength[ 2 ],
+ uint8_t serializedPasswordLength[ 2 ] )
+{
+ if( pWillInfo != NULL )
+ {
+ /* Serialize the topic. */
+ iterator = addEncodedStringToVector( serializedTopicLength,
+ pWillInfo->pTopicName,
+ pWillInfo->topicNameLength,
+ iterator,
+ pTotalMessageLength );
+
+ /* Serialize the payload. */
+ iterator = addEncodedStringToVector( serializedPayloadLength,
+ pWillInfo->pPayload,
+ ( uint16_t ) pWillInfo->payloadLength,
+ iterator,
+ pTotalMessageLength );
+ }
+
+ /* Encode the user name if provided. */
+ if( pConnectInfo->pUserName != NULL )
+ {
+ /* Serialize the user name string. */
+ iterator = addEncodedStringToVector( serializedUsernameLength,
+ pConnectInfo->pUserName,
+ pConnectInfo->userNameLength,
+ iterator,
+ pTotalMessageLength );
+ }
+
+ /* Encode the password if provided. */
+ if( pConnectInfo->pPassword != NULL )
+ {
+ /* Serialize the user name string. */
+ iterator = addEncodedStringToVector( serializedPasswordLength,
+ pConnectInfo->pPassword,
+ pConnectInfo->passwordLength,
+ iterator,
+ pTotalMessageLength );
+ }
+}
+
+/*-----------------------------------------------------------*/
+
+static MQTTStatus_t sendConnectWithoutCopy( MQTTContext_t * pContext,
+ const MQTTConnectInfo_t * pConnectInfo,
+ const MQTTPublishInfo_t * pWillInfo,
+ size_t remainingLength )
+{
+ MQTTStatus_t status = MQTTSuccess;
+ TransportOutVector_t * iterator;
+ size_t ioVectorLength = 0U;
+ size_t totalMessageLength = 0U;
+ int32_t bytesSentOrError;
+
+ /* Connect packet header can be of maximum 15 bytes. */
+ uint8_t connectPacketHeader[ 15 ];
+ uint8_t * pIndex = connectPacketHeader;
+ TransportOutVector_t pIoVector[ 11 ];
+ uint8_t serializedClientIDLength[ 2 ];
+ uint8_t serializedTopicLength[ 2 ];
+ uint8_t serializedPayloadLength[ 2 ];
+ uint8_t serializedUsernameLength[ 2 ];
+ uint8_t serializedPasswordLength[ 2 ];
+
+ iterator = pIoVector;
+
+ /* Validate arguments. */
+ if( pConnectInfo == NULL )
+ {
+ LogError( ( "Argument cannot be NULL: pConnectInfo=%p, "
+ "pFixedBuffer=%p.",
+ ( void * ) pConnectInfo ) );
+ status = MQTTBadParameter;
+ }
+ else if( ( pWillInfo != NULL ) && ( pWillInfo->pTopicName == NULL ) )
+ {
+ LogError( ( "pWillInfo->pTopicName cannot be NULL if Will is present." ) );
+ status = MQTTBadParameter;
+ }
else
{
- LogDebug( ( "Sent %ld bytes of PUBLISH header.",
- ( long int ) bytesSent ) );
+ pIndex = MQTT_SerializeConnectFixedHeader( pIndex,
+ pConnectInfo,
+ pWillInfo,
+ remainingLength );
- /* Send Payload if there is one to send. It is valid for a PUBLISH
- * Packet to contain a zero length payload.*/
- if( pPublishInfo->payloadLength > 0U )
- {
- bytesSent = sendPacket( pContext,
- pPublishInfo->pPayload,
- pPublishInfo->payloadLength );
+ assert( ( pIndex - connectPacketHeader ) <= 15 );
- if( bytesSent < ( int32_t ) pPublishInfo->payloadLength )
- {
- LogError( ( "Transport send failed for PUBLISH payload." ) );
- status = MQTTSendFailed;
- }
- else
- {
- LogDebug( ( "Sent %ld bytes of PUBLISH payload.",
- ( long int ) bytesSent ) );
- }
- }
- else
+ /* The header gets sent first. */
+ iterator->iov_base = connectPacketHeader;
+ iterator->iov_len = ( size_t ) ( pIndex - connectPacketHeader );
+ totalMessageLength += iterator->iov_len;
+ iterator++;
+
+ /* Serialize the client ID. */
+ iterator = addEncodedStringToVector( serializedClientIDLength,
+ pConnectInfo->pClientIdentifier,
+ pConnectInfo->clientIdentifierLength,
+ iterator,
+ &totalMessageLength );
+
+ addWillAndConnectInfo( pConnectInfo,
+ pWillInfo,
+ &totalMessageLength,
+ iterator,
+ serializedTopicLength,
+ serializedPayloadLength,
+ serializedUsernameLength,
+ serializedPasswordLength );
+
+ ioVectorLength = ( size_t ) ( iterator - pIoVector );
+
+ bytesSentOrError = sendMessageVector( pContext, pIoVector, ioVectorLength );
+
+ if( bytesSentOrError != ( int32_t ) totalMessageLength )
{
- LogDebug( ( "PUBLISH payload was not sent. Payload length was zero." ) );
+ status = MQTTSendFailed;
}
}
@@ -1908,7 +2455,6 @@ MQTTStatus_t MQTT_Connect( MQTTContext_t * pContext,
bool * pSessionPresent )
{
size_t remainingLength = 0UL, packetSize = 0UL;
- int32_t bytesSent;
MQTTStatus_t status = MQTTSuccess;
MQTTPacketInfo_t incomingPacket = { 0 };
@@ -1938,28 +2484,10 @@ MQTTStatus_t MQTT_Connect( MQTTContext_t * pContext,
if( status == MQTTSuccess )
{
- status = MQTT_SerializeConnect( pConnectInfo,
- pWillInfo,
- remainingLength,
- &( pContext->networkBuffer ) );
- }
-
- if( status == MQTTSuccess )
- {
- bytesSent = sendPacket( pContext,
- pContext->networkBuffer.pBuffer,
- packetSize );
-
- if( bytesSent < ( int32_t ) packetSize )
- {
- LogError( ( "Transport send failed for CONNECT packet." ) );
- status = MQTTSendFailed;
- }
- else
- {
- LogDebug( ( "Sent %ld bytes of CONNECT packet.",
- ( long int ) bytesSent ) );
- }
+ status = sendConnectWithoutCopy( pContext,
+ pConnectInfo,
+ pWillInfo,
+ remainingLength );
}
/* Read CONNACK from transport layer. */
@@ -2004,7 +2532,6 @@ MQTTStatus_t MQTT_Subscribe( MQTTContext_t * pContext,
uint16_t packetId )
{
size_t remainingLength = 0UL, packetSize = 0UL;
- int32_t bytesSent = 0;
/* Validate arguments. */
MQTTStatus_t status = validateSubscribeUnsubscribeParams( pContext,
@@ -2026,31 +2553,12 @@ MQTTStatus_t MQTT_Subscribe( MQTTContext_t * pContext,
if( status == MQTTSuccess )
{
- /* Serialize MQTT SUBSCRIBE packet. */
- status = MQTT_SerializeSubscribe( pSubscriptionList,
- subscriptionCount,
- packetId,
- remainingLength,
- &( pContext->networkBuffer ) );
- }
-
- if( status == MQTTSuccess )
- {
- /* Send serialized MQTT SUBSCRIBE packet to transport layer. */
- bytesSent = sendPacket( pContext,
- pContext->networkBuffer.pBuffer,
- packetSize );
-
- if( bytesSent < ( int32_t ) packetSize )
- {
- LogError( ( "Transport send failed for SUBSCRIBE packet." ) );
- status = MQTTSendFailed;
- }
- else
- {
- LogDebug( ( "Sent %ld bytes of SUBSCRIBE packet.",
- ( long int ) bytesSent ) );
- }
+ /* Send MQTT SUBSCRIBE packet. */
+ status = sendSubscribeWithoutCopy( pContext,
+ pSubscriptionList,
+ subscriptionCount,
+ packetId,
+ remainingLength );
}
return status;
@@ -2062,19 +2570,30 @@ MQTTStatus_t MQTT_Publish( MQTTContext_t * pContext,
const MQTTPublishInfo_t * pPublishInfo,
uint16_t packetId )
{
- size_t headerSize = 0UL;
+ size_t headerSize = 0UL, remainingLength = 0UL, packetSize = 0UL;
MQTTPublishState_t publishStatus = MQTTStateNull;
+ /* 1 header byte + 4 bytes (maximum) required for encoding the length +
+ * 2 bytes for topic string. */
+ uint8_t mqttHeader[ 7 ];
+
/* Validate arguments. */
MQTTStatus_t status = validatePublishParams( pContext, pPublishInfo, packetId );
if( status == MQTTSuccess )
{
- /* Serialize PUBLISH packet. */
- status = serializePublish( pContext,
- pPublishInfo,
- packetId,
- &headerSize );
+ /* Get the remaining length and packet size.*/
+ status = MQTT_GetPublishPacketSize( pPublishInfo,
+ &remainingLength,
+ &packetSize );
+ }
+
+ if( status == MQTTSuccess )
+ {
+ status = MQTT_SerializePublishHeaderWithoutTopic( pPublishInfo,
+ remainingLength,
+ mqttHeader,
+ &headerSize );
}
if( ( status == MQTTSuccess ) && ( pPublishInfo->qos > MQTTQoS0 ) )
@@ -2095,10 +2614,11 @@ MQTTStatus_t MQTT_Publish( MQTTContext_t * pContext,
if( status == MQTTSuccess )
{
- /* Sends the serialized publish packet over network. */
- status = sendPublish( pContext,
- pPublishInfo,
- headerSize );
+ status = sendPublishWithoutCopy( pContext,
+ pPublishInfo,
+ mqttHeader,
+ headerSize,
+ packetId );
}
if( ( status == MQTTSuccess ) && ( pPublishInfo->qos > MQTTQoS0 ) )
@@ -2137,6 +2657,12 @@ MQTTStatus_t MQTT_Ping( MQTTContext_t * pContext )
int32_t bytesSent = 0;
MQTTStatus_t status = MQTTSuccess;
size_t packetSize = 0U;
+ /* MQTT ping packets are of fixed length. */
+ uint8_t pingreqPacket[ 2U ];
+ MQTTFixedBuffer_t localBuffer;
+
+ localBuffer.pBuffer = pingreqPacket;
+ localBuffer.size = 2U;
if( pContext == NULL )
{
@@ -2163,15 +2689,19 @@ MQTTStatus_t MQTT_Ping( MQTTContext_t * pContext )
if( status == MQTTSuccess )
{
/* Serialize MQTT PINGREQ. */
- status = MQTT_SerializePingreq( &( pContext->networkBuffer ) );
+ status = MQTT_SerializePingreq( &localBuffer );
}
if( status == MQTTSuccess )
{
- /* Send the serialized PINGREQ packet to transport layer. */
- bytesSent = sendPacket( pContext,
- pContext->networkBuffer.pBuffer,
- packetSize );
+ /* Send the serialized PINGREQ packet to transport layer.
+ * Here, we do not use the vectored IO approach for efficiency as the
+ * Ping packet does not have numerous fields which need to be copied
+ * from the user provided buffers. Thus it can be sent directly. */
+ bytesSent = sendBuffer( pContext,
+ localBuffer.pBuffer,
+ 2U,
+ MQTT_SEND_RETRY_TIMEOUT_MS );
/* It is an error to not send the entire PINGREQ packet. */
if( bytesSent < ( int32_t ) packetSize )
@@ -2199,7 +2729,6 @@ MQTTStatus_t MQTT_Unsubscribe( MQTTContext_t * pContext,
uint16_t packetId )
{
size_t remainingLength = 0UL, packetSize = 0UL;
- int32_t bytesSent = 0;
/* Validate arguments. */
MQTTStatus_t status = validateSubscribeUnsubscribeParams( pContext,
@@ -2221,31 +2750,11 @@ MQTTStatus_t MQTT_Unsubscribe( MQTTContext_t * pContext,
if( status == MQTTSuccess )
{
- /* Serialize MQTT UNSUBSCRIBE packet. */
- status = MQTT_SerializeUnsubscribe( pSubscriptionList,
- subscriptionCount,
- packetId,
- remainingLength,
- &( pContext->networkBuffer ) );
- }
-
- if( status == MQTTSuccess )
- {
- /* Send serialized MQTT UNSUBSCRIBE packet to transport layer. */
- bytesSent = sendPacket( pContext,
- pContext->networkBuffer.pBuffer,
- packetSize );
-
- if( bytesSent < ( int32_t ) packetSize )
- {
- LogError( ( "Transport send failed for UNSUBSCRIBE packet." ) );
- status = MQTTSendFailed;
- }
- else
- {
- LogDebug( ( "Sent %ld bytes of UNSUBSCRIBE packet.",
- ( long int ) bytesSent ) );
- }
+ status = sendUnsubscribeWithoutCopy( pContext,
+ pSubscriptionList,
+ subscriptionCount,
+ packetId,
+ remainingLength );
}
return status;
@@ -2258,6 +2767,11 @@ MQTTStatus_t MQTT_Disconnect( MQTTContext_t * pContext )
size_t packetSize = 0U;
int32_t bytesSent = 0;
MQTTStatus_t status = MQTTSuccess;
+ MQTTFixedBuffer_t localBuffer;
+ uint8_t disconnectPacket[ 2U ];
+
+ localBuffer.pBuffer = disconnectPacket;
+ localBuffer.size = 2U;
/* Validate arguments. */
if( pContext == NULL )
@@ -2277,14 +2791,18 @@ MQTTStatus_t MQTT_Disconnect( MQTTContext_t * pContext )
if( status == MQTTSuccess )
{
/* Serialize MQTT DISCONNECT packet. */
- status = MQTT_SerializeDisconnect( &( pContext->networkBuffer ) );
+ status = MQTT_SerializeDisconnect( &localBuffer );
}
if( status == MQTTSuccess )
{
- bytesSent = sendPacket( pContext,
- pContext->networkBuffer.pBuffer,
- packetSize );
+ /* Here we do not use vectors as the disconnect packet has fixed fields
+ * which do not reside in user provided buffers. Thus, it can be sent
+ * using a simple send call. */
+ bytesSent = sendBuffer( pContext,
+ localBuffer.pBuffer,
+ packetSize,
+ MQTT_SEND_RETRY_TIMEOUT_MS );
if( bytesSent < ( int32_t ) packetSize )
{
diff --git a/source/core_mqtt_serializer.c b/source/core_mqtt_serializer.c
index c8d02475c..e2db56668 100644
--- a/source/core_mqtt_serializer.c
+++ b/source/core_mqtt_serializer.c
@@ -140,6 +140,7 @@
/*-----------------------------------------------------------*/
+
/**
* @brief MQTT Subscription packet types.
*/
@@ -630,6 +631,79 @@ static bool calculatePublishPacketSize( const MQTTPublishInfo_t * pPublishInfo,
/*-----------------------------------------------------------*/
+MQTTStatus_t MQTT_SerializePublishHeaderWithoutTopic( const MQTTPublishInfo_t * pPublishInfo,
+ size_t remainingLength,
+ uint8_t * pBuffer,
+ size_t * headerSize )
+{
+ size_t headerLength;
+ uint8_t * pIndex;
+ MQTTStatus_t status = MQTTSuccess;
+
+ /* The first byte of a PUBLISH packet contains the packet type and flags. */
+ uint8_t publishFlags = MQTT_PACKET_TYPE_PUBLISH;
+
+ /* Get the start address of the buffer. */
+ pIndex = pBuffer;
+
+ /* Length of serialized packet = First byte
+ * + Length of encoded remaining length
+ * + Encoded topic length. */
+ headerLength = 1U + remainingLengthEncodedSize( remainingLength ) + 2U;
+
+ if( pPublishInfo->qos == MQTTQoS1 )
+ {
+ LogDebug( ( "Adding QoS as QoS1 in PUBLISH flags." ) );
+ UINT8_SET_BIT( publishFlags, MQTT_PUBLISH_FLAG_QOS1 );
+ }
+ else if( pPublishInfo->qos == MQTTQoS2 )
+ {
+ LogDebug( ( "Adding QoS as QoS2 in PUBLISH flags." ) );
+ UINT8_SET_BIT( publishFlags, MQTT_PUBLISH_FLAG_QOS2 );
+ }
+ else
+ {
+ /* Empty else MISRA 15.7 */
+ }
+
+ if( pPublishInfo->retain == true )
+ {
+ LogDebug( ( "Adding retain bit in PUBLISH flags." ) );
+ UINT8_SET_BIT( publishFlags, MQTT_PUBLISH_FLAG_RETAIN );
+ }
+
+ if( pPublishInfo->dup == true )
+ {
+ LogDebug( ( "Adding dup bit in PUBLISH flags." ) );
+ UINT8_SET_BIT( publishFlags, MQTT_PUBLISH_FLAG_DUP );
+ }
+
+ *pIndex = publishFlags;
+ pIndex++;
+
+ /* The "Remaining length" is encoded from the second byte. */
+ pIndex = encodeRemainingLength( pIndex, remainingLength );
+
+ /* The first byte of a UTF-8 string is the high byte of the string length. */
+ *pIndex = UINT16_HIGH_BYTE( pPublishInfo->topicNameLength );
+ pIndex++;
+
+ /* The second byte of a UTF-8 string is the low byte of the string length. */
+ *pIndex = UINT16_LOW_BYTE( pPublishInfo->topicNameLength );
+ pIndex++;
+
+ *headerSize = headerLength;
+
+ if( headerLength > 7 )
+ {
+ status = MQTTBadParameter;
+ }
+
+ return status;
+}
+
+/*-----------------------------------------------------------*/
+
static void serializePublishCommon( const MQTTPublishInfo_t * pPublishInfo,
size_t remainingLength,
uint16_t packetIdentifier,
@@ -1475,37 +1549,30 @@ static MQTTStatus_t deserializePingresp( const MQTTPacketInfo_t * pPingresp )
return status;
}
-/*-----------------------------------------------------------*/
-
-static void serializeConnectPacket( const MQTTConnectInfo_t * pConnectInfo,
- const MQTTPublishInfo_t * pWillInfo,
- size_t remainingLength,
- const MQTTFixedBuffer_t * pFixedBuffer )
+uint8_t * MQTT_SerializeConnectFixedHeader( uint8_t * pIndex,
+ const MQTTConnectInfo_t * pConnectInfo,
+ const MQTTPublishInfo_t * pWillInfo,
+ size_t remainingLength )
{
+ uint8_t * pIndexLocal = pIndex;
uint8_t connectFlags = 0U;
- uint8_t * pIndex = NULL;
- assert( pConnectInfo != NULL );
- assert( pFixedBuffer != NULL );
- assert( pFixedBuffer->pBuffer != NULL );
-
- pIndex = pFixedBuffer->pBuffer;
/* The first byte in the CONNECT packet is the control packet type. */
- *pIndex = MQTT_PACKET_TYPE_CONNECT;
- pIndex++;
+ *pIndexLocal = MQTT_PACKET_TYPE_CONNECT;
+ pIndexLocal++;
/* The remaining length of the CONNECT packet is encoded starting from the
* second byte. The remaining length does not include the length of the fixed
* header or the encoding of the remaining length. */
- pIndex = encodeRemainingLength( pIndex, remainingLength );
+ pIndexLocal = encodeRemainingLength( pIndex, remainingLength );
/* The string "MQTT" is placed at the beginning of the CONNECT packet's variable
* header. This string is 4 bytes long. */
- pIndex = encodeString( pIndex, "MQTT", 4 );
+ pIndexLocal = encodeString( pIndex, "MQTT", 4 );
/* The MQTT protocol version is the second field of the variable header. */
- *pIndex = MQTT_VERSION_3_1_1;
- pIndex++;
+ *pIndexLocal = MQTT_VERSION_3_1_1;
+ pIndexLocal++;
/* Set the clean session flag if needed. */
if( pConnectInfo->cleanSession == true )
@@ -1549,13 +1616,36 @@ static void serializeConnectPacket( const MQTTConnectInfo_t * pConnectInfo,
}
}
- *pIndex = connectFlags;
- pIndex++;
+ *pIndexLocal = connectFlags;
+ pIndexLocal++;
/* Write the 2 bytes of the keep alive interval into the CONNECT packet. */
- *pIndex = UINT16_HIGH_BYTE( pConnectInfo->keepAliveSeconds );
- *( pIndex + 1 ) = UINT16_LOW_BYTE( pConnectInfo->keepAliveSeconds );
- pIndex += 2;
+ *pIndexLocal = UINT16_HIGH_BYTE( pConnectInfo->keepAliveSeconds );
+ *( pIndexLocal + 1 ) = UINT16_LOW_BYTE( pConnectInfo->keepAliveSeconds );
+ pIndexLocal += 2;
+
+ return pIndexLocal;
+}
+/*-----------------------------------------------------------*/
+
+static void serializeConnectPacket( const MQTTConnectInfo_t * pConnectInfo,
+ const MQTTPublishInfo_t * pWillInfo,
+ size_t remainingLength,
+ const MQTTFixedBuffer_t * pFixedBuffer )
+{
+ uint8_t * pIndex = NULL;
+
+ assert( pConnectInfo != NULL );
+ assert( pFixedBuffer != NULL );
+ assert( pFixedBuffer->pBuffer != NULL );
+
+ pIndex = pFixedBuffer->pBuffer;
+
+ /* Serialize the header. */
+ pIndex = MQTT_SerializeConnectFixedHeader( pIndex,
+ pConnectInfo,
+ pWillInfo,
+ remainingLength );
/* Write the client identifier into the CONNECT packet. */
pIndex = encodeString( pIndex,
@@ -1792,6 +1882,50 @@ MQTTStatus_t MQTT_GetSubscribePacketSize( const MQTTSubscribeInfo_t * pSubscript
/*-----------------------------------------------------------*/
+uint8_t * MQTT_SerializeSubscribeHeader( size_t remainingLength,
+ uint8_t * pIndex,
+ uint16_t packetId )
+{
+ uint8_t * pIterator = pIndex;
+
+ /* The first byte in SUBSCRIBE is the packet type. */
+ *pIterator = MQTT_PACKET_TYPE_SUBSCRIBE;
+ pIterator++;
+
+ /* Encode the "Remaining length" starting from the second byte. */
+ pIterator = encodeRemainingLength( pIterator, remainingLength );
+
+ /* Place the packet identifier into the SUBSCRIBE packet. */
+ *pIterator = UINT16_HIGH_BYTE( packetId );
+ *( pIterator + 1 ) = UINT16_LOW_BYTE( packetId );
+ pIterator += 2;
+
+ return pIterator;
+}
+
+/*-----------------------------------------------------------*/
+
+uint8_t * MQTT_SerializeUnsubscribeHeader( size_t remainingLength,
+ uint8_t * pIndex,
+ uint16_t packetId )
+{
+ uint8_t * pIterator = pIndex;
+
+ /* The first byte in UNSUBSCRIBE is the packet type. */
+ *pIterator = MQTT_PACKET_TYPE_UNSUBSCRIBE;
+ pIterator++;
+
+ /* Encode the "Remaining length" starting from the second byte. */
+ pIterator = encodeRemainingLength( pIterator, remainingLength );
+
+ /* Place the packet identifier into the SUBSCRIBE packet. */
+ *pIterator = UINT16_HIGH_BYTE( packetId );
+ *( pIterator + 1 ) = UINT16_LOW_BYTE( packetId );
+ pIterator += 2;
+
+ return pIterator;
+}
+
MQTTStatus_t MQTT_SerializeSubscribe( const MQTTSubscribeInfo_t * pSubscriptionList,
size_t subscriptionCount,
uint16_t packetId,
@@ -1813,17 +1947,9 @@ MQTTStatus_t MQTT_SerializeSubscribe( const MQTTSubscribeInfo_t * pSubscriptionL
{
pIndex = pFixedBuffer->pBuffer;
- /* The first byte in SUBSCRIBE is the packet type. */
- *pIndex = MQTT_PACKET_TYPE_SUBSCRIBE;
- pIndex++;
-
- /* Encode the "Remaining length" starting from the second byte. */
- pIndex = encodeRemainingLength( pIndex, remainingLength );
-
- /* Place the packet identifier into the SUBSCRIBE packet. */
- *pIndex = UINT16_HIGH_BYTE( packetId );
- *( pIndex + 1 ) = UINT16_LOW_BYTE( packetId );
- pIndex += 2;
+ pIndex = MQTT_SerializeSubscribeHeader( remainingLength,
+ pIndex,
+ packetId );
/* Serialize each subscription topic filter and QoS. */
for( i = 0; i < subscriptionCount; i++ )
@@ -1906,17 +2032,7 @@ MQTTStatus_t MQTT_SerializeUnsubscribe( const MQTTSubscribeInfo_t * pSubscriptio
/* Get the start of the buffer to the iterator variable. */
pIndex = pFixedBuffer->pBuffer;
- /* The first byte in UNSUBSCRIBE is the packet type. */
- *pIndex = MQTT_PACKET_TYPE_UNSUBSCRIBE;
- pIndex++;
-
- /* Encode the "Remaining length" starting from the second byte. */
- pIndex = encodeRemainingLength( pIndex, remainingLength );
-
- /* Place the packet identifier into the UNSUBSCRIBE packet. */
- *pIndex = UINT16_HIGH_BYTE( packetId );
- *( pIndex + 1 ) = UINT16_LOW_BYTE( packetId );
- pIndex += 2;
+ pIndex = MQTT_SerializeUnsubscribeHeader( remainingLength, pIndex, packetId );
/* Serialize each subscription topic filter. */
for( i = 0; i < subscriptionCount; i++ )
diff --git a/source/include/core_mqtt.h b/source/include/core_mqtt.h
index d13f75c0d..09b0e4c18 100644
--- a/source/include/core_mqtt.h
+++ b/source/include/core_mqtt.h
@@ -58,13 +58,19 @@
#define MQTT_LIBRARY_VERSION "v1.2.0"
/** @endcond */
+/**
+ * @ingroup mqtt_constants
+ * @brief Maximum number of vectors in subscribe and unsubscribe packet.
+ */
+#define MQTT_SUB_UNSUB_MAX_VECTORS ( 4U )
+
/**
* @ingroup mqtt_constants
* @brief Invalid packet identifier.
*
* Zero is an invalid packet identifier as per MQTT v3.1.1 spec.
*/
-#define MQTT_PACKET_ID_INVALID ( ( uint16_t ) 0U )
+#define MQTT_PACKET_ID_INVALID ( ( uint16_t ) 0U )
/* Structures defined in this file. */
struct MQTTPubAckInfo;
diff --git a/source/include/core_mqtt_serializer.h b/source/include/core_mqtt_serializer.h
index b62ae2853..de849eeea 100644
--- a/source/include/core_mqtt_serializer.h
+++ b/source/include/core_mqtt_serializer.h
@@ -744,6 +744,24 @@ MQTTStatus_t MQTT_SerializePublish( const MQTTPublishInfo_t * pPublishInfo,
const MQTTFixedBuffer_t * pFixedBuffer );
/* @[declare_mqtt_serializepublish] */
+/**
+ * @brief Serialize an MQTT PUBLISH packet header without the topic string in the
+ * given buffer. This function will add the topic string length to the provided
+ * buffer. This helps reduce an unnecessary copy of the topic string into the
+ * buffer.
+ *
+ * @param[in] pPublishInfo MQTT PUBLISH packet parameters.
+ * @param[in] remainingLength Remaining Length provided by #MQTT_GetPublishPacketSize.
+ * @param[out] pBuffer Buffer for packet serialization.
+ * @param[out] headerSize Size of the serialized MQTT PUBLISH header.
+ *
+ * @return #MQTTSuccess if the serialization is successful. Otherwise, #MQTTBadParameter.
+ */
+MQTTStatus_t MQTT_SerializePublishHeaderWithoutTopic( const MQTTPublishInfo_t * pPublishInfo,
+ size_t remainingLength,
+ uint8_t * pBuffer,
+ size_t * headerSize );
+
/**
* @brief Serialize an MQTT PUBLISH packet header in the given buffer.
*
@@ -1191,20 +1209,89 @@ MQTTStatus_t MQTT_GetIncomingPacketTypeAndLength( TransportRecv_t readFunc,
* #MQTTPacketInfo_t is not valid until this routine has been invoked.
*
* @param[in] pBuffer The buffer holding the raw data to be processed
- * @param[in] pIndex Pointer to the index within the buffer to marking the end of raw data
- * available.
- * @param[in] pIncomingPacket Structure used to hold the fields of the
+ * @param[in] pIndex Pointer to the index within the buffer to marking the end
+ * of raw data available.
+ * @param[out] pIncomingPacket Structure used to hold the fields of the
* incoming packet.
*
* @return #MQTTSuccess on successful extraction of type and length,
* #MQTTBadParameter if @p pIncomingPacket is invalid,
- * #MQTTRecvFailed on transport receive failure,
* #MQTTBadResponse if an invalid packet is read, and
* #MQTTNoDataAvailable if there is nothing to read.
*/
+ /* @[declare_mqtt_processincomingpackettypeandlength] */
MQTTStatus_t MQTT_ProcessIncomingPacketTypeAndLength( uint8_t * pBuffer,
size_t * pIndex,
MQTTPacketInfo_t * pIncomingPacket );
+/* @[declare_mqtt_processincomingpackettypeandlength] */
+
+/**
+ * @fn uint8_t * MQTT_SerializeConnectFixedHeader( uint8_t * pIndex, const MQTTConnectInfo_t * pConnectInfo, const MQTTPublishInfo_t * pWillInfo, size_t remainingLength );
+ * @brief Serialize the fixed part of the connect packet header.
+ *
+ * @param[out] pIndex Pointer to the buffer where the header is to
+ * be serialized.
+ * @param[in] pConnectInfo The connect information.
+ * @param[in] pWillInfo The last will and testament information.
+ * @param[in] remainingLength The remaining length of the packet to be
+ * serialized.
+ *
+ * @return A pointer to the end of the encoded string.
+ */
+
+/**
+ * @cond DOXYGEN_IGNORE
+ * Doxygen should ignore this definition, this function is private.
+ */
+uint8_t * MQTT_SerializeConnectFixedHeader( uint8_t * pIndex,
+ const MQTTConnectInfo_t * pConnectInfo,
+ const MQTTPublishInfo_t * pWillInfo,
+ size_t remainingLength );
+/** @endcond */
+
+/**
+ * @fn uint8_t * MQTT_SerializeSubscribeHeader( size_t remainingLength, uint8_t * pIndex, uint16_t packetId );
+ * @brief Serialize the fixed part of the subscribe packet header.
+ *
+ * @param[in] remainingLength The remaining length of the packet to be
+ * serialized.
+ * @param[in] pIndex Pointer to the buffer where the header is to
+ * be serialized.
+ * @param[in] packetId The packet ID to be serialized.
+ *
+ * @return A pointer to the end of the encoded string.
+ */
+
+/**
+ * @cond DOXYGEN_IGNORE
+ * Doxygen should ignore this definition, this function is private.
+ */
+uint8_t * MQTT_SerializeSubscribeHeader( size_t remainingLength,
+ uint8_t * pIndex,
+ uint16_t packetId );
+/** @endcond */
+
+/**
+ * @fn uint8_t * MQTT_SerializeUnsubscribeHeader( size_t remainingLength, uint8_t * pIndex, uint16_t packetId );
+ * @brief Serialize the fixed part of the unsubscribe packet header.
+ *
+ * @param[in] remainingLength The remaining length of the packet to be
+ * serialized.
+ * @param[in] pIndex Pointer to the buffer where the header is to
+ * be serialized.
+ * @param[in] packetId The packet ID to be serialized.
+ *
+ * @return A pointer to the end of the encoded string.
+ */
+
+/**
+ * @cond DOXYGEN_IGNORE
+ * Doxygen should ignore this definition, this function is private.
+ */
+uint8_t * MQTT_SerializeUnsubscribeHeader( size_t remainingLength,
+ uint8_t * pIndex,
+ uint16_t packetId );
+/** @endcond */
/* *INDENT-OFF* */
#ifdef __cplusplus
diff --git a/source/interface/transport_interface.h b/source/interface/transport_interface.h
index e476945c6..d97495bde 100644
--- a/source/interface/transport_interface.h
+++ b/source/interface/transport_interface.h
@@ -243,6 +243,54 @@ typedef int32_t ( * TransportSend_t )( NetworkContext_t * pNetworkContext,
size_t bytesToSend );
/* @[define_transportsend] */
+/**
+ * @brief Transport vector structure for sending multiple messages.
+ */
+typedef struct TransportOutVector
+{
+ /**
+ * @brief Base address of data.
+ */
+ const void * iov_base;
+
+ /**
+ * @brief Length of data in buffer.
+ */
+ size_t iov_len;
+} TransportOutVector_t;
+
+/**
+ * @transportcallback
+ * @brief Transport interface function for "vectored" / scatter-gather based
+ * writes. This function is expected to iterate over the list of vectors pIoVec
+ * having ioVecCount entries containing portions of one MQTT message at a maximum.
+ * If the proper functionality is available, then the data in the list should be
+ * copied to the underlying TCP buffer before flushing the buffer. Implementing it
+ * in this fashion will lead to sending of fewer TCP packets for all the values
+ * in the list.
+ *
+ * @note If the proper write functionality is not present for a given device/IP-stack,
+ * then there is no strict requirement to implement write. Only the send and recv
+ * interfaces must be defined for the application to work properly.
+ *
+ * @param[in] pNetworkContext Implementation-defined network context.
+ * @param[in] pIoVec An array of TransportIoVector_t structs.
+ * @param[in] ioVecCount Number of TransportIoVector_t in pIoVec.
+ *
+ * @return The number of bytes written or a negative value to indicate error.
+ *
+ * @note If no data is written to the buffer due to the buffer being full this MUST
+ * return zero as the return value.
+ * A zero return value SHOULD represent that the write operation can be retried
+ * by calling the API function. Zero MUST NOT be returned if a network disconnection
+ * has occurred.
+ */
+/* @[define_transportwritev] */
+typedef int32_t ( * TransportWritev_t )( NetworkContext_t * pNetworkContext,
+ TransportOutVector_t * pIoVec,
+ size_t ioVecCount );
+/* @[define_transportwritev] */
+
/**
* @transportstruct
* @brief The transport layer interface.
@@ -250,8 +298,9 @@ typedef int32_t ( * TransportSend_t )( NetworkContext_t * pNetworkContext,
/* @[define_transportinterface] */
typedef struct TransportInterface
{
- TransportRecv_t recv; /**< Transport receive interface. */
- TransportSend_t send; /**< Transport send interface. */
+ TransportRecv_t recv; /**< Transport receive function pointer. */
+ TransportSend_t send; /**< Transport send function pointer. */
+ TransportWritev_t writev; /**< Transport writev function pointer. */
NetworkContext_t * pNetworkContext; /**< Implementation-defined network context. */
} TransportInterface_t;
/* @[define_transportinterface] */
diff --git a/test/cbmc/proofs/MQTT_Connect/MQTT_Connect_harness.c b/test/cbmc/proofs/MQTT_Connect/MQTT_Connect_harness.c
index 12873a787..c4bcd3493 100644
--- a/test/cbmc/proofs/MQTT_Connect/MQTT_Connect_harness.c
+++ b/test/cbmc/proofs/MQTT_Connect/MQTT_Connect_harness.c
@@ -33,17 +33,45 @@ void harness()
MQTTConnectInfo_t * pConnectInfo;
MQTTPublishInfo_t * pWillInfo;
uint32_t timeoutMs;
+ size_t totalMessageLength = 0U;
bool * pSessionPresent;
pContext = allocateMqttContext( NULL );
__CPROVER_assume( isValidMqttContext( pContext ) );
+ __CPROVER_assume( pContext != NULL );
+ __CPROVER_assume( pContext->networkBuffer.pBuffer != NULL );
pConnectInfo = allocateMqttConnectInfo( NULL );
__CPROVER_assume( isValidMqttConnectInfo( pConnectInfo ) );
+ if( pConnectInfo != NULL )
+ {
+ /* 128^4 is the length imposed by the MQTT spec. */
+ __CPROVER_assume( pConnectInfo->passwordLength < 268435456 );
+ __CPROVER_assume( pConnectInfo->userNameLength < 268435456 );
+ __CPROVER_assume( pConnectInfo->clientIdentifierLength < 268435456 );
+
+ totalMessageLength += pConnectInfo->passwordLength;
+ totalMessageLength += pConnectInfo->userNameLength;
+ totalMessageLength += pConnectInfo->clientIdentifierLength;
+ }
+
pWillInfo = allocateMqttPublishInfo( NULL );
__CPROVER_assume( isValidMqttPublishInfo( pWillInfo ) );
+ if( pWillInfo != NULL )
+ {
+ /* 128^4 is the length imposed by the MQTT spec. */
+ __CPROVER_assume( pWillInfo->topicNameLength < 268435456 );
+ __CPROVER_assume( pWillInfo->payloadLength < 268435456 );
+
+ totalMessageLength += pWillInfo->topicNameLength;
+ totalMessageLength += pWillInfo->payloadLength;
+ }
+
+ /* 128^4 is the length imposed by the MQTT spec. */
+ __CPROVER_assume( totalMessageLength <= 268435456 );
+
pSessionPresent = malloc( sizeof( bool ) );
/* The MQTT_RECEIVE_TIMEOUT is used here to control the number of loops
diff --git a/test/cbmc/proofs/MQTT_Connect/Makefile b/test/cbmc/proofs/MQTT_Connect/Makefile
index 66e0df047..13ae71f3e 100644
--- a/test/cbmc/proofs/MQTT_Connect/Makefile
+++ b/test/cbmc/proofs/MQTT_Connect/Makefile
@@ -40,6 +40,7 @@ MAX_NETWORK_RECV_TRIES=4
# information on these defines.
MQTT_STATE_ARRAY_MAX_COUNT=11
MQTT_MAX_CONNACK_RECEIVE_RETRY_COUNT=3
+CONNECT_PACKET_VECTORS = 12
DEFINES += -DMQTT_RECEIVE_TIMEOUT=$(MQTT_RECEIVE_TIMEOUT)
DEFINES += -DMAX_NETWORK_SEND_TRIES=$(MAX_NETWORK_SEND_TRIES)
DEFINES += -DMAX_NETWORK_RECV_TRIES=$(MAX_NETWORK_RECV_TRIES)
@@ -52,6 +53,8 @@ REMOVE_FUNCTION_BODY += MQTT_ProcessLoop
REMOVE_FUNCTION_BODY += MQTT_ReceiveLoop
REMOVE_FUNCTION_BODY += __CPROVER_file_local_core_mqtt_c_handleIncomingPublish
REMOVE_FUNCTION_BODY += __CPROVER_file_local_core_mqtt_c_handleKeepAlive
+#REMOVE_FUNCTION_BODY += __CPROVER_file_local_core_mqtt_c_sendMessageVector
+#REMOVE_FUNCTION_BODY += __CPROVER_file_local_core_mqtt_c_sendBuffer
# The loop below is unwound once more than the timeout. The loop below uses
# the user passed in timeout to break the loop.
@@ -60,12 +63,12 @@ UNWINDSET += __CPROVER_file_local_core_mqtt_c_recvExact.0:$(MAX_NETWORK_RECV_TRI
# If the user passed in timeout is zero, then the loop will run until the
# MQTT_MAX_CONNACK_RECEIVE_RETRY_COUNT is reached.
UNWINDSET += __CPROVER_file_local_core_mqtt_c_receiveConnack.0:$(MQTT_MAX_CONNACK_RECEIVE_RETRY_COUNT)
-# Unlike recvExact, sendPacket is not bounded by the timeout. The loop in
-# sendPacket will continue until all the bytes are sent or a network error
+# Unlike recvExact, sendBuffer is not bounded by the timeout. The loop in
+# sendBuffer will continue until all the bytes are sent or a network error
# occurs. Please see NetworkInterfaceReceiveStub in
# libraries\standard\mqtt\cbmc\stubs\network_interface_stubs.c for more
# information.
-UNWINDSET += __CPROVER_file_local_core_mqtt_c_sendPacket.0:$(MAX_NETWORK_SEND_TRIES)
+UNWINDSET += __CPROVER_file_local_core_mqtt_c_sendBuffer.0:$(MAX_NETWORK_SEND_TRIES)
# The loops are unwound 5 times because these functions divides a size_t
# variable by 128 until it reaches zero to stop the loop.
# log128(SIZE_MAX) = 4.571...
@@ -75,6 +78,9 @@ UNWINDSET += __CPROVER_file_local_core_mqtt_serializer_c_getRemainingLength.0:5
# acknowledgements plus one. This value is set in
# test/cbmc/include/core_mqtt_config.h.
UNWINDSET += __CPROVER_file_local_core_mqtt_state_c_stateSelect.0:$(MQTT_STATE_ARRAY_MAX_COUNT)
+UNWINDSET += __CPROVER_file_local_core_mqtt_c_sendMessageVector.0:${CONNECT_PACKET_VECTORS}
+UNWINDSET += __CPROVER_file_local_core_mqtt_c_sendMessageVector.1:${CONNECT_PACKET_VECTORS}
+UNWINDSET += __CPROVER_file_local_core_mqtt_c_sendMessageVector.2:${CONNECT_PACKET_VECTORS}
PROOF_SOURCES += $(PROOFDIR)/$(HARNESS_FILE).c
PROOF_SOURCES += $(SRCDIR)/test/cbmc/sources/mqtt_cbmc_state.c
diff --git a/test/cbmc/proofs/MQTT_Disconnect/Makefile b/test/cbmc/proofs/MQTT_Disconnect/Makefile
index a63b90b09..351ad5b60 100644
--- a/test/cbmc/proofs/MQTT_Disconnect/Makefile
+++ b/test/cbmc/proofs/MQTT_Disconnect/Makefile
@@ -30,12 +30,12 @@ DEFINES += -DMAX_NETWORK_SEND_TRIES=$(MAX_NETWORK_SEND_TRIES)
INCLUDES +=
REMOVE_FUNCTION_BODY +=
-# Unlike recvExact, sendPacket is not bounded by the timeout. The loop in
-# sendPacket will continue until all the bytes are sent or a network error
+# Unlike recvExact, sendBuffer is not bounded by the timeout. The loop in
+# sendBuffer will continue until all the bytes are sent or a network error
# occurs. Please see NetworkInterfaceReceiveStub in
# libraries\standard\mqtt\cbmc\stubs\network_interface_stubs.c for more
# information.
-UNWINDSET += __CPROVER_file_local_core_mqtt_c_sendPacket.0:$(MAX_NETWORK_SEND_TRIES)
+UNWINDSET += __CPROVER_file_local_core_mqtt_c_sendBuffer.0:$(MAX_NETWORK_SEND_TRIES)
PROOF_SOURCES += $(PROOFDIR)/$(HARNESS_FILE).c
PROOF_SOURCES += $(SRCDIR)/test/cbmc/sources/mqtt_cbmc_state.c
diff --git a/test/cbmc/proofs/MQTT_Ping/Makefile b/test/cbmc/proofs/MQTT_Ping/Makefile
index edcee27bf..ea36f2c74 100644
--- a/test/cbmc/proofs/MQTT_Ping/Makefile
+++ b/test/cbmc/proofs/MQTT_Ping/Makefile
@@ -29,12 +29,12 @@ MAX_NETWORK_SEND_TRIES=3
DEFINES += -DMAX_NETWORK_SEND_TRIES=$(MAX_NETWORK_SEND_TRIES)
INCLUDES +=
-# Unlike recvExact, sendPacket is not bounded by the timeout. The loop in
-# sendPacket will continue until all the bytes are sent or a network error
+# Unlike recvExact, sendBuffer is not bounded by the timeout. The loop in
+# sendBuffer will continue until all the bytes are sent or a network error
# occurs. Please see NetworkInterfaceReceiveStub in
# test/cbmc/stubs/network_interface_stubs.c for more
# information.
-UNWINDSET += __CPROVER_file_local_core_mqtt_c_sendPacket.0:$(MAX_NETWORK_SEND_TRIES)
+UNWINDSET += __CPROVER_file_local_core_mqtt_c_sendBuffer.0:$(MAX_NETWORK_SEND_TRIES)
PROOF_SOURCES += $(PROOFDIR)/$(HARNESS_FILE).c
PROOF_SOURCES += $(SRCDIR)/test/cbmc/sources/mqtt_cbmc_state.c
diff --git a/test/cbmc/proofs/MQTT_ProcessLoop/Makefile b/test/cbmc/proofs/MQTT_ProcessLoop/Makefile
index bfb14f7e4..b1f72d17d 100644
--- a/test/cbmc/proofs/MQTT_ProcessLoop/Makefile
+++ b/test/cbmc/proofs/MQTT_ProcessLoop/Makefile
@@ -52,12 +52,12 @@ REMOVE_FUNCTION_BODY += memmove # Use stub
UNWINDSET += __CPROVER_file_local_core_mqtt_c_discardStoredPacket.0:$(MAX_NETWORK_RECV_TRIES)
UNWINDSET += __CPROVER_file_local_core_mqtt_c_recvExact.0:$(MAX_NETWORK_RECV_TRIES)
-# Unlike recvExact, sendPacket is not bounded by the timeout. The loop in
-# sendPacket will continue until all the bytes are sent or a network error
+# Unlike recvExact, sendBuffer is not bounded by the timeout. The loop in
+# sendBuffer will continue until all the bytes are sent or a network error
# occurs. Please see NetworkInterfaceReceiveStub in
# libraries\standard\mqtt\cbmc\stubs\network_interface_stubs.c for more
# information.
-UNWINDSET += __CPROVER_file_local_core_mqtt_c_sendPacket.0:$(MAX_NETWORK_SEND_TRIES)
+UNWINDSET += __CPROVER_file_local_core_mqtt_c_sendBuffer.0:$(MAX_NETWORK_SEND_TRIES)
# The getRemainingLength loop is unwound 5 times because getRemainingLength()
# divides a size_t variable by 128 until it reaches zero to stop the loop.
# log128(SIZE_MAX) = 4.571...
diff --git a/test/cbmc/proofs/MQTT_Publish/Makefile b/test/cbmc/proofs/MQTT_Publish/Makefile
index 64486a5e1..ddbf5042c 100644
--- a/test/cbmc/proofs/MQTT_Publish/Makefile
+++ b/test/cbmc/proofs/MQTT_Publish/Makefile
@@ -29,17 +29,19 @@ MAX_NETWORK_SEND_TRIES=3
# Please see test/cbmc/include/core_mqtt_config.h for more
# information.
MQTT_STATE_ARRAY_MAX_COUNT=11
+PUBLISH_PACKET_VECTORS = 5
+
DEFINES += -DMAX_NETWORK_SEND_TRIES=$(MAX_NETWORK_SEND_TRIES)
INCLUDES +=
REMOVE_FUNCTION_BODY +=
REMOVE_FUNCTION_BODY +=
-# Unlike recvExact, sendPacket is not bounded by the timeout. The loop in
-# sendPacket will continue until all the bytes are sent or a network error
+# Unlike recvExact, sendBuffer is not bounded by the timeout. The loop in
+# sendBuffer will continue until all the bytes are sent or a network error
# occurs. Please see NetworkInterfaceReceiveStub in
# libraries\standard\mqtt\cbmc\stubs\network_interface_stubs.c for more
# information.
-UNWINDSET += __CPROVER_file_local_core_mqtt_c_sendPacket.0:$(MAX_NETWORK_SEND_TRIES)
+UNWINDSET += __CPROVER_file_local_core_mqtt_c_sendBuffer.0:$(MAX_NETWORK_SEND_TRIES)
# These loops will run for the maximum number of publishes pending acknowledgement.
# This is set in test/cbmc/include/core_mqtt_config.h.
UNWINDSET += __CPROVER_file_local_core_mqtt_state_c_addRecord.0:$(MQTT_STATE_ARRAY_MAX_COUNT)
@@ -48,6 +50,9 @@ UNWINDSET += __CPROVER_file_local_core_mqtt_state_c_findInRecord.0:$(MQTT_STATE_
# divides a size_t variable by 128 until it reaches zero to stop the loop.
# log128(SIZE_MAX) = 4.571...
UNWINDSET += __CPROVER_file_local_core_mqtt_serializer_c_encodeRemainingLength.0:5
+UNWINDSET += __CPROVER_file_local_core_mqtt_c_sendMessageVector.0:${PUBLISH_PACKET_VECTORS}
+UNWINDSET += __CPROVER_file_local_core_mqtt_c_sendMessageVector.1:${PUBLISH_PACKET_VECTORS}
+UNWINDSET += __CPROVER_file_local_core_mqtt_c_sendMessageVector.2:${PUBLISH_PACKET_VECTORS}
PROOF_SOURCES += $(PROOFDIR)/$(HARNESS_FILE).c
PROOF_SOURCES += $(SRCDIR)/test/cbmc/sources/mqtt_cbmc_state.c
diff --git a/test/cbmc/proofs/MQTT_ReceiveLoop/Makefile b/test/cbmc/proofs/MQTT_ReceiveLoop/Makefile
index d8778405d..0a161df95 100644
--- a/test/cbmc/proofs/MQTT_ReceiveLoop/Makefile
+++ b/test/cbmc/proofs/MQTT_ReceiveLoop/Makefile
@@ -34,12 +34,12 @@ REMOVE_FUNCTION_BODY += memmove # Use stub
# The loops below are unwound once more than the exclusive timeout bound.
UNWINDSET += __CPROVER_file_local_core_mqtt_c_discardStoredPacket.0:$(MAX_NETWORK_RECV_TRIES)
UNWINDSET += __CPROVER_file_local_core_mqtt_c_recvExact.0:$(MAX_NETWORK_RECV_TRIES)
-# Unlike recvExact, sendPacket is not bounded by the timeout. The loop in
-# sendPacket will continue until all the bytes are sent or a network error
+# Unlike recvExact, sendBuffer is not bounded by the timeout. The loop in
+# sendBuffer will continue until all the bytes are sent or a network error
# occurs. Please see NetworkInterfaceReceiveStub in
# libraries\standard\mqtt\cbmc\stubs\network_interface_stubs.c for more
# information.
-UNWINDSET += __CPROVER_file_local_core_mqtt_c_sendPacket.0:$(MAX_NETWORK_SEND_TRIES)
+UNWINDSET += __CPROVER_file_local_core_mqtt_c_sendBuffer.0:$(MAX_NETWORK_SEND_TRIES)
# The getRemainingLength loop is unwound 5 times because getRemainingLength()
# divides a size_t variable by 128 until it reaches zero to stop the loop.
# log128(SIZE_MAX) = 4.571...
diff --git a/test/cbmc/proofs/MQTT_Subscribe/MQTT_Subscribe_harness.c b/test/cbmc/proofs/MQTT_Subscribe/MQTT_Subscribe_harness.c
index d6bf4a374..d3a0b2323 100644
--- a/test/cbmc/proofs/MQTT_Subscribe/MQTT_Subscribe_harness.c
+++ b/test/cbmc/proofs/MQTT_Subscribe/MQTT_Subscribe_harness.c
@@ -41,8 +41,8 @@ void harness()
* mqtt_cbmc_state.c for more information. */
__CPROVER_assume( subscriptionCount < SUBSCRIPTION_COUNT_MAX );
- pSubscriptionList = allocateMqttSubscriptionList( NULL, subscriptionCount );
- __CPROVER_assume( isValidMqttSubscriptionList( pSubscriptionList, subscriptionCount ) );
+ pSubscriptionList = allocateMqttSubscriptionList( NULL, 1U );
+ __CPROVER_assume( isValidMqttSubscriptionList( pSubscriptionList, 1U ) );
MQTT_Subscribe( pContext, pSubscriptionList, subscriptionCount, packetId );
}
diff --git a/test/cbmc/proofs/MQTT_Subscribe/Makefile b/test/cbmc/proofs/MQTT_Subscribe/Makefile
index c6d2f9140..55b7afb4a 100644
--- a/test/cbmc/proofs/MQTT_Subscribe/Makefile
+++ b/test/cbmc/proofs/MQTT_Subscribe/Makefile
@@ -30,26 +30,30 @@ MAX_NETWORK_SEND_TRIES=3
# mqtt_cbmc_state.c for more information on this bound. This is set to 2
# currently to have the proof run quickly.
SUBSCRIPTION_COUNT_MAX=2
+SUBSCRIBE_PACKET_VECTORS = 5
DEFINES += -DMAX_NETWORK_SEND_TRIES=$(MAX_NETWORK_SEND_TRIES)
DEFINES += -DSUBSCRIPTION_COUNT_MAX=$(SUBSCRIPTION_COUNT_MAX)
INCLUDES +=
REMOVE_FUNCTION_BODY +=
-# Unlike recvExact, sendPacket is not bounded by the timeout. The loop in
-# sendPacket will continue until all the bytes are sent or a network error
+# Unlike recvExact, sendBuffer is not bounded by the timeout. The loop in
+# sendBuffer will continue until all the bytes are sent or a network error
# occurs. Please see NetworkInterfaceReceiveStub in
# libraries\standard\mqtt\cbmc\stubs\network_interface_stubs.c for more
# information.
-UNWINDSET += __CPROVER_file_local_core_mqtt_c_sendPacket.0:$(MAX_NETWORK_SEND_TRIES)
+UNWINDSET += __CPROVER_file_local_core_mqtt_c_sendBuffer.0:$(MAX_NETWORK_SEND_TRIES)
UNWINDSET += allocateMqttSubscriptionList.0:$(SUBSCRIPTION_COUNT_MAX)
-UNWINDSET += isValidMqttSubscriptionList.0:$(SUBSCRIPTION_COUNT_MAX)
UNWINDSET += __CPROVER_file_local_core_mqtt_serializer_c_calculateSubscriptionPacketSize.0:$(SUBSCRIPTION_COUNT_MAX)
-UNWINDSET += MQTT_SerializeSubscribe.0:$(SUBSCRIPTION_COUNT_MAX)
+UNWINDSET += __CPROVER_file_local_core_mqtt_c_sendMessageVector.0:${SUBSCRIBE_PACKET_VECTORS}
+UNWINDSET += __CPROVER_file_local_core_mqtt_c_sendMessageVector.1:${SUBSCRIBE_PACKET_VECTORS}
+UNWINDSET += __CPROVER_file_local_core_mqtt_c_sendMessageVector.2:${SUBSCRIBE_PACKET_VECTORS}
# The encodeRemainingLength loop is unwound 5 times because encodeRemainingLength()
# divides a size_t variable by 128 until it reaches zero to stop the loop.
# log128(SIZE_MAX) = 4.571...
UNWINDSET += __CPROVER_file_local_core_mqtt_serializer_c_encodeRemainingLength.0:5
+UNWINDSET += __CPROVER_file_local_core_mqtt_c_sendSubscribeWithoutCopy.0:$(MAX_NETWORK_SEND_TRIES)
+UNWINDSET += __CPROVER_file_local_core_mqtt_c_sendSubscribeWithoutCopy.1:$(MAX_NETWORK_SEND_TRIES)
PROOF_SOURCES += $(PROOFDIR)/$(HARNESS_FILE).c
PROOF_SOURCES += $(SRCDIR)/test/cbmc/sources/mqtt_cbmc_state.c
diff --git a/test/cbmc/proofs/MQTT_Unsubscribe/MQTT_Unsubscribe_harness.c b/test/cbmc/proofs/MQTT_Unsubscribe/MQTT_Unsubscribe_harness.c
index dc595df92..6a7f5671c 100644
--- a/test/cbmc/proofs/MQTT_Unsubscribe/MQTT_Unsubscribe_harness.c
+++ b/test/cbmc/proofs/MQTT_Unsubscribe/MQTT_Unsubscribe_harness.c
@@ -41,8 +41,8 @@ void harness()
* mqtt_cbmc_state.c for more information. */
__CPROVER_assume( subscriptionCount < SUBSCRIPTION_COUNT_MAX );
- pSubscriptionList = allocateMqttSubscriptionList( NULL, subscriptionCount );
- __CPROVER_assume( isValidMqttSubscriptionList( pSubscriptionList, subscriptionCount ) );
+ pSubscriptionList = allocateMqttSubscriptionList( NULL, 1U );
+ __CPROVER_assume( isValidMqttSubscriptionList( pSubscriptionList, 1U ) );
MQTT_Unsubscribe( pContext, pSubscriptionList, subscriptionCount, packetId );
}
diff --git a/test/cbmc/proofs/MQTT_Unsubscribe/Makefile b/test/cbmc/proofs/MQTT_Unsubscribe/Makefile
index 468468c20..5a3613fde 100644
--- a/test/cbmc/proofs/MQTT_Unsubscribe/Makefile
+++ b/test/cbmc/proofs/MQTT_Unsubscribe/Makefile
@@ -30,26 +30,31 @@ MAX_NETWORK_SEND_TRIES=3
# mqtt_cbmc_state.c for more information on this bound. This is set to 2
# currently to have the proof run quickly.
SUBSCRIPTION_COUNT_MAX=2
+UNSUBSCRIBE_PACKET_VECTORS = 5
DEFINES += -DMAX_NETWORK_SEND_TRIES=$(MAX_NETWORK_SEND_TRIES)
DEFINES += -DSUBSCRIPTION_COUNT_MAX=$(SUBSCRIPTION_COUNT_MAX)
INCLUDES +=
REMOVE_FUNCTION_BODY +=
-# Unlike recvExact, sendPacket is not bounded by the timeout. The loop in
-# sendPacket will continue until all the bytes are sent or a network error
+# Unlike recvExact, sendBuffer is not bounded by the timeout. The loop in
+# sendBuffer will continue until all the bytes are sent or a network error
# occurs. Please see NetworkInterfaceReceiveStub in
# libraries\standard\mqtt\cbmc\stubs\network_interface_stubs.c for more
# information.
-UNWINDSET += __CPROVER_file_local_core_mqtt_c_sendPacket.0:$(MAX_NETWORK_SEND_TRIES)
+UNWINDSET += __CPROVER_file_local_core_mqtt_c_sendBuffer.0:$(MAX_NETWORK_SEND_TRIES)
UNWINDSET += allocateMqttSubscriptionList.0:$(SUBSCRIPTION_COUNT_MAX)
-UNWINDSET += isValidMqttSubscriptionList.0:$(SUBSCRIPTION_COUNT_MAX)
UNWINDSET += __CPROVER_file_local_core_mqtt_serializer_c_calculateSubscriptionPacketSize.0:$(SUBSCRIPTION_COUNT_MAX)
UNWINDSET += MQTT_SerializeUnsubscribe.0:$(SUBSCRIPTION_COUNT_MAX)
+UNWINDSET += __CPROVER_file_local_core_mqtt_c_sendMessageVector.0:${UNSUBSCRIBE_PACKET_VECTORS}
+UNWINDSET += __CPROVER_file_local_core_mqtt_c_sendMessageVector.1:${UNSUBSCRIBE_PACKET_VECTORS}
+UNWINDSET += __CPROVER_file_local_core_mqtt_c_sendMessageVector.2:${UNSUBSCRIBE_PACKET_VECTORS}
# The encodeRemainingLength loop is unwound 5 times because encodeRemainingLength()
# divides a size_t variable by 128 until it reaches zero to stop the loop.
# log128(SIZE_MAX) = 4.571...
UNWINDSET += __CPROVER_file_local_core_mqtt_serializer_c_encodeRemainingLength.0:5
+UNWINDSET += __CPROVER_file_local_core_mqtt_c_sendUnsubscribeWithoutCopy.0:$(MAX_NETWORK_SEND_TRIES)
+UNWINDSET += __CPROVER_file_local_core_mqtt_c_sendUnsubscribeWithoutCopy.1:$(MAX_NETWORK_SEND_TRIES)
PROOF_SOURCES += $(PROOFDIR)/$(HARNESS_FILE).c
PROOF_SOURCES += $(SRCDIR)/test/cbmc/sources/mqtt_cbmc_state.c
diff --git a/test/cbmc/sources/mqtt_cbmc_state.c b/test/cbmc/sources/mqtt_cbmc_state.c
index 026b363b2..3bb2676ad 100644
--- a/test/cbmc/sources/mqtt_cbmc_state.c
+++ b/test/cbmc/sources/mqtt_cbmc_state.c
@@ -199,6 +199,7 @@ MQTTContext_t * allocateMqttContext( MQTTContext_t * pContext )
* function in core_mqtt.h. */
pTransportInterface->recv = NetworkInterfaceReceiveStub;
pTransportInterface->send = NetworkInterfaceSendStub;
+ pTransportInterface->writev = NULL;
}
pNetworkBuffer = allocateMqttFixedBuffer( NULL );