From 12a0facff6a8215d73b9a297d4ac44d52ccb5ed8 Mon Sep 17 00:00:00 2001 From: Aniruddha Kanhere <60444055+AniruddhaKanhere@users.noreply.github.com> Date: Thu, 18 Aug 2022 13:32:30 -0700 Subject: [PATCH 01/35] Make publish use internal buffers --- source/core_mqtt.c | 148 ++++++++++++++++---------- source/core_mqtt_serializer.c | 73 +++++++++++++ source/include/core_mqtt_serializer.h | 28 +++++ 3 files changed, 190 insertions(+), 59 deletions(-) diff --git a/source/core_mqtt.c b/source/core_mqtt.c index 4226a7372..981e9f31e 100644 --- a/source/core_mqtt.c +++ b/source/core_mqtt.c @@ -232,20 +232,6 @@ static MQTTStatus_t validateSubscribeUnsubscribeParams( const MQTTContext_t * pC size_t subscriptionCount, uint16_t packetId ); -/** - * @brief Send serialized publish packet using transport send. - * - * @brief param[in] pContext Initialized MQTT context. - * @brief param[in] pPublishInfo MQTT PUBLISH packet parameters. - * @brief param[in] headerSize Header size of the PUBLISH packet. - * - * @return #MQTTSendFailed if transport write failed; - * #MQTTSuccess otherwise. - */ -static MQTTStatus_t sendPublish( MQTTContext_t * pContext, - const MQTTPublishInfo_t * pPublishInfo, - size_t headerSize ); - /** * @brief Receives a CONNACK MQTT packet. * @@ -280,6 +266,26 @@ static MQTTStatus_t receiveConnack( const MQTTContext_t * pContext, static MQTTStatus_t handleSessionResumption( MQTTContext_t * pContext, bool sessionPresent ); + +/** + * @brief Send the publish packet without copying the topic string and payload in + * the buffer. + * + * @brief param[in] pContext Initialized MQTT context. + * @brief param[in] pPublishInfo MQTT PUBLISH packet parameters. + * @brief param[in] pMqttHeader the serialized MQTT header with the header byte; + * the encoded length of the packet; and the encoded length of the topic string. + * @brief param[in] headerSize Size of the serialized PUBLISH header. + * @brief param[in] packetId Packet Id of the publish packet. + * + * @return #MQTTSendFailed if transport send during resend failed; + * #MQTTSuccess otherwise. + */ +static MQTTStatus_t sendPublishWithoutCopy( MQTTContext_t * pContext, + const MQTTPublishInfo_t * pPublishInfo, + const uint8_t * pMqttHeader, + size_t headerSize, + uint16_t packetId ); /** * @brief Serializes a PUBLISH message. * @@ -1412,56 +1418,69 @@ static MQTTStatus_t validateSubscribeUnsubscribeParams( const MQTTContext_t * pC /*-----------------------------------------------------------*/ -static MQTTStatus_t sendPublish( MQTTContext_t * pContext, - const MQTTPublishInfo_t * pPublishInfo, - size_t headerSize ) +static MQTTStatus_t sendPublishWithoutCopy( MQTTContext_t * pContext, + const MQTTPublishInfo_t * pPublishInfo, + const uint8_t * pMqttHeader, + size_t headerSize, + uint16_t packetId ) { MQTTStatus_t status = MQTTSuccess; - int32_t bytesSent = 0; - - assert( pContext != NULL ); - assert( pPublishInfo != NULL ); - assert( headerSize > 0 ); - assert( pContext->networkBuffer.pBuffer != NULL ); - assert( !( pPublishInfo->payloadLength > 0 ) || ( pPublishInfo->pPayload != NULL ) ); + int32_t bytesSent; + uint8_t serializedPacketID[ 2 ]; + const size_t packetIdLength = 2UL; - /* Send header first. */ + /* Send the serialized publish packet header over network. */ bytesSent = sendPacket( pContext, - pContext->networkBuffer.pBuffer, + pMqttHeader, headerSize ); - if( bytesSent < ( int32_t ) headerSize ) + if( bytesSent != ( int32_t ) headerSize ) { - LogError( ( "Transport send failed for PUBLISH header." ) ); status = MQTTSendFailed; } - else + + if( status == MQTTSuccess ) { - LogDebug( ( "Sent %ld bytes of PUBLISH header.", - ( long int ) bytesSent ) ); + /* Send the topic string over the network. */ + bytesSent = sendPacket( pContext, + ( const uint8_t * ) pPublishInfo->pTopicName, + pPublishInfo->topicNameLength ); - /* Send Payload if there is one to send. It is valid for a PUBLISH - * Packet to contain a zero length payload.*/ - if( pPublishInfo->payloadLength > 0U ) + if( bytesSent != ( ( int32_t ) pPublishInfo->topicNameLength ) ) { - bytesSent = sendPacket( pContext, - pPublishInfo->pPayload, - pPublishInfo->payloadLength ); + status = MQTTSendFailed; + } + } - if( bytesSent < ( int32_t ) pPublishInfo->payloadLength ) - { - LogError( ( "Transport send failed for PUBLISH payload." ) ); - status = MQTTSendFailed; - } - else - { - LogDebug( ( "Sent %ld bytes of PUBLISH payload.", - ( long int ) bytesSent ) ); - } + if( ( status == MQTTSuccess ) && ( pPublishInfo->qos > MQTTQoS0 ) ) + { + /* Encode and send the packet ID. */ + serializedPacketID[ 0 ] = ( ( uint8_t ) ( ( packetId ) >> 8 ) ); + serializedPacketID[ 1 ] = ( ( uint8_t ) ( ( packetId ) & 0x00ffU ) ); + + /* Sends the topic string over the network. */ + bytesSent = sendPacket( pContext, + serializedPacketID, + packetIdLength ); + + if( bytesSent != ( int32_t ) packetIdLength ) + { + status = MQTTSendFailed; } - else + } + + if( ( status == MQTTSuccess ) && + ( pPublishInfo->payloadLength > 0U ) ) + { + /* Send Payload if there is one to send. It is valid for a PUBLISH + * Packet to contain a zero length payload.*/ + bytesSent = sendPacket( pContext, + pPublishInfo->pPayload, + pPublishInfo->payloadLength ); + + if( bytesSent != ( ( int32_t ) pPublishInfo->payloadLength ) ) { - LogDebug( ( "PUBLISH payload was not sent. Payload length was zero." ) ); + status = MQTTSendFailed; } } @@ -1929,19 +1948,29 @@ MQTTStatus_t MQTT_Publish( MQTTContext_t * pContext, const MQTTPublishInfo_t * pPublishInfo, uint16_t packetId ) { - size_t headerSize = 0UL; + size_t headerSize = 0UL, remainingLength = 0UL, packetSize = 0UL; MQTTPublishState_t publishStatus = MQTTStateNull; + /* 1 header byte + 4 bytes (maximum) required for encoding the length + + * 2 bytes for topic string. */ + uint8_t mqttHeader[ 7 ]; /* Validate arguments. */ MQTTStatus_t status = validatePublishParams( pContext, pPublishInfo, packetId ); if( status == MQTTSuccess ) { - /* Serialize PUBLISH packet. */ - status = serializePublish( pContext, - pPublishInfo, - packetId, - &headerSize ); + /* Get the remaining length and packet size.*/ + status = MQTT_GetPublishPacketSize( pPublishInfo, + &remainingLength, + &packetSize ); + } + + if( status == MQTTSuccess ) + { + status = MQTT_SerializePublishHeaderWithoutTopic( pPublishInfo, + remainingLength, + mqttHeader, + &headerSize ); } if( ( status == MQTTSuccess ) && ( pPublishInfo->qos > MQTTQoS0 ) ) @@ -1962,10 +1991,11 @@ MQTTStatus_t MQTT_Publish( MQTTContext_t * pContext, if( status == MQTTSuccess ) { - /* Sends the serialized publish packet over network. */ - status = sendPublish( pContext, - pPublishInfo, - headerSize ); + status = sendPublishWithoutCopy( pContext, + pPublishInfo, + mqttHeader, + headerSize, + packetId ); } if( ( status == MQTTSuccess ) && ( pPublishInfo->qos > MQTTQoS0 ) ) diff --git a/source/core_mqtt_serializer.c b/source/core_mqtt_serializer.c index cfc1f00d3..000e6e2a5 100644 --- a/source/core_mqtt_serializer.c +++ b/source/core_mqtt_serializer.c @@ -612,6 +612,79 @@ static bool calculatePublishPacketSize( const MQTTPublishInfo_t * pPublishInfo, /*-----------------------------------------------------------*/ +MQTTStatus_t MQTT_SerializePublishHeaderWithoutTopic( const MQTTPublishInfo_t * pPublishInfo, + size_t remainingLength, + uint8_t * pBuffer, + size_t * headerSize ) +{ + size_t headerLength; + uint8_t * pIndex; + MQTTStatus_t status = MQTTSuccess; + + /* The first byte of a PUBLISH packet contains the packet type and flags. */ + uint8_t publishFlags = MQTT_PACKET_TYPE_PUBLISH; + + /* Get the start address of the buffer. */ + pIndex = pBuffer; + + /* Length of serialized packet = First byte + * + Length of encoded remaining length + * + Encoded topic length. */ + headerLength = 1U + remainingLengthEncodedSize( remainingLength ) + 2U; + + if( pPublishInfo->qos == MQTTQoS1 ) + { + LogDebug( ( "Adding QoS as QoS1 in PUBLISH flags." ) ); + UINT8_SET_BIT( publishFlags, MQTT_PUBLISH_FLAG_QOS1 ); + } + else if( pPublishInfo->qos == MQTTQoS2 ) + { + LogDebug( ( "Adding QoS as QoS2 in PUBLISH flags." ) ); + UINT8_SET_BIT( publishFlags, MQTT_PUBLISH_FLAG_QOS2 ); + } + else + { + /* Empty else MISRA 15.7 */ + } + + if( pPublishInfo->retain == true ) + { + LogDebug( ( "Adding retain bit in PUBLISH flags." ) ); + UINT8_SET_BIT( publishFlags, MQTT_PUBLISH_FLAG_RETAIN ); + } + + if( pPublishInfo->dup == true ) + { + LogDebug( ( "Adding dup bit in PUBLISH flags." ) ); + UINT8_SET_BIT( publishFlags, MQTT_PUBLISH_FLAG_DUP ); + } + + *pIndex = publishFlags; + pIndex++; + + /* The "Remaining length" is encoded from the second byte. */ + pIndex = encodeRemainingLength( pIndex, remainingLength ); + + /* The first byte of a UTF-8 string is the high byte of the string length. */ + *pIndex = UINT16_HIGH_BYTE( pPublishInfo->topicNameLength ); + pIndex++; + + /* The second byte of a UTF-8 string is the low byte of the string length. */ + *pIndex = UINT16_LOW_BYTE( pPublishInfo->topicNameLength ); + pIndex++; + + *headerSize = headerLength; + + if( headerLength > 7 ) + { + status = MQTTBadParameter; + } + + return status; +} + +/*-----------------------------------------------------------*/ + static void serializePublishCommon( const MQTTPublishInfo_t * pPublishInfo, size_t remainingLength, uint16_t packetIdentifier, diff --git a/source/include/core_mqtt_serializer.h b/source/include/core_mqtt_serializer.h index bf3d974f4..2a6c8e9f5 100644 --- a/source/include/core_mqtt_serializer.h +++ b/source/include/core_mqtt_serializer.h @@ -738,6 +738,34 @@ MQTTStatus_t MQTT_SerializePublish( const MQTTPublishInfo_t * pPublishInfo, const MQTTFixedBuffer_t * pFixedBuffer ); /* @[declare_mqtt_serializepublish] */ +/* TODO: Re-write the brief comments below. */ +/** + * @brief Serialize an MQTT PUBLISH packet header without the topic string in the + * given buffer. This function will add the topic string length to the provided + * buffer. This helps reduce an unnecessary copy of the topic string into the + * buffer. + * + * #MQTT_GetPublishPacketSize should be called with @p pPublishInfo before + * invoking this function to get the size of the required buffer and + * @p remainingLength. The @p remainingLength must be the same as returned by + * #MQTT_GetPublishPacketSize. The buffer must be at least as large + * as the size returned by #MQTT_GetPublishPacketSize. + * + * @param[in] pPublishInfo MQTT PUBLISH packet parameters. + * @param[in] packetId packet ID generated by #MQTT_GetPacketId. + * @param[in] remainingLength Remaining Length provided by #MQTT_GetPublishPacketSize. + * @param[out] pFixedBuffer Buffer for packet serialization. + * @param[out] pHeaderSize Size of the serialized MQTT PUBLISH header. + * + * @return #MQTTNoMemory if pFixedBuffer is too small to hold the MQTT packet; + * #MQTTBadParameter if invalid parameters are passed; + * #MQTTSuccess otherwise. + */ +MQTTStatus_t MQTT_SerializePublishHeaderWithoutTopic( const MQTTPublishInfo_t * pPublishInfo, + size_t remainingLength, + uint8_t * pBuffer, + size_t * headerSize ); + /** * @brief Serialize an MQTT PUBLISH packet header in the given buffer. * From 00f0b5889e15ff7bcdfe349d82e80e2f25e29c0d Mon Sep 17 00:00:00 2001 From: Aniruddha Kanhere <60444055+AniruddhaKanhere@users.noreply.github.com> Date: Thu, 18 Aug 2022 14:39:09 -0700 Subject: [PATCH 02/35] Fix comment about packet ID --- source/core_mqtt.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/core_mqtt.c b/source/core_mqtt.c index 981e9f31e..becdba59a 100644 --- a/source/core_mqtt.c +++ b/source/core_mqtt.c @@ -1454,11 +1454,11 @@ static MQTTStatus_t sendPublishWithoutCopy( MQTTContext_t * pContext, if( ( status == MQTTSuccess ) && ( pPublishInfo->qos > MQTTQoS0 ) ) { - /* Encode and send the packet ID. */ + /* Encode the packet ID. */ serializedPacketID[ 0 ] = ( ( uint8_t ) ( ( packetId ) >> 8 ) ); serializedPacketID[ 1 ] = ( ( uint8_t ) ( ( packetId ) & 0x00ffU ) ); - /* Sends the topic string over the network. */ + /* Send the packet ID over the network. */ bytesSent = sendPacket( pContext, serializedPacketID, packetIdLength ); From 24a1a3ea72877ad563949a2b487505af31888a86 Mon Sep 17 00:00:00 2001 From: Aniruddha Kanhere <60444055+AniruddhaKanhere@users.noreply.github.com> Date: Thu, 18 Aug 2022 15:52:12 -0700 Subject: [PATCH 03/35] Try a different approach for ping and disconnect --- source/core_mqtt.c | 19 ++++- source/core_mqtt_serializer.c | 110 -------------------------- source/include/core_mqtt_serializer.h | 110 ++++++++++++++++++++++++++ 3 files changed, 125 insertions(+), 114 deletions(-) diff --git a/source/core_mqtt.c b/source/core_mqtt.c index becdba59a..bba708b01 100644 --- a/source/core_mqtt.c +++ b/source/core_mqtt.c @@ -2034,6 +2034,12 @@ MQTTStatus_t MQTT_Ping( MQTTContext_t * pContext ) int32_t bytesSent = 0; MQTTStatus_t status = MQTTSuccess; size_t packetSize = 0U; + /* MQTT ping packets are of fixed length. */ + uint8_t pingreqPacket[ MQTT_PACKET_PINGREQ_SIZE ]; + MQTTFixedBuffer_t localBuffer; + + localBuffer.pBuffer = pingreqPacket; + localBuffer.size = MQTT_PACKET_PINGREQ_SIZE; if( pContext == NULL ) { @@ -2060,14 +2066,14 @@ MQTTStatus_t MQTT_Ping( MQTTContext_t * pContext ) if( status == MQTTSuccess ) { /* Serialize MQTT PINGREQ. */ - status = MQTT_SerializePingreq( &( pContext->networkBuffer ) ); + status = MQTT_SerializePingreq( &localBuffer ); } if( status == MQTTSuccess ) { /* Send the serialized PINGREQ packet to transport layer. */ bytesSent = sendPacket( pContext, - pContext->networkBuffer.pBuffer, + localBuffer.pBuffer, packetSize ); /* It is an error to not send the entire PINGREQ packet. */ @@ -2155,6 +2161,11 @@ MQTTStatus_t MQTT_Disconnect( MQTTContext_t * pContext ) size_t packetSize = 0U; int32_t bytesSent = 0; MQTTStatus_t status = MQTTSuccess; + MQTTFixedBuffer_t localBuffer; + uint8_t disconnectPacket[ MQTT_DISCONNECT_PACKET_SIZE ]; + + localBuffer.pBuffer = disconnectPacket; + localBuffer.size = MQTT_DISCONNECT_PACKET_SIZE; /* Validate arguments. */ if( pContext == NULL ) @@ -2174,13 +2185,13 @@ MQTTStatus_t MQTT_Disconnect( MQTTContext_t * pContext ) if( status == MQTTSuccess ) { /* Serialize MQTT DISCONNECT packet. */ - status = MQTT_SerializeDisconnect( &( pContext->networkBuffer ) ); + status = MQTT_SerializeDisconnect( &localBuffer ); } if( status == MQTTSuccess ) { bytesSent = sendPacket( pContext, - pContext->networkBuffer.pBuffer, + localBuffer.pBuffer, packetSize ); if( bytesSent < ( int32_t ) packetSize ) diff --git a/source/core_mqtt_serializer.c b/source/core_mqtt_serializer.c index 000e6e2a5..f228460bd 100644 --- a/source/core_mqtt_serializer.c +++ b/source/core_mqtt_serializer.c @@ -30,116 +30,6 @@ #include "core_mqtt_serializer.h" #include "core_mqtt_default_logging.h" -/** - * @brief MQTT protocol version 3.1.1. - */ -#define MQTT_VERSION_3_1_1 ( ( uint8_t ) 4U ) - -/** - * @brief Size of the fixed and variable header of a CONNECT packet. - */ -#define MQTT_PACKET_CONNECT_HEADER_SIZE ( 10UL ) - -/* MQTT CONNECT flags. */ -#define MQTT_CONNECT_FLAG_CLEAN ( 1 ) /**< @brief Clean session. */ -#define MQTT_CONNECT_FLAG_WILL ( 2 ) /**< @brief Will present. */ -#define MQTT_CONNECT_FLAG_WILL_QOS1 ( 3 ) /**< @brief Will QoS 1. */ -#define MQTT_CONNECT_FLAG_WILL_QOS2 ( 4 ) /**< @brief Will QoS 2. */ -#define MQTT_CONNECT_FLAG_WILL_RETAIN ( 5 ) /**< @brief Will retain. */ -#define MQTT_CONNECT_FLAG_PASSWORD ( 6 ) /**< @brief Password present. */ -#define MQTT_CONNECT_FLAG_USERNAME ( 7 ) /**< @brief User name present. */ - -/* - * Positions of each flag in the first byte of an MQTT PUBLISH packet's - * fixed header. - */ -#define MQTT_PUBLISH_FLAG_RETAIN ( 0 ) /**< @brief MQTT PUBLISH retain flag. */ -#define MQTT_PUBLISH_FLAG_QOS1 ( 1 ) /**< @brief MQTT PUBLISH QoS1 flag. */ -#define MQTT_PUBLISH_FLAG_QOS2 ( 2 ) /**< @brief MQTT PUBLISH QoS2 flag. */ -#define MQTT_PUBLISH_FLAG_DUP ( 3 ) /**< @brief MQTT PUBLISH duplicate flag. */ - -/** - * @brief The size of MQTT DISCONNECT packets, per MQTT spec. - */ -#define MQTT_DISCONNECT_PACKET_SIZE ( 2UL ) - -/** - * @brief A PINGREQ packet is always 2 bytes in size, defined by MQTT 3.1.1 spec. - */ -#define MQTT_PACKET_PINGREQ_SIZE ( 2UL ) - -/** - * @brief The Remaining Length field of MQTT disconnect packets, per MQTT spec. - */ -#define MQTT_DISCONNECT_REMAINING_LENGTH ( ( uint8_t ) 0 ) - -/* - * Constants relating to CONNACK packets, defined by MQTT 3.1.1 spec. - */ -#define MQTT_PACKET_CONNACK_REMAINING_LENGTH ( ( uint8_t ) 2U ) /**< @brief A CONNACK packet always has a "Remaining length" of 2. */ -#define MQTT_PACKET_CONNACK_SESSION_PRESENT_MASK ( ( uint8_t ) 0x01U ) /**< @brief The "Session Present" bit is always the lowest bit. */ - -/* - * UNSUBACK, PUBACK, PUBREC, PUBREL, and PUBCOMP always have a remaining length - * of 2. - */ -#define MQTT_PACKET_SIMPLE_ACK_REMAINING_LENGTH ( ( uint8_t ) 2 ) /**< @brief PUBACK, PUBREC, PUBREl, PUBCOMP, UNSUBACK Remaining length. */ -#define MQTT_PACKET_PINGRESP_REMAINING_LENGTH ( 0U ) /**< @brief A PINGRESP packet always has a "Remaining length" of 0. */ - -/** - * @brief Per the MQTT 3.1.1 spec, the largest "Remaining Length" of an MQTT - * packet is this value, 256 MB. - */ -#define MQTT_MAX_REMAINING_LENGTH ( 268435455UL ) - -/** - * @brief Set a bit in an 8-bit unsigned integer. - */ -#define UINT8_SET_BIT( x, position ) ( ( x ) = ( uint8_t ) ( ( x ) | ( 0x01U << ( position ) ) ) ) - -/** - * @brief Macro for checking if a bit is set in a 1-byte unsigned int. - * - * @param[in] x The unsigned int to check. - * @param[in] position Which bit to check. - */ -#define UINT8_CHECK_BIT( x, position ) ( ( ( x ) & ( 0x01U << ( position ) ) ) == ( 0x01U << ( position ) ) ) - -/** - * @brief Get the high byte of a 16-bit unsigned integer. - */ -#define UINT16_HIGH_BYTE( x ) ( ( uint8_t ) ( ( x ) >> 8 ) ) - -/** - * @brief Get the low byte of a 16-bit unsigned integer. - */ -#define UINT16_LOW_BYTE( x ) ( ( uint8_t ) ( ( x ) & 0x00ffU ) ) - -/** - * @brief Macro for decoding a 2-byte unsigned int from a sequence of bytes. - * - * @param[in] ptr A uint8_t* that points to the high byte. - */ -#define UINT16_DECODE( ptr ) \ - ( uint16_t ) ( ( ( ( uint16_t ) ( *( ptr ) ) ) << 8 ) | \ - ( ( uint16_t ) ( *( ( ptr ) + 1 ) ) ) ) - -/** - * @brief A value that represents an invalid remaining length. - * - * This value is greater than what is allowed by the MQTT specification. - */ -#define MQTT_REMAINING_LENGTH_INVALID ( ( size_t ) 268435456 ) - -/** - * @brief The minimum remaining length for a QoS 0 PUBLISH. - * - * Includes two bytes for topic name length and one byte for topic name. - */ -#define MQTT_MIN_PUBLISH_REMAINING_LENGTH_QOS0 ( 3U ) - -/*-----------------------------------------------------------*/ - /** * @brief MQTT Subscription packet types. */ diff --git a/source/include/core_mqtt_serializer.h b/source/include/core_mqtt_serializer.h index 2a6c8e9f5..c6088e37e 100644 --- a/source/include/core_mqtt_serializer.h +++ b/source/include/core_mqtt_serializer.h @@ -76,6 +76,116 @@ #define MQTT_PACKET_TYPE_DISCONNECT ( ( uint8_t ) 0xE0U ) /**< @brief DISCONNECT (client-to-server). */ /** @} */ +/** + * @brief MQTT protocol version 3.1.1. + */ +#define MQTT_VERSION_3_1_1 ( ( uint8_t ) 4U ) + +/** + * @brief Size of the fixed and variable header of a CONNECT packet. + */ +#define MQTT_PACKET_CONNECT_HEADER_SIZE ( 10UL ) + +/* MQTT CONNECT flags. */ +#define MQTT_CONNECT_FLAG_CLEAN ( 1 ) /**< @brief Clean session. */ +#define MQTT_CONNECT_FLAG_WILL ( 2 ) /**< @brief Will present. */ +#define MQTT_CONNECT_FLAG_WILL_QOS1 ( 3 ) /**< @brief Will QoS 1. */ +#define MQTT_CONNECT_FLAG_WILL_QOS2 ( 4 ) /**< @brief Will QoS 2. */ +#define MQTT_CONNECT_FLAG_WILL_RETAIN ( 5 ) /**< @brief Will retain. */ +#define MQTT_CONNECT_FLAG_PASSWORD ( 6 ) /**< @brief Password present. */ +#define MQTT_CONNECT_FLAG_USERNAME ( 7 ) /**< @brief User name present. */ + +/* + * Positions of each flag in the first byte of an MQTT PUBLISH packet's + * fixed header. + */ +#define MQTT_PUBLISH_FLAG_RETAIN ( 0 ) /**< @brief MQTT PUBLISH retain flag. */ +#define MQTT_PUBLISH_FLAG_QOS1 ( 1 ) /**< @brief MQTT PUBLISH QoS1 flag. */ +#define MQTT_PUBLISH_FLAG_QOS2 ( 2 ) /**< @brief MQTT PUBLISH QoS2 flag. */ +#define MQTT_PUBLISH_FLAG_DUP ( 3 ) /**< @brief MQTT PUBLISH duplicate flag. */ + +/** + * @brief The size of MQTT DISCONNECT packets, per MQTT spec. + */ +#define MQTT_DISCONNECT_PACKET_SIZE ( 2UL ) + +/** + * @brief A PINGREQ packet is always 2 bytes in size, defined by MQTT 3.1.1 spec. + */ +#define MQTT_PACKET_PINGREQ_SIZE ( 2UL ) + +/** + * @brief The Remaining Length field of MQTT disconnect packets, per MQTT spec. + */ +#define MQTT_DISCONNECT_REMAINING_LENGTH ( ( uint8_t ) 0 ) + +/* + * Constants relating to CONNACK packets, defined by MQTT 3.1.1 spec. + */ +#define MQTT_PACKET_CONNACK_REMAINING_LENGTH ( ( uint8_t ) 2U ) /**< @brief A CONNACK packet always has a "Remaining length" of 2. */ +#define MQTT_PACKET_CONNACK_SESSION_PRESENT_MASK ( ( uint8_t ) 0x01U ) /**< @brief The "Session Present" bit is always the lowest bit. */ + +/* + * UNSUBACK, PUBACK, PUBREC, PUBREL, and PUBCOMP always have a remaining length + * of 2. + */ +#define MQTT_PACKET_SIMPLE_ACK_REMAINING_LENGTH ( ( uint8_t ) 2 ) /**< @brief PUBACK, PUBREC, PUBREl, PUBCOMP, UNSUBACK Remaining length. */ +#define MQTT_PACKET_PINGRESP_REMAINING_LENGTH ( 0U ) /**< @brief A PINGRESP packet always has a "Remaining length" of 0. */ + +/** + * @brief Per the MQTT 3.1.1 spec, the largest "Remaining Length" of an MQTT + * packet is this value, 256 MB. + */ +#define MQTT_MAX_REMAINING_LENGTH ( 268435455UL ) + +/** + * @brief Set a bit in an 8-bit unsigned integer. + */ +#define UINT8_SET_BIT( x, position ) ( ( x ) = ( uint8_t ) ( ( x ) | ( 0x01U << ( position ) ) ) ) + +/** + * @brief Macro for checking if a bit is set in a 1-byte unsigned int. + * + * @param[in] x The unsigned int to check. + * @param[in] position Which bit to check. + */ +#define UINT8_CHECK_BIT( x, position ) ( ( ( x ) & ( 0x01U << ( position ) ) ) == ( 0x01U << ( position ) ) ) + +/** + * @brief Get the high byte of a 16-bit unsigned integer. + */ +#define UINT16_HIGH_BYTE( x ) ( ( uint8_t ) ( ( x ) >> 8 ) ) + +/** + * @brief Get the low byte of a 16-bit unsigned integer. + */ +#define UINT16_LOW_BYTE( x ) ( ( uint8_t ) ( ( x ) & 0x00ffU ) ) + +/** + * @brief Macro for decoding a 2-byte unsigned int from a sequence of bytes. + * + * @param[in] ptr A uint8_t* that points to the high byte. + */ +#define UINT16_DECODE( ptr ) \ + ( uint16_t ) ( ( ( ( uint16_t ) ( *( ptr ) ) ) << 8 ) | \ + ( ( uint16_t ) ( *( ( ptr ) + 1 ) ) ) ) + +/** + * @brief A value that represents an invalid remaining length. + * + * This value is greater than what is allowed by the MQTT specification. + */ +#define MQTT_REMAINING_LENGTH_INVALID ( ( size_t ) 268435456 ) + +/** + * @brief The minimum remaining length for a QoS 0 PUBLISH. + * + * Includes two bytes for topic name length and one byte for topic name. + */ +#define MQTT_MIN_PUBLISH_REMAINING_LENGTH_QOS0 ( 3U ) + +/*-----------------------------------------------------------*/ + /** * @ingroup mqtt_constants * @brief The size of MQTT PUBACK, PUBREC, PUBREL, and PUBCOMP packets, per MQTT spec. From a40d76837fd9125bc7c5c3b2f7d9e30ee7456a62 Mon Sep 17 00:00:00 2001 From: Aniruddha Kanhere <60444055+AniruddhaKanhere@users.noreply.github.com> Date: Mon, 22 Aug 2022 13:05:48 -0700 Subject: [PATCH 04/35] Use writev and flush in conjuction with send --- source/core_mqtt.c | 82 ++++++++++++++++++++------ source/interface/transport_interface.h | 45 ++++++++++++++ 2 files changed, 109 insertions(+), 18 deletions(-) diff --git a/source/core_mqtt.c b/source/core_mqtt.c index bba708b01..e01b5f1b9 100644 --- a/source/core_mqtt.c +++ b/source/core_mqtt.c @@ -1428,23 +1428,48 @@ static MQTTStatus_t sendPublishWithoutCopy( MQTTContext_t * pContext, int32_t bytesSent; uint8_t serializedPacketID[ 2 ]; const size_t packetIdLength = 2UL; + bool writeAndFlushAvailable; + + writeAndFlushAvailable = ( ( pContext->transportInterface.write != NULL ) && + ( pContext->transportInterface.flush != NULL ) ); + + /* Encode the packet ID. */ + serializedPacketID[ 0 ] = ( ( uint8_t ) ( ( packetId ) >> 8 ) ); + serializedPacketID[ 1 ] = ( ( uint8_t ) ( ( packetId ) & 0x00ffU ) ); /* Send the serialized publish packet header over network. */ - bytesSent = sendPacket( pContext, - pMqttHeader, - headerSize ); + if( writeAndFlushAvailable == true ) + { + bytesSent = pContext->transportInterface.write( pContext->transportInterface.pNetworkContext, + pMqttHeader, + headerSize ); + } + else + { + bytesSent = sendPacket( pContext, + pMqttHeader, + headerSize ); + } if( bytesSent != ( int32_t ) headerSize ) { status = MQTTSendFailed; } - - if( status == MQTTSuccess ) + else { /* Send the topic string over the network. */ - bytesSent = sendPacket( pContext, - ( const uint8_t * ) pPublishInfo->pTopicName, - pPublishInfo->topicNameLength ); + if( writeAndFlushAvailable == true ) + { + bytesSent = pContext->transportInterface.write( pContext->transportInterface.pNetworkContext, + ( const uint8_t* )pPublishInfo->pTopicName, + pPublishInfo->topicNameLength ); + } + else + { + bytesSent = sendPacket( pContext, + ( const uint8_t * ) pPublishInfo->pTopicName, + pPublishInfo->topicNameLength ); + } if( bytesSent != ( ( int32_t ) pPublishInfo->topicNameLength ) ) { @@ -1454,14 +1479,19 @@ static MQTTStatus_t sendPublishWithoutCopy( MQTTContext_t * pContext, if( ( status == MQTTSuccess ) && ( pPublishInfo->qos > MQTTQoS0 ) ) { - /* Encode the packet ID. */ - serializedPacketID[ 0 ] = ( ( uint8_t ) ( ( packetId ) >> 8 ) ); - serializedPacketID[ 1 ] = ( ( uint8_t ) ( ( packetId ) & 0x00ffU ) ); - /* Send the packet ID over the network. */ - bytesSent = sendPacket( pContext, - serializedPacketID, - packetIdLength ); + if( writeAndFlushAvailable == true ) + { + bytesSent = pContext->transportInterface.write( pContext->transportInterface.pNetworkContext, + serializedPacketID, + packetIdLength ); + } + else + { + bytesSent = sendPacket( pContext, + serializedPacketID, + packetIdLength ); + } if( bytesSent != ( int32_t ) packetIdLength ) { @@ -1474,9 +1504,18 @@ static MQTTStatus_t sendPublishWithoutCopy( MQTTContext_t * pContext, { /* Send Payload if there is one to send. It is valid for a PUBLISH * Packet to contain a zero length payload.*/ - bytesSent = sendPacket( pContext, - pPublishInfo->pPayload, - pPublishInfo->payloadLength ); + if( writeAndFlushAvailable == true ) + { + bytesSent = pContext->transportInterface.write( pContext->transportInterface.pNetworkContext, + pPublishInfo->pPayload, + pPublishInfo->payloadLength ); + } + else + { + bytesSent = sendPacket( pContext, + pPublishInfo->pPayload, + pPublishInfo->payloadLength ); + } if( bytesSent != ( ( int32_t ) pPublishInfo->payloadLength ) ) { @@ -1484,6 +1523,13 @@ static MQTTStatus_t sendPublishWithoutCopy( MQTTContext_t * pContext, } } + if( ( status == MQTTSuccess ) && + ( writeAndFlushAvailable == true ) ) + { + /* Flush the buffers to send the message over the network. */ + ( void ) pContext->transportInterface.flush( pContext->transportInterface.pNetworkContext ); + } + return status; } diff --git a/source/interface/transport_interface.h b/source/interface/transport_interface.h index e476945c6..3ad52f17a 100644 --- a/source/interface/transport_interface.h +++ b/source/interface/transport_interface.h @@ -243,6 +243,49 @@ typedef int32_t ( * TransportSend_t )( NetworkContext_t * pNetworkContext, size_t bytesToSend ); /* @[define_transportsend] */ +/** + * @transportcallback + * @brief Transport interface for writing data into the IP stack's buffers. This + * data will not be sent immediately as the stack will wait for the application to + * write more data. + * + * @param[in] pNetworkContext Implementation-defined network context. + * @param[in] pBuffer Buffer containing the bytes to send over the network stack. + * @param[in] bytesToWrite Number of bytes to write to the stack's buffers. + * + * @return The number of bytes written or a negative value to indicate error. + * + * @note If no data is written to the buffer due to the buffer being full this MUST + * return zero as the return value. + * A zero return value SHOULD represent that the send operation can be retried + * by calling the API function. Zero MUST NOT be returned if a network disconnection + * has occurred. + */ +/* @[define_transportwrite] */ +typedef int32_t ( * TransportWrite_t )( NetworkContext_t * pNetworkContext, + const void * pBuffer, + size_t bytesToWrite ); +/* @[define_transportwrite] */ + +/** + * @transportcallback + * @brief Transport interface for sending all the data present in the IP-stacks buffers + * added by the call to transport interface write function. + * + * @param[in] pNetworkContext Implementation-defined network context. + * + * @return The number of bytes sent or a negative value to indicate error. + * + * @note If no data is written to the buffer due to the buffer being full this MUST + * return zero as the return value. + * A zero return value SHOULD represent that the send operation can be retried + * by calling the API function. Zero MUST NOT be returned if a network disconnection + * has occurred. + */ +/* @[define_transportflush] */ +typedef int32_t ( * TransportFlush_t )( NetworkContext_t * pNetworkContext ); +/* @[define_transportflush] */ + /** * @transportstruct * @brief The transport layer interface. @@ -252,6 +295,8 @@ typedef struct TransportInterface { TransportRecv_t recv; /**< Transport receive interface. */ TransportSend_t send; /**< Transport send interface. */ + TransportWrite_t write; /**< Transport write interface. */ + TransportFlush_t flush; /**< Transport flush interface. */ NetworkContext_t * pNetworkContext; /**< Implementation-defined network context. */ } TransportInterface_t; /* @[define_transportinterface] */ From e1af7cb5095627916091dbf154ab9d93eaffc534 Mon Sep 17 00:00:00 2001 From: Aniruddha Kanhere <60444055+AniruddhaKanhere@users.noreply.github.com> Date: Tue, 23 Aug 2022 11:59:03 -0700 Subject: [PATCH 05/35] Update the publish method to use vectors --- source/core_mqtt.c | 242 ++++++++++++++----------- source/interface/transport_interface.h | 50 ++--- 2 files changed, 153 insertions(+), 139 deletions(-) diff --git a/source/core_mqtt.c b/source/core_mqtt.c index e01b5f1b9..e1d8ec7c9 100644 --- a/source/core_mqtt.c +++ b/source/core_mqtt.c @@ -51,7 +51,7 @@ * * @return Total number of bytes sent, or negative value on network error. */ -static int32_t sendPacket( MQTTContext_t * pContext, +static int32_t sendBuffer( MQTTContext_t * pContext, const uint8_t * pBufferToSend, size_t bytesToSend ); @@ -600,14 +600,97 @@ static bool matchTopicFilter( const char * pTopicName, /*-----------------------------------------------------------*/ -static int32_t sendPacket( MQTTContext_t * pContext, +static int32_t sendMessageVector( MQTTContext_t * pContext, + TransportOutVector_t * pIoVec, + size_t ioVecCount ) +{ + uint32_t timeoutTime; + uint32_t bytesToSend = 0U; + int32_t bytesSentOrError = 0U; + TransportOutVector_t * pIoVectIterator; + size_t vectorsToBeSent = ioVecCount; + + assert( pContext != NULL ); + assert( pIoVec != NULL ); + assert( pContext->getTime != NULL ); + /* Send must alwaus be defined */ + assert( pContext->transportInterface.send ); + + timeoutTime = pContext->getTime() + MQTT_SEND_RETRY_TIMEOUT_MS; + + /* Count the total number of bytes to be sent as outlined in the vector. */ + for( pIoVectIterator = pIoVec; pIoVectIterator < &( pIoVec[ ioVecCount ] ); pIoVectIterator++ ) + { + bytesToSend += pIoVectIterator->iov_len; + } + + /* Reset the iterator to point to the first entry in the array. */ + pIoVectIterator = pIoVec; + + while( ( pContext->getTime() < timeoutTime ) && + ( bytesSentOrError < bytesToSend ) ) + { + int32_t sendResult; + uint32_t bytesSentThisVector = 0U; + + if( pContext->transportInterface.writev != NULL ) + { + sendResult = pContext->transportInterface.writev( pContext->transportInterface.pNetworkContext, + pIoVectIterator, + vectorsToBeSent ); + } + else + { + sendResult = sendBuffer( pContext, + pIoVectIterator->iov_base, + pIoVectIterator->iov_len, + ( timeoutTime - pContext->getTime() ) ); + } + + if( sendResult >= 0 ) + { + bytesSentOrError += sendResult; + bytesSentThisVector += sendResult; + } + else + { + bytesSentOrError = sendResult; + break; + } + + while( ( bytesSentThisVector >= pIoVectIterator->iov_len ) && + ( pIoVectIterator < &( pIoVec[ ioVecCount ] ) ) ) + { + bytesSentThisVector -= pIoVectIterator->iov_len; + pIoVectIterator++; + + /* Update the number of vector which are yet to be sent. */ + vectorsToBeSent--; + } + + /* Some of the bytes from this vector were sent as well, update the length + * and the pointer to data in this vector. */ + if( ( bytesSentThisVector > 0U ) && + ( pIoVectIterator < &( pIoVec[ ioVecCount ] ) ) ) + { + ( ( uint8_t * ) pIoVectIterator->iov_base ) += bytesSentThisVector; + pIoVectIterator->iov_len -= bytesSentThisVector; + } + } + + return bytesSentOrError; +} + +static int32_t sendBuffer( MQTTContext_t * pContext, const uint8_t * pBufferToSend, - size_t bytesToSend ) + size_t bytesToSend, + uint32_t timeout ) { const uint8_t * pIndex = pBufferToSend; size_t bytesRemaining = bytesToSend; int32_t totalBytesSent = 0, bytesSent; uint32_t lastSendTimeMs = 0U, timeSinceLastSendMs = 0U; + uint32_t timeoutTime; bool sendError = false; assert( pContext != NULL ); @@ -616,9 +699,7 @@ static int32_t sendPacket( MQTTContext_t * pContext, assert( pIndex != NULL ); bytesRemaining = bytesToSend; - - /* Record the most recent time of successful transmission. */ - lastSendTimeMs = pContext->getTime(); + timeoutTime = pContext->getTime() + timeout; /* Loop until the entire packet is sent. */ while( ( bytesRemaining > 0UL ) && ( sendError == false ) ) @@ -970,9 +1051,10 @@ static MQTTStatus_t sendPublishAcks( MQTTContext_t * pContext, if( status == MQTTSuccess ) { - bytesSent = sendPacket( pContext, + bytesSent = sendBuffer( pContext, pContext->networkBuffer.pBuffer, - MQTT_PUBLISH_ACK_PACKET_SIZE ); + MQTT_PUBLISH_ACK_PACKET_SIZE, + MQTT_SEND_RETRY_TIMEOUT_MS ); } if( bytesSent == ( int32_t ) MQTT_PUBLISH_ACK_PACKET_SIZE ) @@ -1425,109 +1507,52 @@ static MQTTStatus_t sendPublishWithoutCopy( MQTTContext_t * pContext, uint16_t packetId ) { MQTTStatus_t status = MQTTSuccess; - int32_t bytesSent; uint8_t serializedPacketID[ 2 ]; - const size_t packetIdLength = 2UL; - bool writeAndFlushAvailable; + TransportOutVector_t pIoVector[ 4 ]; + size_t ioVectorLength; + size_t totalMessageLength; + const size_t packetIDLength = 2U; - writeAndFlushAvailable = ( ( pContext->transportInterface.write != NULL ) && - ( pContext->transportInterface.flush != NULL ) ); + /* The header is sent first. */ + pIoVector[ 0U ].iov_base = pMqttHeader; + pIoVector[ 0U ].iov_len = headerSize; + totalMessageLength = headerSize; - /* Encode the packet ID. */ - serializedPacketID[ 0 ] = ( ( uint8_t ) ( ( packetId ) >> 8 ) ); - serializedPacketID[ 1 ] = ( ( uint8_t ) ( ( packetId ) & 0x00ffU ) ); + /* Then the topic name has to be sent. */ + pIoVector[ 1U ].iov_base = pPublishInfo->pTopicName; + pIoVector[ 1U ].iov_len = pPublishInfo->topicNameLength; + totalMessageLength += pPublishInfo->topicNameLength; - /* Send the serialized publish packet header over network. */ - if( writeAndFlushAvailable == true ) - { - bytesSent = pContext->transportInterface.write( pContext->transportInterface.pNetworkContext, - pMqttHeader, - headerSize ); - } - else - { - bytesSent = sendPacket( pContext, - pMqttHeader, - headerSize ); - } + /* The next field's index should be 2 as the first two fields + * have been filled in. */ + ioVectorLength = 2U; - if( bytesSent != ( int32_t ) headerSize ) - { - status = MQTTSendFailed; - } - else + if( pPublishInfo->qos > MQTTQoS0 ) { - /* Send the topic string over the network. */ - if( writeAndFlushAvailable == true ) - { - bytesSent = pContext->transportInterface.write( pContext->transportInterface.pNetworkContext, - ( const uint8_t* )pPublishInfo->pTopicName, - pPublishInfo->topicNameLength ); - } - else - { - bytesSent = sendPacket( pContext, - ( const uint8_t * ) pPublishInfo->pTopicName, - pPublishInfo->topicNameLength ); - } + /* Encode the packet ID. */ + serializedPacketID[ 0 ] = ( ( uint8_t ) ( ( packetId ) >> 8 ) ); + serializedPacketID[ 1 ] = ( ( uint8_t ) ( ( packetId ) & 0x00ffU ) ); - if( bytesSent != ( ( int32_t ) pPublishInfo->topicNameLength ) ) - { - status = MQTTSendFailed; - } - } + pIoVector[ ioVectorLength ].iov_base = serializedPacketID; + pIoVector[ ioVectorLength ].iov_len = packetIDLength; - if( ( status == MQTTSuccess ) && ( pPublishInfo->qos > MQTTQoS0 ) ) - { - /* Send the packet ID over the network. */ - if( writeAndFlushAvailable == true ) - { - bytesSent = pContext->transportInterface.write( pContext->transportInterface.pNetworkContext, - serializedPacketID, - packetIdLength ); - } - else - { - bytesSent = sendPacket( pContext, - serializedPacketID, - packetIdLength ); - } - - if( bytesSent != ( int32_t ) packetIdLength ) - { - status = MQTTSendFailed; - } + ioVectorLength++; + totalMessageLength += packetIDLength; } - if( ( status == MQTTSuccess ) && - ( pPublishInfo->payloadLength > 0U ) ) + /* Publish packets are allowed to contain no payload. */ + if( pPublishInfo->payloadLength > 0U ) { - /* Send Payload if there is one to send. It is valid for a PUBLISH - * Packet to contain a zero length payload.*/ - if( writeAndFlushAvailable == true ) - { - bytesSent = pContext->transportInterface.write( pContext->transportInterface.pNetworkContext, - pPublishInfo->pPayload, - pPublishInfo->payloadLength ); - } - else - { - bytesSent = sendPacket( pContext, - pPublishInfo->pPayload, - pPublishInfo->payloadLength ); - } + pIoVector[ ioVectorLength ].iov_base = pPublishInfo->pPayload;; + pIoVector[ ioVectorLength ].iov_len = pPublishInfo->payloadLength; - if( bytesSent != ( ( int32_t ) pPublishInfo->payloadLength ) ) - { - status = MQTTSendFailed; - } + ioVectorLength++; + totalMessageLength += pPublishInfo->payloadLength; } - if( ( status == MQTTSuccess ) && - ( writeAndFlushAvailable == true ) ) + if( sendMessageVector( pContext, pIoVector, ioVectorLength ) != ( int32_t ) totalMessageLength ) { - /* Flush the buffers to send the message over the network. */ - ( void ) pContext->transportInterface.flush( pContext->transportInterface.pNetworkContext ); + status = MQTTSendFailed; } return status; @@ -1878,9 +1903,10 @@ MQTTStatus_t MQTT_Connect( MQTTContext_t * pContext, if( status == MQTTSuccess ) { - bytesSent = sendPacket( pContext, + bytesSent = sendBuffer( pContext, pContext->networkBuffer.pBuffer, - packetSize ); + packetSize, + MQTT_SEND_RETRY_TIMEOUT_MS ); if( bytesSent < ( int32_t ) packetSize ) { @@ -1969,9 +1995,10 @@ MQTTStatus_t MQTT_Subscribe( MQTTContext_t * pContext, if( status == MQTTSuccess ) { /* Send serialized MQTT SUBSCRIBE packet to transport layer. */ - bytesSent = sendPacket( pContext, + bytesSent = sendBuffer( pContext, pContext->networkBuffer.pBuffer, - packetSize ); + packetSize, + MQTT_SEND_RETRY_TIMEOUT_MS ); if( bytesSent < ( int32_t ) packetSize ) { @@ -2118,9 +2145,10 @@ MQTTStatus_t MQTT_Ping( MQTTContext_t * pContext ) if( status == MQTTSuccess ) { /* Send the serialized PINGREQ packet to transport layer. */ - bytesSent = sendPacket( pContext, + bytesSent = sendBuffer( pContext, localBuffer.pBuffer, - packetSize ); + MQTT_PACKET_PINGREQ_SIZE, + MQTT_SEND_RETRY_TIMEOUT_MS ); /* It is an error to not send the entire PINGREQ packet. */ if( bytesSent < ( int32_t ) packetSize ) @@ -2181,9 +2209,10 @@ MQTTStatus_t MQTT_Unsubscribe( MQTTContext_t * pContext, if( status == MQTTSuccess ) { /* Send serialized MQTT UNSUBSCRIBE packet to transport layer. */ - bytesSent = sendPacket( pContext, + bytesSent = sendBuffer( pContext, pContext->networkBuffer.pBuffer, - packetSize ); + packetSize, + MQTT_SEND_RETRY_TIMEOUT_MS ); if( bytesSent < ( int32_t ) packetSize ) { @@ -2236,9 +2265,10 @@ MQTTStatus_t MQTT_Disconnect( MQTTContext_t * pContext ) if( status == MQTTSuccess ) { - bytesSent = sendPacket( pContext, + bytesSent = sendBuffer( pContext, localBuffer.pBuffer, - packetSize ); + packetSize, + MQTT_SEND_RETRY_TIMEOUT_MS ); if( bytesSent < ( int32_t ) packetSize ) { diff --git a/source/interface/transport_interface.h b/source/interface/transport_interface.h index 3ad52f17a..a891c04a1 100644 --- a/source/interface/transport_interface.h +++ b/source/interface/transport_interface.h @@ -243,38 +243,21 @@ typedef int32_t ( * TransportSend_t )( NetworkContext_t * pNetworkContext, size_t bytesToSend ); /* @[define_transportsend] */ -/** - * @transportcallback - * @brief Transport interface for writing data into the IP stack's buffers. This - * data will not be sent immediately as the stack will wait for the application to - * write more data. - * - * @param[in] pNetworkContext Implementation-defined network context. - * @param[in] pBuffer Buffer containing the bytes to send over the network stack. - * @param[in] bytesToWrite Number of bytes to write to the stack's buffers. - * - * @return The number of bytes written or a negative value to indicate error. - * - * @note If no data is written to the buffer due to the buffer being full this MUST - * return zero as the return value. - * A zero return value SHOULD represent that the send operation can be retried - * by calling the API function. Zero MUST NOT be returned if a network disconnection - * has occurred. - */ -/* @[define_transportwrite] */ -typedef int32_t ( * TransportWrite_t )( NetworkContext_t * pNetworkContext, - const void * pBuffer, - size_t bytesToWrite ); -/* @[define_transportwrite] */ +typedef struct TransportOutVector +{ + const void * iov_base; /* Base address of data */ + size_t iov_len; /* Length of data in buffer */ +} TransportOutVector_t; /** * @transportcallback - * @brief Transport interface for sending all the data present in the IP-stacks buffers - * added by the call to transport interface write function. + * @brief Transport interface function for "vectored" / scatter-gather based writes. * * @param[in] pNetworkContext Implementation-defined network context. + * @param[in] pIoVec An array of TransportIoVector_t structs. + * @param[in] ioVecCount Number of TransportIoVector_t in pIoVec. * - * @return The number of bytes sent or a negative value to indicate error. + * @return The number of bytes written or a negative value to indicate error. * * @note If no data is written to the buffer due to the buffer being full this MUST * return zero as the return value. @@ -282,9 +265,11 @@ typedef int32_t ( * TransportWrite_t )( NetworkContext_t * pNetworkContext, * by calling the API function. Zero MUST NOT be returned if a network disconnection * has occurred. */ -/* @[define_transportflush] */ -typedef int32_t ( * TransportFlush_t )( NetworkContext_t * pNetworkContext ); -/* @[define_transportflush] */ +/* @[define_transportwritev] */ +typedef int32_t ( * TransportWritev_t )( NetworkContext_t * pNetworkContext, + TransportOutVector_t * pIoVec, + size_t ioVecCount ); +/* @[define_transportwritev] */ /** * @transportstruct @@ -293,10 +278,9 @@ typedef int32_t ( * TransportFlush_t )( NetworkContext_t * pNetworkContext ); /* @[define_transportinterface] */ typedef struct TransportInterface { - TransportRecv_t recv; /**< Transport receive interface. */ - TransportSend_t send; /**< Transport send interface. */ - TransportWrite_t write; /**< Transport write interface. */ - TransportFlush_t flush; /**< Transport flush interface. */ + TransportRecv_t recv; /**< Transport receive function pointer. */ + TransportSend_t send; /**< Transport send function pointer. */ + TransportWritev_t writev; /**< Transport writev function pointer. */ NetworkContext_t * pNetworkContext; /**< Implementation-defined network context. */ } TransportInterface_t; /* @[define_transportinterface] */ From d93e6effb376d06b03dda1a7af5b15a173ac3fa3 Mon Sep 17 00:00:00 2001 From: Aniruddha Kanhere <60444055+AniruddhaKanhere@users.noreply.github.com> Date: Wed, 24 Aug 2022 15:21:18 -0700 Subject: [PATCH 06/35] Add vectored IO to all functions --- source/core_mqtt.c | 377 +++++++++++++++++++------- source/core_mqtt_serializer.c | 135 +++++---- source/include/core_mqtt.h | 6 +- source/include/core_mqtt_serializer.h | 13 + 4 files changed, 390 insertions(+), 141 deletions(-) diff --git a/source/core_mqtt.c b/source/core_mqtt.c index e1d8ec7c9..d2b067d16 100644 --- a/source/core_mqtt.c +++ b/source/core_mqtt.c @@ -53,7 +53,13 @@ */ static int32_t sendBuffer( MQTTContext_t * pContext, const uint8_t * pBufferToSend, - size_t bytesToSend ); + size_t bytesToSend, + uint32_t timeout ); + +static MQTTStatus_t sendConnectWithoutCopy( MQTTContext_t * pContext, + const MQTTConnectInfo_t * pConnectInfo, + const MQTTPublishInfo_t * pWillInfo, + size_t remainingLength ); /** * @brief Calculate the interval between two millisecond timestamps, including @@ -1036,6 +1042,11 @@ static MQTTStatus_t sendPublishAcks( MQTTContext_t * pContext, int32_t bytesSent = 0; uint8_t packetTypeByte = 0U; MQTTPubAckType_t packetType; + MQTTFixedBuffer_t localBuffer; + uint8_t pubAckPacket[ MQTT_PUBLISH_ACK_PACKET_SIZE ]; + + localBuffer.pBuffer = pubAckPacket; + localBuffer.size = MQTT_PUBLISH_ACK_PACKET_SIZE; assert( pContext != NULL ); @@ -1045,14 +1056,16 @@ static MQTTStatus_t sendPublishAcks( MQTTContext_t * pContext, { packetType = getAckFromPacketType( packetTypeByte ); - status = MQTT_SerializeAck( &( pContext->networkBuffer ), + status = MQTT_SerializeAck( &localBuffer, packetTypeByte, packetId ); if( status == MQTTSuccess ) { + /* Here, we are not using the vector approach for efficiency. There is just one buffer + * to be sent which can be achieved with a normal send call. */ bytesSent = sendBuffer( pContext, - pContext->networkBuffer.pBuffer, + localBuffer.pBuffer, MQTT_PUBLISH_ACK_PACKET_SIZE, MQTT_SEND_RETRY_TIMEOUT_MS ); } @@ -1500,6 +1513,135 @@ static MQTTStatus_t validateSubscribeUnsubscribeParams( const MQTTContext_t * pC /*-----------------------------------------------------------*/ +static TransportOutVector_t * addEncodedStringToVector( uint8_t serailizedLength[ 2 ], + uint8_t string, + uint8_t length, + TransportOutVector_t * iterator, + size_t * updatedLength ) +{ + size_t packetLength = 0U; + const size_t seralizedLengthFieldSize = 2U; + + serailizedLength[ 0 ] = UINT16_HIGH_BYTE( length ); + serailizedLength[ 1 ] = UINT16_LOW_BYTE( length ); + + iterator->iov_base = serailizedLength; + iterator->iov_len = seralizedLengthFieldSize; + iterator++; + + iterator->iov_base = string; + iterator->iov_len = length; + iterator++; + + packetLength = length + seralizedLengthFieldSize; + + ( * updatedLength ) = (*updatedLength) + packetLength; + + return iterator; +} + +/*-----------------------------------------------------------*/ + +static MQTTStatus_t sendSubscribeWithoutCopy( MQTTContext_t * pContext, + const MQTTSubscribeInfo_t * pSubscription, + uint16_t packetId, + size_t remainingLength ) +{ + MQTTStatus_t status = MQTTSuccess; + uint8_t subscribeheader[ 5 ]; + uint8_t * pIndex; + TransportOutVector_t pIoVector[ 4 ]; + TransportOutVector_t* pIterator; + uint8_t serializedTopicFieldLength[ 2 ]; + size_t totalPacketLength = 0U; + /* Subscribe packet always has 4 vector fields. Namely: + * Header + Topic Filter length + Topic filter + QoS */ + const size_t ioVectorLength = 4U; + + pIndex = subscribeheader; + pIterator = pIoVector; + + pIndex = MQTT_SerializeSubscribeHeader( remainingLength, + pIndex, + packetId ); + + /* The header is to be sent first. */ + pIterator->iov_base = subscribeheader; + pIterator->iov_len = ( size_t ) ( pIndex - subscribeheader ); + pIterator++; + totalPacketLength += ( size_t ) ( pIndex - subscribeheader ); + + /* The topic filter gets sent next. */ + pIterator = addEncodedStringToVector( serializedTopicFieldLength, + pSubscription->pTopicFilter, + pSubscription->topicFilterLength, + pIterator, + &totalPacketLength ); + + /* Lastly, the QoS gets sent. */ + pIterator->iov_base = &( pSubscription->qos ); + pIterator->iov_len = 1U; + + if( sendMessageVector( pContext, pIoVector, ioVectorLength ) != ( int32_t ) totalPacketLength ) + { + status = MQTTSendFailed; + } + + return status; +} + +/*-----------------------------------------------------------*/ + +static MQTTStatus_t sendUnsubscribeWithoutCopy( MQTTContext_t * pContext, + const MQTTSubscribeInfo_t * pSubscription, + uint16_t packetId, + size_t remainingLength ) +{ + MQTTStatus_t status = MQTTSuccess; + uint8_t unsubscribeheader[ 5 ]; + uint8_t * pIndex; + TransportOutVector_t pIoVector[ 4 ]; + TransportOutVector_t* pIterator; + uint8_t serializedTopicFieldLength[ 2 ]; + size_t totalPacketLength = 0U; + /* Subscribe packet always has 4 vector fields. Namely: + * Header + Topic Filter length + Topic filter + QoS */ + const size_t ioVectorLength = 4U; + + pIndex = unsubscribeheader; + pIterator = pIoVector; + + pIndex = MQTT_SerializeUnsubscribeHeader( remainingLength, + pIndex, + packetId ); + + /* The header is to be sent first. */ + pIterator->iov_base = unsubscribeheader; + pIterator->iov_len = ( size_t ) ( pIndex - unsubscribeheader ); + pIterator++; + totalPacketLength += ( size_t ) ( pIndex - unsubscribeheader ); + + /* The topic filter gets sent next. */ + pIterator = addEncodedStringToVector( serializedTopicFieldLength, + pSubscription->pTopicFilter, + pSubscription->topicFilterLength, + pIterator, + &totalPacketLength ); + + /* Lastly, the QoS gets sent. */ + pIterator->iov_base = &( pSubscription->qos ); + pIterator->iov_len = 1U; + + if( sendMessageVector( pContext, pIoVector, ioVectorLength ) != ( int32_t ) totalPacketLength ) + { + status = MQTTSendFailed; + } + + return status; +} + +/*-----------------------------------------------------------*/ + static MQTTStatus_t sendPublishWithoutCopy( MQTTContext_t * pContext, const MQTTPublishInfo_t * pPublishInfo, const uint8_t * pMqttHeader, @@ -1560,6 +1702,117 @@ static MQTTStatus_t sendPublishWithoutCopy( MQTTContext_t * pContext, /*-----------------------------------------------------------*/ +static MQTTStatus_t sendConnectWithoutCopy( MQTTContext_t * pContext, + const MQTTConnectInfo_t * pConnectInfo, + const MQTTPublishInfo_t * pWillInfo, + size_t remainingLength ) +{ + MQTTStatus_t status = MQTTSuccess; + size_t connectPacketSize = 0; + TransportOutVector_t* iterator; + size_t ioVectorLength = 0U; + size_t totalMessageLength = 0U; + + /* Connect packet header can be of maximum 15 bytes. */ + uint8_t connectPacketHeader[ 15 ]; + uint8_t* pIndex = connectPacketHeader; + TransportOutVector_t pIoVector[ 11 ]; + uint8_t serializedClientIDLength[ 2 ]; + uint8_t serializedTopicLength[ 2 ]; + uint8_t serializedPayloadLength[ 2 ]; + uint8_t serializedUsernameLength[ 2 ]; + uint8_t serializedPasswordLength[ 2 ]; + + iterator = pIoVector; + + /* Validate arguments. */ + if( pConnectInfo == NULL ) + { + LogError( ( "Argument cannot be NULL: pConnectInfo=%p, " + "pFixedBuffer=%p.", + ( void * ) pConnectInfo ) ); + status = MQTTBadParameter; + } + else if( ( pWillInfo != NULL ) && ( pWillInfo->pTopicName == NULL ) ) + { + LogError( ( "pWillInfo->pTopicName cannot be NULL if Will is present." ) ); + status = MQTTBadParameter; + } + else + { + pIndex = MQTT_SerializeConnectFixedHeader( pIndex, + pConnectInfo, + pWillInfo, + remainingLength ); + + assert( ( pIndex - connectPacketHeader ) <= 15 ); + + /* The header gets sent first. */ + iterator->iov_base = connectPacketHeader; + iterator->iov_len = ( size_t ) ( pIndex - connectPacketHeader ); + totalMessageLength += iterator->iov_len; + iterator++; + + + /* Serialize the client ID. */ + iterator = addEncodedStringToVector( serializedClientIDLength, + pConnectInfo->pClientIdentifier, + pConnectInfo->clientIdentifierLength, + iterator, + &totalMessageLength ); + + if( pWillInfo != NULL ) + { + /* Serialize the topic. */ + iterator = addEncodedStringToVector( serializedTopicLength, + pWillInfo->pTopicName, + pWillInfo->topicNameLength, + iterator, + &totalMessageLength ); + + /* Serialize the payload. */ + iterator = addEncodedStringToVector( serializedPayloadLength, + pWillInfo->pPayload, + pWillInfo->payloadLength, + iterator, + &totalMessageLength ); + } + + /* Encode the user name if provided. */ + if( pConnectInfo->pUserName != NULL ) + { + /* Serialize the user name string. */ + iterator = addEncodedStringToVector( serializedUsernameLength, + pConnectInfo->pUserName, + pConnectInfo->userNameLength, + iterator, + &totalMessageLength ); + } + + /* Encode the password if provided. */ + if( pConnectInfo->pPassword != NULL ) + { + /* Serialize the user name string. */ + iterator = addEncodedStringToVector( serializedPasswordLength, + pConnectInfo->pPassword, + pConnectInfo->passwordLength, + iterator, + &totalMessageLength ); + } + + ioVectorLength = ( size_t ) ( ( iterator - pIoVector ) + 1 ); + + if( sendMessageVector( pContext, pIoVector, ioVectorLength ) != ( int32_t ) totalMessageLength ) + { + status = MQTTSendFailed; + } + } + + return status; +} + +/*-----------------------------------------------------------*/ + static MQTTStatus_t receiveConnack( const MQTTContext_t * pContext, uint32_t timeoutMs, bool cleanSession, @@ -1895,29 +2148,10 @@ MQTTStatus_t MQTT_Connect( MQTTContext_t * pContext, if( status == MQTTSuccess ) { - status = MQTT_SerializeConnect( pConnectInfo, - pWillInfo, - remainingLength, - &( pContext->networkBuffer ) ); - } - - if( status == MQTTSuccess ) - { - bytesSent = sendBuffer( pContext, - pContext->networkBuffer.pBuffer, - packetSize, - MQTT_SEND_RETRY_TIMEOUT_MS ); - - if( bytesSent < ( int32_t ) packetSize ) - { - LogError( ( "Transport send failed for CONNECT packet." ) ); - status = MQTTSendFailed; - } - else - { - LogDebug( ( "Sent %ld bytes of CONNECT packet.", - ( long int ) bytesSent ) ); - } + status = sendConnectWithoutCopy( pContext, + pConnectInfo, + pWillInfo, + remainingLength ); } /* Read CONNACK from transport layer. */ @@ -1957,8 +2191,7 @@ MQTTStatus_t MQTT_Connect( MQTTContext_t * pContext, /*-----------------------------------------------------------*/ MQTTStatus_t MQTT_Subscribe( MQTTContext_t * pContext, - const MQTTSubscribeInfo_t * pSubscriptionList, - size_t subscriptionCount, + const MQTTSubscribeInfo_t * pSubscription, uint16_t packetId ) { size_t remainingLength = 0UL, packetSize = 0UL; @@ -1966,15 +2199,15 @@ MQTTStatus_t MQTT_Subscribe( MQTTContext_t * pContext, /* Validate arguments. */ MQTTStatus_t status = validateSubscribeUnsubscribeParams( pContext, - pSubscriptionList, - subscriptionCount, + pSubscription, + 1U, packetId ); if( status == MQTTSuccess ) { /* Get the remaining length and packet size.*/ - status = MQTT_GetSubscribePacketSize( pSubscriptionList, - subscriptionCount, + status = MQTT_GetSubscribePacketSize( pSubscription, + 1U, &remainingLength, &packetSize ); LogDebug( ( "SUBSCRIBE packet size is %lu and remaining length is %lu.", @@ -1984,32 +2217,11 @@ MQTTStatus_t MQTT_Subscribe( MQTTContext_t * pContext, if( status == MQTTSuccess ) { - /* Serialize MQTT SUBSCRIBE packet. */ - status = MQTT_SerializeSubscribe( pSubscriptionList, - subscriptionCount, - packetId, - remainingLength, - &( pContext->networkBuffer ) ); - } - - if( status == MQTTSuccess ) - { - /* Send serialized MQTT SUBSCRIBE packet to transport layer. */ - bytesSent = sendBuffer( pContext, - pContext->networkBuffer.pBuffer, - packetSize, - MQTT_SEND_RETRY_TIMEOUT_MS ); - - if( bytesSent < ( int32_t ) packetSize ) - { - LogError( ( "Transport send failed for SUBSCRIBE packet." ) ); - status = MQTTSendFailed; - } - else - { - LogDebug( ( "Sent %ld bytes of SUBSCRIBE packet.", - ( long int ) bytesSent ) ); - } + /* Send MQTT SUBSCRIBE packet. */ + status = sendSubscribeWithoutCopy( pContext, + pSubscription, + packetId, + remainingLength ); } return status; @@ -2144,7 +2356,10 @@ MQTTStatus_t MQTT_Ping( MQTTContext_t * pContext ) if( status == MQTTSuccess ) { - /* Send the serialized PINGREQ packet to transport layer. */ + /* Send the serialized PINGREQ packet to transport layer. + * Here, we do not use the vectored IO approach for efficiency as the + * Ping packet does not have numerous fields which need to be copied + * from the user provided buffers. Thus it can be sent directly. */ bytesSent = sendBuffer( pContext, localBuffer.pBuffer, MQTT_PACKET_PINGREQ_SIZE, @@ -2171,8 +2386,7 @@ MQTTStatus_t MQTT_Ping( MQTTContext_t * pContext ) /*-----------------------------------------------------------*/ MQTTStatus_t MQTT_Unsubscribe( MQTTContext_t * pContext, - const MQTTSubscribeInfo_t * pSubscriptionList, - size_t subscriptionCount, + const MQTTSubscribeInfo_t * pSubscription, uint16_t packetId ) { size_t remainingLength = 0UL, packetSize = 0UL; @@ -2180,15 +2394,15 @@ MQTTStatus_t MQTT_Unsubscribe( MQTTContext_t * pContext, /* Validate arguments. */ MQTTStatus_t status = validateSubscribeUnsubscribeParams( pContext, - pSubscriptionList, - subscriptionCount, + pSubscription, + 1U, packetId ); if( status == MQTTSuccess ) { /* Get the remaining length and packet size.*/ - status = MQTT_GetUnsubscribePacketSize( pSubscriptionList, - subscriptionCount, + status = MQTT_GetUnsubscribePacketSize( pSubscription, + 1U, &remainingLength, &packetSize ); LogDebug( ( "UNSUBSCRIBE packet size is %lu and remaining length is %lu.", @@ -2198,32 +2412,10 @@ MQTTStatus_t MQTT_Unsubscribe( MQTTContext_t * pContext, if( status == MQTTSuccess ) { - /* Serialize MQTT UNSUBSCRIBE packet. */ - status = MQTT_SerializeUnsubscribe( pSubscriptionList, - subscriptionCount, - packetId, - remainingLength, - &( pContext->networkBuffer ) ); - } - - if( status == MQTTSuccess ) - { - /* Send serialized MQTT UNSUBSCRIBE packet to transport layer. */ - bytesSent = sendBuffer( pContext, - pContext->networkBuffer.pBuffer, - packetSize, - MQTT_SEND_RETRY_TIMEOUT_MS ); - - if( bytesSent < ( int32_t ) packetSize ) - { - LogError( ( "Transport send failed for UNSUBSCRIBE packet." ) ); - status = MQTTSendFailed; - } - else - { - LogDebug( ( "Sent %ld bytes of UNSUBSCRIBE packet.", - ( long int ) bytesSent ) ); - } + status = sendUnsubscribeWithoutCopy( pContext, + pSubscription, + packetId, + remainingLength ); } return status; @@ -2265,6 +2457,9 @@ MQTTStatus_t MQTT_Disconnect( MQTTContext_t * pContext ) if( status == MQTTSuccess ) { + /* Here we do not use vectors as the disconnect packet has fixed fields + * which do not reside in user provided buffers. Thus, it can be sent + * using a simple send call. */ bytesSent = sendBuffer( pContext, localBuffer.pBuffer, packetSize, diff --git a/source/core_mqtt_serializer.c b/source/core_mqtt_serializer.c index f228460bd..09b620d70 100644 --- a/source/core_mqtt_serializer.c +++ b/source/core_mqtt_serializer.c @@ -1352,37 +1352,30 @@ static MQTTStatus_t deserializePingresp( const MQTTPacketInfo_t * pPingresp ) return status; } -/*-----------------------------------------------------------*/ - -static void serializeConnectPacket( const MQTTConnectInfo_t * pConnectInfo, - const MQTTPublishInfo_t * pWillInfo, - size_t remainingLength, - const MQTTFixedBuffer_t * pFixedBuffer ) +uint8_t * MQTT_SerializeConnectFixedHeader( uint8_t * pIndex, + const MQTTConnectInfo_t * pConnectInfo, + const MQTTPublishInfo_t * pWillInfo, + size_t remainingLength ) { + uint8_t * pIndexLocal = pIndex; uint8_t connectFlags = 0U; - uint8_t * pIndex = NULL; - - assert( pConnectInfo != NULL ); - assert( pFixedBuffer != NULL ); - assert( pFixedBuffer->pBuffer != NULL ); - - pIndex = pFixedBuffer->pBuffer; + /* The first byte in the CONNECT packet is the control packet type. */ - *pIndex = MQTT_PACKET_TYPE_CONNECT; - pIndex++; + *pIndexLocal = MQTT_PACKET_TYPE_CONNECT; + pIndexLocal++; /* The remaining length of the CONNECT packet is encoded starting from the * second byte. The remaining length does not include the length of the fixed * header or the encoding of the remaining length. */ - pIndex = encodeRemainingLength( pIndex, remainingLength ); + pIndexLocal = encodeRemainingLength( pIndex, remainingLength ); /* The string "MQTT" is placed at the beginning of the CONNECT packet's variable * header. This string is 4 bytes long. */ - pIndex = encodeString( pIndex, "MQTT", 4 ); + pIndexLocal = encodeString( pIndex, "MQTT", 4 ); /* The MQTT protocol version is the second field of the variable header. */ - *pIndex = MQTT_VERSION_3_1_1; - pIndex++; + *pIndexLocal = MQTT_VERSION_3_1_1; + pIndexLocal++; /* Set the clean session flag if needed. */ if( pConnectInfo->cleanSession == true ) @@ -1426,13 +1419,37 @@ static void serializeConnectPacket( const MQTTConnectInfo_t * pConnectInfo, } } - *pIndex = connectFlags; - pIndex++; + *pIndexLocal = connectFlags; + pIndexLocal++; /* Write the 2 bytes of the keep alive interval into the CONNECT packet. */ - *pIndex = UINT16_HIGH_BYTE( pConnectInfo->keepAliveSeconds ); - *( pIndex + 1 ) = UINT16_LOW_BYTE( pConnectInfo->keepAliveSeconds ); - pIndex += 2; + *pIndexLocal = UINT16_HIGH_BYTE( pConnectInfo->keepAliveSeconds ); + *( pIndexLocal + 1 ) = UINT16_LOW_BYTE( pConnectInfo->keepAliveSeconds ); + pIndexLocal += 2; + + return pIndexLocal; +} +/*-----------------------------------------------------------*/ + +static void serializeConnectPacket( const MQTTConnectInfo_t * pConnectInfo, + const MQTTPublishInfo_t * pWillInfo, + size_t remainingLength, + const MQTTFixedBuffer_t * pFixedBuffer ) +{ + uint8_t connectFlags = 0U; + uint8_t * pIndex = NULL; + + assert( pConnectInfo != NULL ); + assert( pFixedBuffer != NULL ); + assert( pFixedBuffer->pBuffer != NULL ); + + pIndex = pFixedBuffer->pBuffer; + + /* Serialize the header. */ + pIndex = MQTT_SerializeConnectFixedHeader( pIndex, + pConnectInfo, + pWillInfo, + remainingLength ); /* Write the client identifier into the CONNECT packet. */ pIndex = encodeString( pIndex, @@ -1669,6 +1686,50 @@ MQTTStatus_t MQTT_GetSubscribePacketSize( const MQTTSubscribeInfo_t * pSubscript /*-----------------------------------------------------------*/ +uint8_t * MQTT_SerializeSubscribeHeader( size_t remainingLength, + uint8_t * pIndex, + uint16_t packetId ) +{ + uint8_t * pIterator = pIndex; + + /* The first byte in SUBSCRIBE is the packet type. */ + *pIterator = MQTT_PACKET_TYPE_SUBSCRIBE; + pIterator++; + + /* Encode the "Remaining length" starting from the second byte. */ + pIterator = encodeRemainingLength( pIterator, remainingLength ); + + /* Place the packet identifier into the SUBSCRIBE packet. */ + *pIterator = UINT16_HIGH_BYTE( packetId ); + *( pIterator + 1 ) = UINT16_LOW_BYTE( packetId ); + pIterator += 2; + + return pIterator; +} + +/*-----------------------------------------------------------*/ + +uint8_t * MQTT_SerializeUnsubscribeHeader( size_t remainingLength, + uint8_t * pIndex, + uint16_t packetId ) +{ + uint8_t * pIterator = pIndex; + + /* The first byte in UNSUBSCRIBE is the packet type. */ + *pIterator = MQTT_PACKET_TYPE_UNSUBSCRIBE; + pIterator++; + + /* Encode the "Remaining length" starting from the second byte. */ + pIterator = encodeRemainingLength( pIterator, remainingLength ); + + /* Place the packet identifier into the SUBSCRIBE packet. */ + *pIterator = UINT16_HIGH_BYTE( packetId ); + *( pIterator + 1 ) = UINT16_LOW_BYTE( packetId ); + pIterator += 2; + + return pIterator; +} + MQTTStatus_t MQTT_SerializeSubscribe( const MQTTSubscribeInfo_t * pSubscriptionList, size_t subscriptionCount, uint16_t packetId, @@ -1690,17 +1751,9 @@ MQTTStatus_t MQTT_SerializeSubscribe( const MQTTSubscribeInfo_t * pSubscriptionL { pIndex = pFixedBuffer->pBuffer; - /* The first byte in SUBSCRIBE is the packet type. */ - *pIndex = MQTT_PACKET_TYPE_SUBSCRIBE; - pIndex++; - - /* Encode the "Remaining length" starting from the second byte. */ - pIndex = encodeRemainingLength( pIndex, remainingLength ); - - /* Place the packet identifier into the SUBSCRIBE packet. */ - *pIndex = UINT16_HIGH_BYTE( packetId ); - *( pIndex + 1 ) = UINT16_LOW_BYTE( packetId ); - pIndex += 2; + pIndex = MQTT_SerializeSubscribeHeader( remainingLength, + pIndex, + packetId ); /* Serialize each subscription topic filter and QoS. */ for( i = 0; i < subscriptionCount; i++ ) @@ -1783,17 +1836,7 @@ MQTTStatus_t MQTT_SerializeUnsubscribe( const MQTTSubscribeInfo_t * pSubscriptio /* Get the start of the buffer to the iterator variable. */ pIndex = pFixedBuffer->pBuffer; - /* The first byte in UNSUBSCRIBE is the packet type. */ - *pIndex = MQTT_PACKET_TYPE_UNSUBSCRIBE; - pIndex++; - - /* Encode the "Remaining length" starting from the second byte. */ - pIndex = encodeRemainingLength( pIndex, remainingLength ); - - /* Place the packet identifier into the UNSUBSCRIBE packet. */ - *pIndex = UINT16_HIGH_BYTE( packetId ); - *( pIndex + 1 ) = UINT16_LOW_BYTE( packetId ); - pIndex += 2; + pIndex = MQTT_SerializeUnsubscribeHeader( remainingLength, pIndex, packetId ); /* Serialize each subscription topic filter. */ for( i = 0; i < subscriptionCount; i++ ) diff --git a/source/include/core_mqtt.h b/source/include/core_mqtt.h index 4feca99c5..361e3cf36 100644 --- a/source/include/core_mqtt.h +++ b/source/include/core_mqtt.h @@ -475,8 +475,7 @@ MQTTStatus_t MQTT_Connect( MQTTContext_t * pContext, */ /* @[declare_mqtt_subscribe] */ MQTTStatus_t MQTT_Subscribe( MQTTContext_t * pContext, - const MQTTSubscribeInfo_t * pSubscriptionList, - size_t subscriptionCount, + const MQTTSubscribeInfo_t * pSubscription, uint16_t packetId ); /* @[declare_mqtt_subscribe] */ @@ -591,8 +590,7 @@ MQTTStatus_t MQTT_Ping( MQTTContext_t * pContext ); */ /* @[declare_mqtt_unsubscribe] */ MQTTStatus_t MQTT_Unsubscribe( MQTTContext_t * pContext, - const MQTTSubscribeInfo_t * pSubscriptionList, - size_t subscriptionCount, + const MQTTSubscribeInfo_t * pSubscription, uint16_t packetId ); /* @[declare_mqtt_unsubscribe] */ diff --git a/source/include/core_mqtt_serializer.h b/source/include/core_mqtt_serializer.h index c6088e37e..644f30c7d 100644 --- a/source/include/core_mqtt_serializer.h +++ b/source/include/core_mqtt_serializer.h @@ -1315,6 +1315,19 @@ MQTTStatus_t MQTT_GetIncomingPacketTypeAndLength( TransportRecv_t readFunc, MQTTPacketInfo_t * pIncomingPacket ); /* @[declare_mqtt_getincomingpackettypeandlength] */ +uint8_t * MQTT_SerializeConnectFixedHeader( uint8_t * pIndex, + const MQTTConnectInfo_t * pConnectInfo, + const MQTTPublishInfo_t * pWillInfo, + size_t remainingLength ); + +uint8_t * MQTT_SerializeSubscribeHeader( size_t remainingLength, + uint8_t * pIndex, + uint16_t packetId ); + +uint8_t * MQTT_SerializeUnsubscribeHeader( size_t remainingLength, + uint8_t * pIndex, + uint16_t packetId ); + /* *INDENT-OFF* */ #ifdef __cplusplus } From d01829b472370994eb44bdfe9274187c0128bdd4 Mon Sep 17 00:00:00 2001 From: Aniruddha Kanhere <60444055+AniruddhaKanhere@users.noreply.github.com> Date: Wed, 24 Aug 2022 22:54:42 +0000 Subject: [PATCH 07/35] Fix formatting --- source/core_mqtt.c | 20 ++++++++++++-------- source/core_mqtt_serializer.c | 4 ++-- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/source/core_mqtt.c b/source/core_mqtt.c index d2b067d16..3b8bda0b3 100644 --- a/source/core_mqtt.c +++ b/source/core_mqtt.c @@ -292,6 +292,7 @@ static MQTTStatus_t sendPublishWithoutCopy( MQTTContext_t * pContext, const uint8_t * pMqttHeader, size_t headerSize, uint16_t packetId ); + /** * @brief Serializes a PUBLISH message. * @@ -1535,7 +1536,7 @@ static TransportOutVector_t * addEncodedStringToVector( uint8_t serailizedLength packetLength = length + seralizedLengthFieldSize; - ( * updatedLength ) = (*updatedLength) + packetLength; + ( *updatedLength ) = ( *updatedLength ) + packetLength; return iterator; } @@ -1551,9 +1552,10 @@ static MQTTStatus_t sendSubscribeWithoutCopy( MQTTContext_t * pContext, uint8_t subscribeheader[ 5 ]; uint8_t * pIndex; TransportOutVector_t pIoVector[ 4 ]; - TransportOutVector_t* pIterator; + TransportOutVector_t * pIterator; uint8_t serializedTopicFieldLength[ 2 ]; size_t totalPacketLength = 0U; + /* Subscribe packet always has 4 vector fields. Namely: * Header + Topic Filter length + Topic filter + QoS */ const size_t ioVectorLength = 4U; @@ -1601,9 +1603,10 @@ static MQTTStatus_t sendUnsubscribeWithoutCopy( MQTTContext_t * pContext, uint8_t unsubscribeheader[ 5 ]; uint8_t * pIndex; TransportOutVector_t pIoVector[ 4 ]; - TransportOutVector_t* pIterator; + TransportOutVector_t * pIterator; uint8_t serializedTopicFieldLength[ 2 ]; size_t totalPacketLength = 0U; + /* Subscribe packet always has 4 vector fields. Namely: * Header + Topic Filter length + Topic filter + QoS */ const size_t ioVectorLength = 4U; @@ -1685,7 +1688,7 @@ static MQTTStatus_t sendPublishWithoutCopy( MQTTContext_t * pContext, /* Publish packets are allowed to contain no payload. */ if( pPublishInfo->payloadLength > 0U ) { - pIoVector[ ioVectorLength ].iov_base = pPublishInfo->pPayload;; + pIoVector[ ioVectorLength ].iov_base = pPublishInfo->pPayload; pIoVector[ ioVectorLength ].iov_len = pPublishInfo->payloadLength; ioVectorLength++; @@ -1709,13 +1712,13 @@ static MQTTStatus_t sendConnectWithoutCopy( MQTTContext_t * pContext, { MQTTStatus_t status = MQTTSuccess; size_t connectPacketSize = 0; - TransportOutVector_t* iterator; + TransportOutVector_t * iterator; size_t ioVectorLength = 0U; size_t totalMessageLength = 0U; /* Connect packet header can be of maximum 15 bytes. */ uint8_t connectPacketHeader[ 15 ]; - uint8_t* pIndex = connectPacketHeader; + uint8_t * pIndex = connectPacketHeader; TransportOutVector_t pIoVector[ 11 ]; uint8_t serializedClientIDLength[ 2 ]; uint8_t serializedTopicLength[ 2 ]; @@ -1750,9 +1753,9 @@ static MQTTStatus_t sendConnectWithoutCopy( MQTTContext_t * pContext, /* The header gets sent first. */ iterator->iov_base = connectPacketHeader; iterator->iov_len = ( size_t ) ( pIndex - connectPacketHeader ); - totalMessageLength += iterator->iov_len; + totalMessageLength += iterator->iov_len; iterator++; - + /* Serialize the client ID. */ iterator = addEncodedStringToVector( serializedClientIDLength, @@ -2235,6 +2238,7 @@ MQTTStatus_t MQTT_Publish( MQTTContext_t * pContext, { size_t headerSize = 0UL, remainingLength = 0UL, packetSize = 0UL; MQTTPublishState_t publishStatus = MQTTStateNull; + /* 1 header byte + 4 bytes (maximum) required for encoding the length + * 2 bytes for topic string. */ uint8_t mqttHeader[ 7 ]; diff --git a/source/core_mqtt_serializer.c b/source/core_mqtt_serializer.c index 09b620d70..f132069dd 100644 --- a/source/core_mqtt_serializer.c +++ b/source/core_mqtt_serializer.c @@ -1359,7 +1359,7 @@ uint8_t * MQTT_SerializeConnectFixedHeader( uint8_t * pIndex, { uint8_t * pIndexLocal = pIndex; uint8_t connectFlags = 0U; - + /* The first byte in the CONNECT packet is the control packet type. */ *pIndexLocal = MQTT_PACKET_TYPE_CONNECT; pIndexLocal++; @@ -1444,7 +1444,7 @@ static void serializeConnectPacket( const MQTTConnectInfo_t * pConnectInfo, assert( pFixedBuffer->pBuffer != NULL ); pIndex = pFixedBuffer->pBuffer; - + /* Serialize the header. */ pIndex = MQTT_SerializeConnectFixedHeader( pIndex, pConnectInfo, From f3812f2b11a1595c23eac06abd4f0a107b4a913a Mon Sep 17 00:00:00 2001 From: Aniruddha Kanhere <60444055+AniruddhaKanhere@users.noreply.github.com> Date: Wed, 24 Aug 2022 23:14:09 +0000 Subject: [PATCH 08/35] Reduce complexity score --- source/core_mqtt.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/source/core_mqtt.c b/source/core_mqtt.c index 3b8bda0b3..5040c6d66 100644 --- a/source/core_mqtt.c +++ b/source/core_mqtt.c @@ -1715,6 +1715,7 @@ static MQTTStatus_t sendConnectWithoutCopy( MQTTContext_t * pContext, TransportOutVector_t * iterator; size_t ioVectorLength = 0U; size_t totalMessageLength = 0U; + int32_t bytesSentOrError; /* Connect packet header can be of maximum 15 bytes. */ uint8_t connectPacketHeader[ 15 ]; @@ -1805,7 +1806,9 @@ static MQTTStatus_t sendConnectWithoutCopy( MQTTContext_t * pContext, ioVectorLength = ( size_t ) ( ( iterator - pIoVector ) + 1 ); - if( sendMessageVector( pContext, pIoVector, ioVectorLength ) != ( int32_t ) totalMessageLength ) + bytesSentOrError = sendMessageVector( pContext, pIoVector, ioVectorLength ); + + if( bytesSentOrError != ( int32_t ) totalMessageLength ) { status = MQTTSendFailed; } From 78623b84734a4b43a572e63551327b378624179f Mon Sep 17 00:00:00 2001 From: Aniruddha Kanhere <60444055+AniruddhaKanhere@users.noreply.github.com> Date: Wed, 24 Aug 2022 23:58:26 +0000 Subject: [PATCH 09/35] Fix spell check and complexity score --- lexicon.txt | 5 ++ source/core_mqtt.c | 100 +++++++++++++++---------- source/interface/transport_interface.h | 4 +- 3 files changed, 67 insertions(+), 42 deletions(-) diff --git a/lexicon.txt b/lexicon.txt index f3e782b6b..818ba164c 100644 --- a/lexicon.txt +++ b/lexicon.txt @@ -112,7 +112,9 @@ initializeconnectinfo initializesubscribeinfo initializewillinfo int +io iot +ioveccount isn iso keepaliveintervalsec @@ -238,11 +240,13 @@ pingreqs pingreqsendtimems pingresp pingresps +piovec pismatch plaintext pmatch pmessage pmqttcontext +pmqttheader pnameindex pnetworkbuffer pnetworkcontext @@ -400,6 +404,7 @@ validatesubscribeunsubscribeparams validator waitingforpingresp willinfo +writev xa xb xc diff --git a/source/core_mqtt.c b/source/core_mqtt.c index 5040c6d66..84f82a824 100644 --- a/source/core_mqtt.c +++ b/source/core_mqtt.c @@ -620,7 +620,7 @@ static int32_t sendMessageVector( MQTTContext_t * pContext, assert( pContext != NULL ); assert( pIoVec != NULL ); assert( pContext->getTime != NULL ); - /* Send must alwaus be defined */ + /* Send must always be defined */ assert( pContext->transportInterface.send ); timeoutTime = pContext->getTime() + MQTT_SEND_RETRY_TIMEOUT_MS; @@ -1705,6 +1705,57 @@ static MQTTStatus_t sendPublishWithoutCopy( MQTTContext_t * pContext, /*-----------------------------------------------------------*/ +static void addWillAndConnectInfo( const MQTTConnectInfo_t * pConnectInfo, + const MQTTPublishInfo_t * pWillInfo, + size_t * totalMessageLength, + TransportOutVector_t * iterator, + uint8_t serializedTopicLength[ 2 ], + uint8_t serializedPayloadLength[ 2 ], + uint8_t serializedUsernameLength[ 2 ], + uint8_t serializedPasswordLength[ 2 ] ) +{ + if( pWillInfo != NULL ) + { + /* Serialize the topic. */ + iterator = addEncodedStringToVector( serializedTopicLength, + pWillInfo->pTopicName, + pWillInfo->topicNameLength, + iterator, + &totalMessageLength ); + + /* Serialize the payload. */ + iterator = addEncodedStringToVector( serializedPayloadLength, + pWillInfo->pPayload, + pWillInfo->payloadLength, + iterator, + &totalMessageLength ); + } + + /* Encode the user name if provided. */ + if( pConnectInfo->pUserName != NULL ) + { + /* Serialize the user name string. */ + iterator = addEncodedStringToVector( serializedUsernameLength, + pConnectInfo->pUserName, + pConnectInfo->userNameLength, + iterator, + &totalMessageLength ); + } + + /* Encode the password if provided. */ + if( pConnectInfo->pPassword != NULL ) + { + /* Serialize the user name string. */ + iterator = addEncodedStringToVector( serializedPasswordLength, + pConnectInfo->pPassword, + pConnectInfo->passwordLength, + iterator, + &totalMessageLength ); + } +} + +/*-----------------------------------------------------------*/ + static MQTTStatus_t sendConnectWithoutCopy( MQTTContext_t * pContext, const MQTTConnectInfo_t * pConnectInfo, const MQTTPublishInfo_t * pWillInfo, @@ -1757,7 +1808,6 @@ static MQTTStatus_t sendConnectWithoutCopy( MQTTContext_t * pContext, totalMessageLength += iterator->iov_len; iterator++; - /* Serialize the client ID. */ iterator = addEncodedStringToVector( serializedClientIDLength, pConnectInfo->pClientIdentifier, @@ -1765,44 +1815,14 @@ static MQTTStatus_t sendConnectWithoutCopy( MQTTContext_t * pContext, iterator, &totalMessageLength ); - if( pWillInfo != NULL ) - { - /* Serialize the topic. */ - iterator = addEncodedStringToVector( serializedTopicLength, - pWillInfo->pTopicName, - pWillInfo->topicNameLength, - iterator, - &totalMessageLength ); - - /* Serialize the payload. */ - iterator = addEncodedStringToVector( serializedPayloadLength, - pWillInfo->pPayload, - pWillInfo->payloadLength, - iterator, - &totalMessageLength ); - } - - /* Encode the user name if provided. */ - if( pConnectInfo->pUserName != NULL ) - { - /* Serialize the user name string. */ - iterator = addEncodedStringToVector( serializedUsernameLength, - pConnectInfo->pUserName, - pConnectInfo->userNameLength, - iterator, - &totalMessageLength ); - } - - /* Encode the password if provided. */ - if( pConnectInfo->pPassword != NULL ) - { - /* Serialize the user name string. */ - iterator = addEncodedStringToVector( serializedPasswordLength, - pConnectInfo->pPassword, - pConnectInfo->passwordLength, - iterator, - &totalMessageLength ); - } + addWillAndConnectInfo( pWillInfo, + pConnectInfo, + &totalMessageLength, + iterator, + serializedTopicLength, + serializedPayloadLength, + serializedUsernameLength, + serializedPasswordLength ); ioVectorLength = ( size_t ) ( ( iterator - pIoVector ) + 1 ); diff --git a/source/interface/transport_interface.h b/source/interface/transport_interface.h index a891c04a1..c7b6de6df 100644 --- a/source/interface/transport_interface.h +++ b/source/interface/transport_interface.h @@ -245,8 +245,8 @@ typedef int32_t ( * TransportSend_t )( NetworkContext_t * pNetworkContext, typedef struct TransportOutVector { - const void * iov_base; /* Base address of data */ - size_t iov_len; /* Length of data in buffer */ + const void * iov_base; /* Base address of data */ + size_t iov_len; /* Length of data in buffer */ } TransportOutVector_t; /** From 92886c58f4ae9f41c78a75199bc80cd94338fb62 Mon Sep 17 00:00:00 2001 From: Aniruddha Kanhere <60444055+AniruddhaKanhere@users.noreply.github.com> Date: Thu, 25 Aug 2022 00:02:55 +0000 Subject: [PATCH 10/35] Fix breaking build --- source/core_mqtt.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/core_mqtt.c b/source/core_mqtt.c index 84f82a824..9a2b89aba 100644 --- a/source/core_mqtt.c +++ b/source/core_mqtt.c @@ -680,7 +680,7 @@ static int32_t sendMessageVector( MQTTContext_t * pContext, if( ( bytesSentThisVector > 0U ) && ( pIoVectIterator < &( pIoVec[ ioVecCount ] ) ) ) { - ( ( uint8_t * ) pIoVectIterator->iov_base ) += bytesSentThisVector; + ( * ( ( uint8_t * ) pIoVectIterator->iov_base ) ) += bytesSentThisVector; pIoVectIterator->iov_len -= bytesSentThisVector; } } From 21e7180e666c0081e7ba425533b26c1cbfd833ed Mon Sep 17 00:00:00 2001 From: Aniruddha Kanhere <60444055+AniruddhaKanhere@users.noreply.github.com> Date: Thu, 25 Aug 2022 06:27:47 +0000 Subject: [PATCH 11/35] Add doxygen comments --- source/core_mqtt.c | 123 ++++++++++++++++++++++++- source/include/core_mqtt.h | 28 +++--- source/interface/transport_interface.h | 3 + 3 files changed, 138 insertions(+), 16 deletions(-) diff --git a/source/core_mqtt.c b/source/core_mqtt.c index 9a2b89aba..405b3ffb0 100644 --- a/source/core_mqtt.c +++ b/source/core_mqtt.c @@ -56,11 +56,132 @@ static int32_t sendBuffer( MQTTContext_t * pContext, size_t bytesToSend, uint32_t timeout ); +/** + * @brief Sends MQTT connect without copying the users data into any buffer. + * + * @brief param[in] pContext Initialized MQTT context. + * @brief param[in] pConnectInfo MQTT CONNECT packet information. + * @brief param[in] pWillInfo Last Will and Testament. Pass NULL if Last Will and + * Testament is not used. + * @brief param[in] remainingLength the length of the connect packet. + * + * @note This operation may call the transport send function + * repeatedly to send bytes over the network until either: + * 1. The requested number of bytes @a remainingLength have been sent. + * OR + * 2. No byte cannot be sent over the network for the MQTT_SEND_RETRY_TIMEOUT_MS + * duration. + * OR + * 3. There is an error in sending data over the network. + * + * @return #MQTTSendFailed or #MQTTSuccess. + */ static MQTTStatus_t sendConnectWithoutCopy( MQTTContext_t * pContext, const MQTTConnectInfo_t * pConnectInfo, const MQTTPublishInfo_t * pWillInfo, size_t remainingLength ); +/** + * @brief Sends the vector array passed through the parameters over the network. + * + * @note The preference is given to 'write' function if it is present in the + * transport interface. Otherwise, a send call is made repeatedly to achieve the + * result. + * + * @param[in] pContext Initialized MQTT context. + * @param[in] pIoVec The vecotr array to be sent. + * @param[in] ioVecCount The number of elements in the array. + * + * @return The total number of bytes sent or the error code as received from the + * transport interface. + */ +static int32_t sendMessageVector( MQTTContext_t * pContext, + TransportOutVector_t * pIoVec, + size_t ioVecCount ); + +/** + * @brief Add a string and its length after serializing it in a manner outlined by + * the MQTT specification. + * + * @param[in] serailizedLength Array of two bytes to which the vecotr will point. + * The array must remain in scope until the message has been sent. + * @param[in] string The string to be serialized. + * @param[in] length The length of the string to be serialized. + * @param[in] iterator The iterator pointing to the first element in the + * transport interface IO array. + * @param[out] updatedLength This parameter will be added to with the number of + * bytes added to the vector. + * + * @return The updated pointer to the vector array. + */ +static TransportOutVector_t * addEncodedStringToVector( uint8_t serailizedLength[ 2 ], + uint8_t * string, + uint8_t length, + TransportOutVector_t * iterator, + size_t * updatedLength ); + +/** + * @brief Add the will and testament information along with the connection information + * to the given vector. + * + * @param[in] pConnectInfo Connection information of MQTT. + * @param[in] pWillInfo The last will and testament information. + * @param[out] totalMessageLength This parameter will be added to with the number of + * bytes added to the vector. + * @param[in] iterator The iterator pointing to the first element in the + * transport interface IO array. + * @param[out] serializedTopicLength This array will be updated with the topic field + * length in serialized MQTT format. + * @param[out] serializedPayloadLength This array will be updated with the payload + * field length in serialized MQTT format. + * @param[out] serializedUsernameLength This array will be updated with the user name + * field length in serialized MQTT format. + * @param[out] serializedPasswordLength This array will be updated with the password + * field length in serialized MQTT format. + * + * @note All the arrays must stay in scope until the message contained in the vector has + * been sent. + */ +static void addWillAndConnectInfo( const MQTTConnectInfo_t * pConnectInfo, + const MQTTPublishInfo_t * pWillInfo, + size_t * totalMessageLength, + TransportOutVector_t * iterator, + uint8_t serializedTopicLength[ 2 ], + uint8_t serializedPayloadLength[ 2 ], + uint8_t serializedUsernameLength[ 2 ], + uint8_t serializedPasswordLength[ 2 ] ); + +/** + * @brief Send MQTT SUBSCRIBE message without copying the user data into a buffer and + * directly sending it. + * + * @param[in] pContext Initialized MQTT context. + * @param[in] pSubscription MQTT subscription info. + * @param[in] packetId The packet ID of the subscribe packet + * @param[in] remainingLength The remaining length of the subscribe packet. + * + * @return #MQTTSuccess or #MQTTSendFailed. + */ +static MQTTStatus_t sendSubscribeWithoutCopy( MQTTContext_t * pContext, + const MQTTSubscribeInfo_t * pSubscription, + uint16_t packetId, + size_t remainingLength ); + +/** + * @brief Send MQTT UNSUBSCRIBE message without copying the user data into a buffer and + * directly sending it. + * + * @param[in] pContext Initialized MQTT context. + * @param[in] pSubscription MQTT subscription info. + * @param[in] packetId The packet ID of the unsubscribe packet. + * @param[in] remainingLength The remaining length of the unsubscribe packet. + * + * @return #MQTTSuccess or #MQTTSendFailed. + */ +static MQTTStatus_t sendUnsubscribeWithoutCopy( MQTTContext_t * pContext, + const MQTTSubscribeInfo_t * pSubscription, + uint16_t packetId, + size_t remainingLength ); /** * @brief Calculate the interval between two millisecond timestamps, including * when the later value has overflowed. @@ -1515,7 +1636,7 @@ static MQTTStatus_t validateSubscribeUnsubscribeParams( const MQTTContext_t * pC /*-----------------------------------------------------------*/ static TransportOutVector_t * addEncodedStringToVector( uint8_t serailizedLength[ 2 ], - uint8_t string, + uint8_t * string, uint8_t length, TransportOutVector_t * iterator, size_t * updatedLength ) diff --git a/source/include/core_mqtt.h b/source/include/core_mqtt.h index 361e3cf36..2b611f56f 100644 --- a/source/include/core_mqtt.h +++ b/source/include/core_mqtt.h @@ -425,12 +425,10 @@ MQTTStatus_t MQTT_Connect( MQTTContext_t * pContext, /* @[declare_mqtt_connect] */ /** - * @brief Sends MQTT SUBSCRIBE for the given list of topic filters to - * the broker. + * @brief Sends MQTT SUBSCRIBE for the given topic filter to the broker. * * @param[in] pContext Initialized MQTT context. - * @param[in] pSubscriptionList List of MQTT subscription info. - * @param[in] subscriptionCount The number of elements in pSubscriptionList. + * @param[in] pSubscription MQTT subscription info. * @param[in] packetId Packet ID generated by #MQTT_GetPacketId. * * @return #MQTTNoMemory if the #MQTTContext_t.networkBuffer is too small to @@ -451,25 +449,25 @@ MQTTStatus_t MQTT_Connect( MQTTContext_t * pContext, * // This is assumed to be a list of filters we want to subscribe to. * const char * filters[ NUMBER_OF_SUBSCRIPTIONS ]; * - * // Set each subscription. + * // Subscribe to each subscription filter. * for( int i = 0; i < NUMBER_OF_SUBSCRIPTIONS; i++ ) * { * subscriptionList[ i ].qos = MQTTQoS0; * // Each subscription needs a topic filter. * subscriptionList[ i ].pTopicFilter = filters[ i ]; * subscriptionList[ i ].topicFilterLength = strlen( filters[ i ] ); - * } - * - * // Obtain a new packet id for the subscription. - * packetId = MQTT_GetPacketId( pContext ); + * + * // Obtain a new packet id for the subscription. + * packetId = MQTT_GetPacketId( pContext ); * - * status = MQTT_Subscribe( pContext, &subscriptionList[ 0 ], NUMBER_OF_SUBSCRIPTIONS, packetId ); + * status = MQTT_Subscribe( pContext, &subscriptionList[ i ], packetId ); * - * if( status == MQTTSuccess ) - * { - * // We must now call MQTT_ReceiveLoop() or MQTT_ProcessLoop() to receive the SUBACK. - * // If the broker accepts the subscription we can now receive publishes - * // on the requested topics. + * if( status == MQTTSuccess ) + * { + * // We must now call MQTT_ReceiveLoop() or MQTT_ProcessLoop() to receive the SUBACK. + * // If the broker accepts the subscription we can now receive publishes + * // on the requested topics. + * } * } * @endcode */ diff --git a/source/interface/transport_interface.h b/source/interface/transport_interface.h index c7b6de6df..25bd88091 100644 --- a/source/interface/transport_interface.h +++ b/source/interface/transport_interface.h @@ -243,6 +243,9 @@ typedef int32_t ( * TransportSend_t )( NetworkContext_t * pNetworkContext, size_t bytesToSend ); /* @[define_transportsend] */ +/** + * @brief Transport vector structure for sending multiple messages. + */ typedef struct TransportOutVector { const void * iov_base; /* Base address of data */ From 399803f91b4a51cf0b554724fa1f0dc9c7d64022 Mon Sep 17 00:00:00 2001 From: Aniruddha Kanhere <60444055+AniruddhaKanhere@users.noreply.github.com> Date: Thu, 25 Aug 2022 06:47:49 +0000 Subject: [PATCH 12/35] Fix doxygen part 2 --- source/include/core_mqtt.h | 24 ++++++------ source/include/core_mqtt_serializer.h | 54 +++++++++++++++++++++++++-- 2 files changed, 62 insertions(+), 16 deletions(-) diff --git a/source/include/core_mqtt.h b/source/include/core_mqtt.h index 2b611f56f..8a035b21d 100644 --- a/source/include/core_mqtt.h +++ b/source/include/core_mqtt.h @@ -539,12 +539,10 @@ MQTTStatus_t MQTT_Ping( MQTTContext_t * pContext ); /* @[declare_mqtt_ping] */ /** - * @brief Sends MQTT UNSUBSCRIBE for the given list of topic filters to - * the broker. + * @brief Sends MQTT UNSUBSCRIBE for the topic filter to the broker. * * @param[in] pContext Initialized MQTT context. - * @param[in] pSubscriptionList List of MQTT subscription info. - * @param[in] subscriptionCount The number of elements in pSubscriptionList. + * @param[in] pSubscription MQTT subscription info. * @param[in] packetId packet ID generated by #MQTT_GetPacketId. * * @return #MQTTNoMemory if the #MQTTContext_t.networkBuffer is too small to @@ -572,17 +570,17 @@ MQTTStatus_t MQTT_Ping( MQTTContext_t * pContext ); * unsubscribeList[ i ].topicFilterLength = strlen( filters[ i ] ); * * // The QoS field of MQTT_SubscribeInfo_t is unused for unsubscribing. - * } - * - * // Obtain a new packet id for the unsubscribe request. - * packetId = MQTT_GetPacketId( pContext ); + * + * // Obtain a new packet id for the unsubscribe request. + * packetId = MQTT_GetPacketId( pContext ); * - * status = MQTT_Unsubscribe( pContext, &unsubscribeList[ 0 ], NUMBER_OF_SUBSCRIPTIONS, packetId ); + * status = MQTT_Unsubscribe( pContext, &unsubscribeList[ i ], packetId ); * - * if( status == MQTTSuccess ) - * { - * // We must now call MQTT_ReceiveLoop() or MQTT_ProcessLoop() to receive the UNSUBACK. - * // After this the broker should no longer send publishes for these topics. + * if( status == MQTTSuccess ) + * { + * // We must now call MQTT_ReceiveLoop() or MQTT_ProcessLoop() to receive the UNSUBACK. + * // After this the broker should no longer send publishes for these topics. + * } * } * @endcode */ diff --git a/source/include/core_mqtt_serializer.h b/source/include/core_mqtt_serializer.h index 644f30c7d..7341a7ba5 100644 --- a/source/include/core_mqtt_serializer.h +++ b/source/include/core_mqtt_serializer.h @@ -862,10 +862,9 @@ MQTTStatus_t MQTT_SerializePublish( const MQTTPublishInfo_t * pPublishInfo, * as the size returned by #MQTT_GetPublishPacketSize. * * @param[in] pPublishInfo MQTT PUBLISH packet parameters. - * @param[in] packetId packet ID generated by #MQTT_GetPacketId. * @param[in] remainingLength Remaining Length provided by #MQTT_GetPublishPacketSize. - * @param[out] pFixedBuffer Buffer for packet serialization. - * @param[out] pHeaderSize Size of the serialized MQTT PUBLISH header. + * @param[out] pBuffer Buffer for packet serialization. + * @param[out] headerSize Size of the serialized MQTT PUBLISH header. * * @return #MQTTNoMemory if pFixedBuffer is too small to hold the MQTT packet; * #MQTTBadParameter if invalid parameters are passed; @@ -1315,15 +1314,64 @@ MQTTStatus_t MQTT_GetIncomingPacketTypeAndLength( TransportRecv_t readFunc, MQTTPacketInfo_t * pIncomingPacket ); /* @[declare_mqtt_getincomingpackettypeandlength] */ +/** + * @brief Serialize the fixed part of the connect packet header. + * + * @param[out] pIndex Pointer to the buffer where the header is to + * be serialized. + * @param[in] pConnectInfo The connect information. + * @param[in] pWillInfo The last will and testament information. + * @param[in] remainingLength The remaining length of the packet to be + * serialized. + * + * @return A pointer to the end of the encoded string. + */ + +/** + * @cond DOXYGEN_IGNORE + * Doxygen should ignore this definition, this function is private. + */ uint8_t * MQTT_SerializeConnectFixedHeader( uint8_t * pIndex, const MQTTConnectInfo_t * pConnectInfo, const MQTTPublishInfo_t * pWillInfo, size_t remainingLength ); +/** + * @brief Serialize the fixed part of the subscribe packet header. + * + * @param[in] remainingLength The remaining length of the packet to be + * serialized. + * @param[in] pIndex Pointer to the buffer where the header is to + * be serialized. + * @param[in] packetId The packet ID to be serialized. + * + * @return A pointer to the end of the encoded string. + */ + +/** + * @cond DOXYGEN_IGNORE + * Doxygen should ignore this definition, this function is private. + */ uint8_t * MQTT_SerializeSubscribeHeader( size_t remainingLength, uint8_t * pIndex, uint16_t packetId ); +/** + * @brief Serialize the fixed part of the unsubscribe packet header. + * + * @param[in] remainingLength The remaining length of the packet to be + * serialized. + * @param[in] pIndex Pointer to the buffer where the header is to + * be serialized. + * @param[in] packetId The packet ID to be serialized. + * + * @return A pointer to the end of the encoded string. + */ + +/** + * @cond DOXYGEN_IGNORE + * Doxygen should ignore this definition, this function is private. + */ uint8_t * MQTT_SerializeUnsubscribeHeader( size_t remainingLength, uint8_t * pIndex, uint16_t packetId ); From 2caf1966dd325be45b32e31f1573a52824e7877b Mon Sep 17 00:00:00 2001 From: Aniruddha Kanhere <60444055+AniruddhaKanhere@users.noreply.github.com> Date: Thu, 25 Aug 2022 06:51:31 +0000 Subject: [PATCH 13/35] Doxygen fix part 3 --- source/include/core_mqtt_serializer.h | 3 +++ source/interface/transport_interface.h | 11 +++++++++-- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/source/include/core_mqtt_serializer.h b/source/include/core_mqtt_serializer.h index 7341a7ba5..e36255013 100644 --- a/source/include/core_mqtt_serializer.h +++ b/source/include/core_mqtt_serializer.h @@ -1335,6 +1335,7 @@ uint8_t * MQTT_SerializeConnectFixedHeader( uint8_t * pIndex, const MQTTConnectInfo_t * pConnectInfo, const MQTTPublishInfo_t * pWillInfo, size_t remainingLength ); +/** @endcond */ /** * @brief Serialize the fixed part of the subscribe packet header. @@ -1355,6 +1356,7 @@ uint8_t * MQTT_SerializeConnectFixedHeader( uint8_t * pIndex, uint8_t * MQTT_SerializeSubscribeHeader( size_t remainingLength, uint8_t * pIndex, uint16_t packetId ); +/** @endcond */ /** * @brief Serialize the fixed part of the unsubscribe packet header. @@ -1375,6 +1377,7 @@ uint8_t * MQTT_SerializeSubscribeHeader( size_t remainingLength, uint8_t * MQTT_SerializeUnsubscribeHeader( size_t remainingLength, uint8_t * pIndex, uint16_t packetId ); +/** @endcond */ /* *INDENT-OFF* */ #ifdef __cplusplus diff --git a/source/interface/transport_interface.h b/source/interface/transport_interface.h index 25bd88091..a884e7ee2 100644 --- a/source/interface/transport_interface.h +++ b/source/interface/transport_interface.h @@ -248,8 +248,15 @@ typedef int32_t ( * TransportSend_t )( NetworkContext_t * pNetworkContext, */ typedef struct TransportOutVector { - const void * iov_base; /* Base address of data */ - size_t iov_len; /* Length of data in buffer */ + /** + * @brief Base address of data. + */ + const void * iov_base; + + /** + * @brief Length of data in buffer. + */ + size_t iov_len; } TransportOutVector_t; /** From cb533e5e3ed05957e6582cf08e78cf17d84b04dc Mon Sep 17 00:00:00 2001 From: Aniruddha Kanhere <60444055+AniruddhaKanhere@users.noreply.github.com> Date: Thu, 25 Aug 2022 07:04:18 +0000 Subject: [PATCH 14/35] Fix doxygen part 4 --- source/include/core_mqtt_serializer.h | 3 +++ 1 file changed, 3 insertions(+) diff --git a/source/include/core_mqtt_serializer.h b/source/include/core_mqtt_serializer.h index e36255013..5887f3bba 100644 --- a/source/include/core_mqtt_serializer.h +++ b/source/include/core_mqtt_serializer.h @@ -1315,6 +1315,7 @@ MQTTStatus_t MQTT_GetIncomingPacketTypeAndLength( TransportRecv_t readFunc, /* @[declare_mqtt_getincomingpackettypeandlength] */ /** + * @fn uint8_t * MQTT_SerializeConnectFixedHeader( uint8_t * pIndex, const MQTTConnectInfo_t * pConnectInfo, const MQTTPublishInfo_t * pWillInfo, size_t remainingLength ); * @brief Serialize the fixed part of the connect packet header. * * @param[out] pIndex Pointer to the buffer where the header is to @@ -1338,6 +1339,7 @@ uint8_t * MQTT_SerializeConnectFixedHeader( uint8_t * pIndex, /** @endcond */ /** + * @fn uint8_t * MQTT_SerializeSubscribeHeader( size_t remainingLength, uint8_t * pIndex, uint16_t packetId ); * @brief Serialize the fixed part of the subscribe packet header. * * @param[in] remainingLength The remaining length of the packet to be @@ -1359,6 +1361,7 @@ uint8_t * MQTT_SerializeSubscribeHeader( size_t remainingLength, /** @endcond */ /** + * @fn uint8_t * MQTT_SerializeUnsubscribeHeader( size_t remainingLength, uint8_t * pIndex, uint16_t packetId ); * @brief Serialize the fixed part of the unsubscribe packet header. * * @param[in] remainingLength The remaining length of the packet to be From c16927c84f88fbd9f404588156bfe1860f3f71b9 Mon Sep 17 00:00:00 2001 From: Aniruddha Kanhere <60444055+AniruddhaKanhere@users.noreply.github.com> Date: Thu, 25 Aug 2022 07:19:33 +0000 Subject: [PATCH 15/35] Fix some checks --- lexicon.txt | 9 +++++++++ source/core_mqtt.c | 11 ++++++----- source/include/core_mqtt.h | 4 ++-- source/include/core_mqtt_serializer.h | 11 +---------- 4 files changed, 18 insertions(+), 17 deletions(-) diff --git a/lexicon.txt b/lexicon.txt index 818ba164c..1718dc35a 100644 --- a/lexicon.txt +++ b/lexicon.txt @@ -235,6 +235,7 @@ pfilterindex pfixedbuffer pheadersize pincomingpacket +pindex pingreq pingreqs pingreqsendtimems @@ -281,6 +282,7 @@ psuback psubackpacket psubscribeinfo psubscribes +psubscription psubscriptionlist ptopic ptopicfilter @@ -332,6 +334,11 @@ serializeack serializeconnect serializeconnectpacket serializedisconnect +serailizedlength +serializedpasswordlength +serializedpayloadlength +serializedtopiclength +serializedusernamelength serializepayload serializepingreq serializepublish @@ -378,6 +385,7 @@ tlssend toolchain topicfilterlength topicnamelength +totalmessagelength tr transportcallback transportinterface @@ -394,6 +402,7 @@ uint un unsuback unsubscribelist +updatedlength updatestateack updatestatepublish updatestatestatus diff --git a/source/core_mqtt.c b/source/core_mqtt.c index 405b3ffb0..3240f4cdc 100644 --- a/source/core_mqtt.c +++ b/source/core_mqtt.c @@ -74,7 +74,7 @@ static int32_t sendBuffer( MQTTContext_t * pContext, * OR * 3. There is an error in sending data over the network. * - * @return #MQTTSendFailed or #MQTTSuccess. + * @return #MQTTSendFailed or #MQTTSuccess. */ static MQTTStatus_t sendConnectWithoutCopy( MQTTContext_t * pContext, const MQTTConnectInfo_t * pConnectInfo, @@ -89,7 +89,7 @@ static MQTTStatus_t sendConnectWithoutCopy( MQTTContext_t * pContext, * result. * * @param[in] pContext Initialized MQTT context. - * @param[in] pIoVec The vecotr array to be sent. + * @param[in] pIoVec The vector array to be sent. * @param[in] ioVecCount The number of elements in the array. * * @return The total number of bytes sent or the error code as received from the @@ -103,7 +103,7 @@ static int32_t sendMessageVector( MQTTContext_t * pContext, * @brief Add a string and its length after serializing it in a manner outlined by * the MQTT specification. * - * @param[in] serailizedLength Array of two bytes to which the vecotr will point. + * @param[in] serailizedLength Array of two bytes to which the vector will point. * The array must remain in scope until the message has been sent. * @param[in] string The string to be serialized. * @param[in] length The length of the string to be serialized. @@ -138,7 +138,7 @@ static TransportOutVector_t * addEncodedStringToVector( uint8_t serailizedLength * field length in serialized MQTT format. * @param[out] serializedPasswordLength This array will be updated with the password * field length in serialized MQTT format. - * + * * @note All the arrays must stay in scope until the message contained in the vector has * been sent. */ @@ -182,6 +182,7 @@ static MQTTStatus_t sendUnsubscribeWithoutCopy( MQTTContext_t * pContext, const MQTTSubscribeInfo_t * pSubscription, uint16_t packetId, size_t remainingLength ); + /** * @brief Calculate the interval between two millisecond timestamps, including * when the later value has overflowed. @@ -801,7 +802,7 @@ static int32_t sendMessageVector( MQTTContext_t * pContext, if( ( bytesSentThisVector > 0U ) && ( pIoVectIterator < &( pIoVec[ ioVecCount ] ) ) ) { - ( * ( ( uint8_t * ) pIoVectIterator->iov_base ) ) += bytesSentThisVector; + ( *( ( uint8_t * ) pIoVectIterator->iov_base ) ) += bytesSentThisVector; pIoVectIterator->iov_len -= bytesSentThisVector; } } diff --git a/source/include/core_mqtt.h b/source/include/core_mqtt.h index 8a035b21d..5b5de6d8f 100644 --- a/source/include/core_mqtt.h +++ b/source/include/core_mqtt.h @@ -456,7 +456,7 @@ MQTTStatus_t MQTT_Connect( MQTTContext_t * pContext, * // Each subscription needs a topic filter. * subscriptionList[ i ].pTopicFilter = filters[ i ]; * subscriptionList[ i ].topicFilterLength = strlen( filters[ i ] ); - * + * * // Obtain a new packet id for the subscription. * packetId = MQTT_GetPacketId( pContext ); * @@ -570,7 +570,7 @@ MQTTStatus_t MQTT_Ping( MQTTContext_t * pContext ); * unsubscribeList[ i ].topicFilterLength = strlen( filters[ i ] ); * * // The QoS field of MQTT_SubscribeInfo_t is unused for unsubscribing. - * + * * // Obtain a new packet id for the unsubscribe request. * packetId = MQTT_GetPacketId( pContext ); * diff --git a/source/include/core_mqtt_serializer.h b/source/include/core_mqtt_serializer.h index 5887f3bba..3240c2594 100644 --- a/source/include/core_mqtt_serializer.h +++ b/source/include/core_mqtt_serializer.h @@ -848,27 +848,18 @@ MQTTStatus_t MQTT_SerializePublish( const MQTTPublishInfo_t * pPublishInfo, const MQTTFixedBuffer_t * pFixedBuffer ); /* @[declare_mqtt_serializepublish] */ -/* TODO: Re-write the brief comments below. */ /** * @brief Serialize an MQTT PUBLISH packet header without the topic string in the * given buffer. This function will add the topic string length to the provided * buffer. This helps reduce an unnecessary copy of the topic string into the * buffer. * - * #MQTT_GetPublishPacketSize should be called with @p pPublishInfo before - * invoking this function to get the size of the required buffer and - * @p remainingLength. The @p remainingLength must be the same as returned by - * #MQTT_GetPublishPacketSize. The buffer must be at least as large - * as the size returned by #MQTT_GetPublishPacketSize. - * * @param[in] pPublishInfo MQTT PUBLISH packet parameters. * @param[in] remainingLength Remaining Length provided by #MQTT_GetPublishPacketSize. * @param[out] pBuffer Buffer for packet serialization. * @param[out] headerSize Size of the serialized MQTT PUBLISH header. * - * @return #MQTTNoMemory if pFixedBuffer is too small to hold the MQTT packet; - * #MQTTBadParameter if invalid parameters are passed; - * #MQTTSuccess otherwise. + * @return #MQTTSuccess if the serialization is successful. Otherwise, #MQTTBadParameter. */ MQTTStatus_t MQTT_SerializePublishHeaderWithoutTopic( const MQTTPublishInfo_t * pPublishInfo, size_t remainingLength, From effd5d4a6bf1992b9d4c919460bf040ad3a15733 Mon Sep 17 00:00:00 2001 From: Aniruddha Kanhere <60444055+AniruddhaKanhere@users.noreply.github.com> Date: Thu, 25 Aug 2022 07:23:11 +0000 Subject: [PATCH 16/35] Fix memory tables --- docs/doxygen/include/size_table.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/docs/doxygen/include/size_table.md b/docs/doxygen/include/size_table.md index 9f085682c..3f5d85312 100644 --- a/docs/doxygen/include/size_table.md +++ b/docs/doxygen/include/size_table.md @@ -9,8 +9,8 @@