Skip to content

Commit

Permalink
Merge branch 'main' into pr/304
Browse files Browse the repository at this point in the history
  • Loading branch information
waahm7 authored Aug 24, 2023
2 parents 96c01e0 + 9fc57a1 commit 2d831c7
Show file tree
Hide file tree
Showing 40 changed files with 10,792 additions and 3,774 deletions.
2 changes: 1 addition & 1 deletion bin/elastipubsub/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ target_include_directories(${ELASTIPUBSUB_PROJECT_NAME} PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:include>)

target_link_libraries(${ELASTIPUBSUB_PROJECT_NAME} aws-c-mqtt)
target_link_libraries(${ELASTIPUBSUB_PROJECT_NAME} PRIVATE aws-c-mqtt)

if (BUILD_SHARED_LIBS AND NOT WIN32)
message(INFO " elastiPUBSUB will be built with shared libs, but you may need to set LD_LIBRARY_PATH=${CMAKE_INSTALL_PREFIX}/lib to run the application")
Expand Down
2 changes: 1 addition & 1 deletion bin/elastipubsub5/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ target_include_directories(${ELASTIPUBSUB_MQTT5_PROJECT_NAME} PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:include>)

target_link_libraries(${ELASTIPUBSUB_MQTT5_PROJECT_NAME} aws-c-mqtt)
target_link_libraries(${ELASTIPUBSUB_MQTT5_PROJECT_NAME} PRIVATE aws-c-mqtt)

if (BUILD_SHARED_LIBS AND NOT WIN32)
message(INFO " elastiPUBSUB will be built with shared libs, but you may need to set LD_LIBRARY_PATH=${CMAKE_INSTALL_PREFIX}/lib to run the application")
Expand Down
2 changes: 1 addition & 1 deletion bin/mqtt5canary/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ target_include_directories(${MQTT5CANARY_PROJECT_NAME} PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:include>)

target_link_libraries(${MQTT5CANARY_PROJECT_NAME} aws-c-mqtt)
target_link_libraries(${MQTT5CANARY_PROJECT_NAME} PRIVATE aws-c-mqtt)

if (BUILD_SHARED_LIBS AND NOT WIN32)
message(INFO " mqtt5canary will be built with shared libs, but you may need to set LD_LIBRARY_PATH=${CMAKE_INSTALL_PREFIX}/lib to run the application")
Expand Down
18 changes: 18 additions & 0 deletions include/aws/mqtt/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,11 @@ typedef void(aws_mqtt_client_publish_received_fn)(
/** Called when a connection is closed, right before any resources are deleted */
typedef void(aws_mqtt_client_on_disconnect_fn)(struct aws_mqtt_client_connection *connection, void *userdata);

/**
* Signature of callback invoked on a connection destruction.
*/
typedef void(aws_mqtt_client_on_connection_termination_fn)(void *userdata);

/**
* Function to invoke when the websocket handshake request transformation completes.
* This function MUST be invoked or the application will soft-lock.
Expand Down Expand Up @@ -506,6 +511,19 @@ int aws_mqtt_client_connection_set_on_any_publish_handler(
aws_mqtt_client_publish_received_fn *on_any_publish,
void *on_any_publish_ud);

/**
* Sets the callback to call on a connection destruction.
*
* \param[in] connection The connection object.
* \param[in] on_termination The function to call when a connection is destroyed.
* \param[in] on_termination_ud Userdata for on_termination.
*/
AWS_MQTT_API
int aws_mqtt_client_connection_set_connection_termination_handler(
struct aws_mqtt_client_connection *connection,
aws_mqtt_client_on_connection_termination_fn *on_termination,
void *on_termination_ud);

/**
* Opens the actual connection defined by aws_mqtt_client_connection_new.
* Once the connection is opened, on_connack will be called. Only called when connection is disconnected.
Expand Down
2 changes: 2 additions & 0 deletions include/aws/mqtt/mqtt.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ enum aws_mqtt_error {
AWS_ERROR_MQTT5_INVALID_OUTBOUND_TOPIC_ALIAS,
AWS_ERROR_MQTT5_INVALID_UTF8_STRING,
AWS_ERROR_MQTT_CONNECTION_RESET_FOR_ADAPTER_CONNECT,
AWS_ERROR_MQTT_CONNECTION_RESUBSCRIBE_NO_TOPICS,

AWS_ERROR_END_MQTT_RANGE = AWS_ERROR_ENUM_END_RANGE(AWS_C_MQTT_PACKAGE_ID),
};
Expand All @@ -90,6 +91,7 @@ enum aws_mqtt_log_subject {
AWS_LS_MQTT5_GENERAL,
AWS_LS_MQTT5_CLIENT,
AWS_LS_MQTT5_CANARY,
AWS_LS_MQTT5_TO_MQTT3_ADAPTER,
};

/** Function called on cleanup of a userdata. */
Expand Down
13 changes: 11 additions & 2 deletions include/aws/mqtt/private/client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#include <aws/mqtt/private/client_impl_shared.h>
#include <aws/mqtt/private/fixed_header.h>
#include <aws/mqtt/private/mqtt311_decoder.h>
#include <aws/mqtt/private/topic_tree.h>

#include <aws/common/hash_table.h>
Expand Down Expand Up @@ -127,6 +128,11 @@ struct aws_mqtt_request {

struct aws_channel_task outgoing_task;

/*
* The request send time. Currently used to push off keepalive packet.
*/
uint64_t request_send_timestamp;

/* How this operation is currently affecting the statistics of the connection */
enum aws_mqtt_operation_statistic_state_flags statistic_state_flags;
/* The encoded size of the packet - used for operation statistics tracking */
Expand Down Expand Up @@ -244,6 +250,8 @@ struct aws_mqtt_client_connection_311_impl {
void *on_any_publish_ud;
aws_mqtt_client_on_disconnect_fn *on_disconnect;
void *on_disconnect_ud;
aws_mqtt_client_on_connection_termination_fn *on_termination;
void *on_termination_ud;
aws_mqtt_on_operation_statistics_fn *on_any_operation_statistics;
void *on_any_operation_statistics_ud;

Expand All @@ -261,8 +269,7 @@ struct aws_mqtt_client_connection_311_impl {

/* Only the event-loop thread may touch this data */
struct {
/* If an incomplete packet arrives, store the data here. */
struct aws_byte_buf pending_packet;
struct aws_mqtt311_decoder decoder;

bool waiting_on_ping_response;

Expand Down Expand Up @@ -416,4 +423,6 @@ void aws_mqtt_connection_statistics_change_operation_statistic_state(
struct aws_mqtt_request *request,
enum aws_mqtt_operation_statistic_state_flags new_state_flags);

AWS_MQTT_API const struct aws_mqtt_client_connection_packet_handlers *aws_mqtt311_get_default_packet_handlers(void);

#endif /* AWS_MQTT_PRIVATE_CLIENT_IMPL_H */
11 changes: 11 additions & 0 deletions include/aws/mqtt/private/client_impl_shared.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ struct aws_mqtt_client_connection_vtable {
aws_mqtt_client_publish_received_fn *on_any_publish,
void *on_any_publish_ud);

int (*set_connection_termination_handler_fn)(
void *impl,
aws_mqtt_client_on_connection_termination_fn *on_termination,
void *on_termination_ud);

int (*connect_fn)(void *impl, const struct aws_mqtt_connection_options *connection_options);

int (*reconnect_fn)(void *impl, aws_mqtt_client_on_connection_complete_fn *on_connection_complete, void *userdata);
Expand Down Expand Up @@ -109,4 +114,10 @@ struct aws_mqtt_client_connection {
void *impl;
};

AWS_MQTT_API uint64_t aws_mqtt_hash_uint16_t(const void *item);

AWS_MQTT_API bool aws_mqtt_compare_uint16_t_eq(const void *a, const void *b);

AWS_MQTT_API bool aws_mqtt_byte_cursor_hash_equality(const void *a, const void *b);

#endif /* AWS_MQTT_PRIVATE_CLIENT_IMPL_SHARED_H */
2 changes: 2 additions & 0 deletions include/aws/mqtt/private/fixed_header.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,6 @@ AWS_MQTT_API int aws_mqtt_fixed_header_encode(struct aws_byte_buf *buf, const st
*/
AWS_MQTT_API int aws_mqtt_fixed_header_decode(struct aws_byte_cursor *cur, struct aws_mqtt_fixed_header *header);

AWS_MQTT_API int aws_mqtt311_decode_remaining_length(struct aws_byte_cursor *cur, size_t *remaining_length_out);

#endif /* AWS_MQTT_PRIVATE_FIXED_HEADER_H */
135 changes: 135 additions & 0 deletions include/aws/mqtt/private/mqtt311_decoder.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
#ifndef AWS_MQTT_PRIVATE_MQTT311_DECODER_H
#define AWS_MQTT_PRIVATE_MQTT311_DECODER_H

/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

#include <aws/mqtt/mqtt.h>

#include <aws/common/byte_buf.h>

/*
* Per-packet-type callback signature. message_cursor contains the entire packet's data.
*/
typedef int(packet_handler_fn)(struct aws_byte_cursor message_cursor, void *user_data);

/*
* Wrapper for a set of packet handlers for each possible MQTT packet type. Some values are invalid in 311 (15), and
* some values are only valid from the perspective of the server or client.
*/
struct aws_mqtt_client_connection_packet_handlers {
packet_handler_fn *handlers_by_packet_type[16];
};

/*
* Internal state of the 311 decoder/framing logic.
*
* When a packet is fragmented across multiple io buffers, state moves circularly:
* first byte -> remaining length -> body -> first byte etc...
*
* When a packet is completely contained inside a single io buffer, the entire packet is processed within
* the READ_FIRST_BYTE state.
*/
enum aws_mqtt_311_decoder_state_type {

/*
* The decoder is expecting the first byte of the fixed header of an MQTT control packet
*/
AWS_MDST_READ_FIRST_BYTE,

/*
* The decoder is expecting the vli-encoded total remaining length field of the fixed header on an MQTT control
* packet.
*/
AWS_MDST_READ_REMAINING_LENGTH,

/*
* The decoder is expecting the "rest" of the MQTT packet's data based on the remaining length value that has
* already been read.
*/
AWS_MDST_READ_BODY,

/*
* Terminal state for when a protocol error has been encountered by the decoder. The only way to leave this
* state is to reset the decoder via the aws_mqtt311_decoder_reset_for_new_connection() API.
*/
AWS_MDST_PROTOCOL_ERROR,
};

/*
* Configuration options for the decoder. When used by the actual implementation, handler_user_data is the
* connection object and the packet handlers are channel functions that hook into reactionary behavior and user
* callbacks.
*/
struct aws_mqtt311_decoder_options {
const struct aws_mqtt_client_connection_packet_handlers *packet_handlers;
void *handler_user_data;
};

/*
* Simple MQTT311 decoder structure. Actual decoding is deferred to per-packet functions that expect the whole
* packet in a buffer. The primary function of this sub-system is correctly framing a stream of bytes into the
* constituent packets.
*/
struct aws_mqtt311_decoder {
struct aws_mqtt311_decoder_options config;

enum aws_mqtt_311_decoder_state_type state;

/*
* If zero, not valid. If non-zero, represents the number of bytes that need to be read to finish the packet.
* This includes the total encoding size of the fixed header.
*/
size_t total_packet_length;

/* scratch buffer to hold individual packets when they fragment across incoming data frame boundaries */
struct aws_byte_buf packet_buffer;
};

AWS_EXTERN_C_BEGIN

/**
* Initialize function for the MQTT311 decoder
*
* @param decoder decoder to initialize
* @param allocator memory allocator to use
* @param options additional decoder configuration options
*/
AWS_MQTT_API void aws_mqtt311_decoder_init(
struct aws_mqtt311_decoder *decoder,
struct aws_allocator *allocator,
const struct aws_mqtt311_decoder_options *options);

/**
* Clean up function for an MQTT311 decoder
*
* @param decoder decoder to release resources for
*/
AWS_MQTT_API void aws_mqtt311_decoder_clean_up(struct aws_mqtt311_decoder *decoder);

/**
* Callback function to decode the incoming data stream of an MQTT311 connection. Handles packet framing and
* correct decoder/handler function dispatch.
*
* @param decoder decoder to decode with
* @param data raw plaintext bytes of a connection operating on the MQTT311 protocol
* @return success/failure, failure represents a protocol error and implies the connection must be shut down
*/
AWS_MQTT_API int aws_mqtt311_decoder_on_bytes_received(
struct aws_mqtt311_decoder *decoder,
struct aws_byte_cursor data);

/**
* Resets a decoder's state to its initial values. If using a decoder across multiple network connections (within
* the same client), you must invoke this when setting up a new connection, before any MQTT protocol bytes are
* processed. Breaks the decoder out of any previous protocol error terminal state.
*
* @param decoder decoder to reset
*/
AWS_MQTT_API void aws_mqtt311_decoder_reset_for_new_connection(struct aws_mqtt311_decoder *decoder);

AWS_EXTERN_C_END

#endif /* AWS_MQTT_PRIVATE_MQTT311_DECODER_H */
Loading

0 comments on commit 2d831c7

Please sign in to comment.