diff --git a/include/aws/mqtt/mqtt.h b/include/aws/mqtt/mqtt.h index 11232125..72e4df48 100644 --- a/include/aws/mqtt/mqtt.h +++ b/include/aws/mqtt/mqtt.h @@ -83,6 +83,7 @@ enum aws_mqtt_error { AWS_ERROR_MQTT_CONNECTION_SUBSCRIBE_FAILURE, AWS_ERROR_MQTT_PROTOCOL_ADAPTER_FAILING_REASON_CODE, AWS_ERROR_MQTT_REQUEST_RESPONSE_CLIENT_SHUT_DOWN, + AWS_ERROR_MQTT_REQUEST_RESPONSE_TIMEOUT, AWS_ERROR_END_MQTT_RANGE = AWS_ERROR_ENUM_END_RANGE(AWS_C_MQTT_PACKAGE_ID), }; diff --git a/source/mqtt.c b/source/mqtt.c index f1f7c360..68bd9bf0 100644 --- a/source/mqtt.c +++ b/source/mqtt.c @@ -239,6 +239,10 @@ bool aws_mqtt_is_valid_topic_filter(const struct aws_byte_cursor *topic_filter) AWS_DEFINE_ERROR_INFO_MQTT( AWS_ERROR_MQTT_REQUEST_RESPONSE_CLIENT_SHUT_DOWN, "Request operation failed due to client shut down"), + AWS_DEFINE_ERROR_INFO_MQTT( + AWS_ERROR_MQTT_REQUEST_RESPONSE_TIMEOUT, + "Request operation failed due to timeout"), + }; /* clang-format on */ #undef AWS_DEFINE_ERROR_INFO_MQTT diff --git a/source/request-response/request_response_client.c b/source/request-response/request_response_client.c index 110199d6..f52604cb 100644 --- a/source/request-response/request_response_client.c +++ b/source/request-response/request_response_client.c @@ -5,6 +5,7 @@ #include +#include #include #include #include @@ -13,6 +14,8 @@ #include #include +#include + #define MQTT_RR_CLIENT_OPERATION_TABLE_DEFAULT_SIZE 50 enum aws_mqtt_request_response_operation_type { @@ -303,6 +306,9 @@ struct aws_mqtt_rr_client_operation { struct aws_mqtt_request_operation_storage request_storage; } storage; + uint64_t timeout_timepoint_ns; + struct aws_priority_queue_node priority_queue_node; + /* Sometimes this is client->operation_queue, other times it is an entry in the client's topic_filter table */ struct aws_linked_list_node node; @@ -370,6 +376,9 @@ struct aws_mqtt_request_response_client { struct aws_task external_shutdown_task; struct aws_task internal_shutdown_task; + uint64_t scheduled_service_timepoint_ns; + struct aws_task service_task; + enum aws_request_response_client_state state; struct aws_atomic_var next_id; @@ -378,6 +387,13 @@ struct aws_mqtt_request_response_client { /* &operation->id -> &operation */ struct aws_hash_table operations; + + /* + * heap of operation pointers where the timeout is the sort value. Elements are added to this on operation + * submission and removed on operation timeout/completion/termination. Request-response operations have actual + * timeouts, while streaming operations have UINT64_MAX timeouts. + */ + struct aws_priority_queue operations_by_timeout; }; static void s_aws_rr_client_on_zero_internal_ref_count(void *context) { @@ -401,6 +417,8 @@ static void s_mqtt_request_response_client_final_destroy(struct aws_mqtt_request AWS_FATAL_ASSERT(aws_hash_table_get_entry_count(&client->operations) == 0); aws_hash_table_clean_up(&client->operations); + aws_priority_queue_clean_up(&client->operations_by_timeout); + aws_mem_release(client->allocator, client); if (terminate_callback != NULL) { @@ -421,6 +439,15 @@ static void s_mqtt_request_response_client_internal_shutdown_task_fn( s_mqtt_request_response_client_final_destroy(client); } +static void s_remove_operation_from_timeout_queue(struct aws_mqtt_rr_client_operation *operation) { + struct aws_mqtt_request_response_client *client = operation->client_internal_ref; + + if (aws_priority_queue_node_is_in_queue(&operation->priority_queue_node)) { + struct aws_mqtt_rr_client_operation *queued_operation = NULL; + aws_priority_queue_remove(&client->operations_by_timeout, &queued_operation, &operation->priority_queue_node); + } +} + static void s_complete_request_operation_with_failure(struct aws_mqtt_rr_client_operation *operation, int error_code) { AWS_FATAL_ASSERT(operation->type == AWS_MRROT_REQUEST); AWS_FATAL_ASSERT(error_code != AWS_ERROR_SUCCESS); @@ -493,6 +520,11 @@ static void s_mqtt_request_response_client_external_shutdown_task_fn( /* stop handling adapter event callbacks */ client->state = AWS_RRCS_SHUTTING_DOWN; + if (client->scheduled_service_timepoint_ns != 0) { + aws_event_loop_cancel_task(client->loop, &client->service_task); + client->scheduled_service_timepoint_ns = 0; + } + aws_rr_subscription_manager_clean_up(&client->subscription_manager); if (client->client_adapter != NULL) { @@ -515,6 +547,26 @@ static void s_mqtt_request_response_client_external_shutdown_task_fn( aws_ref_count_release(&client->internal_ref_count); } +static void s_mqtt_request_response_client_wake_service(struct aws_mqtt_request_response_client *client) { + uint64_t now = 0; + aws_high_res_clock_get_ticks(&now); + + AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(client->loop)); + + if (client->state != AWS_RRCS_ACTIVE) { + return; + } + + if (client->scheduled_service_timepoint_ns == 0 || now < client->scheduled_service_timepoint_ns) { + if (now < client->scheduled_service_timepoint_ns) { + aws_event_loop_cancel_task(client->loop, &client->service_task); + } + + client->scheduled_service_timepoint_ns = now; + aws_event_loop_schedule_task_now(client->loop, &client->service_task); + } +} + static void s_aws_rr_client_subscription_status_event_callback( const struct aws_rr_subscription_status_event *event, void *userdata) { @@ -595,6 +647,22 @@ bool aws_mqtt_compare_uint64_t_eq(const void *a, const void *b) { return *(uint64_t *)a == *(uint64_t *)b; } +static int s_compare_rr_operation_timeouts(const void *a, const void *b) { + const struct aws_mqtt_rr_client_operation **operation_a_ptr = (void *)a; + const struct aws_mqtt_rr_client_operation *operation_a = *operation_a_ptr; + + const struct aws_mqtt_rr_client_operation **operation_b_ptr = (void *)b; + const struct aws_mqtt_rr_client_operation *operation_b = *operation_b_ptr; + + if (operation_a->timeout_timepoint_ns < operation_b->timeout_timepoint_ns) { + return -1; + } else if (operation_a->timeout_timepoint_ns > operation_b->timeout_timepoint_ns) { + return 1; + } else { + return 0; + } +} + static struct aws_mqtt_request_response_client *s_aws_mqtt_request_response_client_new( struct aws_allocator *allocator, const struct aws_mqtt_request_response_client_options *options, @@ -632,6 +700,13 @@ static struct aws_mqtt_request_response_client *s_aws_mqtt_request_response_clie NULL, NULL); + aws_priority_queue_init_dynamic( + &rr_client->operations_by_timeout, + allocator, + 100, + sizeof(struct aws_mqtt_rr_client_operation *), + s_compare_rr_operation_timeouts); + aws_linked_list_init(&rr_client->operation_queue); aws_task_init( @@ -673,6 +748,81 @@ static void s_aws_rr_client_init_subscription_manager( &rr_client->subscription_manager, allocator, rr_client->client_adapter, &subscription_manager_options); } +static void s_check_for_operation_timeouts(struct aws_mqtt_request_response_client *client) { + uint64_t now = 0; + aws_high_res_clock_get_ticks(&now); + + struct aws_priority_queue *timeout_queue = &client->operations_by_timeout; + + bool done = aws_priority_queue_size(timeout_queue) == 0; + while (!done) { + struct aws_mqtt_rr_client_operation **next_operation_by_timeout_ptr = NULL; + aws_priority_queue_top(timeout_queue, (void **)&next_operation_by_timeout_ptr); + AWS_FATAL_ASSERT(next_operation_by_timeout_ptr != NULL); + struct aws_mqtt_rr_client_operation *next_operation_by_timeout = *next_operation_by_timeout_ptr; + AWS_FATAL_ASSERT(next_operation_by_timeout != NULL); + + // If the current top of the heap hasn't timed out than nothing has + if (next_operation_by_timeout->timeout_timepoint_ns > now) { + break; + } + + /* Ack timeout for this operation has been reached */ + aws_priority_queue_pop(timeout_queue, &next_operation_by_timeout); + + AWS_LOGF_INFO( + AWS_LS_MQTT_REQUEST_RESPONSE, + "id=%p: request operation with id:%" PRIu64 " has timed out", + (void *)client, + next_operation_by_timeout->id); + + s_complete_request_operation_with_failure(next_operation_by_timeout, AWS_ERROR_MQTT_REQUEST_RESPONSE_TIMEOUT); + + done = aws_priority_queue_size(timeout_queue) == 0; + } +} + +static uint64_t s_mqtt_request_response_client_get_next_service_time(struct aws_mqtt_request_response_client *client) { + if (aws_priority_queue_size(&client->operations_by_timeout) > 0) { + struct aws_mqtt_rr_client_operation **next_operation_by_timeout_ptr = NULL; + aws_priority_queue_top(&client->operations_by_timeout, (void **)&next_operation_by_timeout_ptr); + AWS_FATAL_ASSERT(next_operation_by_timeout_ptr != NULL); + struct aws_mqtt_rr_client_operation *next_operation_by_timeout = *next_operation_by_timeout_ptr; + AWS_FATAL_ASSERT(next_operation_by_timeout != NULL); + + return next_operation_by_timeout->timeout_timepoint_ns; + } + + return UINT64_MAX; +} + +static void s_mqtt_request_response_service_task_fn( + struct aws_task *task, + void *arg, + enum aws_task_status task_status) { + (void)task; + + if (task_status == AWS_TASK_STATUS_CANCELED) { + return; + } + + struct aws_mqtt_request_response_client *client = arg; + client->scheduled_service_timepoint_ns = 0; + + if (client->state == AWS_RRCS_ACTIVE) { + + // timeouts + s_check_for_operation_timeouts(client); + + // TODO: operation intake and service + + // schedule next service + client->scheduled_service_timepoint_ns = s_mqtt_request_response_client_get_next_service_time(client); + aws_event_loop_schedule_task_future( + client->loop, &client->service_task, client->scheduled_service_timepoint_ns); + } +} + static void s_mqtt_request_response_client_initialize_task_fn( struct aws_task *task, void *arg, @@ -687,6 +837,11 @@ static void s_mqtt_request_response_client_initialize_task_fn( s_aws_rr_client_init_subscription_manager(client, client->allocator); client->state = AWS_RRCS_ACTIVE; + + aws_task_init(&client->service_task, s_mqtt_request_response_service_task_fn, client, "mqtt_rr_client_service"); + + aws_event_loop_schedule_task_future(client->loop, &client->service_task, UINT64_MAX); + client->scheduled_service_timepoint_ns = UINT64_MAX; } if (client->config.initialized_callback != NULL) { @@ -913,24 +1068,26 @@ static void s_mqtt_rr_client_submit_operation(struct aws_task *task, void *arg, (void)task; struct aws_mqtt_rr_client_operation *operation = arg; + struct aws_mqtt_request_response_client *client = operation->client_internal_ref; if (status == AWS_TASK_STATUS_CANCELED) { goto done; } // add appropriate client table entries - aws_hash_table_put(&operation->client_internal_ref->operations, &operation->id, operation, NULL); + aws_hash_table_put(&client->operations, &operation->id, operation, NULL); // NYI other tables - // NYI set up timeout + // add to timeout priority queue + aws_priority_queue_push_ref(&client->operations_by_timeout, (void *)&operation, &operation->priority_queue_node); // enqueue aws_linked_list_push_back(&operation->client_internal_ref->operation_queue, &operation->node); operation->state = AWS_MRROS_QUEUED; - // NYI wake service + s_mqtt_request_response_client_wake_service(operation->client_internal_ref); done: @@ -972,6 +1129,7 @@ static void s_mqtt_rr_client_destroy_operation(struct aws_task *task, void *arg, struct aws_mqtt_request_response_client *client = operation->client_internal_ref; aws_hash_table_remove(&client->operations, &operation->id, NULL, NULL); + s_remove_operation_from_timeout_queue(operation); aws_linked_list_remove(&operation->node); @@ -986,7 +1144,6 @@ static void s_mqtt_rr_client_destroy_operation(struct aws_task *task, void *arg, /* NYI: - Remove from timeout tracking Remove from topic filter table Remove from correlation token table @@ -1115,11 +1272,19 @@ int aws_mqtt_request_response_client_submit_request( return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT); } + uint64_t now = 0; + if (aws_high_res_clock_get_ticks(&now)) { + return aws_raise_error(AWS_ERROR_CLOCK_FAILURE); + } + struct aws_allocator *allocator = client->allocator; struct aws_mqtt_rr_client_operation *operation = aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt_rr_client_operation)); operation->allocator = allocator; operation->type = AWS_MRROT_REQUEST; + operation->timeout_timepoint_ns = + now + + aws_timestamp_convert(client->config.operation_timeout_seconds, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL); s_aws_mqtt_request_operation_storage_init_from_options( &operation->storage.request_storage, allocator, request_options); @@ -1163,6 +1328,7 @@ struct aws_mqtt_rr_client_operation *aws_mqtt_request_response_client_create_str aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt_rr_client_operation)); operation->allocator = allocator; operation->type = AWS_MRROT_STREAMING; + operation->timeout_timepoint_ns = UINT64_MAX; s_aws_mqtt_streaming_operation_storage_init_from_options( &operation->storage.streaming_storage, allocator, streaming_options); diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 3ba0bb96..678a8fa6 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -533,6 +533,7 @@ add_test_case(rrc_submit_streaming_operation_failure_invalid_subscription_topic_ add_test_case(rrc_submit_request_operation_failure_by_shutdown) add_test_case(rrc_submit_streaming_operation_and_shutdown) +add_test_case(rrc_submit_request_operation_failure_by_timeout) generate_test_driver(${PROJECT_NAME}-tests) diff --git a/tests/request-response/request_response_client_tests.c b/tests/request-response/request_response_client_tests.c index 9448dd81..5dc0fa5c 100644 --- a/tests/request-response/request_response_client_tests.c +++ b/tests/request-response/request_response_client_tests.c @@ -787,6 +787,7 @@ AWS_TEST_CASE( static int s_do_rrc_single_request_operation_test_fn( struct aws_allocator *allocator, + struct aws_mqtt_request_response_client_options *rr_client_options, struct aws_mqtt_request_operation_options *request_options, int expected_error_code, struct aws_byte_cursor *expected_payload, @@ -802,8 +803,8 @@ static int s_do_rrc_single_request_operation_test_fn( }; struct aws_rr_client_test_fixture fixture; - ASSERT_SUCCESS( - s_aws_rr_client_test_fixture_init_from_mqtt5(&fixture, allocator, NULL, &client_test_fixture_options, NULL)); + ASSERT_SUCCESS(s_aws_rr_client_test_fixture_init_from_mqtt5( + &fixture, allocator, rr_client_options, &client_test_fixture_options, NULL)); struct aws_rr_client_fixture_request_response_record *record = s_rrc_fixture_add_request_record(&fixture, request_options->serialized_request); @@ -850,13 +851,14 @@ static int s_rrc_submit_request_operation_failure_by_shutdown_fn(struct aws_allo }; return s_do_rrc_single_request_operation_test_fn( - allocator, &request, AWS_ERROR_MQTT_REQUEST_RESPONSE_CLIENT_SHUT_DOWN, NULL, true); + allocator, NULL, &request, AWS_ERROR_MQTT_REQUEST_RESPONSE_CLIENT_SHUT_DOWN, NULL, true); } AWS_TEST_CASE(rrc_submit_request_operation_failure_by_shutdown, s_rrc_submit_request_operation_failure_by_shutdown_fn) static int s_do_rrc_single_streaming_operation_test_fn( struct aws_allocator *allocator, + struct aws_mqtt_request_response_client_options *rr_client_options, struct aws_mqtt_streaming_operation_options *streaming_options, size_t expected_subscription_event_count, struct aws_rr_client_fixture_streaming_record_subscription_event *expected_subscription_events, @@ -872,8 +874,8 @@ static int s_do_rrc_single_streaming_operation_test_fn( }; struct aws_rr_client_test_fixture fixture; - ASSERT_SUCCESS( - s_aws_rr_client_test_fixture_init_from_mqtt5(&fixture, allocator, NULL, &client_test_fixture_options, NULL)); + ASSERT_SUCCESS(s_aws_rr_client_test_fixture_init_from_mqtt5( + &fixture, allocator, rr_client_options, &client_test_fixture_options, NULL)); struct aws_byte_cursor streaming_id = aws_byte_cursor_from_c_str("streaming1"); struct aws_rr_client_fixture_streaming_record *record = s_rrc_fixture_add_streaming_record(&fixture, streaming_id); @@ -935,7 +937,37 @@ static int s_rrc_submit_streaming_operation_and_shutdown_fn(struct aws_allocator }; return s_do_rrc_single_streaming_operation_test_fn( - allocator, &streaming_options, AWS_ARRAY_SIZE(expected_events), expected_events, true); + allocator, NULL, &streaming_options, AWS_ARRAY_SIZE(expected_events), expected_events, true); } AWS_TEST_CASE(rrc_submit_streaming_operation_and_shutdown, s_rrc_submit_streaming_operation_and_shutdown_fn) + +static int s_rrc_submit_request_operation_failure_by_timeout_fn(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + struct aws_mqtt_request_operation_response_path response_paths[] = { + { + .topic = aws_byte_cursor_from_c_str("response/filter/accepted"), + .correlation_token_json_path = aws_byte_cursor_from_c_str("client_token"), + }, + }; + + struct aws_mqtt_request_operation_options request = { + .subscription_topic_filter = aws_byte_cursor_from_c_str("response/filter/+"), + .response_paths = response_paths, + .response_path_count = AWS_ARRAY_SIZE(response_paths), + .publish_topic = aws_byte_cursor_from_c_str("get/shadow"), + .serialized_request = aws_byte_cursor_from_c_str("request1"), + .correlation_token = aws_byte_cursor_from_c_str("MyRequest#1"), + }; + + struct aws_mqtt_request_response_client_options rr_client_options = { + .max_subscriptions = 2, + .operation_timeout_seconds = 2, + }; + + return s_do_rrc_single_request_operation_test_fn( + allocator, &rr_client_options, &request, AWS_ERROR_MQTT_REQUEST_RESPONSE_TIMEOUT, NULL, false); +} + +AWS_TEST_CASE(rrc_submit_request_operation_failure_by_timeout, s_rrc_submit_request_operation_failure_by_timeout_fn)