Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pushoff keep alive on ack received #314

Merged
merged 7 commits into from
Aug 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions include/aws/mqtt/private/client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ struct aws_mqtt_request {

struct aws_channel_task outgoing_task;

/*
* The request send time. Currently used to push off keepalive packet.
*/
uint64_t request_send_timestamp;

/* How this operation is currently affecting the statistics of the connection */
enum aws_mqtt_operation_statistic_state_flags statistic_state_flags;
/* The encoded size of the packet - used for operation statistics tracking */
Expand Down
33 changes: 25 additions & 8 deletions source/client_channel_handler.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,28 @@ static void s_update_next_ping_time(struct aws_mqtt_client_connection_311_impl *
}
}

/* Caches the request send time. The `request_send_timestamp` will be used to push off ping request on request complete.
*/
static void s_update_request_send_time(struct aws_mqtt_request *request) {
if (request->connection != NULL && request->connection->slot != NULL &&
request->connection->slot->channel != NULL) {
aws_channel_current_clock_time(request->connection->slot->channel, &request->request_send_timestamp);
}
}

/* push off next ping time on ack received to last_request_send_timestamp_ns + keep_alive_time_ns
* The function must be called in critical section. */
static void s_pushoff_next_ping_time(
struct aws_mqtt_client_connection_311_impl *connection,
uint64_t last_request_send_timestamp_ns) {
ASSERT_SYNCED_DATA_LOCK_HELD(connection);
aws_add_u64_checked(
last_request_send_timestamp_ns, connection->keep_alive_time_ns, &last_request_send_timestamp_ns);
if (last_request_send_timestamp_ns > connection->next_ping_time) {
connection->next_ping_time = last_request_send_timestamp_ns;
}
}

/*******************************************************************************
* Packet State Machine
******************************************************************************/
Expand Down Expand Up @@ -426,7 +448,6 @@ static int s_packet_handler_unsuback(struct aws_byte_cursor message_cursor, void
AWS_LS_MQTT_CLIENT, "id=%p: received ack for message id %" PRIu16, (void *)connection, ack.packet_identifier);

mqtt_request_complete(connection, AWS_ERROR_SUCCESS, ack.packet_identifier);

return AWS_OP_SUCCESS;
}

Expand Down Expand Up @@ -528,7 +549,6 @@ static int s_packet_handler_pubcomp(struct aws_byte_cursor message_cursor, void
AWS_LS_MQTT_CLIENT, "id=%p: received ack for message id %" PRIu16, (void *)connection, ack.packet_identifier);

mqtt_request_complete(connection, AWS_ERROR_SUCCESS, ack.packet_identifier);

return AWS_OP_SUCCESS;
}

Expand Down Expand Up @@ -780,6 +800,8 @@ static void s_request_outgoing_task(struct aws_channel_task *task, void *arg, en
/* Send the request */
enum aws_mqtt_client_request_state state =
request->send_request(request->packet_id, !request->initiated, request->send_request_ud);
/* Update the request send time.*/
s_update_request_send_time(request);
request->initiated = true;
int error_code = AWS_ERROR_SUCCESS;
switch (state) {
Expand Down Expand Up @@ -813,9 +835,6 @@ static void s_request_outgoing_task(struct aws_channel_task *task, void *arg, en
aws_mqtt_connection_statistics_change_operation_statistic_state(
request->connection, request, AWS_MQTT_OSS_NONE);

/* Since a request has complete, update the next ping time */
s_update_next_ping_time(connection);

aws_hash_table_remove(
&connection->synced_data.outstanding_requests_table, &request->packet_id, NULL, NULL);
aws_memory_pool_release(&connection->synced_data.requests_pool, request);
Expand All @@ -837,9 +856,6 @@ static void s_request_outgoing_task(struct aws_channel_task *task, void *arg, en
aws_mqtt_connection_statistics_change_operation_statistic_state(
request->connection, request, AWS_MQTT_OSS_INCOMPLETE | AWS_MQTT_OSS_UNACKED);

/* Since a request has complete, update the next ping time */
s_update_next_ping_time(connection);

mqtt_connection_unlock_synced_data(connection);
} /* END CRITICAL SECTION */

Expand Down Expand Up @@ -1016,6 +1032,7 @@ void mqtt_request_complete(struct aws_mqtt_client_connection_311_impl *connectio
aws_mqtt_connection_statistics_change_operation_statistic_state(
request->connection, request, AWS_MQTT_OSS_NONE);

s_pushoff_next_ping_time(connection, request->request_send_timestamp);
/* clean up request resources */
aws_hash_table_remove_element(&connection->synced_data.outstanding_requests_table, elem);
/* remove the request from the list, which is thread_data.ongoing_requests_list */
Expand Down
1 change: 1 addition & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ add_test_case(mqtt_connection_unsub_timeout)
add_test_case(mqtt_connection_publish_QoS1_timeout_connection_lost_reset_time)
add_test_case(mqtt_connection_ping_norm)
add_test_case(mqtt_connection_ping_no)
add_test_case(mqtt_connection_ping_noack)
add_test_case(mqtt_connection_ping_basic_scenario)
add_test_case(mqtt_connection_ping_double_scenario)
add_test_case(mqtt_connection_close_callback_simple)
Expand Down
100 changes: 90 additions & 10 deletions tests/v3/connection_state_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -3343,7 +3343,6 @@ static int s_test_mqtt_connection_ping_norm_fn(struct aws_allocator *allocator,
};

ASSERT_SUCCESS(aws_mqtt_client_connection_connect(state_test_data->mqtt_connection, &connection_options));
s_wait_for_connection_to_complete(state_test_data);

/* Wait for 4.5 seconds (to account for slight drift/jitter) */
aws_thread_current_sleep(4500000000);
Expand All @@ -3366,8 +3365,8 @@ AWS_TEST_CASE_FIXTURE(
&test_data)

/**
* Makes a CONNECT, with 1 second keep alive ping interval, send a publish roughly every second, and then ensure NO
* pings were sent
* Makes a CONNECT, with 1 second keep alive ping interval. Publish QOS1 message for 4.5 seconds and then ensure NO
* pings were sent. (The ping time will be push off on ack )
*/
static int s_test_mqtt_connection_ping_no_fn(struct aws_allocator *allocator, void *ctx) {
(void)allocator;
Expand All @@ -3387,23 +3386,36 @@ static int s_test_mqtt_connection_ping_no_fn(struct aws_allocator *allocator, vo
ASSERT_SUCCESS(aws_mqtt_client_connection_connect(state_test_data->mqtt_connection, &connection_options));
s_wait_for_connection_to_complete(state_test_data);

for (int i = 0; i < 4; i++) {
struct aws_byte_cursor pub_topic = aws_byte_cursor_from_c_str("/test/topic");
struct aws_byte_cursor payload_1 = aws_byte_cursor_from_c_str("Test Message 1");
uint16_t packet_id_1 = aws_mqtt_client_connection_publish(
struct aws_byte_cursor pub_topic = aws_byte_cursor_from_c_str("/test/topic");
struct aws_byte_cursor payload_1 = aws_byte_cursor_from_c_str("Test Message 1");

uint64_t begin_timestamp = 0;
uint64_t elapsed_time = 0;
uint64_t now = 0;
aws_high_res_clock_get_ticks(&begin_timestamp);
uint64_t test_duration = (uint64_t)4 * AWS_TIMESTAMP_NANOS;

// Make sure we publish for 4 seconds;
while (elapsed_time < test_duration) {
/* Publish qos1*/
uint16_t packet_id = aws_mqtt_client_connection_publish(
state_test_data->mqtt_connection,
&pub_topic,
AWS_MQTT_QOS_AT_LEAST_ONCE,
false,
&payload_1,
s_on_op_complete,
state_test_data);
ASSERT_TRUE(packet_id_1 > 0);
ASSERT_TRUE(packet_id > 0);

/* Wait 0.8 seconds */
aws_thread_current_sleep(800000000);
aws_thread_current_sleep(500000000); /* Sleep 0.5 seconds to avoid spamming*/

aws_high_res_clock_get_ticks(&now);
elapsed_time = now - begin_timestamp;
}

aws_thread_current_sleep(250000000); /* Sleep 0.25 seconds to consider jitter*/

/* Ensure the server got 0 PING packets */
ASSERT_INT_EQUALS(0, mqtt_mock_server_get_ping_count(state_test_data->mock_server));

Expand All @@ -3421,6 +3433,74 @@ AWS_TEST_CASE_FIXTURE(
s_clean_up_mqtt_server_fn,
&test_data)

/**
* Makes a CONNECT, with 1 second keep alive ping interval, publish a qos0 messages for 4.5 seconds.
* We should send a total of 4 pings
*/
static int s_test_mqtt_connection_ping_noack_fn(struct aws_allocator *allocator, void *ctx) {
(void)allocator;
struct mqtt_connection_state_test *state_test_data = ctx;

struct aws_mqtt_connection_options connection_options = {
.user_data = state_test_data,
.clean_session = true,
.client_id = aws_byte_cursor_from_c_str("client1234"),
.host_name = aws_byte_cursor_from_c_str(state_test_data->endpoint.address),
.socket_options = &state_test_data->socket_options,
.on_connection_complete = s_on_connection_complete_fn,
.keep_alive_time_secs = 1,
.ping_timeout_ms = 100,
};

ASSERT_SUCCESS(aws_mqtt_client_connection_connect(state_test_data->mqtt_connection, &connection_options));
s_wait_for_connection_to_complete(state_test_data);

struct aws_byte_cursor pub_topic = aws_byte_cursor_from_c_str("/test/topic");
struct aws_byte_cursor payload_1 = aws_byte_cursor_from_c_str("Test Message 1");

uint64_t begin_timestamp = 0;
uint64_t elapsed_time = 0;
uint64_t now = 0;
aws_high_res_clock_get_ticks(&begin_timestamp);
uint64_t test_duration = (uint64_t)4 * AWS_TIMESTAMP_NANOS;

// Make sure we publish for 4 seconds;
while (elapsed_time < test_duration) {
/* Publish qos0*/
uint16_t packet_id = aws_mqtt_client_connection_publish(
state_test_data->mqtt_connection,
&pub_topic,
AWS_MQTT_QOS_AT_MOST_ONCE,
false,
&payload_1,
s_on_op_complete,
state_test_data);
ASSERT_TRUE(packet_id > 0);

aws_thread_current_sleep(500000000); /* Sleep 0.5 seconds to avoid spamming*/
aws_high_res_clock_get_ticks(&now);
elapsed_time = now - begin_timestamp;
}

aws_thread_current_sleep(250000000); /* Sleep 0.25 seconds to consider jitter*/

/* Ensure the server got 4 PING packets */
ASSERT_INT_EQUALS(4, mqtt_mock_server_get_ping_count(state_test_data->mock_server));

ASSERT_SUCCESS(
aws_mqtt_client_connection_disconnect(state_test_data->mqtt_connection, s_on_disconnect_fn, state_test_data));
s_wait_for_disconnect_to_complete(state_test_data);

return AWS_OP_SUCCESS;
}

AWS_TEST_CASE_FIXTURE(
mqtt_connection_ping_noack,
s_setup_mqtt_server_fn,
s_test_mqtt_connection_ping_noack_fn,
s_clean_up_mqtt_server_fn,
&test_data)

/**
* Test to make sure the PING timing is correct if a publish/packet is sent near the end of the keep alive time.
* Note: Because of socket write jitter and scheduling jitter, the times have a 0.25 (quarter of a second) delta range.
Expand Down
4 changes: 3 additions & 1 deletion tests/v3/mqtt_mock_server_handler.c
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,9 @@ static int s_mqtt_mock_server_handler_process_packet(
bool auto_ack = server->synced.auto_ack;
aws_mutex_unlock(&server->synced.lock);

if (auto_ack) {
uint8_t qos = (publish_packet.fixed_header.flags >> 1) & 0x3;
// Do not send puback if qos0
if (auto_ack && qos != 0) {
struct aws_io_message *puback_msg =
aws_channel_acquire_message_from_pool(server->slot->channel, AWS_IO_MESSAGE_APPLICATION_DATA, 256);
struct aws_mqtt_packet_ack puback;
Expand Down