From b9cee40681026ca3a14da7d5bddd00182bd61f9f Mon Sep 17 00:00:00 2001 From: Bret Ambrose Date: Thu, 22 Feb 2024 14:52:51 -0800 Subject: [PATCH] Checkpoint --- include/aws/mqtt/mqtt.h | 1 + .../request-response/subscription_manager.h | 50 ++- source/mqtt.c | 1 + .../request-response/subscription_manager.c | 289 ++++++++++++++---- 4 files changed, 276 insertions(+), 65 deletions(-) diff --git a/include/aws/mqtt/mqtt.h b/include/aws/mqtt/mqtt.h index 8e7cd0ef..d8034640 100644 --- a/include/aws/mqtt/mqtt.h +++ b/include/aws/mqtt/mqtt.h @@ -94,6 +94,7 @@ enum aws_mqtt_log_subject { AWS_LS_MQTT5_CLIENT, AWS_LS_MQTT5_CANARY, AWS_LS_MQTT5_TO_MQTT3_ADAPTER, + AWS_LS_MQTT_REQUEST_RESPONSE, }; /** Function called on cleanup of a userdata. */ diff --git a/include/aws/mqtt/private/request-response/subscription_manager.h b/include/aws/mqtt/private/request-response/subscription_manager.h index f3ab0e9d..636cd4a8 100644 --- a/include/aws/mqtt/private/request-response/subscription_manager.h +++ b/include/aws/mqtt/private/request-response/subscription_manager.h @@ -15,9 +15,9 @@ struct aws_protocol_adapter_connection_event; struct aws_protocol_adapter_subscription_event; enum aws_rr_subscription_event_type { - ARRSET_SUBSCRIPTION_SUCCESS, - ARRSET_SUBSCRIPTION_FAILURE, - ARRSET_SUBSCRIPTION_LOST + ARRSET_SUBSCRIPTION_SUBSCRIBE_SUCCESS, + ARRSET_SUBSCRIPTION_SUBSCRIBE_FAILURE, + ARRSET_SUBSCRIPTION_ENDED }; struct aws_rr_subscription_status_event { @@ -26,7 +26,13 @@ struct aws_rr_subscription_status_event { uint64_t operation_id; }; -typedef void (aws_rr_subscription_status_event_callbacK_fn)(struct aws_rr_subscription_status_event, void *userdata); +/* + * Invariant: despite being on the same thread, these callbacks must be queued as cross-thread tasks on the native + * request-response client. This allows us to iterate internal collections without worrying about external + * callers disrupting things by invoking APIs back on us. + */ +typedef void( + aws_rr_subscription_status_event_callbacK_fn)(const struct aws_rr_subscription_status_event *event, void *userdata); struct aws_rr_subscription_manager_options { size_t max_subscriptions; @@ -36,6 +42,18 @@ struct aws_rr_subscription_manager_options { void *userdata; }; +/* + * The subscription manager works from a purely lazy perspective. Unsubscribes (from topic filters that are no longer + * referenced) occur when looking for new subscription space. Unsubscribe failures don't trigger anything special, + * we'll just try again next time we look for subscription space. Subscribes are attempted on idle subscriptions + * that still need them, either in response to a new operation listener or a connection resumption event. + * + * We only allow one subscribe or unsubscribe to be outstanding at once for a given topic. If an operation requires a + * subscription while an unsubscribe is in progress the operation is blocked until the unsubscribe resolves. + * + * These invariants are dropped during shutdown. In that case, we immediately send unsubscribes for everything + * that is not already unsubscribing. + */ struct aws_rr_subscription_manager { struct aws_allocator *allocator; @@ -44,7 +62,7 @@ struct aws_rr_subscription_manager { /* non-owning reference; the client is responsible for destroying this asynchronously (listener detachment) */ struct aws_mqtt_protocol_adapter *protocol_adapter; - /* &aws_request_response_subscription.topic_filter_cursor -> aws_request_response_subscription * */ + /* &aws_rr_subscription_record.topic_filter_cursor -> aws_rr_subscription_record * */ struct aws_hash_table subscriptions; bool is_protocol_client_connected; @@ -76,17 +94,29 @@ enum aws_acquire_subscription_result_type { AWS_EXTERN_C_BEGIN -int aws_rr_subscription_manager_init(struct aws_rr_subscription_manager *manager, struct aws_allocator *allocator, struct aws_mqtt_protocol_adapter *protocol_adapter, struct aws_rr_subscription_manager_options *options); +int aws_rr_subscription_manager_init( + struct aws_rr_subscription_manager *manager, + struct aws_allocator *allocator, + struct aws_mqtt_protocol_adapter *protocol_adapter, + const struct aws_rr_subscription_manager_options *options); void aws_rr_subscription_manager_clean_up(struct aws_rr_subscription_manager *manager); -enum aws_acquire_subscription_result_type aws_rr_subscription_manager_acquire_subscription(struct aws_rr_subscription_manager *manager, struct aws_rr_acquire_subscription_options *options); +enum aws_acquire_subscription_result_type aws_rr_subscription_manager_acquire_subscription( + struct aws_rr_subscription_manager *manager, + const struct aws_rr_acquire_subscription_options *options); -void aws_rr_subscription_manager_release_subscription(struct aws_rr_subscription_manager *manager, struct aws_rr_release_subscription_options *options); +void aws_rr_subscription_manager_release_subscription( + struct aws_rr_subscription_manager *manager, + const struct aws_rr_release_subscription_options *options); -void aws_rr_subscription_manager_on_protocol_adapter_subscription_event(struct aws_rr_subscription_manager *manager, struct aws_protocol_adapter_subscription_event *event); +void aws_rr_subscription_manager_on_protocol_adapter_subscription_event( + struct aws_rr_subscription_manager *manager, + const struct aws_protocol_adapter_subscription_event *event); -void aws_rr_subscription_manager_on_protocol_adapter_connection_event(struct aws_rr_subscription_manager *manager, struct aws_protocol_adapter_connection_event *event); +void aws_rr_subscription_manager_on_protocol_adapter_connection_event( + struct aws_rr_subscription_manager *manager, + const struct aws_protocol_adapter_connection_event *event); AWS_EXTERN_C_END diff --git a/source/mqtt.c b/source/mqtt.c index fd16340a..0fbafdb5 100644 --- a/source/mqtt.c +++ b/source/mqtt.c @@ -254,6 +254,7 @@ static struct aws_error_info_list s_error_list = { DEFINE_LOG_SUBJECT_INFO(AWS_LS_MQTT5_CLIENT, "mqtt5-client", "MQTT5 client and connections"), DEFINE_LOG_SUBJECT_INFO(AWS_LS_MQTT5_CANARY, "mqtt5-canary", "MQTT5 canary logging"), DEFINE_LOG_SUBJECT_INFO(AWS_LS_MQTT5_TO_MQTT3_ADAPTER, "mqtt5-to-mqtt3-adapter", "MQTT5-To-MQTT3 adapter logging"), + DEFINE_LOG_SUBJECT_INFO(AWS_LS_MQTT_REQUEST_RESPONSE, "mqtt-request-response-client", "MQTT request-response client logging"), }; /* clang-format on */ diff --git a/source/request-response/subscription_manager.c b/source/request-response/subscription_manager.c index fb1d08e1..31887f86 100644 --- a/source/request-response/subscription_manager.c +++ b/source/request-response/subscription_manager.c @@ -1,10 +1,11 @@ /** -* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -* SPDX-License-Identifier: Apache-2.0. -*/ + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ #include +#include #include #include @@ -13,6 +14,12 @@ enum aws_rr_subscription_status_type { ARRSST_NOT_SUBSCRIBED, }; +/* + * Invariant: subscriptions can only transition from nothing -> {subscribing, unsubscribing} + * + * In particular, the logic blocks subscribing while unsubscribing and unsubscribing while subscribing (unless + * shutting down). + */ enum aws_rr_subscription_pending_action_type { ARRSPAT_NOTHING, ARRSPAT_SUBSCRIBING, @@ -57,6 +64,15 @@ struct aws_rr_subscription_record { enum aws_rr_subscription_type type; }; +static void s_aws_rr_subscription_record_log_invariant_violations(const struct aws_rr_subscription_record *record) { + if (record->status == ARRSST_SUBSCRIBED && record->pending_action == ARRSPAT_SUBSCRIBING) { + AWS_LOGF_ERROR( + AWS_LS_MQTT_REQUEST_RESPONSE, + "MQTT request response subscription ('" PRInSTR "') invalid state", + AWS_BYTE_CURSOR_PRI(record->topic_filter_cursor)); + } +} + static void s_aws_rr_subscription_record_destroy(void *element) { struct aws_rr_subscription_record *record = element; @@ -66,15 +82,23 @@ static void s_aws_rr_subscription_record_destroy(void *element) { aws_mem_release(record->allocator, record); } -static struct aws_rr_subscription_record *s_aws_rr_subscription_new(struct aws_allocator *allocator, struct aws_rr_acquire_subscription_options *options) { +static struct aws_rr_subscription_record *s_aws_rr_subscription_new( + struct aws_allocator *allocator, + const struct aws_rr_acquire_subscription_options *options) { struct aws_rr_subscription_record *record = aws_mem_calloc(allocator, 1, sizeof(struct aws_rr_subscription_record)); record->allocator = allocator; aws_byte_buf_init_copy_from_cursor(&record->topic_filter, allocator, options->topic_filter); record->topic_filter_cursor = aws_byte_cursor_from_buf(&record->topic_filter); - aws_hash_table_init(&record->listeners, allocator, 4, s_aws_hash_subscription_listener, - s_aws_subscription_listener_hash_equality, NULL, s_aws_subscription_listener_destroy); + aws_hash_table_init( + &record->listeners, + allocator, + 4, + s_aws_hash_subscription_listener, + s_aws_subscription_listener_hash_equality, + NULL, + s_aws_subscription_listener_destroy); record->status = ARRSST_NOT_SUBSCRIBED; record->pending_action = ARRSPAT_NOTHING; @@ -84,11 +108,27 @@ static struct aws_rr_subscription_record *s_aws_rr_subscription_new(struct aws_a return record; } -static void s_subscription_record_unsubscribe(struct aws_rr_subscription_manager *manager, struct aws_rr_subscription_record *record, bool forced) { - if (!forced) { - if ((record->status != ARRSST_SUBSCRIBED) || (record->pending_action == ARRSPAT_NOTHING)) { - return; - } +static void s_subscription_record_unsubscribe( + struct aws_rr_subscription_manager *manager, + struct aws_rr_subscription_record *record, + bool shutdown) { + + bool currently_subscribed = record->status == ARRSST_SUBSCRIBED; + bool currently_subscribing = record->pending_action == ARRSPAT_SUBSCRIBING; + bool currently_unsubscribing = record->pending_action == ARRSPAT_UNSUBSCRIBING; + + /* + * The difference between a shutdown unsubscribe and a normal unsubscribe is that on a shutdown we will "chase" + * a pending subscribe with an unsubscribe (breaking the invariant of never having multiple MQTT operations + * pending on a subscription). + */ + bool should_unsubscribe = currently_subscribed && !currently_unsubscribing; + if (shutdown) { + should_unsubscribe = should_unsubscribe || currently_subscribing; + } + + if (!should_unsubscribe) { + return; } struct aws_protocol_adapter_unsubscribe_options unsubscribe_options = { @@ -101,8 +141,12 @@ static void s_subscription_record_unsubscribe(struct aws_rr_subscription_manager } record->pending_action = ARRSPAT_UNSUBSCRIBING; + + // check_invariants may no longer be true now because we might have converted a pending subscribe to a pending + // unsubscribe } +/* Only called when shutting down the client */ static int s_rr_subscription_clean_up_foreach_wrap(void *context, struct aws_hash_element *elem) { struct aws_rr_subscription_manager *manager = context; struct aws_rr_subscription_record *subscription = elem->value; @@ -112,7 +156,9 @@ static int s_rr_subscription_clean_up_foreach_wrap(void *context, struct aws_has return AWS_COMMON_HASH_TABLE_ITER_CONTINUE | AWS_COMMON_HASH_TABLE_ITER_DELETE; } -static struct aws_rr_subscription_record *s_get_subscription_record(struct aws_rr_subscription_manager *manager, struct aws_byte_cursor topic_filter) { +static struct aws_rr_subscription_record *s_get_subscription_record( + struct aws_rr_subscription_manager *manager, + struct aws_byte_cursor topic_filter) { struct aws_rr_subscription_record *subscription = NULL; struct aws_hash_element *element = NULL; if (aws_hash_table_find(&manager->subscriptions, &topic_filter, &element)) { @@ -132,7 +178,7 @@ struct aws_subscription_stats { }; static int s_rr_subscription_count_foreach_wrap(void *context, struct aws_hash_element *elem) { - struct aws_rr_subscription_record *subscription = elem->value; + const struct aws_rr_subscription_record *subscription = elem->value; struct aws_subscription_stats *stats = context; if (subscription->type == ARRST_EVENT_STREAM) { @@ -144,13 +190,18 @@ static int s_rr_subscription_count_foreach_wrap(void *context, struct aws_hash_e return AWS_COMMON_HASH_TABLE_ITER_CONTINUE; } -static void s_get_subscription_stats(struct aws_rr_subscription_manager *manager, struct aws_subscription_stats *stats) { +static void s_get_subscription_stats( + struct aws_rr_subscription_manager *manager, + struct aws_subscription_stats *stats) { AWS_ZERO_STRUCT(*stats); aws_hash_table_foreach(&manager->subscriptions, s_rr_subscription_count_foreach_wrap, stats); } -static void s_remove_listener_from_subscription_record(struct aws_rr_subscription_manager *manager, struct aws_byte_cursor topic_filter, uint64_t operation_id) { +static void s_remove_listener_from_subscription_record( + struct aws_rr_subscription_manager *manager, + struct aws_byte_cursor topic_filter, + uint64_t operation_id) { struct aws_rr_subscription_record *record = s_get_subscription_record(manager, topic_filter); if (record == NULL) { return; @@ -164,7 +215,8 @@ static void s_remove_listener_from_subscription_record(struct aws_rr_subscriptio } static void s_add_listener_to_subscription_record(struct aws_rr_subscription_record *record, uint64_t operation_id) { - struct aws_rr_subscription_listener *listener = aws_mem_calloc(record->allocator, 1, sizeof(struct aws_rr_subscription_listener)); + struct aws_rr_subscription_listener *listener = + aws_mem_calloc(record->allocator, 1, sizeof(struct aws_rr_subscription_listener)); listener->allocator = record->allocator; listener->operation_id = operation_id; @@ -175,49 +227,102 @@ static int s_rr_subscription_cull_unused_subscriptions_wrapper(void *context, st struct aws_rr_subscription_record *record = elem->value; struct aws_rr_subscription_manager *manager = context; - if (manager->is_protocol_client_connected && aws_hash_table_get_entry_count(&record->listeners) == 0) { - s_subscription_record_unsubscribe(manager, record, false); - } + if (aws_hash_table_get_entry_count(&record->listeners) == 0) { + if (manager->is_protocol_client_connected) { + s_subscription_record_unsubscribe(manager, record, false); + } - if (record->status == ARRSST_NOT_SUBSCRIBED && record->pending_action != ARRSPAT_SUBSCRIBING) { - return AWS_COMMON_HASH_TABLE_ITER_CONTINUE | AWS_COMMON_HASH_TABLE_ITER_DELETE; - } else { - return AWS_COMMON_HASH_TABLE_ITER_CONTINUE; + if (record->status == ARRSST_NOT_SUBSCRIBED && record->pending_action == ARRSPAT_NOTHING) { + return AWS_COMMON_HASH_TABLE_ITER_CONTINUE | AWS_COMMON_HASH_TABLE_ITER_DELETE; + } } + + return AWS_COMMON_HASH_TABLE_ITER_CONTINUE; } static void s_cull_unused_subscriptions(struct aws_rr_subscription_manager *manager) { aws_hash_table_foreach(&manager->subscriptions, s_rr_subscription_cull_unused_subscriptions_wrapper, manager); } -enum aws_acquire_subscription_result_type aws_rr_subscription_manager_acquire_subscription(struct aws_rr_subscription_manager *manager, struct aws_rr_acquire_subscription_options *options) { +static int s_rr_activate_idle_subscription( + struct aws_rr_subscription_manager *manager, + struct aws_rr_subscription_record *record) { + int result = AWS_OP_SUCCESS; + + if (manager->is_protocol_client_connected && aws_hash_table_get_entry_count(&record->listeners) > 0) { + if (record->status == ARRSST_NOT_SUBSCRIBED && record->pending_action == ARRSPAT_NOTHING) { + struct aws_protocol_adapter_subscribe_options subscribe_options = { + .topic_filter = record->topic_filter_cursor, + .ack_timeout_seconds = manager->config.operation_timeout_seconds, + }; + + result = aws_mqtt_protocol_adapter_subscribe(manager->protocol_adapter, &subscribe_options); + if (result == AWS_OP_SUCCESS) { + record->pending_action = ARRSPAT_SUBSCRIBING; + } + } + } + + return result; +} + +static void s_emit_subscription_event( + const struct aws_rr_subscription_manager *manager, + const struct aws_rr_subscription_record *record, + enum aws_rr_subscription_event_type type) { + + for (struct aws_hash_iter iter = aws_hash_iter_begin(&record->listeners); !aws_hash_iter_done(&iter); + aws_hash_iter_next(&iter)) { + + struct aws_rr_subscription_listener *listener = iter.element.value; + struct aws_rr_subscription_status_event event = { + .type = type, + .topic_filter = record->topic_filter_cursor, + .operation_id = listener->operation_id, + }; + + (*manager->config.subscription_status_callback)(&event, manager->config.userdata); + } +} + +enum aws_acquire_subscription_result_type aws_rr_subscription_manager_acquire_subscription( + struct aws_rr_subscription_manager *manager, + const struct aws_rr_acquire_subscription_options *options) { struct aws_rr_subscription_record *existing_record = s_get_subscription_record(manager, options->topic_filter); // is no subscription present? if (existing_record == NULL) { + s_cull_unused_subscriptions(manager); // is the budget used up? struct aws_subscription_stats stats; s_get_subscription_stats(manager, &stats); - if (stats.event_stream_subscriptions + stats.request_response_subscriptions >= manager->config.max_subscriptions) { - s_cull_unused_subscriptions(manager); + bool space_for_subscription = + stats.event_stream_subscriptions + stats.request_response_subscriptions < manager->config.max_subscriptions; + if (options->type == ARRST_EVENT_STREAM) { + // event stream subscriptions cannot hog the entire subscription budget + space_for_subscription = + space_for_subscription && (stats.event_stream_subscriptions + 1 < manager->config.max_subscriptions); + } + + if (!space_for_subscription) { // could space eventually free up? if (options->type == ARRST_REQUEST_RESPONSE || stats.request_response_subscriptions > 1) { return AASRT_BLOCKED; } else { return AASRT_NO_CAPACITY; } - } else { - // create-and-add subscription - existing_record = s_aws_rr_subscription_new(manager->allocator, options); - AWS_FATAL_ASSERT(existing_record != NULL); - aws_hash_table_put(&manager->subscriptions, &existing_record->topic_filter_cursor, existing_record, NULL); } + + // create-and-add subscription + existing_record = s_aws_rr_subscription_new(manager->allocator, options); + aws_hash_table_put(&manager->subscriptions, &existing_record->topic_filter_cursor, existing_record, NULL); } AWS_FATAL_ASSERT(existing_record != NULL); AWS_FATAL_ASSERT(existing_record->type == options->type); + s_aws_rr_subscription_record_log_invariant_violations(existing_record); // for simplicity, we require unsubscribes to complete before re-subscribing if (existing_record->pending_action == ARRSPAT_UNSUBSCRIBING) { @@ -231,39 +336,109 @@ enum aws_acquire_subscription_result_type aws_rr_subscription_manager_acquire_su } // do we need to send a subscribe? - if (existing_record->pending_action != ARRSPAT_SUBSCRIBING) { - struct aws_protocol_adapter_subscribe_options subscribe_options = { - .topic_filter = options->topic_filter, - .ack_timeout_seconds = manager->config.operation_timeout_seconds, - }; + if (s_rr_activate_idle_subscription(manager, existing_record)) { + s_emit_subscription_event(manager, existing_record, ARRSET_SUBSCRIPTION_SUBSCRIBE_FAILURE); + return AASRT_FAILURE; + } + + s_aws_rr_subscription_record_log_invariant_violations(existing_record); - if (aws_mqtt_protocol_adapter_subscribe(manager->protocol_adapter, &subscribe_options)) { - s_remove_listener_from_subscription_record(manager, options->topic_filter, options->operation_id); - return AASRT_FAILURE; + return AASRT_SUBSCRIBING; +} + +void aws_rr_subscription_manager_release_subscription( + struct aws_rr_subscription_manager *manager, + const struct aws_rr_release_subscription_options *options) { + s_remove_listener_from_subscription_record(manager, options->topic_filter, options->operation_id); +} + +void aws_rr_subscription_manager_on_protocol_adapter_subscription_event( + struct aws_rr_subscription_manager *manager, + const struct aws_protocol_adapter_subscription_event *event) { + struct aws_rr_subscription_record *record = s_get_subscription_record(manager, event->topic_filter); + if (record == NULL) { + return; + } + + if (event->event_type == AWS_PASET_SUBSCRIBE) { + AWS_FATAL_ASSERT(record->pending_action == ARRSPAT_SUBSCRIBING); + + if (event->error_code == AWS_ERROR_SUCCESS) { + record->status = ARRSST_SUBSCRIBED; + s_emit_subscription_event(manager, record, ARRSET_SUBSCRIPTION_SUBSCRIBE_SUCCESS); + } else { + s_emit_subscription_event(manager, record, ARRSET_SUBSCRIPTION_SUBSCRIBE_FAILURE); } + } else if (event->event_type == AWS_PASET_UNSUBSCRIBE) { + AWS_FATAL_ASSERT(record->pending_action == ARRSPAT_UNSUBSCRIBING); - existing_record->pending_action = ARRSPAT_SUBSCRIBING; + if (event->error_code == AWS_ERROR_SUCCESS) { + record->status = ARRSST_NOT_SUBSCRIBED; + s_emit_subscription_event(manager, record, ARRSET_SUBSCRIPTION_ENDED); + } } - return AASRT_SUBSCRIBING; + record->pending_action = ARRSPAT_NOTHING; + + s_aws_rr_subscription_record_log_invariant_violations(record); +} + +static int s_rr_activate_idle_subscriptions_wrapper(void *context, struct aws_hash_element *elem) { + struct aws_rr_subscription_record *record = elem->value; + struct aws_rr_subscription_manager *manager = context; + + s_rr_activate_idle_subscription(manager, record); + + s_aws_rr_subscription_record_log_invariant_violations(record); + + return AWS_COMMON_HASH_TABLE_ITER_CONTINUE; +} + +static void s_activate_idle_subscriptions(struct aws_rr_subscription_manager *manager) { + aws_hash_table_foreach(&manager->subscriptions, s_rr_activate_idle_subscriptions_wrapper, manager); } -void aws_rr_subscription_manager_release_subscription(struct aws_rr_subscription_manager *manager, struct aws_rr_release_subscription_options *options) { - (void)manager; - (void)options; +static int s_apply_session_lost_wrapper(void *context, struct aws_hash_element *elem) { + struct aws_rr_subscription_record *record = elem->value; + struct aws_rr_subscription_manager *manager = context; + + if (record->status == ARRSST_SUBSCRIBED) { + record->status = ARRSST_NOT_SUBSCRIBED; + s_emit_subscription_event(manager, record, ARRSET_SUBSCRIPTION_ENDED); + + if (record->pending_action != ARRSPAT_UNSUBSCRIBING) { + return AWS_COMMON_HASH_TABLE_ITER_CONTINUE | AWS_COMMON_HASH_TABLE_ITER_DELETE; + } + } + + return AWS_COMMON_HASH_TABLE_ITER_CONTINUE; } -void aws_rr_subscription_manager_on_protocol_adapter_subscription_event(struct aws_rr_subscription_manager *manager, struct aws_protocol_adapter_subscription_event *event) { - (void)manager; - (void)event; +static void s_apply_session_lost(struct aws_rr_subscription_manager *manager) { + aws_hash_table_foreach(&manager->subscriptions, s_apply_session_lost_wrapper, manager); } -void aws_rr_subscription_manager_on_protocol_adapter_connection_event(struct aws_rr_subscription_manager *manager, struct aws_protocol_adapter_connection_event *event) { - (void)manager; - (void)event; +void aws_rr_subscription_manager_on_protocol_adapter_connection_event( + struct aws_rr_subscription_manager *manager, + const struct aws_protocol_adapter_connection_event *event) { + if (event->event_type == AWS_PACET_CONNECTED) { + manager->is_protocol_client_connected = true; + if (!event->joined_session) { + s_apply_session_lost(manager); + } + + s_cull_unused_subscriptions(manager); + s_activate_idle_subscriptions(manager); + } else { + manager->is_protocol_client_connected = false; + } } -int aws_rr_subscription_manager_init(struct aws_rr_subscription_manager *manager, struct aws_allocator *allocator, struct aws_mqtt_protocol_adapter *protocol_adapter, struct aws_rr_subscription_manager_options *options) { +int aws_rr_subscription_manager_init( + struct aws_rr_subscription_manager *manager, + struct aws_allocator *allocator, + struct aws_mqtt_protocol_adapter *protocol_adapter, + const struct aws_rr_subscription_manager_options *options) { AWS_ZERO_STRUCT(*manager); if (options == NULL || options->max_subscriptions < 1 || options->operation_timeout_seconds == 0) { @@ -274,10 +449,14 @@ int aws_rr_subscription_manager_init(struct aws_rr_subscription_manager *manager manager->config = *options; manager->protocol_adapter = protocol_adapter; - if (aws_hash_table_init(&manager->subscriptions, allocator, options->max_subscriptions, aws_hash_byte_cursor_ptr, - aws_mqtt_byte_cursor_hash_equality, NULL, s_aws_rr_subscription_record_destroy)) { - return AWS_OP_ERR; - } + aws_hash_table_init( + &manager->subscriptions, + allocator, + options->max_subscriptions, + aws_hash_byte_cursor_ptr, + aws_mqtt_byte_cursor_hash_equality, + NULL, + s_aws_rr_subscription_record_destroy); manager->is_protocol_client_connected = aws_mqtt_protocol_adapter_is_connected(protocol_adapter);