From b7a323d30ccb12c69a49b70c5e033260054d3a74 Mon Sep 17 00:00:00 2001 From: xiazhvera Date: Thu, 10 Aug 2023 10:16:28 -0700 Subject: [PATCH] Pushoff keep alive on ack received (#314) * pushoff keep alive onack * make sure sync data is incritical section * set request send time for each request * updat tests * cast timestamp number * sleep to avoid jitter * fix comments --- include/aws/mqtt/private/client_impl.h | 5 ++ source/client_channel_handler.c | 33 ++++++-- tests/CMakeLists.txt | 1 + tests/v3/connection_state_test.c | 100 ++++++++++++++++++++++--- tests/v3/mqtt_mock_server_handler.c | 4 +- 5 files changed, 124 insertions(+), 19 deletions(-) diff --git a/include/aws/mqtt/private/client_impl.h b/include/aws/mqtt/private/client_impl.h index 68efb114..68bbdf27 100644 --- a/include/aws/mqtt/private/client_impl.h +++ b/include/aws/mqtt/private/client_impl.h @@ -128,6 +128,11 @@ struct aws_mqtt_request { struct aws_channel_task outgoing_task; + /* + * The request send time. Currently used to push off keepalive packet. + */ + uint64_t request_send_timestamp; + /* How this operation is currently affecting the statistics of the connection */ enum aws_mqtt_operation_statistic_state_flags statistic_state_flags; /* The encoded size of the packet - used for operation statistics tracking */ diff --git a/source/client_channel_handler.c b/source/client_channel_handler.c index 6f5502d6..543f0950 100644 --- a/source/client_channel_handler.c +++ b/source/client_channel_handler.c @@ -32,6 +32,28 @@ static void s_update_next_ping_time(struct aws_mqtt_client_connection_311_impl * } } +/* Caches the request send time. The `request_send_timestamp` will be used to push off ping request on request complete. + */ +static void s_update_request_send_time(struct aws_mqtt_request *request) { + if (request->connection != NULL && request->connection->slot != NULL && + request->connection->slot->channel != NULL) { + aws_channel_current_clock_time(request->connection->slot->channel, &request->request_send_timestamp); + } +} + +/* push off next ping time on ack received to last_request_send_timestamp_ns + keep_alive_time_ns + * The function must be called in critical section. */ +static void s_pushoff_next_ping_time( + struct aws_mqtt_client_connection_311_impl *connection, + uint64_t last_request_send_timestamp_ns) { + ASSERT_SYNCED_DATA_LOCK_HELD(connection); + aws_add_u64_checked( + last_request_send_timestamp_ns, connection->keep_alive_time_ns, &last_request_send_timestamp_ns); + if (last_request_send_timestamp_ns > connection->next_ping_time) { + connection->next_ping_time = last_request_send_timestamp_ns; + } +} + /******************************************************************************* * Packet State Machine ******************************************************************************/ @@ -426,7 +448,6 @@ static int s_packet_handler_unsuback(struct aws_byte_cursor message_cursor, void AWS_LS_MQTT_CLIENT, "id=%p: received ack for message id %" PRIu16, (void *)connection, ack.packet_identifier); mqtt_request_complete(connection, AWS_ERROR_SUCCESS, ack.packet_identifier); - return AWS_OP_SUCCESS; } @@ -528,7 +549,6 @@ static int s_packet_handler_pubcomp(struct aws_byte_cursor message_cursor, void AWS_LS_MQTT_CLIENT, "id=%p: received ack for message id %" PRIu16, (void *)connection, ack.packet_identifier); mqtt_request_complete(connection, AWS_ERROR_SUCCESS, ack.packet_identifier); - return AWS_OP_SUCCESS; } @@ -780,6 +800,8 @@ static void s_request_outgoing_task(struct aws_channel_task *task, void *arg, en /* Send the request */ enum aws_mqtt_client_request_state state = request->send_request(request->packet_id, !request->initiated, request->send_request_ud); + /* Update the request send time.*/ + s_update_request_send_time(request); request->initiated = true; int error_code = AWS_ERROR_SUCCESS; switch (state) { @@ -813,9 +835,6 @@ static void s_request_outgoing_task(struct aws_channel_task *task, void *arg, en aws_mqtt_connection_statistics_change_operation_statistic_state( request->connection, request, AWS_MQTT_OSS_NONE); - /* Since a request has complete, update the next ping time */ - s_update_next_ping_time(connection); - aws_hash_table_remove( &connection->synced_data.outstanding_requests_table, &request->packet_id, NULL, NULL); aws_memory_pool_release(&connection->synced_data.requests_pool, request); @@ -837,9 +856,6 @@ static void s_request_outgoing_task(struct aws_channel_task *task, void *arg, en aws_mqtt_connection_statistics_change_operation_statistic_state( request->connection, request, AWS_MQTT_OSS_INCOMPLETE | AWS_MQTT_OSS_UNACKED); - /* Since a request has complete, update the next ping time */ - s_update_next_ping_time(connection); - mqtt_connection_unlock_synced_data(connection); } /* END CRITICAL SECTION */ @@ -1016,6 +1032,7 @@ void mqtt_request_complete(struct aws_mqtt_client_connection_311_impl *connectio aws_mqtt_connection_statistics_change_operation_statistic_state( request->connection, request, AWS_MQTT_OSS_NONE); + s_pushoff_next_ping_time(connection, request->request_send_timestamp); /* clean up request resources */ aws_hash_table_remove_element(&connection->synced_data.outstanding_requests_table, elem); /* remove the request from the list, which is thread_data.ongoing_requests_list */ diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 407bd07f..7af8c4eb 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -75,6 +75,7 @@ add_test_case(mqtt_connection_unsub_timeout) add_test_case(mqtt_connection_publish_QoS1_timeout_connection_lost_reset_time) add_test_case(mqtt_connection_ping_norm) add_test_case(mqtt_connection_ping_no) +add_test_case(mqtt_connection_ping_noack) add_test_case(mqtt_connection_ping_basic_scenario) add_test_case(mqtt_connection_ping_double_scenario) add_test_case(mqtt_connection_close_callback_simple) diff --git a/tests/v3/connection_state_test.c b/tests/v3/connection_state_test.c index 854b0b59..6181d7e8 100644 --- a/tests/v3/connection_state_test.c +++ b/tests/v3/connection_state_test.c @@ -3343,7 +3343,6 @@ static int s_test_mqtt_connection_ping_norm_fn(struct aws_allocator *allocator, }; ASSERT_SUCCESS(aws_mqtt_client_connection_connect(state_test_data->mqtt_connection, &connection_options)); - s_wait_for_connection_to_complete(state_test_data); /* Wait for 4.5 seconds (to account for slight drift/jitter) */ aws_thread_current_sleep(4500000000); @@ -3366,8 +3365,8 @@ AWS_TEST_CASE_FIXTURE( &test_data) /** - * Makes a CONNECT, with 1 second keep alive ping interval, send a publish roughly every second, and then ensure NO - * pings were sent + * Makes a CONNECT, with 1 second keep alive ping interval. Publish QOS1 message for 4.5 seconds and then ensure NO + * pings were sent. (The ping time will be push off on ack ) */ static int s_test_mqtt_connection_ping_no_fn(struct aws_allocator *allocator, void *ctx) { (void)allocator; @@ -3387,10 +3386,19 @@ static int s_test_mqtt_connection_ping_no_fn(struct aws_allocator *allocator, vo ASSERT_SUCCESS(aws_mqtt_client_connection_connect(state_test_data->mqtt_connection, &connection_options)); s_wait_for_connection_to_complete(state_test_data); - for (int i = 0; i < 4; i++) { - struct aws_byte_cursor pub_topic = aws_byte_cursor_from_c_str("/test/topic"); - struct aws_byte_cursor payload_1 = aws_byte_cursor_from_c_str("Test Message 1"); - uint16_t packet_id_1 = aws_mqtt_client_connection_publish( + struct aws_byte_cursor pub_topic = aws_byte_cursor_from_c_str("/test/topic"); + struct aws_byte_cursor payload_1 = aws_byte_cursor_from_c_str("Test Message 1"); + + uint64_t begin_timestamp = 0; + uint64_t elapsed_time = 0; + uint64_t now = 0; + aws_high_res_clock_get_ticks(&begin_timestamp); + uint64_t test_duration = (uint64_t)4 * AWS_TIMESTAMP_NANOS; + + // Make sure we publish for 4 seconds; + while (elapsed_time < test_duration) { + /* Publish qos1*/ + uint16_t packet_id = aws_mqtt_client_connection_publish( state_test_data->mqtt_connection, &pub_topic, AWS_MQTT_QOS_AT_LEAST_ONCE, @@ -3398,12 +3406,16 @@ static int s_test_mqtt_connection_ping_no_fn(struct aws_allocator *allocator, vo &payload_1, s_on_op_complete, state_test_data); - ASSERT_TRUE(packet_id_1 > 0); + ASSERT_TRUE(packet_id > 0); - /* Wait 0.8 seconds */ - aws_thread_current_sleep(800000000); + aws_thread_current_sleep(500000000); /* Sleep 0.5 seconds to avoid spamming*/ + + aws_high_res_clock_get_ticks(&now); + elapsed_time = now - begin_timestamp; } + aws_thread_current_sleep(250000000); /* Sleep 0.25 seconds to consider jitter*/ + /* Ensure the server got 0 PING packets */ ASSERT_INT_EQUALS(0, mqtt_mock_server_get_ping_count(state_test_data->mock_server)); @@ -3421,6 +3433,74 @@ AWS_TEST_CASE_FIXTURE( s_clean_up_mqtt_server_fn, &test_data) +/** + * Makes a CONNECT, with 1 second keep alive ping interval, publish a qos0 messages for 4.5 seconds. + * We should send a total of 4 pings + */ +static int s_test_mqtt_connection_ping_noack_fn(struct aws_allocator *allocator, void *ctx) { + (void)allocator; + struct mqtt_connection_state_test *state_test_data = ctx; + + struct aws_mqtt_connection_options connection_options = { + .user_data = state_test_data, + .clean_session = true, + .client_id = aws_byte_cursor_from_c_str("client1234"), + .host_name = aws_byte_cursor_from_c_str(state_test_data->endpoint.address), + .socket_options = &state_test_data->socket_options, + .on_connection_complete = s_on_connection_complete_fn, + .keep_alive_time_secs = 1, + .ping_timeout_ms = 100, + }; + + ASSERT_SUCCESS(aws_mqtt_client_connection_connect(state_test_data->mqtt_connection, &connection_options)); + s_wait_for_connection_to_complete(state_test_data); + + struct aws_byte_cursor pub_topic = aws_byte_cursor_from_c_str("/test/topic"); + struct aws_byte_cursor payload_1 = aws_byte_cursor_from_c_str("Test Message 1"); + + uint64_t begin_timestamp = 0; + uint64_t elapsed_time = 0; + uint64_t now = 0; + aws_high_res_clock_get_ticks(&begin_timestamp); + uint64_t test_duration = (uint64_t)4 * AWS_TIMESTAMP_NANOS; + + // Make sure we publish for 4 seconds; + while (elapsed_time < test_duration) { + /* Publish qos0*/ + uint16_t packet_id = aws_mqtt_client_connection_publish( + state_test_data->mqtt_connection, + &pub_topic, + AWS_MQTT_QOS_AT_MOST_ONCE, + false, + &payload_1, + s_on_op_complete, + state_test_data); + ASSERT_TRUE(packet_id > 0); + + aws_thread_current_sleep(500000000); /* Sleep 0.5 seconds to avoid spamming*/ + aws_high_res_clock_get_ticks(&now); + elapsed_time = now - begin_timestamp; + } + + aws_thread_current_sleep(250000000); /* Sleep 0.25 seconds to consider jitter*/ + + /* Ensure the server got 4 PING packets */ + ASSERT_INT_EQUALS(4, mqtt_mock_server_get_ping_count(state_test_data->mock_server)); + + ASSERT_SUCCESS( + aws_mqtt_client_connection_disconnect(state_test_data->mqtt_connection, s_on_disconnect_fn, state_test_data)); + s_wait_for_disconnect_to_complete(state_test_data); + + return AWS_OP_SUCCESS; +} + +AWS_TEST_CASE_FIXTURE( + mqtt_connection_ping_noack, + s_setup_mqtt_server_fn, + s_test_mqtt_connection_ping_noack_fn, + s_clean_up_mqtt_server_fn, + &test_data) + /** * Test to make sure the PING timing is correct if a publish/packet is sent near the end of the keep alive time. * Note: Because of socket write jitter and scheduling jitter, the times have a 0.25 (quarter of a second) delta range. diff --git a/tests/v3/mqtt_mock_server_handler.c b/tests/v3/mqtt_mock_server_handler.c index 13f7171f..51bb448f 100644 --- a/tests/v3/mqtt_mock_server_handler.c +++ b/tests/v3/mqtt_mock_server_handler.c @@ -187,7 +187,9 @@ static int s_mqtt_mock_server_handler_process_packet( bool auto_ack = server->synced.auto_ack; aws_mutex_unlock(&server->synced.lock); - if (auto_ack) { + uint8_t qos = (publish_packet.fixed_header.flags >> 1) & 0x3; + // Do not send puback if qos0 + if (auto_ack && qos != 0) { struct aws_io_message *puback_msg = aws_channel_acquire_message_from_pool(server->slot->channel, AWS_IO_MESSAGE_APPLICATION_DATA, 256); struct aws_mqtt_packet_ack puback;