From e3b0eee835e5824b78f8448ad79c9e2460e5cf45 Mon Sep 17 00:00:00 2001 From: Bret Ambrose Date: Fri, 22 Mar 2024 11:28:40 -0700 Subject: [PATCH] Logging checkpoint --- .../request_response_client.c | 357 +++++++++++++++--- 1 file changed, 295 insertions(+), 62 deletions(-) diff --git a/source/request-response/request_response_client.c b/source/request-response/request_response_client.c index 36dae9af..0419e0fa 100644 --- a/source/request-response/request_response_client.c +++ b/source/request-response/request_response_client.c @@ -27,14 +27,64 @@ enum aws_mqtt_request_response_operation_state { AWS_MRROS_NONE, // creation -> in event loop enqueue AWS_MRROS_QUEUED, // in event loop queue -> non blocked response from subscription manager AWS_MRROS_PENDING_SUBSCRIPTION, // subscribing response from sub manager -> subscription success/failure event - AWS_MRROS_AWAITING_RESPONSE, // (request only) subscription success -> (publish failure OR correlated response + AWS_MRROS_PENDING_RESPONSE, // (request only) subscription success -> (publish failure OR correlated response // received) AWS_MRROS_SUBSCRIBED, // (streaming only) subscription success -> (operation finished OR subscription ended event) AWS_MRROS_TERMINAL, // (streaming only) (subscription failure OR subscription ended) -> operation close/terminate - AWS_MRROS_PENDING_DESTROY, // (request only) the request operation's destroy task has been scheduled but not yet + AWS_MRROS_PENDING_DESTROY, // (request only) the operation's destroy task has been scheduled but not yet // executed }; +const char *s_aws_mqtt_request_response_operation_state_to_c_str(enum aws_mqtt_request_response_operation_state state) { + switch (state) { + case AWS_MRROS_NONE: + return "NONE"; + + case AWS_MRROS_QUEUED: + return "QUEUED"; + + case AWS_MRROS_PENDING_SUBSCRIPTION: + return "PENDING_SUBSCRIPTION"; + + case AWS_MRROS_PENDING_RESPONSE: + return "PENDING_RESPONSE"; + + case AWS_MRROS_SUBSCRIBED: + return "SUBSCRIBED"; + + case AWS_MRROS_TERMINAL: + return "TERMINAL"; + + case AWS_MRROS_PENDING_DESTROY: + return "PENDING_DESTROY"; + + default: + return "Unknown"; + } +} + +const char *s_aws_acquire_subscription_result_type(enum aws_acquire_subscription_result_type result) { + switch (result) { + case AASRT_SUBSCRIBED: + return "SUBSCRIBED"; + + case AASRT_SUBSCRIBING: + return "SUBSCRIBING"; + + case AASRT_BLOCKED: + return "BLOCKED"; + + case AASRT_NO_CAPACITY: + return "NO_CAPACITY"; + + case AASRT_FAILURE: + return "FAILURE"; + + default: + return "Unknown"; + } +} + /* Client Tables/Lookups @@ -497,6 +547,25 @@ static void s_remove_operation_from_timeout_queue(struct aws_mqtt_rr_client_oper } } +static void s_change_operation_state( + struct aws_mqtt_rr_client_operation *operation, + enum aws_mqtt_request_response_operation_state new_state) { + enum aws_mqtt_request_response_operation_state old_state = operation->state; + if (old_state == new_state) { + return; + } + + operation->state = new_state; + + AWS_LOGF_DEBUG( + AWS_LS_MQTT_REQUEST_RESPONSE, + "id=%p: request-response operation %" PRIu64 " changing state from %s to %s", + (void *)operation->client_internal_ref, + operation->id, + s_aws_mqtt_request_response_operation_state_to_c_str(old_state), + s_aws_mqtt_request_response_operation_state_to_c_str(new_state)); +} + static void s_complete_request_operation_with_failure(struct aws_mqtt_rr_client_operation *operation, int error_code) { AWS_FATAL_ASSERT(operation->type == AWS_MRROT_REQUEST); AWS_FATAL_ASSERT(error_code != AWS_ERROR_SUCCESS); @@ -505,6 +574,14 @@ static void s_complete_request_operation_with_failure(struct aws_mqtt_rr_client_ return; } + AWS_LOGF_DEBUG( + AWS_LS_MQTT_REQUEST_RESPONSE, + "id=%p: request-response operation with id:%" PRIu64 " failed with error code %d(%s)", + (void *)operation->client_internal_ref, + operation->id, + error_code, + aws_error_debug_str(error_code)); + aws_mqtt_request_operation_completion_fn *completion_callback = operation->storage.request_storage.options.completion_callback; void *user_data = operation->storage.request_storage.options.user_data; @@ -513,7 +590,7 @@ static void s_complete_request_operation_with_failure(struct aws_mqtt_rr_client_ (*completion_callback)(NULL, error_code, user_data); } - operation->state = AWS_MRROS_PENDING_DESTROY; + s_change_operation_state(operation, AWS_MRROS_PENDING_DESTROY); aws_mqtt_rr_client_operation_release(operation); } @@ -526,6 +603,14 @@ static void s_halt_streaming_operation_with_failure(struct aws_mqtt_rr_client_op return; } + AWS_LOGF_DEBUG( + AWS_LS_MQTT_REQUEST_RESPONSE, + "id=%p: streaming operation with id:%" PRIu64 " halted with error code %d(%s)", + (void *)operation->client_internal_ref, + operation->id, + error_code, + aws_error_debug_str(error_code)); + aws_mqtt_streaming_operation_subscription_status_fn *subscription_status_callback = operation->storage.streaming_storage.options.subscription_status_callback; @@ -534,7 +619,7 @@ static void s_halt_streaming_operation_with_failure(struct aws_mqtt_rr_client_op (*subscription_status_callback)(ARRSET_STREAMING_SUBSCRIPTION_HALTED, error_code, user_data); } - operation->state = AWS_MRROS_TERMINAL; + s_change_operation_state(operation, AWS_MRROS_TERMINAL); } static void s_request_response_fail_operation(struct aws_mqtt_rr_client_operation *operation, int error_code) { @@ -611,6 +696,9 @@ static void s_mqtt_request_response_client_wake_service(struct aws_mqtt_request_ client->scheduled_service_timepoint_ns = now; aws_event_loop_schedule_task_now(client->loop, &client->service_task); + + AWS_LOGF_DEBUG( + AWS_LS_MQTT_REQUEST_RESPONSE, "id=%p: request-response client service task woke", (void *)client); } } @@ -649,10 +737,47 @@ static void s_aws_rr_client_protocol_adapter_subscription_event_callback( aws_rr_subscription_manager_on_protocol_adapter_subscription_event(&rr_client->subscription_manager, event); } +static void s_apply_publish_to_streaming_operation_list( + struct aws_rr_operation_list_topic_filter_entry *entry, + const struct aws_protocol_adapter_incoming_publish_event *publish_event) { + AWS_FATAL_ASSERT(entry != NULL); + + struct aws_linked_list_node *node = aws_linked_list_begin(&entry->operations); + while (node != aws_linked_list_end(&entry->operations)) { + struct aws_mqtt_rr_client_operation *operation = + AWS_CONTAINER_OF(node, struct aws_mqtt_rr_client_operation, node); + node = aws_linked_list_next(node); + + if (operation->type != AWS_MRROT_STREAMING) { + continue; + } + + if (operation->state == AWS_MRROS_PENDING_DESTROY || operation->state == AWS_MRROS_TERMINAL) { + continue; + } + + aws_mqtt_streaming_operation_incoming_publish_fn *incoming_publish_callback = + operation->storage.streaming_storage.options.incoming_publish_callback; + if (!incoming_publish_callback) { + continue; + } + + void *user_data = operation->storage.streaming_storage.options.user_data; + (*incoming_publish_callback)(publish_event->payload, user_data); + + AWS_LOGF_DEBUG( + AWS_LS_MQTT_REQUEST_RESPONSE, + "id=%p: request-response client incoming publish on topic '" PRInSTR + "' routed to streaming operation %" PRIu64, + (void *)operation->client_internal_ref, + AWS_BYTE_CURSOR_PRI(publish_event->topic), + operation->id); + } +} + static void s_aws_rr_client_protocol_adapter_incoming_publish_callback( - const struct aws_protocol_adapter_incoming_publish_event *publish, + const struct aws_protocol_adapter_incoming_publish_event *publish_event, void *user_data) { - (void)publish; struct aws_mqtt_request_response_client *rr_client = user_data; @@ -662,7 +787,23 @@ static void s_aws_rr_client_protocol_adapter_incoming_publish_callback( return; } - /* NYI */ + /* Streaming operation handling */ + struct aws_hash_element *subscription_filter_element = NULL; + if (aws_hash_table_find( + &rr_client->operation_lists_by_subscription_filter, &publish_event->topic, &subscription_filter_element) == + AWS_OP_SUCCESS) { + if (subscription_filter_element != NULL) { + AWS_LOGF_DEBUG( + AWS_LS_MQTT_REQUEST_RESPONSE, + "id=%p: request-response client incoming publish on topic '" PRInSTR "'", + (void *)rr_client, + AWS_BYTE_CURSOR_PRI(publish_event->topic)); + + s_apply_publish_to_streaming_operation_list(subscription_filter_element->value, publish_event); + } + } + + /* Request-Response handling NYI */ } static void s_aws_rr_client_protocol_adapter_terminate_callback(void *user_data) { @@ -683,6 +824,11 @@ static void s_aws_rr_client_protocol_adapter_connection_event_callback( return; } + AWS_LOGF_DEBUG( + AWS_LS_MQTT_REQUEST_RESPONSE, + "id=%p: request-response client applying connection event to subscription manager", + (void *)rr_client); + aws_rr_subscription_manager_on_protocol_adapter_connection_event(&rr_client->subscription_manager, event); } @@ -826,12 +972,6 @@ static void s_check_for_operation_timeouts(struct aws_mqtt_request_response_clie /* Ack timeout for this operation has been reached */ aws_priority_queue_pop(timeout_queue, &next_operation_by_timeout); - AWS_LOGF_INFO( - AWS_LS_MQTT_REQUEST_RESPONSE, - "id=%p: request operation with id:%" PRIu64 " has timed out", - (void *)client, - next_operation_by_timeout->id); - s_request_response_fail_operation(next_operation_by_timeout, AWS_ERROR_MQTT_REQUEST_RESPONSE_TIMEOUT); done = aws_priority_queue_size(timeout_queue) == 0; @@ -861,31 +1001,7 @@ static struct aws_byte_cursor s_aws_mqtt_rr_operation_get_subscription_topic_fil } } -/* - HandleAcquireSubscriptionResult(operation, result) [Event Loop, Service Task Loop]: - - // invariant, BLOCKED is not possible, it was already handled - If result == {No Capacity, Failure} - If operation is streaming - Invoke failure callback - operation.state <- TERMINAL - else - CompleteRequestOperation(operation, error) - return - - // invariant, must be SUBSCRIBING or SUBSCRIBED at this point - Add operation to client's topic filter table - - If operation is streaming - operation.state <- {SUBSCRIBING, SUBSCRIBED} - - If operation is request - if result == SUBSCRIBING - operation.state <- SUBSCRIBING - else // (SUBSCRIBED) - MakeRequest(op) -*/ - +/* TODO: add aws-c-common API? */ static bool s_is_operation_in_list(const struct aws_mqtt_rr_client_operation *operation) { return aws_linked_list_node_prev_is_valid(&operation->node) && aws_linked_list_node_next_is_valid(&operation->node); } @@ -905,6 +1021,11 @@ static int s_add_operation_to_subscription_topic_filter_table( if (element == NULL) { entry = s_aws_rr_operation_list_topic_filter_entry_new(client->allocator, topic_filter_cursor); aws_hash_table_put(&client->operation_lists_by_subscription_filter, &entry->topic_filter_cursor, entry, NULL); + AWS_LOGF_DEBUG( + AWS_LS_MQTT_REQUEST_RESPONSE, + "id=%p: request-response client adding topic filter '" PRInSTR "' to subscriptions table", + (void *)client, + AWS_BYTE_CURSOR_PRI(topic_filter_cursor)); } else { entry = element->value; } @@ -915,6 +1036,14 @@ static int s_add_operation_to_subscription_topic_filter_table( aws_linked_list_remove(&operation->node); } + AWS_LOGF_DEBUG( + AWS_LS_MQTT_REQUEST_RESPONSE, + "id=%p: request-response client adding operation %" PRIu64 " to subscription table with topic_filter '" PRInSTR + "'", + (void *)client, + operation->id, + AWS_BYTE_CURSOR_PRI(topic_filter_cursor)); + aws_linked_list_push_back(&entry->operations, &operation->node); return AWS_OP_SUCCESS; @@ -948,12 +1077,12 @@ static void s_handle_operation_subscribe_result( } if (subscribe_result == AASRT_SUBSCRIBING) { - operation->state = AWS_MRROS_PENDING_SUBSCRIPTION; + s_change_operation_state(operation, AWS_MRROS_PENDING_SUBSCRIPTION); return; } if (operation->type == AWS_MRROT_STREAMING) { - operation->state = AWS_MRROS_SUBSCRIBED; + s_change_operation_state(operation, AWS_MRROS_SUBSCRIBED); } else { s_make_mqtt_request(client, operation); } @@ -968,6 +1097,38 @@ static enum aws_rr_subscription_type s_rr_operation_type_to_subscription_type( return ARRST_EVENT_STREAM; } +static void s_process_queued_operations(struct aws_mqtt_request_response_client *client) { + while (!aws_linked_list_empty(&client->operation_queue)) { + struct aws_linked_list_node *head = aws_linked_list_front(&client->operation_queue); + struct aws_mqtt_rr_client_operation *head_operation = + AWS_CONTAINER_OF(head, struct aws_mqtt_rr_client_operation, node); + + struct aws_rr_acquire_subscription_options subscribe_options = { + .topic_filter = s_aws_mqtt_rr_operation_get_subscription_topic_filter(head_operation), + .operation_id = head_operation->id, + .type = s_rr_operation_type_to_subscription_type(head_operation->type), + }; + + enum aws_acquire_subscription_result_type subscribe_result = + aws_rr_subscription_manager_acquire_subscription(&client->subscription_manager, &subscribe_options); + + AWS_LOGF_DEBUG( + AWS_LS_MQTT_REQUEST_RESPONSE, + "id=%p: request-response client intake, queued operation %" PRIu64 + " yielded acquire subscription result: %s", + (void *)client, + head_operation->id, + s_aws_acquire_subscription_result_type(subscribe_result)); + + if (subscribe_result == AASRT_BLOCKED) { + break; + } + + aws_linked_list_pop_front(&client->operation_queue); + s_handle_operation_subscribe_result(client, head_operation, subscribe_result); + } +} + static void s_mqtt_request_response_service_task_fn( struct aws_task *task, void *arg, @@ -986,31 +1147,19 @@ static void s_mqtt_request_response_service_task_fn( // timeouts s_check_for_operation_timeouts(client); - while (!aws_linked_list_empty(&client->operation_queue)) { - struct aws_linked_list_node *head = aws_linked_list_front(&client->operation_queue); - struct aws_mqtt_rr_client_operation *head_operation = - AWS_CONTAINER_OF(head, struct aws_mqtt_rr_client_operation, node); - - struct aws_rr_acquire_subscription_options subscribe_options = { - .topic_filter = s_aws_mqtt_rr_operation_get_subscription_topic_filter(head_operation), - .operation_id = head_operation->id, - .type = s_rr_operation_type_to_subscription_type(head_operation->type), - }; - - enum aws_acquire_subscription_result_type subscribe_result = - aws_rr_subscription_manager_acquire_subscription(&client->subscription_manager, &subscribe_options); - if (subscribe_result == AASRT_BLOCKED) { - break; - } - - aws_linked_list_pop_front(&client->operation_queue); - s_handle_operation_subscribe_result(client, head_operation, subscribe_result); - } + // operation queue + s_process_queued_operations(client); // schedule next service client->scheduled_service_timepoint_ns = s_mqtt_request_response_client_get_next_service_time(client); aws_event_loop_schedule_task_future( client->loop, &client->service_task, client->scheduled_service_timepoint_ns); + + AWS_LOGF_DEBUG( + AWS_LS_MQTT_REQUEST_RESPONSE, + "id=%p: request-response client service, next timepoint: %" PRIu64, + (void *)client, + client->scheduled_service_timepoint_ns); } } @@ -1265,6 +1414,12 @@ static void s_mqtt_rr_client_submit_operation(struct aws_task *task, void *arg, goto done; } + AWS_LOGF_DEBUG( + AWS_LS_MQTT_REQUEST_RESPONSE, + "id=%p: request-response client, queuing operation %" PRIu64, + (void *)client, + operation->id); + // add appropriate client table entries aws_hash_table_put(&client->operations, &operation->id, operation, NULL); @@ -1276,7 +1431,7 @@ static void s_mqtt_rr_client_submit_operation(struct aws_task *task, void *arg, // enqueue aws_linked_list_push_back(&operation->client_internal_ref->operation_queue, &operation->node); - operation->state = AWS_MRROS_QUEUED; + s_change_operation_state(operation, AWS_MRROS_QUEUED); s_mqtt_request_response_client_wake_service(operation->client_internal_ref); @@ -1374,7 +1529,7 @@ static void s_aws_mqtt_rr_client_operation_init_shared( operation->client_internal_ref = s_aws_mqtt_request_response_client_acquire_internal(client); operation->id = s_aws_mqtt_request_response_client_allocate_operation_id(client); - operation->state = AWS_MRROS_NONE; + s_change_operation_state(operation, AWS_MRROS_NONE); aws_task_init( &operation->submit_task, @@ -1442,6 +1597,76 @@ void s_aws_mqtt_request_operation_storage_init_from_options( storage->options.response_paths = storage->operation_response_paths.data; } +static void s_log_request_response_operation( + struct aws_mqtt_rr_client_operation *operation, + struct aws_mqtt_request_response_client *client) { + struct aws_logger *log_handle = aws_logger_get_conditional(AWS_LS_MQTT_REQUEST_RESPONSE, AWS_LL_DEBUG); + if (log_handle == NULL) { + return; + } + + struct aws_mqtt_request_operation_options *options = &operation->storage.request_storage.options; + + AWS_LOGUF( + log_handle, + AWS_LL_DEBUG, + AWS_LS_MQTT_REQUEST_RESPONSE, + "id=%p: request-response client operation %" PRIu64, + ": subscription topic filter: '" PRInSTR "'", + (void *)client, + operation->id, + AWS_BYTE_CURSOR_PRI(options->subscription_topic_filter)); + AWS_LOGUF( + log_handle, + AWS_LL_DEBUG, + AWS_LS_MQTT_REQUEST_RESPONSE, + "id=%p: request-response client operation %" PRIu64, + ": correlation token: '" PRInSTR "'", + (void *)client, + operation->id, + AWS_BYTE_CURSOR_PRI(options->correlation_token)); + AWS_LOGUF( + log_handle, + AWS_LL_DEBUG, + AWS_LS_MQTT_REQUEST_RESPONSE, + "id=%p: request-response client operation %" PRIu64, + ": publish topic: '" PRInSTR "'", + (void *)client, + operation->id, + AWS_BYTE_CURSOR_PRI(options->publish_topic)); + + AWS_LOGUF( + log_handle, + AWS_LL_DEBUG, + AWS_LS_MQTT_REQUEST_RESPONSE, + "id=%p: request-response client operation %" PRIu64, + ": %zu response paths:", + (void *)client, + operation->id, + options->response_path_count); + for (size_t i = 0; i < options->response_path_count; ++i) { + struct aws_mqtt_request_operation_response_path *response_path = &options->response_paths[i]; + AWS_LOGUF( + log_handle, + AWS_LL_DEBUG, + AWS_LS_MQTT_REQUEST_RESPONSE, + "id=%p: request-response client operation %" PRIu64, + ": response path %zu topic '" PRInSTR "'", + (void *)client, + operation->id, + AWS_BYTE_CURSOR_PRI(response_path->topic)); + AWS_LOGUF( + log_handle, + AWS_LL_DEBUG, + AWS_LS_MQTT_REQUEST_RESPONSE, + "id=%p: request-response client operation %" PRIu64, + ": response path %zu correlation token path '" PRInSTR "'", + (void *)client, + operation->id, + AWS_BYTE_CURSOR_PRI(response_path->correlation_token_json_path)); + } +} + int aws_mqtt_request_response_client_submit_request( struct aws_mqtt_request_response_client *client, const struct aws_mqtt_request_operation_options *request_options) { @@ -1473,6 +1698,14 @@ int aws_mqtt_request_response_client_submit_request( &operation->storage.request_storage, allocator, request_options); s_aws_mqtt_rr_client_operation_init_shared(operation, client); + AWS_LOGF_INFO( + AWS_LS_MQTT_REQUEST_RESPONSE, + "id=%p: request-response client - submitting request-response operation with id " PRIu64, + (void *)client, + operation->id); + + s_log_request_response_operation(operation, client); + aws_event_loop_schedule_task_now(client->loop, &operation->submit_task); return AWS_OP_SUCCESS;