Skip to content

Commit

Permalink
Refactor MQTT5 ping timeout (#361)
Browse files Browse the repository at this point in the history
Co-authored-by: Bret Ambrose <bambrose@amazon.com>
  • Loading branch information
bretambrose and Bret Ambrose authored Mar 25, 2024
1 parent 74da9ca commit 1efb8f9
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 62 deletions.
4 changes: 0 additions & 4 deletions include/aws/mqtt/private/v5/mqtt5_options_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -337,10 +337,6 @@ AWS_MQTT_API void aws_mqtt5_client_options_storage_log(
const struct aws_mqtt5_client_options_storage *options_storage,
enum aws_log_level level);

AWS_MQTT_API bool aws_mqtt5_client_keep_alive_options_are_valid(
uint16_t keep_alive_interval_seconds,
uint32_t ping_timeout_ms);

AWS_EXTERN_C_END

#endif /* AWS_MQTT_MQTT5_OPERATION_H */
15 changes: 14 additions & 1 deletion source/v5/mqtt5_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -2940,7 +2940,20 @@ static void s_on_pingreq_send(struct aws_mqtt5_client *client) {
uint64_t now = client->vtable->get_current_time_fn();
uint64_t ping_timeout_nanos =
aws_timestamp_convert(client->config->ping_timeout_ms, AWS_TIMESTAMP_MILLIS, AWS_TIMESTAMP_NANOS, NULL);
client->next_ping_timeout_time = aws_add_u64_saturating(now, ping_timeout_nanos);
uint64_t half_keep_alive_nanos =
aws_timestamp_convert(
client->negotiated_settings.server_keep_alive, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL) /
2;

uint64_t connection_ping_timeout = ping_timeout_nanos;
if (connection_ping_timeout == 0 || connection_ping_timeout > half_keep_alive_nanos) {
connection_ping_timeout = half_keep_alive_nanos;
}

AWS_LOGF_DEBUG(
AWS_LS_MQTT5_CLIENT, "id=%p: dynamic ping timeout: %" PRIu64 " ns", (void *)client, connection_ping_timeout);

client->next_ping_timeout_time = aws_add_u64_saturating(now, connection_ping_timeout);
}

static int s_apply_throughput_flow_control(struct aws_mqtt5_client *client) {
Expand Down
29 changes: 1 addition & 28 deletions source/v5/mqtt5_options_storage.c
Original file line number Diff line number Diff line change
Expand Up @@ -3308,26 +3308,6 @@ struct aws_mqtt5_operation_pingreq *aws_mqtt5_operation_pingreq_new(struct aws_a
return pingreq_op;
}

bool aws_mqtt5_client_keep_alive_options_are_valid(uint16_t keep_alive_interval_seconds, uint32_t ping_timeout_ms) {
/* The client will not behave properly if ping timeout is not significantly shorter than the keep alive interval */
if (keep_alive_interval_seconds > 0) {
uint64_t keep_alive_ms =
aws_timestamp_convert(keep_alive_interval_seconds, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_MILLIS, NULL);
uint64_t one_second_ms = aws_timestamp_convert(1, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_MILLIS, NULL);

if (ping_timeout_ms == 0) {
ping_timeout_ms = AWS_MQTT5_CLIENT_DEFAULT_PING_TIMEOUT_MS;
}

if ((uint64_t)ping_timeout_ms + one_second_ms > keep_alive_ms) {
AWS_LOGF_ERROR(AWS_LS_MQTT5_GENERAL, "keep alive interval is too small relative to ping timeout interval");
return false;
}
}

return true;
}

/*********************************************************************************************************************
* Client storage options
********************************************************************************************************************/
Expand Down Expand Up @@ -3387,17 +3367,10 @@ int aws_mqtt5_client_options_validate(const struct aws_mqtt5_client_options *opt

if (aws_mqtt5_packet_connect_view_validate(options->connect_options)) {
AWS_LOGF_ERROR(AWS_LS_MQTT5_GENERAL, "invalid CONNECT options in mqtt5 client configuration");
/* connect validation failure will have already rasied the appropriate error */
/* connect validation failure will have already raised the appropriate error */
return AWS_OP_ERR;
}

/* The client will not behave properly if ping timeout is not significantly shorter than the keep alive interval */
if (!aws_mqtt5_client_keep_alive_options_are_valid(
options->connect_options->keep_alive_interval_seconds, options->ping_timeout_ms)) {
AWS_LOGF_ERROR(AWS_LS_MQTT5_GENERAL, "keep alive interval is too small relative to ping timeout interval");
return aws_raise_error(AWS_ERROR_MQTT5_CLIENT_OPTIONS_VALIDATION);
}

if (options->topic_aliasing_options != NULL) {
if (!aws_mqtt5_outbound_topic_alias_behavior_type_validate(
options->topic_aliasing_options->outbound_topic_alias_behavior)) {
Expand Down
10 changes: 0 additions & 10 deletions source/v5/mqtt5_to_mqtt3_adapter.c
Original file line number Diff line number Diff line change
Expand Up @@ -309,16 +309,6 @@ static int s_validate_adapter_connection_options(
}
}

/* The client will not behave properly if ping timeout is not significantly shorter than the keep alive interval */
if (!aws_mqtt5_client_keep_alive_options_are_valid(
connection_options->keep_alive_time_secs, connection_options->ping_timeout_ms)) {
AWS_LOGF_ERROR(
AWS_LS_MQTT5_TO_MQTT3_ADAPTER,
"id=%p: mqtt3-to-5-adapter - keep alive interval is too small relative to ping timeout interval",
(void *)adapter);
return aws_raise_error(AWS_ERROR_MQTT5_CLIENT_OPTIONS_VALIDATION);
}

return AWS_OP_SUCCESS;
}

Expand Down
2 changes: 1 addition & 1 deletion tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,6 @@ add_test_case(mqtt5_client_options_validation_failure_no_bootstrap)
add_test_case(mqtt5_client_options_validation_failure_no_publish_received)
add_test_case(mqtt5_client_options_validation_failure_invalid_socket_options)
add_test_case(mqtt5_client_options_validation_failure_invalid_connect)
add_test_case(mqtt5_client_options_validation_failure_invalid_keep_alive)
add_test_case(mqtt5_client_options_validation_failure_invalid_port)
add_test_case(mqtt5_operation_subscribe_connection_settings_validation_failure_exceeds_maximum_packet_size)
add_test_case(mqtt5_operation_unsubscribe_connection_settings_validation_failure_exceeds_maximum_packet_size)
Expand Down Expand Up @@ -330,6 +329,7 @@ add_test_case(mqtt5_client_sub_pub_unsub_qos0)
add_test_case(mqtt5_client_sub_pub_unsub_qos1)
add_test_case(mqtt5_client_ping_sequence)
add_test_case(mqtt5_client_ping_timeout)
add_test_case(mqtt5_client_ping_timeout_with_keep_alive_conflict)
add_test_case(mqtt5_client_reconnect_failure_backoff)
add_test_case(mqtt5_client_reconnect_backoff_insufficient_reset)
add_test_case(mqtt5_client_reconnect_backoff_sufficient_reset)
Expand Down
92 changes: 88 additions & 4 deletions tests/v5/mqtt5_client_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -1217,7 +1217,9 @@ AWS_TEST_CASE(mqtt5_client_ping_write_pushout, s_mqtt5_client_ping_write_pushout

#define TIMEOUT_TEST_PING_INTERVAL_MS ((uint64_t)10000)

static int s_verify_ping_timeout_interval(struct aws_mqtt5_client_mock_test_fixture *test_context) {
static int s_verify_ping_timeout_interval(
struct aws_mqtt5_client_mock_test_fixture *test_context,
uint64_t expected_connected_time_ms) {
aws_mutex_lock(&test_context->lock);

uint64_t connected_time = 0;
Expand All @@ -1242,8 +1244,6 @@ static int s_verify_ping_timeout_interval(struct aws_mqtt5_client_mock_test_fixt

uint64_t connected_interval_ms =
aws_timestamp_convert(disconnected_time - connected_time, AWS_TIMESTAMP_NANOS, AWS_TIMESTAMP_MILLIS, NULL);
uint64_t expected_connected_time_ms =
TIMEOUT_TEST_PING_INTERVAL_MS + (uint64_t)test_context->client->config->ping_timeout_ms;

ASSERT_TRUE(s_is_within_percentage_of(expected_connected_time_ms, connected_interval_ms, .3));

Expand Down Expand Up @@ -1310,7 +1310,9 @@ static int s_mqtt5_client_ping_timeout_fn(struct aws_allocator *allocator, void
ASSERT_SUCCESS(
s_verify_simple_lifecycle_event_sequence(&test_context, expected_events, AWS_ARRAY_SIZE(expected_events)));

ASSERT_SUCCESS(s_verify_ping_timeout_interval(&test_context));
uint64_t expected_connected_time_ms =
TIMEOUT_TEST_PING_INTERVAL_MS + (uint64_t)test_context.client->config->ping_timeout_ms;
ASSERT_SUCCESS(s_verify_ping_timeout_interval(&test_context, expected_connected_time_ms));

enum aws_mqtt5_client_state expected_states[] = {
AWS_MCS_CONNECTING,
Expand All @@ -1329,6 +1331,88 @@ static int s_mqtt5_client_ping_timeout_fn(struct aws_allocator *allocator, void

AWS_TEST_CASE(mqtt5_client_ping_timeout, s_mqtt5_client_ping_timeout_fn)

/*
* A variant of the basic ping timeout test that uses a timeout that is larger than the keep alive. Previously,
* we forbid this because taken literally, it leads to broken behavior. We now clamp the ping timeout dynamically
* based on the connection's established keep alive.
*/
static int s_mqtt5_client_ping_timeout_with_keep_alive_conflict_fn(struct aws_allocator *allocator, void *ctx) {
(void)ctx;

aws_mqtt_library_init(allocator);

struct mqtt5_client_test_options test_options;
aws_mqtt5_client_test_init_default_options(&test_options);

/* fast keep alive in order keep tests reasonably short */
uint16_t keep_alive_seconds =
(uint16_t)aws_timestamp_convert(TIMEOUT_TEST_PING_INTERVAL_MS, AWS_TIMESTAMP_MILLIS, AWS_TIMESTAMP_SECS, NULL);
test_options.connect_options.keep_alive_interval_seconds = keep_alive_seconds;

/* don't respond to PINGREQs */
test_options.server_function_table.packet_handlers[AWS_MQTT5_PT_PINGREQ] = NULL;

/* ping timeout slower than keep alive */
test_options.client_options.ping_timeout_ms = 2 * TIMEOUT_TEST_PING_INTERVAL_MS;

struct aws_mqtt5_client_mqtt5_mock_test_fixture_options test_fixture_options = {
.client_options = &test_options.client_options,
.server_function_table = &test_options.server_function_table,
};

struct aws_mqtt5_client_mock_test_fixture test_context;
ASSERT_SUCCESS(aws_mqtt5_client_mock_test_fixture_init(&test_context, allocator, &test_fixture_options));

struct aws_mqtt5_client *client = test_context.client;
ASSERT_SUCCESS(aws_mqtt5_client_start(client));

aws_wait_for_connected_lifecycle_event(&test_context);
s_wait_for_disconnection_lifecycle_event(&test_context);

ASSERT_SUCCESS(aws_mqtt5_client_stop(client, NULL, NULL));

aws_wait_for_stopped_lifecycle_event(&test_context);

struct aws_mqtt5_client_lifecycle_event expected_events[] = {
{
.event_type = AWS_MQTT5_CLET_ATTEMPTING_CONNECT,
},
{
.event_type = AWS_MQTT5_CLET_CONNECTION_SUCCESS,
},
{
.event_type = AWS_MQTT5_CLET_DISCONNECTION,
.error_code = AWS_ERROR_MQTT5_PING_RESPONSE_TIMEOUT,
},
{
.event_type = AWS_MQTT5_CLET_STOPPED,
},
};
ASSERT_SUCCESS(
s_verify_simple_lifecycle_event_sequence(&test_context, expected_events, AWS_ARRAY_SIZE(expected_events)));

uint64_t expected_connected_time_ms = 3 * TIMEOUT_TEST_PING_INTERVAL_MS / 2;
ASSERT_SUCCESS(s_verify_ping_timeout_interval(&test_context, expected_connected_time_ms));

enum aws_mqtt5_client_state expected_states[] = {
AWS_MCS_CONNECTING,
AWS_MCS_MQTT_CONNECT,
AWS_MCS_CONNECTED,
AWS_MCS_CLEAN_DISCONNECT,
AWS_MCS_CHANNEL_SHUTDOWN,
};
ASSERT_SUCCESS(aws_verify_client_state_sequence(&test_context, expected_states, AWS_ARRAY_SIZE(expected_states)));

aws_mqtt5_client_mock_test_fixture_clean_up(&test_context);
aws_mqtt_library_clean_up();

return AWS_OP_SUCCESS;
}

AWS_TEST_CASE(
mqtt5_client_ping_timeout_with_keep_alive_conflict,
s_mqtt5_client_ping_timeout_with_keep_alive_conflict_fn)

struct aws_lifecycle_event_wait_context {
enum aws_mqtt5_client_lifecycle_event_type type;
size_t count;
Expand Down
14 changes: 0 additions & 14 deletions tests/v5/mqtt5_operation_validation_failure_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -1170,20 +1170,6 @@ static void s_make_invalid_connect_client_options(struct aws_mqtt5_client_option

AWS_CLIENT_CREATION_VALIDATION_FAILURE(invalid_connect, s_good_client_options, s_make_invalid_connect_client_options)

static struct aws_mqtt5_packet_connect_view s_short_keep_alive_connect_view = {
.keep_alive_interval_seconds = 20,
};

static void s_make_invalid_keep_alive_client_options(struct aws_mqtt5_client_options *options) {
options->connect_options = &s_short_keep_alive_connect_view;
options->ping_timeout_ms = 30000;
}

AWS_CLIENT_CREATION_VALIDATION_FAILURE(
invalid_keep_alive,
s_good_client_options,
s_make_invalid_keep_alive_client_options)

static void s_make_invalid_port_client_options(struct aws_mqtt5_client_options *options) {
options->port = 0xFFFFFFFF;
}
Expand Down

0 comments on commit 1efb8f9

Please sign in to comment.