diff --git a/source/core_mqtt.c b/source/core_mqtt.c index e1e9c5f89..a47113e42 100644 --- a/source/core_mqtt.c +++ b/source/core_mqtt.c @@ -733,11 +733,14 @@ static int32_t sendMessageVector( MQTTContext_t * pContext, TransportOutVector_t * pIoVec, size_t ioVecCount ) { - uint32_t timeoutTime; - uint32_t bytesToSend = 0U; - int32_t bytesSentOrError = 0; + int32_t sendResult; + uint32_t lastSendTimeMs; + uint32_t timeSinceLastSendMs; TransportOutVector_t * pIoVectIterator; size_t vectorsToBeSent = ioVecCount; + size_t bytesToSend = 0; + int32_t bytesSentOrError = 0; + assert( pContext != NULL ); assert( pIoVec != NULL ); @@ -745,23 +748,22 @@ static int32_t sendMessageVector( MQTTContext_t * pContext, /* Send must always be defined */ assert( pContext->transportInterface.send != NULL ); - timeoutTime = pContext->getTime() + MQTT_SEND_RETRY_TIMEOUT_MS; - /* Count the total number of bytes to be sent as outlined in the vector. */ for( pIoVectIterator = pIoVec; pIoVectIterator <= &( pIoVec[ ioVecCount - 1U ] ); pIoVectIterator++ ) { - bytesToSend += ( uint32_t ) pIoVectIterator->iov_len; + bytesToSend += pIoVectIterator->iov_len; } /* Reset the iterator to point to the first entry in the array. */ pIoVectIterator = pIoVec; - while( ( bytesSentOrError < ( int32_t ) bytesToSend ) && - ( bytesSentOrError >= 0 ) ) - { - int32_t sendResult; - uint32_t bytesSentThisVector = 0U; + /* This initializes the last send time to the current time to cover the case + * where the transport send function returns 0 on the first iteration of the + * while loop. */ + lastSendTimeMs = pContext->getTime(); + while( ( ( size_t ) bytesSentOrError < bytesToSend ) && ( bytesSentOrError >= 0 ) ) + { if( pContext->transportInterface.writev != NULL ) { sendResult = pContext->transportInterface.writev( pContext->transportInterface.pNetworkContext, @@ -770,49 +772,65 @@ static int32_t sendMessageVector( MQTTContext_t * pContext, } else { - sendResult = sendBuffer( pContext, - pIoVectIterator->iov_base, - pIoVectIterator->iov_len ); + sendResult = pContext->transportInterface.send( pContext->transportInterface.pNetworkContext, + pIoVectIterator->iov_base, + pIoVectIterator->iov_len ); } - if( sendResult >= 0 ) + if( sendResult > 0 ) { + /* It is a bug in the application's transport send implementation if + * more bytes than expected are sent. */ + assert( ( size_t ) sendResult <= ( bytesToSend - ( size_t ) bytesSentOrError ) ); + bytesSentOrError += sendResult; - bytesSentThisVector += ( uint32_t ) sendResult; + + /* Reset timeout since data was sent. */ + lastSendTimeMs = pContext->getTime(); + + LogDebug( ( "sendMessageVector: BytesSent=%ld, BytesRemaining=%ld", + ( long int ) sendResult, + ( long int ) ( bytesToSend - ( size_t ) bytesSentOrError ) ) ); } - else + else if( sendResult < 0 ) { bytesSentOrError = sendResult; + } - /* We do not need to break here as the condition is checked in the loop. - * The following statements will not execute as bytesSentThisVector is not - * updated and is still 0. */ + /* Check for timeout if we have been waiting to send any data over the network. */ + timeSinceLastSendMs = calculateElapsedTime( pContext->getTime(), lastSendTimeMs ); + + if( timeSinceLastSendMs >= MQTT_SEND_RETRY_TIMEOUT_MS ) + { + LogError( ( "sendMessageVector: Unable to send packet: Timed out." ) ); + break; } + /* Update the send pointer to the correct vector and offset. */ while( ( pIoVectIterator <= &( pIoVec[ ioVecCount - 1U ] ) ) && - ( bytesSentThisVector >= pIoVectIterator->iov_len ) ) + ( ( size_t ) sendResult >= pIoVectIterator->iov_len ) ) { - bytesSentThisVector -= ( uint32_t ) pIoVectIterator->iov_len; + sendResult -= ( int32_t ) pIoVectIterator->iov_len; pIoVectIterator++; - /* Update the number of vector which are yet to be sent. */ vectorsToBeSent--; } /* Some of the bytes from this vector were sent as well, update the length * and the pointer to data in this vector. */ - if( ( bytesSentThisVector > 0U ) && + if( ( sendResult > 0 ) && ( pIoVectIterator <= &( pIoVec[ ioVecCount - 1U ] ) ) ) { - pIoVectIterator->iov_base = ( const void * ) &( ( ( const uint8_t * ) pIoVectIterator->iov_base )[ bytesSentThisVector ] ); - pIoVectIterator->iov_len -= bytesSentThisVector; + pIoVectIterator->iov_base = ( const void * ) &( ( ( const uint8_t * ) pIoVectIterator->iov_base )[ sendResult ] ); + pIoVectIterator->iov_len -= sendResult; } + } - /* Check for timeout. */ - if( pContext->getTime() > timeoutTime ) - { - break; - } + if( ( size_t ) bytesSentOrError == bytesToSend ) + { + pContext->lastPacketTxTime = lastSendTimeMs; + LogDebug( ( "sendMessageVector: Successfully sent packet at time %lu.", + ( unsigned long ) lastSendTimeMs ) ); } return bytesSentOrError; @@ -822,74 +840,70 @@ static int32_t sendBuffer( MQTTContext_t * pContext, const uint8_t * pBufferToSend, size_t bytesToSend ) { + int32_t sendResult; + uint32_t lastSendTimeMs; + uint32_t timeSinceLastSendMs; + int32_t bytesSentOrError = 0; const uint8_t * pIndex = pBufferToSend; - size_t bytesRemaining; - int32_t totalBytesSent = 0, bytesSent; - uint32_t lastSendTimeMs = 0U, timeSinceLastSendMs = 0U; - bool sendError = false; assert( pContext != NULL ); assert( pContext->getTime != NULL ); assert( pContext->transportInterface.send != NULL ); assert( pIndex != NULL ); - bytesRemaining = bytesToSend; + /* This initializes the last send time to the current time to cover the case + * where the transport send function returns 0 on the first iteration of the + * while loop. */ + lastSendTimeMs = pContext->getTime(); - /* Loop until the entire packet is sent. */ - while( ( bytesRemaining > 0UL ) && ( sendError == false ) ) + while( ( ( size_t ) bytesSentOrError < bytesToSend ) ) { - bytesSent = pContext->transportInterface.send( pContext->transportInterface.pNetworkContext, - pIndex, - bytesRemaining ); + sendResult = pContext->transportInterface.send( pContext->transportInterface.pNetworkContext, + pIndex, + bytesToSend - ( size_t ) bytesSentOrError ); - if( bytesSent < 0 ) + if( sendResult > 0 ) { - LogError( ( "Transport send failed. Error code=%ld.", ( long int ) bytesSent ) ); - totalBytesSent = bytesSent; - sendError = true; - } - else if( bytesSent > 0 ) - { - /* Record the most recent time of successful transmission. */ - lastSendTimeMs = pContext->getTime(); - /* It is a bug in the application's transport send implementation if - * more bytes than expected are sent. To avoid a possible overflow - * in converting bytesRemaining from unsigned to signed, this assert - * must exist after the check for bytesSent being negative. */ - assert( ( size_t ) bytesSent <= bytesRemaining ); + * more bytes than expected are sent. */ + assert( ( size_t ) sendResult <= ( bytesToSend - ( size_t ) bytesSentOrError ) ); - bytesRemaining -= ( size_t ) bytesSent; - totalBytesSent += bytesSent; - /* Increment the index. */ - pIndex = &pIndex[ bytesSent ]; - LogDebug( ( "BytesSent=%ld, BytesRemaining=%lu", - ( long int ) bytesSent, - ( unsigned long ) bytesRemaining ) ); + bytesSentOrError += sendResult; + pIndex = &pIndex[ sendResult ]; + + /* Reset timeout. */ + lastSendTimeMs = pContext->getTime(); + + LogDebug( ( "sendBuffer: BytesSent=%ld, BytesRemaining=%ld", + ( long int ) sendResult, + ( long int ) ( bytesToSend - ( size_t ) bytesSentOrError ) ) ); + } + else if( sendResult < 0 ) + { + bytesSentOrError = sendResult; + break; } else { - /* No bytes were sent over the network. */ timeSinceLastSendMs = calculateElapsedTime( pContext->getTime(), lastSendTimeMs ); /* Check for timeout if we have been waiting to send any data over the network. */ if( timeSinceLastSendMs >= MQTT_SEND_RETRY_TIMEOUT_MS ) { - LogError( ( "Unable to send packet: Timed out in transport send." ) ); - sendError = true; + LogError( ( "sendBuffer: Unable to send packet: Timed out." ) ); + break; } } } - /* Update time of last transmission if the entire packet is successfully sent. */ - if( totalBytesSent > 0 ) + if( ( size_t ) bytesSentOrError == bytesToSend ) { pContext->lastPacketTxTime = lastSendTimeMs; - LogDebug( ( "Successfully sent packet at time %lu.", + LogDebug( ( "sendBuffer: Successfully sent packet at time %lu.", ( unsigned long ) lastSendTimeMs ) ); } - return totalBytesSent; + return bytesSentOrError; } /*-----------------------------------------------------------*/ @@ -1241,7 +1255,7 @@ static MQTTStatus_t sendPublishAcks( MQTTContext_t * pContext, { MQTTStatus_t status = MQTTSuccess; MQTTPublishState_t newState = MQTTStateNull; - int32_t bytesSent = 0; + int32_t sendResult = 0; uint8_t packetTypeByte = 0U; MQTTPubAckType_t packetType; MQTTFixedBuffer_t localBuffer; @@ -1268,14 +1282,14 @@ static MQTTStatus_t sendPublishAcks( MQTTContext_t * pContext, /* Here, we are not using the vector approach for efficiency. There is just one buffer * to be sent which can be achieved with a normal send call. */ - bytesSent = sendBuffer( pContext, - localBuffer.pBuffer, - MQTT_PUBLISH_ACK_PACKET_SIZE ); + sendResult = sendBuffer( pContext, + localBuffer.pBuffer, + MQTT_PUBLISH_ACK_PACKET_SIZE ); MQTT_POST_SEND_HOOK( pContext ); } - if( bytesSent == ( int32_t ) MQTT_PUBLISH_ACK_PACKET_SIZE ) + if( sendResult == ( int32_t ) MQTT_PUBLISH_ACK_PACKET_SIZE ) { pContext->controlPacketSent = true; @@ -1299,7 +1313,7 @@ static MQTTStatus_t sendPublishAcks( MQTTContext_t * pContext, { LogError( ( "Failed to send ACK packet: PacketType=%02x, SentBytes=%ld, " "PacketSize=%lu.", - ( unsigned int ) packetTypeByte, ( long int ) bytesSent, + ( unsigned int ) packetTypeByte, ( long int ) sendResult, MQTT_PUBLISH_ACK_PACKET_SIZE ) ); status = MQTTSendFailed; } @@ -2828,7 +2842,7 @@ MQTTStatus_t MQTT_Publish( MQTTContext_t * pContext, MQTTStatus_t MQTT_Ping( MQTTContext_t * pContext ) { - int32_t bytesSent = 0; + int32_t sendResult = 0; MQTTStatus_t status = MQTTSuccess; size_t packetSize = 0U; /* MQTT ping packets are of fixed length. */ @@ -2876,15 +2890,15 @@ MQTTStatus_t MQTT_Ping( MQTTContext_t * pContext ) * Here, we do not use the vectored IO approach for efficiency as the * Ping packet does not have numerous fields which need to be copied * from the user provided buffers. Thus it can be sent directly. */ - bytesSent = sendBuffer( pContext, - localBuffer.pBuffer, - 2U ); + sendResult = sendBuffer( pContext, + localBuffer.pBuffer, + 2U ); /* Give the mutex away. */ MQTT_POST_SEND_HOOK( pContext ); /* It is an error to not send the entire PINGREQ packet. */ - if( bytesSent < ( int32_t ) packetSize ) + if( sendResult < ( int32_t ) packetSize ) { LogError( ( "Transport send failed for PINGREQ packet." ) ); status = MQTTSendFailed; @@ -2894,7 +2908,7 @@ MQTTStatus_t MQTT_Ping( MQTTContext_t * pContext ) pContext->pingReqSendTimeMs = pContext->lastPacketTxTime; pContext->waitingForPingResp = true; LogDebug( ( "Sent %ld bytes of PINGREQ packet.", - ( long int ) bytesSent ) ); + ( long int ) sendResult ) ); } } @@ -2951,7 +2965,7 @@ MQTTStatus_t MQTT_Unsubscribe( MQTTContext_t * pContext, MQTTStatus_t MQTT_Disconnect( MQTTContext_t * pContext ) { size_t packetSize = 0U; - int32_t bytesSent = 0; + int32_t sendResult = 0; MQTTStatus_t status = MQTTSuccess; MQTTFixedBuffer_t localBuffer; uint8_t disconnectPacket[ 2U ]; @@ -2988,14 +3002,14 @@ MQTTStatus_t MQTT_Disconnect( MQTTContext_t * pContext ) /* Here we do not use vectors as the disconnect packet has fixed fields * which do not reside in user provided buffers. Thus, it can be sent * using a simple send call. */ - bytesSent = sendBuffer( pContext, - localBuffer.pBuffer, - packetSize ); + sendResult = sendBuffer( pContext, + localBuffer.pBuffer, + packetSize ); /* Give the mutex away. */ MQTT_POST_SEND_HOOK( pContext ); - if( bytesSent < ( int32_t ) packetSize ) + if( sendResult < ( int32_t ) packetSize ) { LogError( ( "Transport send failed for DISCONNECT packet." ) ); status = MQTTSendFailed; @@ -3003,7 +3017,7 @@ MQTTStatus_t MQTT_Disconnect( MQTTContext_t * pContext ) else { LogDebug( ( "Sent %ld bytes of DISCONNECT packet.", - ( long int ) bytesSent ) ); + ( long int ) sendResult ) ); } }