Skip to content

Commit

Permalink
Merge branch 'main' into OverzealousIotCoreValidation
Browse files Browse the repository at this point in the history
  • Loading branch information
bretambrose authored Aug 10, 2023
2 parents f910db7 + b7a323d commit 51368c0
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 19 deletions.
5 changes: 5 additions & 0 deletions include/aws/mqtt/private/client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ struct aws_mqtt_request {

struct aws_channel_task outgoing_task;

/*
* The request send time. Currently used to push off keepalive packet.
*/
uint64_t request_send_timestamp;

/* How this operation is currently affecting the statistics of the connection */
enum aws_mqtt_operation_statistic_state_flags statistic_state_flags;
/* The encoded size of the packet - used for operation statistics tracking */
Expand Down
33 changes: 25 additions & 8 deletions source/client_channel_handler.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,28 @@ static void s_update_next_ping_time(struct aws_mqtt_client_connection_311_impl *
}
}

/* Caches the request send time. The `request_send_timestamp` will be used to push off ping request on request complete.
*/
static void s_update_request_send_time(struct aws_mqtt_request *request) {
if (request->connection != NULL && request->connection->slot != NULL &&
request->connection->slot->channel != NULL) {
aws_channel_current_clock_time(request->connection->slot->channel, &request->request_send_timestamp);
}
}

/* push off next ping time on ack received to last_request_send_timestamp_ns + keep_alive_time_ns
* The function must be called in critical section. */
static void s_pushoff_next_ping_time(
struct aws_mqtt_client_connection_311_impl *connection,
uint64_t last_request_send_timestamp_ns) {
ASSERT_SYNCED_DATA_LOCK_HELD(connection);
aws_add_u64_checked(
last_request_send_timestamp_ns, connection->keep_alive_time_ns, &last_request_send_timestamp_ns);
if (last_request_send_timestamp_ns > connection->next_ping_time) {
connection->next_ping_time = last_request_send_timestamp_ns;
}
}

/*******************************************************************************
* Packet State Machine
******************************************************************************/
Expand Down Expand Up @@ -426,7 +448,6 @@ static int s_packet_handler_unsuback(struct aws_byte_cursor message_cursor, void
AWS_LS_MQTT_CLIENT, "id=%p: received ack for message id %" PRIu16, (void *)connection, ack.packet_identifier);

mqtt_request_complete(connection, AWS_ERROR_SUCCESS, ack.packet_identifier);

return AWS_OP_SUCCESS;
}

Expand Down Expand Up @@ -528,7 +549,6 @@ static int s_packet_handler_pubcomp(struct aws_byte_cursor message_cursor, void
AWS_LS_MQTT_CLIENT, "id=%p: received ack for message id %" PRIu16, (void *)connection, ack.packet_identifier);

mqtt_request_complete(connection, AWS_ERROR_SUCCESS, ack.packet_identifier);

return AWS_OP_SUCCESS;
}

Expand Down Expand Up @@ -780,6 +800,8 @@ static void s_request_outgoing_task(struct aws_channel_task *task, void *arg, en
/* Send the request */
enum aws_mqtt_client_request_state state =
request->send_request(request->packet_id, !request->initiated, request->send_request_ud);
/* Update the request send time.*/
s_update_request_send_time(request);
request->initiated = true;
int error_code = AWS_ERROR_SUCCESS;
switch (state) {
Expand Down Expand Up @@ -813,9 +835,6 @@ static void s_request_outgoing_task(struct aws_channel_task *task, void *arg, en
aws_mqtt_connection_statistics_change_operation_statistic_state(
request->connection, request, AWS_MQTT_OSS_NONE);

/* Since a request has complete, update the next ping time */
s_update_next_ping_time(connection);

aws_hash_table_remove(
&connection->synced_data.outstanding_requests_table, &request->packet_id, NULL, NULL);
aws_memory_pool_release(&connection->synced_data.requests_pool, request);
Expand All @@ -837,9 +856,6 @@ static void s_request_outgoing_task(struct aws_channel_task *task, void *arg, en
aws_mqtt_connection_statistics_change_operation_statistic_state(
request->connection, request, AWS_MQTT_OSS_INCOMPLETE | AWS_MQTT_OSS_UNACKED);

/* Since a request has complete, update the next ping time */
s_update_next_ping_time(connection);

mqtt_connection_unlock_synced_data(connection);
} /* END CRITICAL SECTION */

Expand Down Expand Up @@ -1016,6 +1032,7 @@ void mqtt_request_complete(struct aws_mqtt_client_connection_311_impl *connectio
aws_mqtt_connection_statistics_change_operation_statistic_state(
request->connection, request, AWS_MQTT_OSS_NONE);

s_pushoff_next_ping_time(connection, request->request_send_timestamp);
/* clean up request resources */
aws_hash_table_remove_element(&connection->synced_data.outstanding_requests_table, elem);
/* remove the request from the list, which is thread_data.ongoing_requests_list */
Expand Down
1 change: 1 addition & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ add_test_case(mqtt_connection_unsub_timeout)
add_test_case(mqtt_connection_publish_QoS1_timeout_connection_lost_reset_time)
add_test_case(mqtt_connection_ping_norm)
add_test_case(mqtt_connection_ping_no)
add_test_case(mqtt_connection_ping_noack)
add_test_case(mqtt_connection_ping_basic_scenario)
add_test_case(mqtt_connection_ping_double_scenario)
add_test_case(mqtt_connection_close_callback_simple)
Expand Down
100 changes: 90 additions & 10 deletions tests/v3/connection_state_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -3343,7 +3343,6 @@ static int s_test_mqtt_connection_ping_norm_fn(struct aws_allocator *allocator,
};

ASSERT_SUCCESS(aws_mqtt_client_connection_connect(state_test_data->mqtt_connection, &connection_options));
s_wait_for_connection_to_complete(state_test_data);

/* Wait for 4.5 seconds (to account for slight drift/jitter) */
aws_thread_current_sleep(4500000000);
Expand All @@ -3366,8 +3365,8 @@ AWS_TEST_CASE_FIXTURE(
&test_data)

/**
* Makes a CONNECT, with 1 second keep alive ping interval, send a publish roughly every second, and then ensure NO
* pings were sent
* Makes a CONNECT, with 1 second keep alive ping interval. Publish QOS1 message for 4.5 seconds and then ensure NO
* pings were sent. (The ping time will be push off on ack )
*/
static int s_test_mqtt_connection_ping_no_fn(struct aws_allocator *allocator, void *ctx) {
(void)allocator;
Expand All @@ -3387,23 +3386,36 @@ static int s_test_mqtt_connection_ping_no_fn(struct aws_allocator *allocator, vo
ASSERT_SUCCESS(aws_mqtt_client_connection_connect(state_test_data->mqtt_connection, &connection_options));
s_wait_for_connection_to_complete(state_test_data);

for (int i = 0; i < 4; i++) {
struct aws_byte_cursor pub_topic = aws_byte_cursor_from_c_str("/test/topic");
struct aws_byte_cursor payload_1 = aws_byte_cursor_from_c_str("Test Message 1");
uint16_t packet_id_1 = aws_mqtt_client_connection_publish(
struct aws_byte_cursor pub_topic = aws_byte_cursor_from_c_str("/test/topic");
struct aws_byte_cursor payload_1 = aws_byte_cursor_from_c_str("Test Message 1");

uint64_t begin_timestamp = 0;
uint64_t elapsed_time = 0;
uint64_t now = 0;
aws_high_res_clock_get_ticks(&begin_timestamp);
uint64_t test_duration = (uint64_t)4 * AWS_TIMESTAMP_NANOS;

// Make sure we publish for 4 seconds;
while (elapsed_time < test_duration) {
/* Publish qos1*/
uint16_t packet_id = aws_mqtt_client_connection_publish(
state_test_data->mqtt_connection,
&pub_topic,
AWS_MQTT_QOS_AT_LEAST_ONCE,
false,
&payload_1,
s_on_op_complete,
state_test_data);
ASSERT_TRUE(packet_id_1 > 0);
ASSERT_TRUE(packet_id > 0);

/* Wait 0.8 seconds */
aws_thread_current_sleep(800000000);
aws_thread_current_sleep(500000000); /* Sleep 0.5 seconds to avoid spamming*/

aws_high_res_clock_get_ticks(&now);
elapsed_time = now - begin_timestamp;
}

aws_thread_current_sleep(250000000); /* Sleep 0.25 seconds to consider jitter*/

/* Ensure the server got 0 PING packets */
ASSERT_INT_EQUALS(0, mqtt_mock_server_get_ping_count(state_test_data->mock_server));

Expand All @@ -3421,6 +3433,74 @@ AWS_TEST_CASE_FIXTURE(
s_clean_up_mqtt_server_fn,
&test_data)

/**
* Makes a CONNECT, with 1 second keep alive ping interval, publish a qos0 messages for 4.5 seconds.
* We should send a total of 4 pings
*/
static int s_test_mqtt_connection_ping_noack_fn(struct aws_allocator *allocator, void *ctx) {
(void)allocator;
struct mqtt_connection_state_test *state_test_data = ctx;

struct aws_mqtt_connection_options connection_options = {
.user_data = state_test_data,
.clean_session = true,
.client_id = aws_byte_cursor_from_c_str("client1234"),
.host_name = aws_byte_cursor_from_c_str(state_test_data->endpoint.address),
.socket_options = &state_test_data->socket_options,
.on_connection_complete = s_on_connection_complete_fn,
.keep_alive_time_secs = 1,
.ping_timeout_ms = 100,
};

ASSERT_SUCCESS(aws_mqtt_client_connection_connect(state_test_data->mqtt_connection, &connection_options));
s_wait_for_connection_to_complete(state_test_data);

struct aws_byte_cursor pub_topic = aws_byte_cursor_from_c_str("/test/topic");
struct aws_byte_cursor payload_1 = aws_byte_cursor_from_c_str("Test Message 1");

uint64_t begin_timestamp = 0;
uint64_t elapsed_time = 0;
uint64_t now = 0;
aws_high_res_clock_get_ticks(&begin_timestamp);
uint64_t test_duration = (uint64_t)4 * AWS_TIMESTAMP_NANOS;

// Make sure we publish for 4 seconds;
while (elapsed_time < test_duration) {
/* Publish qos0*/
uint16_t packet_id = aws_mqtt_client_connection_publish(
state_test_data->mqtt_connection,
&pub_topic,
AWS_MQTT_QOS_AT_MOST_ONCE,
false,
&payload_1,
s_on_op_complete,
state_test_data);
ASSERT_TRUE(packet_id > 0);

aws_thread_current_sleep(500000000); /* Sleep 0.5 seconds to avoid spamming*/
aws_high_res_clock_get_ticks(&now);
elapsed_time = now - begin_timestamp;
}

aws_thread_current_sleep(250000000); /* Sleep 0.25 seconds to consider jitter*/

/* Ensure the server got 4 PING packets */
ASSERT_INT_EQUALS(4, mqtt_mock_server_get_ping_count(state_test_data->mock_server));

ASSERT_SUCCESS(
aws_mqtt_client_connection_disconnect(state_test_data->mqtt_connection, s_on_disconnect_fn, state_test_data));
s_wait_for_disconnect_to_complete(state_test_data);

return AWS_OP_SUCCESS;
}

AWS_TEST_CASE_FIXTURE(
mqtt_connection_ping_noack,
s_setup_mqtt_server_fn,
s_test_mqtt_connection_ping_noack_fn,
s_clean_up_mqtt_server_fn,
&test_data)

/**
* Test to make sure the PING timing is correct if a publish/packet is sent near the end of the keep alive time.
* Note: Because of socket write jitter and scheduling jitter, the times have a 0.25 (quarter of a second) delta range.
Expand Down
4 changes: 3 additions & 1 deletion tests/v3/mqtt_mock_server_handler.c
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,9 @@ static int s_mqtt_mock_server_handler_process_packet(
bool auto_ack = server->synced.auto_ack;
aws_mutex_unlock(&server->synced.lock);

if (auto_ack) {
uint8_t qos = (publish_packet.fixed_header.flags >> 1) & 0x3;
// Do not send puback if qos0
if (auto_ack && qos != 0) {
struct aws_io_message *puback_msg =
aws_channel_acquire_message_from_pool(server->slot->channel, AWS_IO_MESSAGE_APPLICATION_DATA, 256);
struct aws_mqtt_packet_ack puback;
Expand Down

0 comments on commit 51368c0

Please sign in to comment.