Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add timeout and service task support to the request-response client #360

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/aws/mqtt/mqtt.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ enum aws_mqtt_error {
AWS_ERROR_MQTT_CONNECTION_SUBSCRIBE_FAILURE,
AWS_ERROR_MQTT_PROTOCOL_ADAPTER_FAILING_REASON_CODE,
AWS_ERROR_MQTT_REQUEST_RESPONSE_CLIENT_SHUT_DOWN,
AWS_ERROR_MQTT_REQUEST_RESPONSE_TIMEOUT,

AWS_ERROR_END_MQTT_RANGE = AWS_ERROR_ENUM_END_RANGE(AWS_C_MQTT_PACKAGE_ID),
};
Expand Down
4 changes: 4 additions & 0 deletions source/mqtt.c
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,10 @@ bool aws_mqtt_is_valid_topic_filter(const struct aws_byte_cursor *topic_filter)
AWS_DEFINE_ERROR_INFO_MQTT(
AWS_ERROR_MQTT_REQUEST_RESPONSE_CLIENT_SHUT_DOWN,
"Request operation failed due to client shut down"),
AWS_DEFINE_ERROR_INFO_MQTT(
AWS_ERROR_MQTT_REQUEST_RESPONSE_TIMEOUT,
"Request operation failed due to timeout"),

};
/* clang-format on */
#undef AWS_DEFINE_ERROR_INFO_MQTT
Expand Down
174 changes: 170 additions & 4 deletions source/request-response/request_response_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include <aws/mqtt/request-response/request_response_client.h>

#include <aws/common/clock.h>
#include <aws/common/ref_count.h>
#include <aws/common/task_scheduler.h>
#include <aws/io/event_loop.h>
Expand All @@ -13,6 +14,8 @@
#include <aws/mqtt/private/request-response/subscription_manager.h>
#include <aws/mqtt/private/v5/mqtt5_client_impl.h>

#include <inttypes.h>

#define MQTT_RR_CLIENT_OPERATION_TABLE_DEFAULT_SIZE 50

enum aws_mqtt_request_response_operation_type {
Expand Down Expand Up @@ -303,6 +306,9 @@ struct aws_mqtt_rr_client_operation {
struct aws_mqtt_request_operation_storage request_storage;
} storage;

uint64_t timeout_timepoint_ns;
struct aws_priority_queue_node priority_queue_node;

/* Sometimes this is client->operation_queue, other times it is an entry in the client's topic_filter table */
struct aws_linked_list_node node;

Expand Down Expand Up @@ -370,6 +376,9 @@ struct aws_mqtt_request_response_client {
struct aws_task external_shutdown_task;
struct aws_task internal_shutdown_task;

uint64_t scheduled_service_timepoint_ns;
struct aws_task service_task;

enum aws_request_response_client_state state;

struct aws_atomic_var next_id;
Expand All @@ -378,6 +387,13 @@ struct aws_mqtt_request_response_client {

/* &operation->id -> &operation */
struct aws_hash_table operations;

/*
* heap of operation pointers where the timeout is the sort value. Elements are added to this on operation
* submission and removed on operation timeout/completion/termination. Request-response operations have actual
* timeouts, while streaming operations have UINT64_MAX timeouts.
*/
struct aws_priority_queue operations_by_timeout;
};

static void s_aws_rr_client_on_zero_internal_ref_count(void *context) {
Expand All @@ -401,6 +417,8 @@ static void s_mqtt_request_response_client_final_destroy(struct aws_mqtt_request
AWS_FATAL_ASSERT(aws_hash_table_get_entry_count(&client->operations) == 0);
aws_hash_table_clean_up(&client->operations);

aws_priority_queue_clean_up(&client->operations_by_timeout);

aws_mem_release(client->allocator, client);

if (terminate_callback != NULL) {
Expand All @@ -421,6 +439,15 @@ static void s_mqtt_request_response_client_internal_shutdown_task_fn(
s_mqtt_request_response_client_final_destroy(client);
}

static void s_remove_operation_from_timeout_queue(struct aws_mqtt_rr_client_operation *operation) {
struct aws_mqtt_request_response_client *client = operation->client_internal_ref;

if (aws_priority_queue_node_is_in_queue(&operation->priority_queue_node)) {
struct aws_mqtt_rr_client_operation *queued_operation = NULL;
aws_priority_queue_remove(&client->operations_by_timeout, &queued_operation, &operation->priority_queue_node);
}
}

static void s_complete_request_operation_with_failure(struct aws_mqtt_rr_client_operation *operation, int error_code) {
AWS_FATAL_ASSERT(operation->type == AWS_MRROT_REQUEST);
AWS_FATAL_ASSERT(error_code != AWS_ERROR_SUCCESS);
Expand Down Expand Up @@ -493,6 +520,11 @@ static void s_mqtt_request_response_client_external_shutdown_task_fn(
/* stop handling adapter event callbacks */
client->state = AWS_RRCS_SHUTTING_DOWN;

if (client->scheduled_service_timepoint_ns != 0) {
aws_event_loop_cancel_task(client->loop, &client->service_task);
client->scheduled_service_timepoint_ns = 0;
}

aws_rr_subscription_manager_clean_up(&client->subscription_manager);

if (client->client_adapter != NULL) {
Expand All @@ -515,6 +547,26 @@ static void s_mqtt_request_response_client_external_shutdown_task_fn(
aws_ref_count_release(&client->internal_ref_count);
}

static void s_mqtt_request_response_client_wake_service(struct aws_mqtt_request_response_client *client) {
uint64_t now = 0;
aws_high_res_clock_get_ticks(&now);

AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(client->loop));

if (client->state != AWS_RRCS_ACTIVE) {
return;
}

if (client->scheduled_service_timepoint_ns == 0 || now < client->scheduled_service_timepoint_ns) {
if (now < client->scheduled_service_timepoint_ns) {
aws_event_loop_cancel_task(client->loop, &client->service_task);
}

client->scheduled_service_timepoint_ns = now;
aws_event_loop_schedule_task_now(client->loop, &client->service_task);
}
}

static void s_aws_rr_client_subscription_status_event_callback(
const struct aws_rr_subscription_status_event *event,
void *userdata) {
Expand Down Expand Up @@ -595,6 +647,22 @@ bool aws_mqtt_compare_uint64_t_eq(const void *a, const void *b) {
return *(uint64_t *)a == *(uint64_t *)b;
}

static int s_compare_rr_operation_timeouts(const void *a, const void *b) {
const struct aws_mqtt_rr_client_operation **operation_a_ptr = (void *)a;
const struct aws_mqtt_rr_client_operation *operation_a = *operation_a_ptr;

const struct aws_mqtt_rr_client_operation **operation_b_ptr = (void *)b;
const struct aws_mqtt_rr_client_operation *operation_b = *operation_b_ptr;

if (operation_a->timeout_timepoint_ns < operation_b->timeout_timepoint_ns) {
return -1;
} else if (operation_a->timeout_timepoint_ns > operation_b->timeout_timepoint_ns) {
return 1;
} else {
return 0;
}
}

static struct aws_mqtt_request_response_client *s_aws_mqtt_request_response_client_new(
struct aws_allocator *allocator,
const struct aws_mqtt_request_response_client_options *options,
Expand Down Expand Up @@ -632,6 +700,13 @@ static struct aws_mqtt_request_response_client *s_aws_mqtt_request_response_clie
NULL,
NULL);

aws_priority_queue_init_dynamic(
&rr_client->operations_by_timeout,
allocator,
100,
sizeof(struct aws_mqtt_rr_client_operation *),
s_compare_rr_operation_timeouts);

aws_linked_list_init(&rr_client->operation_queue);

aws_task_init(
Expand Down Expand Up @@ -673,6 +748,81 @@ static void s_aws_rr_client_init_subscription_manager(
&rr_client->subscription_manager, allocator, rr_client->client_adapter, &subscription_manager_options);
}

static void s_check_for_operation_timeouts(struct aws_mqtt_request_response_client *client) {
uint64_t now = 0;
aws_high_res_clock_get_ticks(&now);

struct aws_priority_queue *timeout_queue = &client->operations_by_timeout;

bool done = aws_priority_queue_size(timeout_queue) == 0;
while (!done) {
struct aws_mqtt_rr_client_operation **next_operation_by_timeout_ptr = NULL;
aws_priority_queue_top(timeout_queue, (void **)&next_operation_by_timeout_ptr);
AWS_FATAL_ASSERT(next_operation_by_timeout_ptr != NULL);
struct aws_mqtt_rr_client_operation *next_operation_by_timeout = *next_operation_by_timeout_ptr;
AWS_FATAL_ASSERT(next_operation_by_timeout != NULL);

// If the current top of the heap hasn't timed out than nothing has
if (next_operation_by_timeout->timeout_timepoint_ns > now) {
break;
}

/* Ack timeout for this operation has been reached */
aws_priority_queue_pop(timeout_queue, &next_operation_by_timeout);

AWS_LOGF_INFO(
AWS_LS_MQTT_REQUEST_RESPONSE,
"id=%p: request operation with id:%" PRIu64 " has timed out",
(void *)client,
next_operation_by_timeout->id);

s_complete_request_operation_with_failure(next_operation_by_timeout, AWS_ERROR_MQTT_REQUEST_RESPONSE_TIMEOUT);

done = aws_priority_queue_size(timeout_queue) == 0;
}
}

static uint64_t s_mqtt_request_response_client_get_next_service_time(struct aws_mqtt_request_response_client *client) {
if (aws_priority_queue_size(&client->operations_by_timeout) > 0) {
struct aws_mqtt_rr_client_operation **next_operation_by_timeout_ptr = NULL;
aws_priority_queue_top(&client->operations_by_timeout, (void **)&next_operation_by_timeout_ptr);
AWS_FATAL_ASSERT(next_operation_by_timeout_ptr != NULL);
struct aws_mqtt_rr_client_operation *next_operation_by_timeout = *next_operation_by_timeout_ptr;
AWS_FATAL_ASSERT(next_operation_by_timeout != NULL);

return next_operation_by_timeout->timeout_timepoint_ns;
}

return UINT64_MAX;
}

static void s_mqtt_request_response_service_task_fn(
struct aws_task *task,
void *arg,
enum aws_task_status task_status) {
(void)task;

if (task_status == AWS_TASK_STATUS_CANCELED) {
return;
}

struct aws_mqtt_request_response_client *client = arg;
client->scheduled_service_timepoint_ns = 0;

if (client->state == AWS_RRCS_ACTIVE) {

// timeouts
s_check_for_operation_timeouts(client);

// TODO: operation intake and service

// schedule next service
client->scheduled_service_timepoint_ns = s_mqtt_request_response_client_get_next_service_time(client);
aws_event_loop_schedule_task_future(
client->loop, &client->service_task, client->scheduled_service_timepoint_ns);
}
}

static void s_mqtt_request_response_client_initialize_task_fn(
struct aws_task *task,
void *arg,
Expand All @@ -687,6 +837,11 @@ static void s_mqtt_request_response_client_initialize_task_fn(
s_aws_rr_client_init_subscription_manager(client, client->allocator);

client->state = AWS_RRCS_ACTIVE;

aws_task_init(&client->service_task, s_mqtt_request_response_service_task_fn, client, "mqtt_rr_client_service");

aws_event_loop_schedule_task_future(client->loop, &client->service_task, UINT64_MAX);
client->scheduled_service_timepoint_ns = UINT64_MAX;
}

if (client->config.initialized_callback != NULL) {
Expand Down Expand Up @@ -913,24 +1068,26 @@ static void s_mqtt_rr_client_submit_operation(struct aws_task *task, void *arg,
(void)task;

struct aws_mqtt_rr_client_operation *operation = arg;
struct aws_mqtt_request_response_client *client = operation->client_internal_ref;

if (status == AWS_TASK_STATUS_CANCELED) {
goto done;
}

// add appropriate client table entries
aws_hash_table_put(&operation->client_internal_ref->operations, &operation->id, operation, NULL);
aws_hash_table_put(&client->operations, &operation->id, operation, NULL);

// NYI other tables

// NYI set up timeout
// add to timeout priority queue
aws_priority_queue_push_ref(&client->operations_by_timeout, (void *)&operation, &operation->priority_queue_node);

// enqueue
aws_linked_list_push_back(&operation->client_internal_ref->operation_queue, &operation->node);

operation->state = AWS_MRROS_QUEUED;

// NYI wake service
s_mqtt_request_response_client_wake_service(operation->client_internal_ref);

done:

Expand Down Expand Up @@ -972,6 +1129,7 @@ static void s_mqtt_rr_client_destroy_operation(struct aws_task *task, void *arg,
struct aws_mqtt_request_response_client *client = operation->client_internal_ref;

aws_hash_table_remove(&client->operations, &operation->id, NULL, NULL);
s_remove_operation_from_timeout_queue(operation);

aws_linked_list_remove(&operation->node);

Expand All @@ -986,7 +1144,6 @@ static void s_mqtt_rr_client_destroy_operation(struct aws_task *task, void *arg,
/*
NYI:

Remove from timeout tracking
Remove from topic filter table
Remove from correlation token table

Expand Down Expand Up @@ -1115,11 +1272,19 @@ int aws_mqtt_request_response_client_submit_request(
return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
}

uint64_t now = 0;
if (aws_high_res_clock_get_ticks(&now)) {
return aws_raise_error(AWS_ERROR_CLOCK_FAILURE);
}

struct aws_allocator *allocator = client->allocator;
struct aws_mqtt_rr_client_operation *operation =
aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt_rr_client_operation));
operation->allocator = allocator;
operation->type = AWS_MRROT_REQUEST;
operation->timeout_timepoint_ns =
now +
aws_timestamp_convert(client->config.operation_timeout_seconds, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL);

s_aws_mqtt_request_operation_storage_init_from_options(
&operation->storage.request_storage, allocator, request_options);
Expand Down Expand Up @@ -1163,6 +1328,7 @@ struct aws_mqtt_rr_client_operation *aws_mqtt_request_response_client_create_str
aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt_rr_client_operation));
operation->allocator = allocator;
operation->type = AWS_MRROT_STREAMING;
operation->timeout_timepoint_ns = UINT64_MAX;

s_aws_mqtt_streaming_operation_storage_init_from_options(
&operation->storage.streaming_storage, allocator, streaming_options);
Expand Down
1 change: 1 addition & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,7 @@ add_test_case(rrc_submit_streaming_operation_failure_invalid_subscription_topic_

add_test_case(rrc_submit_request_operation_failure_by_shutdown)
add_test_case(rrc_submit_streaming_operation_and_shutdown)
add_test_case(rrc_submit_request_operation_failure_by_timeout)

generate_test_driver(${PROJECT_NAME}-tests)

Expand Down
Loading
Loading