Skip to content

Commit

Permalink
Fixes to timeout of sendMessageVector and refactor of sendBuffer for …
Browse files Browse the repository at this point in the history
…consistency.
  • Loading branch information
Jason Carroll committed Oct 4, 2022
1 parent 10d85cb commit 2900e20
Showing 1 changed file with 104 additions and 93 deletions.
197 changes: 104 additions & 93 deletions source/core_mqtt.c
Original file line number Diff line number Diff line change
Expand Up @@ -733,35 +733,37 @@ static int32_t sendMessageVector( MQTTContext_t * pContext,
TransportOutVector_t * pIoVec,
size_t ioVecCount )
{
uint32_t timeoutTime;
uint32_t bytesToSend = 0U;
int32_t bytesSentOrError = 0;
int32_t sendResult;
uint32_t lastSendTimeMs;
uint32_t timeSinceLastSendMs;
TransportOutVector_t * pIoVectIterator;
size_t vectorsToBeSent = ioVecCount;
size_t bytesToSend = 0;
int32_t bytesSentOrError = 0;


assert( pContext != NULL );
assert( pIoVec != NULL );
assert( pContext->getTime != NULL );
/* Send must always be defined */
assert( pContext->transportInterface.send != NULL );

timeoutTime = pContext->getTime() + MQTT_SEND_RETRY_TIMEOUT_MS;

/* Count the total number of bytes to be sent as outlined in the vector. */
for( pIoVectIterator = pIoVec; pIoVectIterator <= &( pIoVec[ ioVecCount - 1U ] ); pIoVectIterator++ )
{
bytesToSend += ( uint32_t ) pIoVectIterator->iov_len;
bytesToSend += pIoVectIterator->iov_len;
}

/* Reset the iterator to point to the first entry in the array. */
pIoVectIterator = pIoVec;

while( ( bytesSentOrError < ( int32_t ) bytesToSend ) &&
( bytesSentOrError >= 0 ) )
{
int32_t sendResult;
uint32_t bytesSentThisVector = 0U;
/* This initializes the last send time to the current time to cover the case
* where the transport send function returns 0 on the first iteration of the
* while loop. */
lastSendTimeMs = pContext->getTime();

while( ( ( size_t ) bytesSentOrError < bytesToSend ) && ( bytesSentOrError >= 0 ) )
{
if( pContext->transportInterface.writev != NULL )
{
sendResult = pContext->transportInterface.writev( pContext->transportInterface.pNetworkContext,
Expand All @@ -770,49 +772,65 @@ static int32_t sendMessageVector( MQTTContext_t * pContext,
}
else
{
sendResult = sendBuffer( pContext,
pIoVectIterator->iov_base,
pIoVectIterator->iov_len );
sendResult = pContext->transportInterface.send( pContext->transportInterface.pNetworkContext,
pIoVectIterator->iov_base,
pIoVectIterator->iov_len );
}

if( sendResult >= 0 )
if( sendResult > 0 )
{
/* It is a bug in the application's transport send implementation if
* more bytes than expected are sent. */
assert( ( size_t ) sendResult <= ( bytesToSend - ( size_t ) bytesSentOrError ) );

bytesSentOrError += sendResult;
bytesSentThisVector += ( uint32_t ) sendResult;

/* Reset timeout since data was sent. */
lastSendTimeMs = pContext->getTime();

LogDebug( ( "sendMessageVector: Bytes Sent=%ld, Bytes Remaining=%lu",
( long int ) sendResult,
( unsigned long ) ( bytesToSend - ( size_t ) bytesSentOrError ) ) );
}
else
else if( sendResult < 0 )
{
bytesSentOrError = sendResult;
}

/* Check for timeout if we have been waiting to send any data over the network. */
timeSinceLastSendMs = calculateElapsedTime( pContext->getTime(), lastSendTimeMs );

/* We do not need to break here as the condition is checked in the loop.
* The following statements will not execute as bytesSentThisVector is not
* updated and is still 0. */
if( timeSinceLastSendMs >= MQTT_SEND_RETRY_TIMEOUT_MS )
{
LogError( ( "sendMessageVector: Unable to send packet: Timed out." ) );
break;
}

/* Update the send pointer to the correct vector and offset. */
while( ( pIoVectIterator <= &( pIoVec[ ioVecCount - 1U ] ) ) &&
( bytesSentThisVector >= pIoVectIterator->iov_len ) )
( ( size_t ) sendResult >= pIoVectIterator->iov_len ) )
{
bytesSentThisVector -= ( uint32_t ) pIoVectIterator->iov_len;
sendResult -= ( int32_t ) pIoVectIterator->iov_len;
pIoVectIterator++;

/* Update the number of vector which are yet to be sent. */
vectorsToBeSent--;
}

/* Some of the bytes from this vector were sent as well, update the length
* and the pointer to data in this vector. */
if( ( bytesSentThisVector > 0U ) &&
if( ( sendResult > 0 ) &&
( pIoVectIterator <= &( pIoVec[ ioVecCount - 1U ] ) ) )
{
pIoVectIterator->iov_base = ( const void * ) &( ( ( const uint8_t * ) pIoVectIterator->iov_base )[ bytesSentThisVector ] );
pIoVectIterator->iov_len -= bytesSentThisVector;
pIoVectIterator->iov_base = ( const void * ) &( ( ( const uint8_t * ) pIoVectIterator->iov_base )[ sendResult ] );
pIoVectIterator->iov_len -= sendResult;
}
}

/* Check for timeout. */
if( pContext->getTime() > timeoutTime )
{
break;
}
if( ( size_t ) bytesSentOrError == bytesToSend )
{
pContext->lastPacketTxTime = lastSendTimeMs;
LogDebug( ( "sendMessageVector: Successfully sent packet at time %lu.",
( unsigned long ) lastSendTimeMs ) );
}

return bytesSentOrError;
Expand All @@ -822,74 +840,67 @@ static int32_t sendBuffer( MQTTContext_t * pContext,
const uint8_t * pBufferToSend,
size_t bytesToSend )
{
int32_t sendResult;
uint32_t lastSendTimeMs;
uint32_t timeSinceLastSendMs;
int32_t bytesSentOrError = 0;
const uint8_t * pIndex = pBufferToSend;
size_t bytesRemaining;
int32_t totalBytesSent = 0, bytesSent;
uint32_t lastSendTimeMs = 0U, timeSinceLastSendMs = 0U;
bool sendError = false;

assert( pContext != NULL );
assert( pContext->getTime != NULL );
assert( pContext->transportInterface.send != NULL );
assert( pIndex != NULL );

bytesRemaining = bytesToSend;
/* This initializes the last send time to the current time to cover the case
* where the transport send function returns 0 on the first iteration of the
* while loop. */
lastSendTimeMs = pContext->getTime();

/* Loop until the entire packet is sent. */
while( ( bytesRemaining > 0UL ) && ( sendError == false ) )
while( ( ( size_t ) bytesSentOrError < bytesToSend ) && ( bytesSentOrError >= 0 ) )
{
bytesSent = pContext->transportInterface.send( pContext->transportInterface.pNetworkContext,
pIndex,
bytesRemaining );
sendResult = pContext->transportInterface.send( pContext->transportInterface.pNetworkContext,
pIndex,
bytesToSend - ( size_t ) bytesSentOrError );

if( bytesSent < 0 )
if( sendResult > 0 )
{
LogError( ( "Transport send failed. Error code=%ld.", ( long int ) bytesSent ) );
totalBytesSent = bytesSent;
sendError = true;
}
else if( bytesSent > 0 )
{
/* Record the most recent time of successful transmission. */
lastSendTimeMs = pContext->getTime();

/* It is a bug in the application's transport send implementation if
* more bytes than expected are sent. To avoid a possible overflow
* in converting bytesRemaining from unsigned to signed, this assert
* must exist after the check for bytesSent being negative. */
assert( ( size_t ) bytesSent <= bytesRemaining );
* more bytes than expected are sent. */
assert( ( size_t ) sendResult <= ( bytesToSend - ( size_t ) bytesSentOrError ) );

bytesRemaining -= ( size_t ) bytesSent;
totalBytesSent += bytesSent;
/* Increment the index. */
pIndex = &pIndex[ bytesSent ];
LogDebug( ( "BytesSent=%ld, BytesRemaining=%lu",
( long int ) bytesSent,
( unsigned long ) bytesRemaining ) );
bytesSentOrError += sendResult;
pIndex = &pIndex[ sendResult ];

/* Reset timeout since data was sent. */
lastSendTimeMs = pContext->getTime();

LogDebug( ( "sendBuffer: Bytes Sent=%ld, Bytes Remaining=%lu",
( long int ) sendResult,
( unsigned long ) ( bytesToSend - ( size_t ) bytesSentOrError ) ) );
}
else
else if( sendResult < 0 )
{
/* No bytes were sent over the network. */
timeSinceLastSendMs = calculateElapsedTime( pContext->getTime(), lastSendTimeMs );
bytesSentOrError = sendResult;
}

/* Check for timeout if we have been waiting to send any data over the network. */
if( timeSinceLastSendMs >= MQTT_SEND_RETRY_TIMEOUT_MS )
{
LogError( ( "Unable to send packet: Timed out in transport send." ) );
sendError = true;
}
timeSinceLastSendMs = calculateElapsedTime( pContext->getTime(), lastSendTimeMs );

/* Check for timeout if we have been waiting to send any data over the network. */
if( timeSinceLastSendMs >= MQTT_SEND_RETRY_TIMEOUT_MS )
{
LogError( ( "sendBuffer: Unable to send packet: Timed out." ) );
break;
}
}

/* Update time of last transmission if the entire packet is successfully sent. */
if( totalBytesSent > 0 )
if( ( size_t ) bytesSentOrError == bytesToSend )
{
pContext->lastPacketTxTime = lastSendTimeMs;
LogDebug( ( "Successfully sent packet at time %lu.",
LogDebug( ( "sendBuffer: Successfully sent packet at time %lu.",
( unsigned long ) lastSendTimeMs ) );
}

return totalBytesSent;
return bytesSentOrError;
}

/*-----------------------------------------------------------*/
Expand Down Expand Up @@ -1241,7 +1252,7 @@ static MQTTStatus_t sendPublishAcks( MQTTContext_t * pContext,
{
MQTTStatus_t status = MQTTSuccess;
MQTTPublishState_t newState = MQTTStateNull;
int32_t bytesSent = 0;
int32_t sendResult = 0;
uint8_t packetTypeByte = 0U;
MQTTPubAckType_t packetType;
MQTTFixedBuffer_t localBuffer;
Expand All @@ -1268,14 +1279,14 @@ static MQTTStatus_t sendPublishAcks( MQTTContext_t * pContext,

/* Here, we are not using the vector approach for efficiency. There is just one buffer
* to be sent which can be achieved with a normal send call. */
bytesSent = sendBuffer( pContext,
localBuffer.pBuffer,
MQTT_PUBLISH_ACK_PACKET_SIZE );
sendResult = sendBuffer( pContext,
localBuffer.pBuffer,
MQTT_PUBLISH_ACK_PACKET_SIZE );

MQTT_POST_SEND_HOOK( pContext );
}

if( bytesSent == ( int32_t ) MQTT_PUBLISH_ACK_PACKET_SIZE )
if( sendResult == ( int32_t ) MQTT_PUBLISH_ACK_PACKET_SIZE )
{
pContext->controlPacketSent = true;

Expand All @@ -1299,7 +1310,7 @@ static MQTTStatus_t sendPublishAcks( MQTTContext_t * pContext,
{
LogError( ( "Failed to send ACK packet: PacketType=%02x, SentBytes=%ld, "
"PacketSize=%lu.",
( unsigned int ) packetTypeByte, ( long int ) bytesSent,
( unsigned int ) packetTypeByte, ( long int ) sendResult,
MQTT_PUBLISH_ACK_PACKET_SIZE ) );
status = MQTTSendFailed;
}
Expand Down Expand Up @@ -2828,7 +2839,7 @@ MQTTStatus_t MQTT_Publish( MQTTContext_t * pContext,

MQTTStatus_t MQTT_Ping( MQTTContext_t * pContext )
{
int32_t bytesSent = 0;
int32_t sendResult = 0;
MQTTStatus_t status = MQTTSuccess;
size_t packetSize = 0U;
/* MQTT ping packets are of fixed length. */
Expand Down Expand Up @@ -2876,15 +2887,15 @@ MQTTStatus_t MQTT_Ping( MQTTContext_t * pContext )
* Here, we do not use the vectored IO approach for efficiency as the
* Ping packet does not have numerous fields which need to be copied
* from the user provided buffers. Thus it can be sent directly. */
bytesSent = sendBuffer( pContext,
localBuffer.pBuffer,
2U );
sendResult = sendBuffer( pContext,
localBuffer.pBuffer,
2U );

/* Give the mutex away. */
MQTT_POST_SEND_HOOK( pContext );

/* It is an error to not send the entire PINGREQ packet. */
if( bytesSent < ( int32_t ) packetSize )
if( sendResult < ( int32_t ) packetSize )
{
LogError( ( "Transport send failed for PINGREQ packet." ) );
status = MQTTSendFailed;
Expand All @@ -2894,7 +2905,7 @@ MQTTStatus_t MQTT_Ping( MQTTContext_t * pContext )
pContext->pingReqSendTimeMs = pContext->lastPacketTxTime;
pContext->waitingForPingResp = true;
LogDebug( ( "Sent %ld bytes of PINGREQ packet.",
( long int ) bytesSent ) );
( long int ) sendResult ) );
}
}

Expand Down Expand Up @@ -2951,7 +2962,7 @@ MQTTStatus_t MQTT_Unsubscribe( MQTTContext_t * pContext,
MQTTStatus_t MQTT_Disconnect( MQTTContext_t * pContext )
{
size_t packetSize = 0U;
int32_t bytesSent = 0;
int32_t sendResult = 0;
MQTTStatus_t status = MQTTSuccess;
MQTTFixedBuffer_t localBuffer;
uint8_t disconnectPacket[ 2U ];
Expand Down Expand Up @@ -2988,22 +2999,22 @@ MQTTStatus_t MQTT_Disconnect( MQTTContext_t * pContext )
/* Here we do not use vectors as the disconnect packet has fixed fields
* which do not reside in user provided buffers. Thus, it can be sent
* using a simple send call. */
bytesSent = sendBuffer( pContext,
localBuffer.pBuffer,
packetSize );
sendResult = sendBuffer( pContext,
localBuffer.pBuffer,
packetSize );

/* Give the mutex away. */
MQTT_POST_SEND_HOOK( pContext );

if( bytesSent < ( int32_t ) packetSize )
if( sendResult < ( int32_t ) packetSize )
{
LogError( ( "Transport send failed for DISCONNECT packet." ) );
status = MQTTSendFailed;
}
else
{
LogDebug( ( "Sent %ld bytes of DISCONNECT packet.",
( long int ) bytesSent ) );
( long int ) sendResult ) );
}
}

Expand Down

0 comments on commit 2900e20

Please sign in to comment.