Skip to content

Commit

Permalink
Apps for different threading models
Browse files Browse the repository at this point in the history
Signed-off-by: Gaurav Aggarwal <aggarg@amazon.com>
  • Loading branch information
aggarg committed Oct 5, 2023
1 parent f1472b1 commit 0b18218
Show file tree
Hide file tree
Showing 10 changed files with 399 additions and 175 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ struct NetworkContext
* @param[in] pvParameters Parameters as passed at the time of task creation. Not
* used in this example.
*/
static void prvMQTTDemoTask( void * pvParameters );
void prvMQTTDemoTask( void * pvParameters );

/**
* @brief Connect to MQTT broker with reconnection retries.
Expand All @@ -241,16 +241,16 @@ static void prvMQTTDemoTask( void * pvParameters );
*
* @return The status of the final connection attempt.
*/
static PlaintextTransportStatus_t prvConnectToServerWithBackoffRetries( NetworkContext_t * pxNetworkContext );
PlaintextTransportStatus_t prvConnectToServerWithBackoffRetries( NetworkContext_t * pxNetworkContext );

/**
* @brief Sends an MQTT Connect packet over the already connected TLS over TCP connection.
*
* @param[in, out] pxMQTTContext MQTT context pointer.
* @param[in] xNetworkContext network context.
*/
static void prvCreateMQTTConnectionWithBroker( MQTTContext_t * pxMQTTContext,
NetworkContext_t * pxNetworkContext );
void prvCreateMQTTConnectionWithBroker( MQTTContext_t * pxMQTTContext,
NetworkContext_t * pxNetworkContext );

/**
* @brief Function to update variable #Context with status
Expand Down Expand Up @@ -290,7 +290,7 @@ static void prvMQTTUnsubscribeFromTopics( MQTTContext_t * pxMQTTContext );
*
* @return Time in milliseconds.
*/
static uint32_t prvGetTimeMs( void );
uint32_t prvGetTimeMs( void );

/**
* @brief Process a response or ack to an MQTT request (PING, PUBLISH,
Expand Down Expand Up @@ -354,7 +354,7 @@ static uint8_t ucSharedBuffer[ democonfigNETWORK_BUFFER_SIZE ];
* between the current time and the global entry time. This will reduce the chances
* of overflow for the 32 bit unsigned integer used for holding the timestamp.
*/
static uint32_t ulGlobalEntryTimeMs;
uint32_t ulGlobalEntryTimeMs;

/**
* @brief Packet Identifier generated when Publish request was sent to the broker;
Expand Down Expand Up @@ -444,141 +444,7 @@ void vStartSimpleMQTTDemo( void )
}
/*-----------------------------------------------------------*/

/*
* @brief The Example shown below uses MQTT APIs to create MQTT messages and
* send them over the server-authenticated network connection established with the
* MQTT broker. This example is single-threaded and uses statically allocated
* memory. It uses QoS2 for sending and receiving messages from the broker.
*
* This MQTT client subscribes to the topic as specified in mqttexampleTOPIC at the
* top of this file by sending a subscribe packet and waiting for a subscribe
* acknowledgment (SUBACK) from the broker. The client will then publish to the
* same topic it subscribed to, therefore expecting that all outgoing messages will be
* sent back from the broker.
*/
static void prvMQTTDemoTask( void * pvParameters )
{
uint32_t ulPublishCount = 0U, ulTopicCount = 0U;
const uint32_t ulMaxPublishCount = 5UL;
NetworkContext_t xNetworkContext = { 0 };
PlaintextTransportParams_t xPlaintextTransportParams = { 0 };
MQTTContext_t xMQTTContext = { 0 };
MQTTStatus_t xMQTTStatus;
PlaintextTransportStatus_t xNetworkStatus;

/* Remove compiler warnings about unused parameters. */
( void ) pvParameters;

/* Set the pParams member of the network context with desired transport. */
xNetworkContext.pParams = &xPlaintextTransportParams;

/* Set the entry time of the demo application. This entry time will be used
* to calculate relative time elapsed in the execution of the demo application,
* by the timer utility function that is provided to the MQTT library.
*/
ulGlobalEntryTimeMs = prvGetTimeMs();

for( ; ; )
{
LogInfo( ( "---------STARTING DEMO---------\r\n" ) );

/**************************** Initialize. *****************************/

prvInitializeTopicBuffers();

/****************************** Connect. ******************************/

/* Wait for Networking */
if( xPlatformIsNetworkUp() == pdFALSE )
{
LogInfo( ( "Waiting for the network link up event..." ) );

while( xPlatformIsNetworkUp() == pdFALSE )
{
vTaskDelay( pdMS_TO_TICKS( 1000U ) );
}
}

/* Attempt to establish a TLS connection with the MQTT broker. This example
* connects to the MQTT broker specified in democonfigMQTT_BROKER_ENDPOINT, using
* the port number specified in democonfigMQTT_BROKER_PORT (these macros are defined
* in file demo_config.h). If the connection fails, attempt to re-connect after a timeout.
* The timeout value will be exponentially increased until either the maximum timeout value
* is reached, or the maximum number of attempts are exhausted. The function returns a failure status
* if the TCP connection cannot be established with the broker after a configured number
* of attempts. */
xNetworkStatus = prvConnectToServerWithBackoffRetries( &xNetworkContext );
configASSERT( xNetworkStatus == PLAINTEXT_TRANSPORT_SUCCESS );

/* Send an MQTT CONNECT packet over the established TLS connection,
* and wait for the connection acknowledgment (CONNACK) packet. */
LogInfo( ( "Creating an MQTT connection to %s.\r\n", democonfigMQTT_BROKER_ENDPOINT ) );
prvCreateMQTTConnectionWithBroker( &xMQTTContext, &xNetworkContext );

/**************************** Subscribe. ******************************/

/* If the server rejected the subscription request, attempt to resubscribe to the
* topic. Attempts are made according to the exponential backoff retry strategy
* implemented in BackoffAlgorithm. */
prvMQTTSubscribeWithBackoffRetries( &xMQTTContext );

/**************************** Publish and Keep-Alive Loop. ******************************/

/* Publish messages with QoS2, and send and process keep-alive messages. */
for( ulPublishCount = 0; ulPublishCount < ulMaxPublishCount; ulPublishCount++ )
{
prvMQTTPublishToTopics( &xMQTTContext );

/* Process incoming publish echo. Since the application subscribed and published
* to the same topic, the broker will send the incoming publish message back
* to the application. */
LogInfo( ( "Attempt to receive publishes from broker.\r\n" ) );
xMQTTStatus = prvProcessLoopWithTimeout( &xMQTTContext, mqttexamplePROCESS_LOOP_TIMEOUT_MS );
configASSERT( xMQTTStatus == MQTTSuccess );

/* Leave connection idle for some time. */
LogInfo( ( "Keeping Connection Idle...\r\n\r\n" ) );
vTaskDelay( mqttexampleDELAY_BETWEEN_PUBLISHES_TICKS );
}

/************************ Unsubscribe from the topic. **************************/

prvMQTTUnsubscribeFromTopics( &xMQTTContext );

/* Process incoming UNSUBACK packet from the broker. */
xMQTTStatus = prvProcessLoopWithTimeout( &xMQTTContext, mqttexamplePROCESS_LOOP_TIMEOUT_MS );
configASSERT( xMQTTStatus == MQTTSuccess );

/**************************** Disconnect. ******************************/

/* Send an MQTT DISCONNECT packet over the already-connected TLS over TCP connection.
* There is no corresponding response expected from the broker. After sending the
* disconnect request, the client must close the network connection. */
LogInfo( ( "Disconnecting the MQTT connection with %s.\r\n", democonfigMQTT_BROKER_ENDPOINT ) );
xMQTTStatus = MQTT_Disconnect( &xMQTTContext );
configASSERT( xMQTTStatus == MQTTSuccess );

/* Close the network connection. */
Plaintext_FreeRTOS_Disconnect( &xNetworkContext );

/* Reset SUBACK status for each topic filter after completion of the subscription request cycle. */
for( ulTopicCount = 0; ulTopicCount < mqttexampleTOPIC_COUNT; ulTopicCount++ )
{
xTopicFilterContext[ ulTopicCount ].xSubAckStatus = MQTTSubAckFailure;
}

/* Wait for some time between two iterations to ensure that we do not
* bombard the broker. */
LogInfo( ( "prvMQTTDemoTask() completed an iteration successfully. Total free heap is %u.\r\n", xPortGetFreeHeapSize() ) );
LogInfo( ( "Demo completed successfully.\r\n" ) );
LogInfo( ( "-------DEMO FINISHED-------\r\n" ) );
LogInfo( ( "Short delay before starting the next iteration.... \r\n\r\n" ) );
vTaskDelay( mqttexampleDELAY_BETWEEN_DEMO_ITERATIONS_TICKS );
}
}
/*-----------------------------------------------------------*/

static PlaintextTransportStatus_t prvConnectToServerWithBackoffRetries( NetworkContext_t * pxNetworkContext )
PlaintextTransportStatus_t prvConnectToServerWithBackoffRetries( NetworkContext_t * pxNetworkContext )
{
PlaintextTransportStatus_t xNetworkStatus;
BackoffAlgorithmStatus_t xBackoffAlgStatus = BackoffAlgorithmSuccess;
Expand Down Expand Up @@ -635,8 +501,8 @@ static PlaintextTransportStatus_t prvConnectToServerWithBackoffRetries( NetworkC
}
/*-----------------------------------------------------------*/

static void prvCreateMQTTConnectionWithBroker( MQTTContext_t * pxMQTTContext,
NetworkContext_t * pxNetworkContext )
void prvCreateMQTTConnectionWithBroker( MQTTContext_t * pxMQTTContext,
NetworkContext_t * pxNetworkContext )
{
MQTTStatus_t xResult;
MQTTConnectInfo_t xConnectInfo;
Expand Down Expand Up @@ -1034,7 +900,7 @@ static void prvEventCallback( MQTTContext_t * pxMQTTContext,

/*-----------------------------------------------------------*/

static uint32_t prvGetTimeMs( void )
uint32_t prvGetTimeMs( void )
{
TickType_t xTickCount = 0;
uint32_t ulTimeMs = 0UL;
Expand Down Expand Up @@ -1105,3 +971,57 @@ static void prvInitializeTopicBuffers( void )
}

/*-----------------------------------------------------------*/

void prvMQTTDemoTask( void * pvParameters )
{
uint32_t ulPublishCount = 0U, ulTopicCount = 0U;
const uint32_t ulMaxPublishCount = 5UL;
NetworkContext_t xNetworkContext = { 0 };
PlaintextTransportParams_t xPlaintextTransportParams = { 0 };
MQTTContext_t xMQTTContext = { 0 };
PlaintextTransportStatus_t xNetworkStatus;

/* Remove compiler warnings about unused parameters. */
( void ) pvParameters;

/* Set the pParams member of the network context with desired transport. */
xNetworkContext.pParams = &xPlaintextTransportParams;

/* Set the entry time of the demo application. This entry time will be used
* to calculate relative time elapsed in the execution of the demo application,
* by the timer utility function that is provided to the MQTT library.
*/
ulGlobalEntryTimeMs = prvGetTimeMs();

for( ; ; )
{
LogInfo( ( "---------STARTING DEMO---------\r\n" ) );

/* Wait for Networking */
if( xPlatformIsNetworkUp() == pdFALSE )
{
LogInfo( ( "Waiting for the network link up event..." ) );

while( xPlatformIsNetworkUp() == pdFALSE )
{
vTaskDelay( pdMS_TO_TICKS( 1000U ) );
}
}

/* TCP Connect. */
xNetworkStatus = prvConnectToServerWithBackoffRetries( &xNetworkContext );
configASSERT( xNetworkStatus == PLAINTEXT_TRANSPORT_SUCCESS );

/* MQTT Connect. */
LogInfo( ( "Creating an MQTT connection to %s.\r\n", democonfigMQTT_BROKER_ENDPOINT ) );
prvCreateMQTTConnectionWithBroker( &xMQTTContext, &xNetworkContext );

extern void start_app( MQTTContext_t * pMqttCtx );
start_app( &xMQTTContext );

for( ;; )
{

}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/* Standard includes. */
#include <string.h>
#include <stdio.h>

/* Kernel includes. */
#include "FreeRTOS.h"
#include "task.h"

/* Demo Specific configs. */
#include "demo_config.h"

/* MQTT library includes. */
#include "core_mqtt.h"

/* Transport interface implementation include header for plaintext connection. */
#include "transport_plaintext.h"

/* Defines. */
#define mqttexampleTOPIC "/example/topic"
#define mqttexampleMESSAGE "Hello World!"
#define mqttexampleDELAY_BETWEEN_PUBLISHES_TICKS ( pdMS_TO_TICKS( 2000U ) )
struct NetworkContext
{
PlaintextTransportParams_t* pParams;
};

/* Static functions. */
static void prvMQTTPublish(MQTTContext_t* pxMQTTContext);
/*-----------------------------------------------------------*/

void start_app( MQTTContext_t * pMqttCtx )
{
/* Publish. */
for( ; ; )
{
prvMQTTPublish( pMqttCtx );

/* Leave connection idle for some time. */
LogInfo( ( "Keeping Connection Idle...\r\n\r\n" ) );
vTaskDelay( mqttexampleDELAY_BETWEEN_PUBLISHES_TICKS );
}
}
/*-----------------------------------------------------------*/

static void prvMQTTPublish( MQTTContext_t * pxMQTTContext )
{
MQTTStatus_t xResult;
MQTTPublishInfo_t xMQTTPublishInfo;
uint16_t usPublishPacketIdentifier;

/* Some fields are not used by this demo so start with everything at 0. */
( void ) memset( ( void * ) &xMQTTPublishInfo, 0x00, sizeof( xMQTTPublishInfo ) );

xMQTTPublishInfo.qos = MQTTQoS0;
xMQTTPublishInfo.retain = false;
xMQTTPublishInfo.pTopicName = mqttexampleTOPIC;
xMQTTPublishInfo.topicNameLength = ( uint16_t ) strlen( mqttexampleTOPIC );
xMQTTPublishInfo.pPayload = mqttexampleMESSAGE;
xMQTTPublishInfo.payloadLength = strlen( mqttexampleMESSAGE );

/* Get a unique packet id. */
usPublishPacketIdentifier = MQTT_GetPacketId( pxMQTTContext );

LogInfo( ( "Publishing to the MQTT topic %s.\r\n", mqttexampleTOPIC ) );
/* Send PUBLISH packet. */
xResult = MQTT_Publish( pxMQTTContext, &xMQTTPublishInfo, usPublishPacketIdentifier );
configASSERT( xResult == MQTTSuccess );
}
/*-----------------------------------------------------------*/
Loading

0 comments on commit 0b18218

Please sign in to comment.