diff --git a/include/aws/mqtt/mqtt.h b/include/aws/mqtt/mqtt.h index 72e4df48..519378f5 100644 --- a/include/aws/mqtt/mqtt.h +++ b/include/aws/mqtt/mqtt.h @@ -84,6 +84,9 @@ enum aws_mqtt_error { AWS_ERROR_MQTT_PROTOCOL_ADAPTER_FAILING_REASON_CODE, AWS_ERROR_MQTT_REQUEST_RESPONSE_CLIENT_SHUT_DOWN, AWS_ERROR_MQTT_REQUEST_RESPONSE_TIMEOUT, + AWS_ERROR_MQTT_REQUEST_RESPONSE_NO_SUBSCRIPTION_CAPACITY, + AWS_ERROR_MQTT_REQUEST_RESPONSE_SUBSCRIBE_FAILURE, + AWS_ERROR_MQTT_REQUEST_RESPONSE_INTERNAL_ERROR, AWS_ERROR_END_MQTT_RANGE = AWS_ERROR_ENUM_END_RANGE(AWS_C_MQTT_PACKAGE_ID), }; diff --git a/include/aws/mqtt/request-response/request_response_client.h b/include/aws/mqtt/request-response/request_response_client.h index ff73cd06..450675d1 100644 --- a/include/aws/mqtt/request-response/request_response_client.h +++ b/include/aws/mqtt/request-response/request_response_client.h @@ -13,7 +13,6 @@ struct aws_mqtt_request_response_client; struct aws_mqtt_client_connection; struct aws_mqtt5_client; -struct aws_mqtt_streaming_operation; struct aws_mqtt_request_operation_response_path { struct aws_byte_cursor topic; diff --git a/source/mqtt.c b/source/mqtt.c index 68bd9bf0..0e7b0c0e 100644 --- a/source/mqtt.c +++ b/source/mqtt.c @@ -242,6 +242,15 @@ bool aws_mqtt_is_valid_topic_filter(const struct aws_byte_cursor *topic_filter) AWS_DEFINE_ERROR_INFO_MQTT( AWS_ERROR_MQTT_REQUEST_RESPONSE_TIMEOUT, "Request operation failed due to timeout"), + AWS_DEFINE_ERROR_INFO_MQTT( + AWS_ERROR_MQTT_REQUEST_RESPONSE_NO_SUBSCRIPTION_CAPACITY, + "Streaming request operation failed because there was no space for the subscription"), + AWS_DEFINE_ERROR_INFO_MQTT( + AWS_ERROR_MQTT_REQUEST_RESPONSE_SUBSCRIBE_FAILURE, + "Request operation failed because the associated subscribe failed synchronously"), + AWS_DEFINE_ERROR_INFO_MQTT( + AWS_ERROR_MQTT_REQUEST_RESPONSE_INTERNAL_ERROR, + "Request operation failed due to a non-specific internal error within the client."), }; /* clang-format on */ diff --git a/source/request-response/request_response_client.c b/source/request-response/request_response_client.c index f52604cb..07f4a5aa 100644 --- a/source/request-response/request_response_client.c +++ b/source/request-response/request_response_client.c @@ -54,8 +54,51 @@ state, removed on operation completion/destruction 4. &topic_filter -> &{topic_filter, linked_list} // added on in-thread pop from queue, removed from list on operation completion/destruction also checked for empty and removed from table + Note: 4 tracks both streaming and request-response operations but each uses the table in different ways. Both use + the table to react to subscription status events to move the operation forward state-wise. Additionally, + streaming operations use the table to map incoming publishes to listening streaming operations. OTOH, request + operations use table 2 and then 3 to map incoming publishes to operations. + */ +struct aws_rr_operation_list_topic_filter_entry { + struct aws_allocator *allocator; + + struct aws_byte_cursor topic_filter_cursor; + struct aws_byte_buf topic_filter; + + struct aws_linked_list operations; +}; + +struct aws_rr_operation_list_topic_filter_entry *s_aws_rr_operation_list_topic_filter_entry_new( + struct aws_allocator *allocator, + struct aws_byte_cursor topic_filter) { + struct aws_rr_operation_list_topic_filter_entry *entry = + aws_mem_calloc(allocator, 1, sizeof(struct aws_rr_operation_list_topic_filter_entry)); + + entry->allocator = allocator; + aws_byte_buf_init_copy_from_cursor(&entry->topic_filter, allocator, topic_filter); + entry->topic_filter_cursor = aws_byte_cursor_from_buf(&entry->topic_filter); + + aws_linked_list_init(&entry->operations); + + return entry; +} + +void s_aws_rr_operation_list_topic_filter_entry_destroy(struct aws_rr_operation_list_topic_filter_entry *entry) { + if (entry == NULL) { + return; + } + + aws_byte_buf_clean_up(&entry->topic_filter); + + aws_mem_release(entry->allocator, entry); +} + +void s_aws_rr_operation_list_topic_filter_entry_hash_element_destroy(void *value) { + s_aws_rr_operation_list_topic_filter_entry_destroy(value); +} + /* All operations have an internal ref to the client they are a part of */ /* @@ -110,7 +153,7 @@ operation completion/destruction also checked for empty and removed from table /* WakeServiceTask(client) [Event Loop]: - If client.state != SHUTTING_DOWN && protocol client is connected + If client.state == ACTIVE && client.connected RescheduleServiceTask(now) */ @@ -127,6 +170,7 @@ operation completion/destruction also checked for empty and removed from table Check client's topic filter table entry for empty list, remove entry if so. (intrusive list removal already unlinked it from table) If client is not shutting down remove from subscription manager (otherwise it's already been cleaned up) + client.subscription_manager.release_subscription(operation.topic_filter) WakeServiceTask // queue may now be unblocked, does nothing if shutting down (Streaming) Invoke termination callback Release client internal ref @@ -155,6 +199,7 @@ operation completion/destruction also checked for empty and removed from table /* OnProtocolAdapterConnectionEvent(event) [Event Loop]: + client.connected <- event.connected client.subscription_manager.notify(event) WakeServiceTask */ @@ -173,6 +218,10 @@ operation completion/destruction also checked for empty and removed from table /* MakeRequest(operation) [Event Loop]: + operation.state <- SUBSCRIBED + if !client.connected + return + // Critical Requirement - the user data for the publish completion callback must be a weak ref that wraps // the operation. On operation destruction, we zero the weak ref (and dec ref it). operation.state <- AWAITING_RESPONSE @@ -193,18 +242,14 @@ operation completion/destruction also checked for empty and removed from table /* StreamingOperationOnSubscriptionStatusEvent(operation, event) [Event loop, top-level task loop]: - If event.type == Success and operation.state == SUBSCRIBING - operation.state <- SUBSCRIBED - Invoke Success/Failure callback with success - Else If event.type == Failure and operation.state == SUBSCRIBING + If event.type == Success + Emit SubscriptionEstablished + Else If event.type == Lost + Emit SubscriptionLost + Else if event.type == Halted operation.state <- TERMINAL - Invoke Success/Failure callback with failure - Else if event.type == Ended and operation.state != TERMINAL - operation.state <- TERMINAL - Invoke Ended callback + Emit SubscriptionHalted - If Failure or Ended: - sub manager release_subscription(operation id) */ /* @@ -230,12 +275,13 @@ operation completion/destruction also checked for empty and removed from table return // invariant, must be SUBSCRIBING or SUBSCRIBED at this point + Add operation to client's topic filter table + If operation is streaming Add operation to topic filter table operation.state <- {SUBSCRIBING, SUBSCRIBED} If operation is request - Add operation to client's topic filter table if result == SUBSCRIBING operation.state <- SUBSCRIBING else // (SUBSCRIBED) @@ -248,26 +294,26 @@ operation completion/destruction also checked for empty and removed from table For all timed out operations: OnOperationTimeout(operation) - While OperationQueue is not empty: - operation = peek queue - result = subscription manager acquire sub(operation) - if result == Blocked - break - pop operation - HandleAcquireSubscriptionResult(operation, result) + if client connected + + For all request operations where state == SUBSCRIBED + MakeRequest(operation) + + While OperationQueue is not empty: + operation = peek queue + result = subscription manager acquire sub(operation) + if result == Blocked + break + pop operation + HandleAcquireSubscriptionResult(operation, result) Reschedule Service for next timeout if it exists */ /* - OnOperationTimeout(operation) [Event Loop, Service Task Loop]: - - If operation.type == request and operation.state != PENDING_DESTROY - CompleteRequestOperation(operation, error) + OnOperationTimeout(operation) [Event Loop, Service Task Loop, operation is request]: - If operation.type == streaming and operation.state != {SUBSCRIBED, TERMINAL} - operation.state <- TERMINAL - Invoke operation failure callback + CompleteRequestOperation(operation, error) */ @@ -394,6 +440,8 @@ struct aws_mqtt_request_response_client { * timeouts, while streaming operations have UINT64_MAX timeouts. */ struct aws_priority_queue operations_by_timeout; + + struct aws_hash_table operation_lists_by_subscription_filter; }; static void s_aws_rr_client_on_zero_internal_ref_count(void *context) { @@ -418,6 +466,7 @@ static void s_mqtt_request_response_client_final_destroy(struct aws_mqtt_request aws_hash_table_clean_up(&client->operations); aws_priority_queue_clean_up(&client->operations_by_timeout); + aws_hash_table_clean_up(&client->operation_lists_by_subscription_filter); aws_mem_release(client->allocator, client); @@ -469,40 +518,38 @@ static void s_complete_request_operation_with_failure(struct aws_mqtt_rr_client_ aws_mqtt_rr_client_operation_release(operation); } -static void s_streaming_operation_on_client_shutdown(struct aws_mqtt_rr_client_operation *operation, int error_code) { +static void s_halt_streaming_operation_with_failure(struct aws_mqtt_rr_client_operation *operation, int error_code) { AWS_FATAL_ASSERT(operation->type == AWS_MRROT_STREAMING); AWS_FATAL_ASSERT(error_code != AWS_ERROR_SUCCESS); - switch (operation->state) { - case AWS_MRROS_QUEUED: - case AWS_MRROS_PENDING_SUBSCRIPTION: - case AWS_MRROS_SUBSCRIBED: { - aws_mqtt_streaming_operation_subscription_status_fn *subscription_status_callback = - operation->storage.streaming_storage.options.subscription_status_callback; - void *user_data = operation->storage.streaming_storage.options.user_data; - if (subscription_status_callback != NULL) { - (*subscription_status_callback)(ARRSET_STREAMING_SUBSCRIPTION_HALTED, error_code, user_data); - } - } + if (operation->state == AWS_MRROS_PENDING_DESTROY || operation->state == AWS_MRROS_TERMINAL) { + return; + } - default: - break; + aws_mqtt_streaming_operation_subscription_status_fn *subscription_status_callback = + operation->storage.streaming_storage.options.subscription_status_callback; + + if (subscription_status_callback != NULL) { + void *user_data = operation->storage.streaming_storage.options.user_data; + (*subscription_status_callback)(ARRSET_STREAMING_SUBSCRIPTION_HALTED, error_code, user_data); } operation->state = AWS_MRROS_TERMINAL; } +static void s_request_response_fail_operation(struct aws_mqtt_rr_client_operation *operation, int error_code) { + if (operation->type == AWS_MRROT_STREAMING) { + s_halt_streaming_operation_with_failure(operation, error_code); + } else { + s_complete_request_operation_with_failure(operation, error_code); + } +} + static int s_rr_client_clean_up_operation(void *context, struct aws_hash_element *elem) { (void)context; struct aws_mqtt_rr_client_operation *operation = elem->value; - if (operation->type == AWS_MRROT_REQUEST) { - /* Complete the request operation as a failure */ - s_complete_request_operation_with_failure(operation, AWS_ERROR_MQTT_REQUEST_RESPONSE_CLIENT_SHUT_DOWN); - } else { - /* Non-terminal streaming operations should a subscription failure or ended event */ - s_streaming_operation_on_client_shutdown(operation, AWS_ERROR_MQTT_REQUEST_RESPONSE_CLIENT_SHUT_DOWN); - } + s_request_response_fail_operation(operation, AWS_ERROR_MQTT_REQUEST_RESPONSE_CLIENT_SHUT_DOWN); return AWS_COMMON_HASH_TABLE_ITER_CONTINUE; } @@ -707,6 +754,15 @@ static struct aws_mqtt_request_response_client *s_aws_mqtt_request_response_clie sizeof(struct aws_mqtt_rr_client_operation *), s_compare_rr_operation_timeouts); + aws_hash_table_init( + &rr_client->operation_lists_by_subscription_filter, + allocator, + MQTT_RR_CLIENT_OPERATION_TABLE_DEFAULT_SIZE, + aws_hash_byte_cursor_ptr, + aws_mqtt_byte_cursor_hash_equality, + NULL, + s_aws_rr_operation_list_topic_filter_entry_hash_element_destroy); + aws_linked_list_init(&rr_client->operation_queue); aws_task_init( @@ -776,7 +832,7 @@ static void s_check_for_operation_timeouts(struct aws_mqtt_request_response_clie (void *)client, next_operation_by_timeout->id); - s_complete_request_operation_with_failure(next_operation_by_timeout, AWS_ERROR_MQTT_REQUEST_RESPONSE_TIMEOUT); + s_request_response_fail_operation(next_operation_by_timeout, AWS_ERROR_MQTT_REQUEST_RESPONSE_TIMEOUT); done = aws_priority_queue_size(timeout_queue) == 0; } @@ -796,6 +852,123 @@ static uint64_t s_mqtt_request_response_client_get_next_service_time(struct aws_ return UINT64_MAX; } +static struct aws_byte_cursor s_aws_mqtt_rr_operation_get_subscription_topic_filter( + struct aws_mqtt_rr_client_operation *operation) { + if (operation->type == AWS_MRROT_REQUEST) { + return operation->storage.request_storage.options.subscription_topic_filter; + } else { + return operation->storage.streaming_storage.options.topic_filter; + } +} + +/* + HandleAcquireSubscriptionResult(operation, result) [Event Loop, Service Task Loop]: + + // invariant, BLOCKED is not possible, it was already handled + If result == {No Capacity, Failure} + If operation is streaming + Invoke failure callback + operation.state <- TERMINAL + else + CompleteRequestOperation(operation, error) + return + + // invariant, must be SUBSCRIBING or SUBSCRIBED at this point + Add operation to client's topic filter table + + If operation is streaming + operation.state <- {SUBSCRIBING, SUBSCRIBED} + + If operation is request + if result == SUBSCRIBING + operation.state <- SUBSCRIBING + else // (SUBSCRIBED) + MakeRequest(op) +*/ + +static bool s_is_operation_in_list(const struct aws_mqtt_rr_client_operation *operation) { + return aws_linked_list_node_prev_is_valid(&operation->node) && aws_linked_list_node_next_is_valid(&operation->node); +} + +static int s_add_operation_to_subscription_topic_filter_table( + struct aws_mqtt_request_response_client *client, + struct aws_mqtt_rr_client_operation *operation) { + + struct aws_byte_cursor topic_filter_cursor = s_aws_mqtt_rr_operation_get_subscription_topic_filter(operation); + + struct aws_hash_element *element = NULL; + if (aws_hash_table_find(&client->operation_lists_by_subscription_filter, &topic_filter_cursor, &element)) { + return aws_raise_error(AWS_ERROR_MQTT_REQUEST_RESPONSE_INTERNAL_ERROR); + } + + struct aws_rr_operation_list_topic_filter_entry *entry = NULL; + if (element == NULL) { + struct aws_byte_cursor topic_filter_cursor = s_aws_mqtt_rr_operation_get_subscription_topic_filter(operation); + entry = s_aws_rr_operation_list_topic_filter_entry_new(client->allocator, topic_filter_cursor); + aws_hash_table_put(&client->operation_lists_by_subscription_filter, &entry->topic_filter_cursor, entry, NULL); + } else { + entry = element->value; + } + + AWS_FATAL_ASSERT(entry != NULL); + + if (s_is_operation_in_list(operation)) { + aws_linked_list_remove(&operation->node); + } + + aws_linked_list_push_back(&entry->operations, &operation->node); + + return AWS_OP_SUCCESS; +} + +static void s_make_mqtt_request( + struct aws_mqtt_request_response_client *client, + struct aws_mqtt_rr_client_operation *operation) { + (void)client; + + AWS_FATAL_ASSERT(operation->type == AWS_MRROT_REQUEST); + + // TODO: NYI +} + +static void s_handle_operation_subscribe_result( + struct aws_mqtt_request_response_client *client, + struct aws_mqtt_rr_client_operation *operation, + enum aws_acquire_subscription_result_type subscribe_result) { + if (subscribe_result == AASRT_FAILURE || subscribe_result == AASRT_NO_CAPACITY) { + int error_code = (subscribe_result == AASRT_NO_CAPACITY) + ? AWS_ERROR_MQTT_REQUEST_RESPONSE_NO_SUBSCRIPTION_CAPACITY + : AWS_ERROR_MQTT_REQUEST_RESPONSE_SUBSCRIBE_FAILURE; + s_request_response_fail_operation(operation, error_code); + return; + } + + if (s_add_operation_to_subscription_topic_filter_table(client, operation)) { + s_request_response_fail_operation(operation, AWS_ERROR_MQTT_REQUEST_RESPONSE_INTERNAL_ERROR); + return; + } + + if (subscribe_result == AASRT_SUBSCRIBING) { + operation->state = AWS_MRROS_PENDING_SUBSCRIPTION; + return; + } + + if (operation->type == AWS_MRROT_STREAMING) { + operation->state = AWS_MRROS_SUBSCRIBED; + } else { + s_make_mqtt_request(client, operation); + } +} + +static enum aws_rr_subscription_type s_rr_operation_type_to_subscription_type( + enum aws_mqtt_request_response_operation_type type) { + if (type == AWS_MRROT_REQUEST) { + return ARRST_REQUEST_RESPONSE; + } + + return ARRST_EVENT_STREAM; +} + static void s_mqtt_request_response_service_task_fn( struct aws_task *task, void *arg, @@ -814,7 +987,26 @@ static void s_mqtt_request_response_service_task_fn( // timeouts s_check_for_operation_timeouts(client); - // TODO: operation intake and service + while (!aws_linked_list_empty(&client->operation_queue)) { + struct aws_linked_list_node *head = aws_linked_list_front(&client->operation_queue); + struct aws_mqtt_rr_client_operation *head_operation = + AWS_CONTAINER_OF(head, struct aws_mqtt_rr_client_operation, node); + + struct aws_rr_acquire_subscription_options subscribe_options = { + .topic_filter = s_aws_mqtt_rr_operation_get_subscription_topic_filter(head_operation), + .operation_id = head_operation->id, + .type = s_rr_operation_type_to_subscription_type(head_operation->type), + }; + + enum aws_acquire_subscription_result_type subscribe_result = + aws_rr_subscription_manager_acquire_subscription(&client->subscription_manager, &subscribe_options); + if (subscribe_result == AASRT_BLOCKED) { + break; + } + + aws_linked_list_pop_front(&client->operation_queue); + s_handle_operation_subscribe_result(client, head_operation, subscribe_result); + } // schedule next service client->scheduled_service_timepoint_ns = s_mqtt_request_response_client_get_next_service_time(client); @@ -1112,15 +1304,6 @@ static void s_aws_mqtt_request_operation_storage_clean_up(struct aws_mqtt_reques aws_byte_buf_clean_up(&storage->operation_data); } -static struct aws_byte_cursor s_aws_mqtt_rr_operation_get_subscription_topic_filter( - struct aws_mqtt_rr_client_operation *operation) { - if (operation->type == AWS_MRROT_REQUEST) { - return operation->storage.request_storage.options.subscription_topic_filter; - } else { - return operation->storage.streaming_storage.options.topic_filter; - } -} - static void s_mqtt_rr_client_destroy_operation(struct aws_task *task, void *arg, enum aws_task_status status) { (void)task; (void)status; @@ -1131,7 +1314,9 @@ static void s_mqtt_rr_client_destroy_operation(struct aws_task *task, void *arg, aws_hash_table_remove(&client->operations, &operation->id, NULL, NULL); s_remove_operation_from_timeout_queue(operation); - aws_linked_list_remove(&operation->node); + if (s_is_operation_in_list(operation)) { + aws_linked_list_remove(&operation->node); + } if (client->state != AWS_RRCS_SHUTTING_DOWN) { struct aws_rr_release_subscription_options release_options = {