Skip to content

Commit

Permalink
Validation phase of MQTT311 compliance pass
Browse files Browse the repository at this point in the history
  • Loading branch information
bretambrose committed Aug 22, 2023
1 parent cbd9074 commit 2529907
Show file tree
Hide file tree
Showing 5 changed files with 198 additions and 8 deletions.
19 changes: 18 additions & 1 deletion source/client.c
Original file line number Diff line number Diff line change
Expand Up @@ -1957,6 +1957,11 @@ static uint16_t s_aws_mqtt_client_connection_311_subscribe_multiple(

AWS_PRECONDITION(connection);

if (topic_filters == NULL || aws_array_list_length(topic_filters) == 0) {
aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
return 0;
}

struct subscribe_task_arg *task_arg = aws_mem_calloc(connection->allocator, 1, sizeof(struct subscribe_task_arg));
if (!task_arg) {
return 0;
Expand Down Expand Up @@ -2919,6 +2924,11 @@ static uint16_t s_aws_mqtt_client_connection_311_publish(
return 0;
}

if (qos > AWS_MQTT_QOS_EXACTLY_ONCE) {
aws_raise_error(AWS_ERROR_MQTT_INVALID_QOS);
return 0;
}

struct publish_task_arg *arg = aws_mem_calloc(connection->allocator, 1, sizeof(struct publish_task_arg));
if (!arg) {
return 0;
Expand All @@ -2929,7 +2939,14 @@ static uint16_t s_aws_mqtt_client_connection_311_publish(
arg->topic = aws_byte_cursor_from_string(arg->topic_string);
arg->qos = qos;
arg->retain = retain;
if (aws_byte_buf_init_copy_from_cursor(&arg->payload_buf, connection->allocator, *payload)) {

struct aws_byte_cursor payload_cursor;
AWS_ZERO_STRUCT(payload_cursor);
if (payload != NULL) {
payload_cursor = *payload;
}

if (aws_byte_buf_init_copy_from_cursor(&arg->payload_buf, connection->allocator, payload_cursor)) {
goto handle_error;
}
arg->payload = aws_byte_cursor_from_buf(&arg->payload_buf);
Expand Down
3 changes: 3 additions & 0 deletions source/mqtt.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
******************************************************************************/

static bool s_is_valid_topic(const struct aws_byte_cursor *topic, bool is_filter) {
if (topic == NULL) {
return false;
}

/* [MQTT-4.7.3-1] Check existance and length */
if (!topic->ptr || !topic->len) {
Expand Down
5 changes: 5 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ add_test_case(mqtt_topic_tree_unsubscribe)
add_test_case(mqtt_topic_tree_duplicate_transactions)
add_test_case(mqtt_topic_tree_transactions)
add_test_case(mqtt_topic_validation)
add_test_case(mqtt_topic_filter_validation)

add_test_case(mqtt_connect_disconnect)
add_test_case(mqtt_connect_set_will_login)
Expand Down Expand Up @@ -86,6 +87,10 @@ add_test_case(mqtt_connection_reconnection_backoff_unstable)
add_test_case(mqtt_connection_reconnection_backoff_reset)
add_test_case(mqtt_connection_reconnection_backoff_reset_after_disconnection)

add_test_case(mqtt_validation_failure_publish_qos)
add_test_case(mqtt_validation_failure_subscribe_empty)
add_test_case(mqtt_validation_failure_unsubscribe_null)

# Operation statistics tests
add_test_case(mqtt_operation_statistics_simple_publish)
add_test_case(mqtt_operation_statistics_offline_publish)
Expand Down
132 changes: 132 additions & 0 deletions tests/v3/connection_state_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -3737,3 +3737,135 @@ AWS_TEST_CASE_FIXTURE(
s_test_mqtt_connection_termination_callback_simple_fn,
s_clean_up_mqtt_server_fn,
&test_data)

/*
* Verifies that calling publish with a bad qos results in a validation failure
*/
static int s_test_mqtt_validation_failure_publish_qos_fn(struct aws_allocator *allocator, void *ctx) {
(void)allocator;
struct mqtt_connection_state_test *state_test_data = ctx;

struct aws_mqtt_connection_options connection_options = {
.user_data = state_test_data,
.clean_session = false,
.client_id = aws_byte_cursor_from_c_str("client1234"),
.host_name = aws_byte_cursor_from_c_str(state_test_data->endpoint.address),
.socket_options = &state_test_data->socket_options,
.on_connection_complete = s_on_connection_complete_fn,
};

ASSERT_SUCCESS(aws_mqtt_client_connection_connect(state_test_data->mqtt_connection, &connection_options));
s_wait_for_connection_to_complete(state_test_data);

struct aws_byte_cursor topic = aws_byte_cursor_from_c_str("a/b");
ASSERT_INT_EQUALS(
0,
aws_mqtt_client_connection_publish(
state_test_data->mqtt_connection,
&topic,
(enum aws_mqtt_qos)3,
true,
NULL,
s_on_op_complete,
state_test_data));
int error_code = aws_last_error();
ASSERT_INT_EQUALS(AWS_ERROR_MQTT_INVALID_QOS, error_code);

ASSERT_SUCCESS(
aws_mqtt_client_connection_disconnect(state_test_data->mqtt_connection, s_on_disconnect_fn, state_test_data));
s_wait_for_disconnect_to_complete(state_test_data);

return AWS_OP_SUCCESS;
}

AWS_TEST_CASE_FIXTURE(
mqtt_validation_failure_publish_qos,
s_setup_mqtt_server_fn,
s_test_mqtt_validation_failure_publish_qos_fn,
s_clean_up_mqtt_server_fn,
&test_data)

/*
* Verifies that calling subscribe_multiple with no topics causes a validation failure
*/
static int s_test_mqtt_validation_failure_subscribe_empty_fn(struct aws_allocator *allocator, void *ctx) {
(void)allocator;
struct mqtt_connection_state_test *state_test_data = ctx;

struct aws_mqtt_connection_options connection_options = {
.user_data = state_test_data,
.clean_session = false,
.client_id = aws_byte_cursor_from_c_str("client1234"),
.host_name = aws_byte_cursor_from_c_str(state_test_data->endpoint.address),
.socket_options = &state_test_data->socket_options,
.on_connection_complete = s_on_connection_complete_fn,
};

ASSERT_SUCCESS(aws_mqtt_client_connection_connect(state_test_data->mqtt_connection, &connection_options));
s_wait_for_connection_to_complete(state_test_data);

struct aws_array_list topic_filters;
size_t list_len = 2;
AWS_VARIABLE_LENGTH_ARRAY(uint8_t, static_buf, list_len * sizeof(struct aws_mqtt_topic_subscription));
aws_array_list_init_static(&topic_filters, static_buf, list_len, sizeof(struct aws_mqtt_topic_subscription));

ASSERT_INT_EQUALS(
0,
aws_mqtt_client_connection_subscribe_multiple(
state_test_data->mqtt_connection, &topic_filters, s_on_multi_suback, state_test_data));
int error_code = aws_last_error();
ASSERT_INT_EQUALS(AWS_ERROR_INVALID_ARGUMENT, error_code);

ASSERT_SUCCESS(
aws_mqtt_client_connection_disconnect(state_test_data->mqtt_connection, s_on_disconnect_fn, state_test_data));
s_wait_for_disconnect_to_complete(state_test_data);

return AWS_OP_SUCCESS;
}

AWS_TEST_CASE_FIXTURE(
mqtt_validation_failure_subscribe_empty,
s_setup_mqtt_server_fn,
s_test_mqtt_validation_failure_subscribe_empty_fn,
s_clean_up_mqtt_server_fn,
&test_data)

/*
* Verifies that calling unsubscribe with a null topic causes a validation failure (not a crash)
*/
static int s_test_mqtt_validation_failure_unsubscribe_null_fn(struct aws_allocator *allocator, void *ctx) {
(void)allocator;
struct mqtt_connection_state_test *state_test_data = ctx;

struct aws_mqtt_connection_options connection_options = {
.user_data = state_test_data,
.clean_session = false,
.client_id = aws_byte_cursor_from_c_str("client1234"),
.host_name = aws_byte_cursor_from_c_str(state_test_data->endpoint.address),
.socket_options = &state_test_data->socket_options,
.on_connection_complete = s_on_connection_complete_fn,
};

ASSERT_SUCCESS(aws_mqtt_client_connection_connect(state_test_data->mqtt_connection, &connection_options));
s_wait_for_connection_to_complete(state_test_data);

ASSERT_INT_EQUALS(
0,
aws_mqtt_client_connection_unsubscribe(
state_test_data->mqtt_connection, NULL, s_on_op_complete, state_test_data));
int error_code = aws_last_error();
ASSERT_INT_EQUALS(AWS_ERROR_MQTT_INVALID_TOPIC, error_code);

ASSERT_SUCCESS(
aws_mqtt_client_connection_disconnect(state_test_data->mqtt_connection, s_on_disconnect_fn, state_test_data));
s_wait_for_disconnect_to_complete(state_test_data);

return AWS_OP_SUCCESS;
}

AWS_TEST_CASE_FIXTURE(
mqtt_validation_failure_unsubscribe_null,
s_setup_mqtt_server_fn,
s_test_mqtt_validation_failure_unsubscribe_null_fn,
s_clean_up_mqtt_server_fn,
&test_data)
47 changes: 40 additions & 7 deletions tests/v3/topic_tree_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -316,18 +316,51 @@ static int s_mqtt_topic_validation_fn(struct aws_allocator *allocator, void *ctx
struct aws_byte_cursor topic_cursor; \
topic_cursor.ptr = (uint8_t *)(topic); \
topic_cursor.len = strlen(topic); \
ASSERT_##expected(aws_mqtt_is_valid_topic_filter(&topic_cursor)); \
ASSERT_##expected(aws_mqtt_is_valid_topic(&topic_cursor)); \
} while (false)

ASSERT_TOPIC_VALIDITY(TRUE, "#");
ASSERT_TOPIC_VALIDITY(TRUE, "sport/tennis/#");
ASSERT_TOPIC_VALIDITY(TRUE, "/");
ASSERT_TOPIC_VALIDITY(TRUE, "a/");
ASSERT_TOPIC_VALIDITY(TRUE, "/b");
ASSERT_TOPIC_VALIDITY(TRUE, "a/b/c");

ASSERT_TOPIC_VALIDITY(FALSE, "#");
ASSERT_TOPIC_VALIDITY(FALSE, "sport/tennis/#");
ASSERT_TOPIC_VALIDITY(FALSE, "sport/tennis#");
ASSERT_TOPIC_VALIDITY(FALSE, "sport/tennis/#/ranking");

ASSERT_TOPIC_VALIDITY(TRUE, "+");
ASSERT_TOPIC_VALIDITY(TRUE, "+/tennis/#");
ASSERT_TOPIC_VALIDITY(TRUE, "sport/+/player1");
ASSERT_TOPIC_VALIDITY(FALSE, "");
ASSERT_TOPIC_VALIDITY(FALSE, "+");
ASSERT_TOPIC_VALIDITY(FALSE, "+/tennis/#");
ASSERT_TOPIC_VALIDITY(FALSE, "sport/+/player1");
ASSERT_TOPIC_VALIDITY(FALSE, "sport+");

return AWS_OP_SUCCESS;
}

AWS_TEST_CASE(mqtt_topic_filter_validation, s_mqtt_topic_filter_validation_fn)
static int s_mqtt_topic_filter_validation_fn(struct aws_allocator *allocator, void *ctx) {
(void)allocator;
(void)ctx;

#define ASSERT_TOPIC_FILTER_VALIDITY(expected, topic_filter) \
do { \
struct aws_byte_cursor topic_filter_cursor; \
topic_filter_cursor.ptr = (uint8_t *)(topic_filter); \
topic_filter_cursor.len = strlen(topic_filter); \
ASSERT_##expected(aws_mqtt_is_valid_topic_filter(&topic_filter_cursor)); \
} while (false)

ASSERT_TOPIC_FILTER_VALIDITY(TRUE, "#");
ASSERT_TOPIC_FILTER_VALIDITY(TRUE, "sport/tennis/#");
ASSERT_TOPIC_FILTER_VALIDITY(FALSE, "sport/tennis#");
ASSERT_TOPIC_FILTER_VALIDITY(FALSE, "sport/tennis/#/ranking");
ASSERT_TOPIC_FILTER_VALIDITY(FALSE, "");

ASSERT_TOPIC_FILTER_VALIDITY(TRUE, "+/");
ASSERT_TOPIC_FILTER_VALIDITY(TRUE, "+");
ASSERT_TOPIC_FILTER_VALIDITY(TRUE, "+/tennis/#");
ASSERT_TOPIC_FILTER_VALIDITY(TRUE, "sport/+/player1");
ASSERT_TOPIC_FILTER_VALIDITY(FALSE, "sport+");

return AWS_OP_SUCCESS;
}

0 comments on commit 2529907

Please sign in to comment.