Skip to content

Commit

Permalink
Client behavior tests part 1
Browse files Browse the repository at this point in the history
  • Loading branch information
Bret Ambrose committed Aug 25, 2023
1 parent dbe3f47 commit 7a300db
Show file tree
Hide file tree
Showing 7 changed files with 259 additions and 29 deletions.
3 changes: 3 additions & 0 deletions include/aws/mqtt/private/packets.h
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,9 @@ int aws_mqtt_packet_publish_encode_headers(struct aws_byte_buf *buf, const struc
AWS_MQTT_API
int aws_mqtt_packet_publish_decode(struct aws_byte_cursor *cur, struct aws_mqtt_packet_publish *packet);

AWS_MQTT_API
void aws_mqtt_packet_publish_set_dup(struct aws_mqtt_packet_publish *packet);

AWS_MQTT_API
bool aws_mqtt_packet_publish_get_dup(const struct aws_mqtt_packet_publish *packet);

Expand Down
2 changes: 2 additions & 0 deletions source/client.c
Original file line number Diff line number Diff line change
Expand Up @@ -2830,6 +2830,8 @@ static enum aws_mqtt_client_request_state s_publish_send(uint16_t packet_id, boo

return AWS_MQTT_CLIENT_REQUEST_ERROR;
}
} else {
aws_mqtt_packet_publish_set_dup(&task_arg->publish);
}

struct aws_io_message *message = mqtt_get_message_for_packet(task_arg->connection, &task_arg->publish.fixed_header);
Expand Down
4 changes: 4 additions & 0 deletions source/packets.c
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,10 @@ bool aws_mqtt_packet_publish_get_dup(const struct aws_mqtt_packet_publish *packe
return packet->fixed_header.flags & (1 << 3); /* bit 3 */
}

void aws_mqtt_packet_publish_set_dup(struct aws_mqtt_packet_publish *packet) {
packet->fixed_header.flags |= 0x08;
}

enum aws_mqtt_qos aws_mqtt_packet_publish_get_qos(const struct aws_mqtt_packet_publish *packet) {
return (packet->fixed_header.flags >> 1) & 0x3; /* bits 2,1 */
}
Expand Down
1 change: 1 addition & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ add_test_case(mqtt_connection_success_callback)
add_test_case(mqtt_connect_subscribe)
add_test_case(mqtt_connect_subscribe_fail_from_broker)
add_test_case(mqtt_connect_subscribe_multi)
add_test_case(mqtt_connect_subscribe_incoming_dup)
add_test_case(mqtt_connect_unsubscribe)
add_test_case(mqtt_connect_resubscribe)
add_test_case(mqtt_connect_publish)
Expand Down
238 changes: 215 additions & 23 deletions tests/v3/connection_state_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -1272,6 +1272,142 @@ AWS_TEST_CASE_FIXTURE(
s_clean_up_mqtt_server_fn,
&test_data)

static int s_test_mqtt_subscribe_incoming_dup_fn(struct aws_allocator *allocator, void *ctx) {
(void)allocator;
struct mqtt_connection_state_test *state_test_data = ctx;

struct aws_mqtt_connection_options connection_options = {
.user_data = state_test_data,
.clean_session = false,
.client_id = aws_byte_cursor_from_c_str("client1234"),
.host_name = aws_byte_cursor_from_c_str(state_test_data->endpoint.address),
.socket_options = &state_test_data->socket_options,
.on_connection_complete = s_on_connection_complete_fn,
};

struct aws_byte_cursor subscribed_topic = aws_byte_cursor_from_c_str("/test/topic");
struct aws_byte_cursor any_topic = aws_byte_cursor_from_c_str("/a/b/c");

uint16_t packet_id = aws_mqtt_client_connection_subscribe(
state_test_data->mqtt_connection,
&subscribed_topic,
AWS_MQTT_QOS_AT_LEAST_ONCE,
s_on_publish_received,
state_test_data,
NULL,
s_on_suback,
state_test_data);
ASSERT_TRUE(packet_id > 0);

ASSERT_SUCCESS(aws_mqtt_client_connection_connect(state_test_data->mqtt_connection, &connection_options));
s_wait_for_connection_to_complete(state_test_data);
s_wait_for_subscribe_to_complete(state_test_data);

state_test_data->expected_publishes = 4;
state_test_data->expected_any_publishes = 8;

struct aws_byte_cursor subscribed_payload = aws_byte_cursor_from_c_str("Subscribed");
for (size_t i = 0; i < 4; ++i) {
ASSERT_SUCCESS(mqtt_mock_server_send_publish_by_id(
state_test_data->mock_server,
1111,
&subscribed_topic,
&subscribed_payload,
i > 0 /*dup*/,
AWS_MQTT_QOS_AT_LEAST_ONCE,
true /*retain*/));
}

struct aws_byte_cursor any_payload = aws_byte_cursor_from_c_str("Not subscribed. On-any only.");
for (size_t i = 0; i < 4; ++i) {
ASSERT_SUCCESS(mqtt_mock_server_send_publish_by_id(
state_test_data->mock_server,
1234,
&any_topic,
&any_payload,
i > 0 /*dup*/,
AWS_MQTT_QOS_AT_LEAST_ONCE,
false /*retain*/));
}

s_wait_for_publish(state_test_data);
s_wait_for_any_publish(state_test_data);
mqtt_mock_server_wait_for_pubacks(state_test_data->mock_server, 8);

ASSERT_SUCCESS(
aws_mqtt_client_connection_disconnect(state_test_data->mqtt_connection, s_on_disconnect_fn, state_test_data));
s_wait_for_disconnect_to_complete(state_test_data);

/* Decode all received packets by mock server */
ASSERT_SUCCESS(mqtt_mock_server_decode_packets(state_test_data->mock_server));

ASSERT_UINT_EQUALS(11, mqtt_mock_server_decoded_packets_count(state_test_data->mock_server));
struct mqtt_decoded_packet *received_packet =
mqtt_mock_server_get_decoded_packet_by_index(state_test_data->mock_server, 0);
ASSERT_UINT_EQUALS(AWS_MQTT_PACKET_CONNECT, received_packet->type);
ASSERT_UINT_EQUALS(connection_options.clean_session, received_packet->clean_session);
ASSERT_TRUE(aws_byte_cursor_eq(&received_packet->client_identifier, &connection_options.client_id));

received_packet = mqtt_mock_server_get_decoded_packet_by_index(state_test_data->mock_server, 1);
ASSERT_UINT_EQUALS(AWS_MQTT_PACKET_SUBSCRIBE, received_packet->type);
ASSERT_UINT_EQUALS(1, aws_array_list_length(&received_packet->sub_topic_filters));
struct aws_mqtt_subscription val;
ASSERT_SUCCESS(aws_array_list_front(&received_packet->sub_topic_filters, &val));
ASSERT_TRUE(aws_byte_cursor_eq(&val.topic_filter, &subscribed_topic));
ASSERT_UINT_EQUALS(AWS_MQTT_QOS_AT_LEAST_ONCE, val.qos);
ASSERT_UINT_EQUALS(packet_id, received_packet->packet_identifier);

for (size_t i = 0; i < 8; ++i) {
received_packet = mqtt_mock_server_get_decoded_packet_by_index(state_test_data->mock_server, 2 + i);
ASSERT_UINT_EQUALS(AWS_MQTT_PACKET_PUBACK, received_packet->type);
}

received_packet = mqtt_mock_server_get_decoded_packet_by_index(state_test_data->mock_server, 10);
ASSERT_UINT_EQUALS(AWS_MQTT_PACKET_DISCONNECT, received_packet->type);

/* Check PUBLISH packets received via subscription callback */
ASSERT_UINT_EQUALS(4, aws_array_list_length(&state_test_data->published_messages));

for (size_t i = 0; i < 4; ++i) {
struct received_publish_packet *publish_msg = NULL;
ASSERT_SUCCESS(aws_array_list_get_at_ptr(&state_test_data->published_messages, (void **)&publish_msg, i));
ASSERT_TRUE(aws_byte_cursor_eq_byte_buf(&subscribed_topic, &publish_msg->topic));
ASSERT_TRUE(aws_byte_cursor_eq_byte_buf(&subscribed_payload, &publish_msg->payload));
ASSERT_INT_EQUALS((i != 0) ? 1 : 0, publish_msg->dup ? 1 : 0);
ASSERT_TRUE(publish_msg->retain);
}

/* Check PUBLISH packets received via on_any_publish callback */
ASSERT_UINT_EQUALS(8, aws_array_list_length(&state_test_data->any_published_messages));

for (size_t i = 0; i < 4; ++i) {
struct received_publish_packet *publish_msg = NULL;
ASSERT_SUCCESS(aws_array_list_get_at_ptr(&state_test_data->any_published_messages, (void **)&publish_msg, i));
ASSERT_TRUE(aws_byte_cursor_eq_byte_buf(&subscribed_topic, &publish_msg->topic));
ASSERT_TRUE(aws_byte_cursor_eq_byte_buf(&subscribed_payload, &publish_msg->payload));
ASSERT_INT_EQUALS((i > 0) ? 1 : 0, publish_msg->dup ? 1 : 0);
ASSERT_TRUE(publish_msg->retain);
}

for (size_t i = 4; i < 8; ++i) {
struct received_publish_packet *publish_msg = NULL;
ASSERT_SUCCESS(aws_array_list_get_at_ptr(&state_test_data->any_published_messages, (void **)&publish_msg, i));
ASSERT_TRUE(aws_byte_cursor_eq_byte_buf(&any_topic, &publish_msg->topic));
ASSERT_TRUE(aws_byte_cursor_eq_byte_buf(&any_payload, &publish_msg->payload));
ASSERT_INT_EQUALS((i > 4) ? 1 : 0, publish_msg->dup ? 1 : 0);
ASSERT_FALSE(publish_msg->retain);
}

return AWS_OP_SUCCESS;
}

AWS_TEST_CASE_FIXTURE(
mqtt_connect_subscribe_incoming_dup,
s_setup_mqtt_server_fn,
s_test_mqtt_subscribe_incoming_dup_fn,
s_clean_up_mqtt_server_fn,
&test_data)

/* Subscribe to a topic and broker returns a SUBACK with failure return code, the subscribe should fail */
static int s_test_mqtt_connect_subscribe_fail_from_broker_fn(struct aws_allocator *allocator, void *ctx) {
(void)allocator;
Expand Down Expand Up @@ -1790,7 +1926,7 @@ static int s_test_mqtt_publish_fn(struct aws_allocator *allocator, void *ctx) {
s_wait_for_connection_to_complete(state_test_data);

aws_mutex_lock(&state_test_data->lock);
state_test_data->expected_ops_completed = 2;
state_test_data->expected_ops_completed = 3;
aws_mutex_unlock(&state_test_data->lock);
uint16_t packet_id_1 = aws_mqtt_client_connection_publish(
state_test_data->mqtt_connection,
Expand All @@ -1811,6 +1947,17 @@ static int s_test_mqtt_publish_fn(struct aws_allocator *allocator, void *ctx) {
state_test_data);
ASSERT_TRUE(packet_id_2 > 0);

/* Null payload case */
uint16_t packet_id_3 = aws_mqtt_client_connection_publish(
state_test_data->mqtt_connection,
&pub_topic,
AWS_MQTT_QOS_AT_LEAST_ONCE,
false,
NULL,
s_on_op_complete,
state_test_data);
ASSERT_TRUE(packet_id_3 > 0);

s_wait_for_ops_completed(state_test_data);

ASSERT_SUCCESS(
Expand All @@ -1820,7 +1967,7 @@ static int s_test_mqtt_publish_fn(struct aws_allocator *allocator, void *ctx) {
/* Decode all received packets by mock server */
ASSERT_SUCCESS(mqtt_mock_server_decode_packets(state_test_data->mock_server));

ASSERT_UINT_EQUALS(4, mqtt_mock_server_decoded_packets_count(state_test_data->mock_server));
ASSERT_UINT_EQUALS(5, mqtt_mock_server_decoded_packets_count(state_test_data->mock_server));
struct mqtt_decoded_packet *received_packet =
mqtt_mock_server_get_decoded_packet_by_index(state_test_data->mock_server, 0);
ASSERT_UINT_EQUALS(AWS_MQTT_PACKET_CONNECT, received_packet->type);
Expand All @@ -1837,6 +1984,11 @@ static int s_test_mqtt_publish_fn(struct aws_allocator *allocator, void *ctx) {
ASSERT_TRUE(aws_byte_cursor_eq(&received_packet->publish_payload, &payload_2));

received_packet = mqtt_mock_server_get_decoded_packet_by_index(state_test_data->mock_server, 3);
ASSERT_UINT_EQUALS(AWS_MQTT_PACKET_PUBLISH, received_packet->type);
ASSERT_TRUE(aws_byte_cursor_eq(&received_packet->topic_name, &pub_topic));
ASSERT_INT_EQUALS(0, received_packet->publish_payload.len);

received_packet = mqtt_mock_server_get_decoded_packet_by_index(state_test_data->mock_server, 4);
ASSERT_UINT_EQUALS(AWS_MQTT_PACKET_DISCONNECT, received_packet->type);

return AWS_OP_SUCCESS;
Expand Down Expand Up @@ -2179,21 +2331,39 @@ AWS_TEST_CASE_FIXTURE(
s_clean_up_mqtt_server_fn,
&test_data)

/* helper to make sure ID 1 is received earlier than ID 2 and ID 2 is earlier than ID 3 */
static int s_check_packets_received_order(
/* helper to make sure packets are received/resent in expected order and duplicat flag is appropriately set */
static int s_check_resend_packets(
struct aws_channel_handler *handler,
size_t search_start_idx,
uint16_t packet_id_1,
uint16_t packet_id_2,
uint16_t packet_id_3) {
bool duplicate_publish_expected,
uint16_t *packet_ids,
size_t packet_id_count) {
ASSERT_SUCCESS(mqtt_mock_server_decode_packets(handler));
size_t packet_idx_1 = 0;
size_t packet_idx_2 = 0;
size_t packet_idx_3 = 0;
ASSERT_NOT_NULL(mqtt_mock_server_find_decoded_packet_by_id(handler, search_start_idx, packet_id_1, &packet_idx_1));
ASSERT_NOT_NULL(mqtt_mock_server_find_decoded_packet_by_id(handler, search_start_idx, packet_id_2, &packet_idx_2));
ASSERT_NOT_NULL(mqtt_mock_server_find_decoded_packet_by_id(handler, search_start_idx, packet_id_3, &packet_idx_3));
ASSERT_TRUE(packet_idx_3 > packet_idx_2 && packet_idx_2 > packet_idx_1);

if (packet_id_count == 0) {
return AWS_OP_SUCCESS;
}

size_t previous_index = 0;
struct mqtt_decoded_packet *previous_packet =
mqtt_mock_server_find_decoded_packet_by_id(handler, search_start_idx, packet_ids[0], &previous_index);
if (previous_packet->type == AWS_MQTT_PACKET_PUBLISH) {
ASSERT_INT_EQUALS(duplicate_publish_expected, previous_packet->duplicate);
}

for (size_t i = 1; i < packet_id_count; ++i) {
size_t current_index = 0;
struct mqtt_decoded_packet *current_packet =
mqtt_mock_server_find_decoded_packet_by_id(handler, search_start_idx, packet_ids[i], &current_index);
if (current_packet->type == AWS_MQTT_PACKET_PUBLISH) {
ASSERT_INT_EQUALS(duplicate_publish_expected, current_packet->duplicate);
}

ASSERT_TRUE(current_index > previous_index);
previous_packet = current_packet;
previous_index = current_index;
}

return AWS_OP_SUCCESS;
}

Expand All @@ -2216,6 +2386,7 @@ static int s_test_mqtt_connection_resend_packets_fn(struct aws_allocator *alloca
.keep_alive_time_secs = DEFAULT_TEST_KEEP_ALIVE_S,
};

struct aws_byte_cursor sub_topic = aws_byte_cursor_from_c_str("/test/topic/sub/#");
struct aws_byte_cursor pub_topic = aws_byte_cursor_from_c_str("/test/topic");
struct aws_byte_cursor payload_1 = aws_byte_cursor_from_c_str("Test Message 1");
struct aws_byte_cursor payload_2 = aws_byte_cursor_from_c_str("Test Message 2");
Expand All @@ -2226,28 +2397,49 @@ static int s_test_mqtt_connection_resend_packets_fn(struct aws_allocator *alloca

/* Disable the auto ACK packets sent by the server, which blocks the requests to complete */
mqtt_mock_server_disable_auto_ack(state_test_data->mock_server);
uint16_t packet_id_1 = aws_mqtt_client_connection_publish(

uint16_t packet_ids[5];

packet_ids[0] = aws_mqtt_client_connection_publish(
state_test_data->mqtt_connection, &pub_topic, AWS_MQTT_QOS_AT_LEAST_ONCE, false, &payload_1, NULL, NULL);
ASSERT_TRUE(packet_id_1 > 0);
uint16_t packet_id_2 = aws_mqtt_client_connection_publish(
ASSERT_TRUE(packet_ids[0] > 0);

packet_ids[1] = aws_mqtt_client_connection_subscribe(
state_test_data->mqtt_connection,
&sub_topic,
AWS_MQTT_QOS_AT_LEAST_ONCE,
s_on_publish_received,
state_test_data,
NULL,
s_on_suback,
state_test_data);
ASSERT_TRUE(packet_ids[1] > 0);

packet_ids[2] = aws_mqtt_client_connection_publish(
state_test_data->mqtt_connection, &pub_topic, AWS_MQTT_QOS_AT_LEAST_ONCE, false, &payload_2, NULL, NULL);
ASSERT_TRUE(packet_id_2 > 0);
uint16_t packet_id_3 = aws_mqtt_client_connection_publish(
ASSERT_TRUE(packet_ids[2] > 0);

packet_ids[3] = aws_mqtt_client_connection_unsubscribe(state_test_data->mqtt_connection, &sub_topic, NULL, NULL);
ASSERT_TRUE(packet_ids[3] > 0);

packet_ids[4] = aws_mqtt_client_connection_publish(
state_test_data->mqtt_connection, &pub_topic, AWS_MQTT_QOS_AT_LEAST_ONCE, false, &payload_3, NULL, NULL);
ASSERT_TRUE(packet_id_3 > 0);
ASSERT_TRUE(packet_ids[4] > 0);

/* Wait for 1 sec. ensure all the publishes have been received by the server */
aws_thread_current_sleep(ONE_SEC);
ASSERT_SUCCESS(
s_check_packets_received_order(state_test_data->mock_server, 0, packet_id_1, packet_id_2, packet_id_3));
s_check_resend_packets(state_test_data->mock_server, 0, false, packet_ids, AWS_ARRAY_SIZE(packet_ids)));

size_t packet_count = mqtt_mock_server_decoded_packets_count(state_test_data->mock_server);

/* shutdown the channel for some error */
aws_channel_shutdown(state_test_data->server_channel, AWS_ERROR_INVALID_STATE);
s_wait_for_reconnect_to_complete(state_test_data);
/* Wait again, and ensure the publishes have been resent */
aws_thread_current_sleep(ONE_SEC);
ASSERT_SUCCESS(s_check_packets_received_order(
state_test_data->mock_server, packet_count, packet_id_1, packet_id_2, packet_id_3));
ASSERT_SUCCESS(s_check_resend_packets(
state_test_data->mock_server, packet_count, true, packet_ids, AWS_ARRAY_SIZE(packet_ids)));

ASSERT_SUCCESS(
aws_mqtt_client_connection_disconnect(state_test_data->mqtt_connection, s_on_disconnect_fn, state_test_data));
Expand Down
Loading

0 comments on commit 7a300db

Please sign in to comment.