Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add State to MQTT_ProcessLoop so that it can be called in a non-blocking manner #198

Merged
merged 20 commits into from
Aug 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
a2d5728
Add stateful process-loop function
AniruddhaKanhere Jul 26, 2022
c4f76fc
Add extra checks; fix bugs and add description of functions
AniruddhaKanhere Jul 27, 2022
ce878e5
Add index based stateful processloop
AniruddhaKanhere Aug 4, 2022
0e3082b
Clean up
AniruddhaKanhere Aug 4, 2022
7c48412
Merge branch 'FreeRTOS:main' into StatefulProcLoop
AniruddhaKanhere Aug 4, 2022
0139fe1
Renamed functions to make them more coherent with their function
AniruddhaKanhere Aug 4, 2022
c8597b3
Merge branch 'StatefulProcLoop' of https://github.com/AniruddhaKanher…
AniruddhaKanhere Aug 4, 2022
211f64a
Remove unused function declarations
AniruddhaKanhere Aug 4, 2022
44f753d
Merge branch 'dev' into StatefulProcLoop
AniruddhaKanhere Aug 10, 2022
8ddd781
Fixed failing CI checks from previous commits except unit-test
Aug 11, 2022
151a802
Fixed spell check and updated size-table
AniruddhaKanhere Aug 11, 2022
5637b14
Fix CBMC proofs
AniruddhaKanhere Aug 11, 2022
f69a32a
Empty-Commit to trigger CBMC proofs
AniruddhaKanhere Aug 11, 2022
a4dbd53
Fix loop unwinding values in the Makefile
AniruddhaKanhere Aug 11, 2022
af119f0
Add upper bound on the buffer size of MQTT
AniruddhaKanhere Aug 11, 2022
64f9111
Increase minimum limit on buffer size to >0
AniruddhaKanhere Aug 12, 2022
3c1ab45
Add upper bound on the size of the buffer as well
AniruddhaKanhere Aug 12, 2022
9c16611
CBMC: Add memmove stub to accelerate coverage
Aug 19, 2022
f959062
Merge pull request #1 from markrtuttle/StatefulProcLoop2
AniruddhaKanhere Aug 19, 2022
cbaa2cd
Fix formatting
AniruddhaKanhere Aug 19, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions docs/doxygen/include/size_table.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@
</tr>
<tr>
<td>core_mqtt_serializer.c</td>
<td><center>2.5K</center></td>
<td><center>2.0K</center></td>
<td><center>2.7K</center></td>
<td><center>2.1K</center></td>
</tr>
<tr>
<td><b>Total estimates</b></td>
<td><b><center>7.1K</center></b></td>
<td><b><center>5.8K</center></b></td>
<td><b><center>7.3K</center></b></td>
<td><b><center>5.9K</center></b></td>
</tr>
</table>
2 changes: 2 additions & 0 deletions lexicon.txt
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ mqttfixedbuffer
mqttgetcurrenttimefunc
mqttillegalstate
mqttkeepalivetimeout
mqttneedmorebytes
mqttnodataavailable
mqttnomemory
mqttnotconnected
Expand Down Expand Up @@ -233,6 +234,7 @@ pfilterindex
pfixedbuffer
pheadersize
pincomingpacket
pindex
pingreq
pingreqs
pingreqsendtimems
Expand Down
217 changes: 139 additions & 78 deletions source/core_mqtt.c
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,17 @@ static MQTTStatus_t discardPacket( const MQTTContext_t * pContext,
size_t remainingLength,
uint32_t timeoutMs );

/**
* @brief Discard a packet from the MQTT buffer and the transport interface.
*
* @param[in] pContext MQTT Connection context.
* @param[in] pPacketInfo Information struct of the packet to be discarded.
*
* @return #MQTTRecvFailed or #MQTTNoDataAvailable.
*/
static MQTTStatus_t discardStoredPacket( MQTTContext_t * pContext,
MQTTPacketInfo_t * pPacketInfo );

/**
* @brief Receive a packet from the transport interface.
*
Expand Down Expand Up @@ -200,7 +211,6 @@ static MQTTStatus_t handleIncomingAck( MQTTContext_t * pContext,
* @brief Run a single iteration of the receive loop.
*
* @param[in] pContext MQTT Connection context.
* @param[in] remainingTimeMs Remaining time for the loop in milliseconds.
* @param[in] manageKeepAlive Flag indicating if keep alive should be handled.
*
* @return #MQTTRecvFailed if a network error occurs during reception;
Expand All @@ -213,7 +223,6 @@ static MQTTStatus_t handleIncomingAck( MQTTContext_t * pContext,
* #MQTTSuccess on success.
*/
static MQTTStatus_t receiveSingleIteration( MQTTContext_t * pContext,
uint32_t remainingTimeMs,
bool manageKeepAlive );

/**
Expand Down Expand Up @@ -853,6 +862,76 @@ static MQTTStatus_t discardPacket( const MQTTContext_t * pContext,

/*-----------------------------------------------------------*/

static MQTTStatus_t discardStoredPacket( MQTTContext_t * pContext,
MQTTPacketInfo_t * pPacketInfo )
{
MQTTStatus_t status = MQTTRecvFailed;
int32_t bytesReceived = 0;
size_t bytesToReceive = 0U;
uint32_t totalBytesReceived = 0U;
bool receiveError = false;
size_t mqttPacketSize = 0;
size_t remainingLength;

assert( pContext != NULL );
assert( pPacketInfo != NULL );

mqttPacketSize = pPacketInfo->remainingLength + pPacketInfo->headerLength;

/* Assert that the packet being discarded is bigger than the
* receive buffer. */
assert( mqttPacketSize > pContext->networkBuffer.size );

/* Discard these many bytes at a time. */
bytesToReceive = pContext->networkBuffer.size;

/* Number of bytes depicted by 'index' have already been received. */
remainingLength = mqttPacketSize - pContext->index;

while( ( totalBytesReceived < remainingLength ) && ( receiveError == false ) )
{
if( ( remainingLength - totalBytesReceived ) < bytesToReceive )
{
bytesToReceive = remainingLength - totalBytesReceived;
}

bytesReceived = recvExact( pContext, bytesToReceive );
AniruddhaKanhere marked this conversation as resolved.
Show resolved Hide resolved

if( bytesReceived != ( int32_t ) bytesToReceive )
{
LogError( ( "Receive error while discarding packet."
"ReceivedBytes=%ld, ExpectedBytes=%lu.",
( long int ) bytesReceived,
( unsigned long ) bytesToReceive ) );
receiveError = true;
}
else
{
totalBytesReceived += ( uint32_t ) bytesReceived;
}
}

if( totalBytesReceived == remainingLength )
{
LogError( ( "Dumped packet. DumpedBytes=%lu.",
( unsigned long ) totalBytesReceived ) );
/* Packet dumped, so no data is available. */
status = MQTTNoDataAvailable;
}

/* Clear the buffer */
memset( pContext->networkBuffer.pBuffer,
0,
pContext->networkBuffer.size );

/* Reset the index. */
pContext->index = 0;

return status;
}

/*-----------------------------------------------------------*/

static MQTTStatus_t receivePacket( const MQTTContext_t * pContext,
MQTTPacketInfo_t incomingPacket,
uint32_t remainingTimeMs )
Expand Down Expand Up @@ -1305,18 +1384,42 @@ static MQTTStatus_t handleIncomingAck( MQTTContext_t * pContext,
/*-----------------------------------------------------------*/

static MQTTStatus_t receiveSingleIteration( MQTTContext_t * pContext,
uint32_t remainingTimeMs,
bool manageKeepAlive )
{
MQTTStatus_t status = MQTTSuccess;
MQTTPacketInfo_t incomingPacket;
MQTTPacketInfo_t incomingPacket = { 0 };
int32_t recvBytes;
size_t totalMQTTPacketLength = 0;

assert( pContext != NULL );
assert( pContext->networkBuffer.pBuffer != NULL );

status = MQTT_GetIncomingPacketTypeAndLength( pContext->transportInterface.recv,
pContext->transportInterface.pNetworkContext,
&incomingPacket );
/* Read as many bytes as possible into the network buffer. */
recvBytes = pContext->transportInterface.recv( pContext->transportInterface.pNetworkContext,
&( pContext->networkBuffer.pBuffer[ pContext->index ] ),
pContext->networkBuffer.size - pContext->index );

if( recvBytes < 0 )
{
/* The receive function has failed. Bubble up the error up to the user. */
status = MQTTRecvFailed;
}
else if( recvBytes == 0 )
{
/* No more bytes available since the last read. */
status = MQTTNoDataAvailable;
}
else
{
/* Update the number of bytes in the MQTT fixed buffer. */
pContext->index += recvBytes;

status = MQTT_ProcessIncomingPacketTypeAndLength( pContext->networkBuffer.pBuffer,
&pContext->index,
&incomingPacket );

totalMQTTPacketLength = incomingPacket.remainingLength + incomingPacket.headerLength;
}

if( status == MQTTNoDataAvailable )
{
Expand All @@ -1329,8 +1432,8 @@ static MQTTStatus_t receiveSingleIteration( MQTTContext_t * pContext,

if( status == MQTTSuccess )
{
/* Reset the status to indicate that we should not try to read
* a packet from the transport interface. */
/* Reset the status to indicate that nothing was read
* from the transport interface. */
status = MQTTNoDataAvailable;
}
}
Expand All @@ -1339,18 +1442,27 @@ static MQTTStatus_t receiveSingleIteration( MQTTContext_t * pContext,
LogError( ( "Receiving incoming packet length failed. Status=%s",
MQTT_Status_strerror( status ) ) );
}
/* If the MQTT Packet size is bigger than the buffer itself. */
else if( totalMQTTPacketLength > pContext->networkBuffer.size )
{
/* Discard the packet from the buffer and from the socket buffer. */
status = discardStoredPacket( pContext,
&incomingPacket );
}
/* If the total packet is of more length than the bytes we have available. */
else if( totalMQTTPacketLength > pContext->index )
{
status = MQTTNeedMoreBytes;
}
else
{
/* Receive packet. Remaining time is recalculated before calling this
* function. */
status = receivePacket( pContext, incomingPacket, remainingTimeMs );
/* MISRA else */
}

/* Handle received packet. If no data was read then this will not execute. */
/* Handle received packet. If incomplete data was read then this will not execute. */
if( status == MQTTSuccess )
{
incomingPacket.pRemainingData = pContext->networkBuffer.pBuffer;
pContext->lastPacketRxTime = pContext->getTime();
incomingPacket.pRemainingData = &pContext->networkBuffer.pBuffer[ incomingPacket.headerLength ];

/* PUBLISH packets allow flags in the lower four bits. For other
* packet types, they are reserved. */
Expand All @@ -1362,6 +1474,14 @@ static MQTTStatus_t receiveSingleIteration( MQTTContext_t * pContext,
{
status = handleIncomingAck( pContext, &incomingPacket, manageKeepAlive );
}

/* Update the index to reflect the remaining bytes in the buffer. */
pContext->index -= totalMQTTPacketLength;

/* Move the remaining bytes to the front of the buffer. */
memmove( pContext->networkBuffer.pBuffer,
&( pContext->networkBuffer.pBuffer[ totalMQTTPacketLength ] ),
pContext->index );
}

if( status == MQTTNoDataAvailable )
Expand Down Expand Up @@ -2189,11 +2309,9 @@ MQTTStatus_t MQTT_Disconnect( MQTTContext_t * pContext )

/*-----------------------------------------------------------*/

MQTTStatus_t MQTT_ProcessLoop( MQTTContext_t * pContext,
uint32_t timeoutMs )
MQTTStatus_t MQTT_ProcessLoop( MQTTContext_t * pContext )
{
MQTTStatus_t status = MQTTBadParameter;
uint32_t entryTimeMs = 0U, remainingTimeMs = timeoutMs, elapsedTimeMs = 0U;

if( pContext == NULL )
{
Expand All @@ -2209,48 +2327,18 @@ MQTTStatus_t MQTT_ProcessLoop( MQTTContext_t * pContext,
}
else
{
entryTimeMs = pContext->getTime();
pContext->controlPacketSent = false;
status = MQTTSuccess;
}

while( status == MQTTSuccess )
{
status = receiveSingleIteration( pContext, remainingTimeMs, true );

/* We don't need to break here since the status is already checked in
* the loop condition, and we do not want multiple breaks in a loop. */
if( status != MQTTSuccess )
{
LogError( ( "Exiting process loop due to failure: ErrorStatus=%s",
MQTT_Status_strerror( status ) ) );
}
else
{
/* Recalculate remaining time and check if loop should exit. This is
* done at the end so the loop will run at least a single iteration. */
elapsedTimeMs = calculateElapsedTime( pContext->getTime(),
entryTimeMs );

if( elapsedTimeMs >= timeoutMs )
{
break;
}

remainingTimeMs = timeoutMs - elapsedTimeMs;
}
status = receiveSingleIteration( pContext, true );
}

return status;
}

/*-----------------------------------------------------------*/

MQTTStatus_t MQTT_ReceiveLoop( MQTTContext_t * pContext,
uint32_t timeoutMs )
MQTTStatus_t MQTT_ReceiveLoop( MQTTContext_t * pContext )
{
MQTTStatus_t status = MQTTBadParameter;
uint32_t entryTimeMs = 0U, remainingTimeMs = timeoutMs, elapsedTimeMs = 0U;

if( pContext == NULL )
{
Expand All @@ -2266,34 +2354,7 @@ MQTTStatus_t MQTT_ReceiveLoop( MQTTContext_t * pContext,
}
else
{
entryTimeMs = pContext->getTime();
status = MQTTSuccess;
}

while( status == MQTTSuccess )
{
status = receiveSingleIteration( pContext, remainingTimeMs, false );

/* We don't need to break here since the status is already checked in
* the loop condition, and we do not want multiple breaks in a loop. */
if( status != MQTTSuccess )
{
LogError( ( "Exiting receive loop. Error status=%s",
MQTT_Status_strerror( status ) ) );
}
else
{
/* Recalculate remaining time and check if loop should exit. This is
* done at the end so the loop will run at least a single iteration. */
elapsedTimeMs = calculateElapsedTime( pContext->getTime(), entryTimeMs );

if( elapsedTimeMs >= timeoutMs )
{
break;
}

remainingTimeMs = timeoutMs - elapsedTimeMs;
}
status = receiveSingleIteration( pContext, false );
}

return status;
Expand Down
Loading