Skip to content

Commit

Permalink
Checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
Bret Ambrose committed Feb 22, 2024
1 parent 212e6c7 commit b9cee40
Show file tree
Hide file tree
Showing 4 changed files with 276 additions and 65 deletions.
1 change: 1 addition & 0 deletions include/aws/mqtt/mqtt.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ enum aws_mqtt_log_subject {
AWS_LS_MQTT5_CLIENT,
AWS_LS_MQTT5_CANARY,
AWS_LS_MQTT5_TO_MQTT3_ADAPTER,
AWS_LS_MQTT_REQUEST_RESPONSE,
};

/** Function called on cleanup of a userdata. */
Expand Down
50 changes: 40 additions & 10 deletions include/aws/mqtt/private/request-response/subscription_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ struct aws_protocol_adapter_connection_event;
struct aws_protocol_adapter_subscription_event;

enum aws_rr_subscription_event_type {
ARRSET_SUBSCRIPTION_SUCCESS,
ARRSET_SUBSCRIPTION_FAILURE,
ARRSET_SUBSCRIPTION_LOST
ARRSET_SUBSCRIPTION_SUBSCRIBE_SUCCESS,
ARRSET_SUBSCRIPTION_SUBSCRIBE_FAILURE,
ARRSET_SUBSCRIPTION_ENDED
};

struct aws_rr_subscription_status_event {
Expand All @@ -26,7 +26,13 @@ struct aws_rr_subscription_status_event {
uint64_t operation_id;
};

typedef void (aws_rr_subscription_status_event_callbacK_fn)(struct aws_rr_subscription_status_event, void *userdata);
/*
* Invariant: despite being on the same thread, these callbacks must be queued as cross-thread tasks on the native
* request-response client. This allows us to iterate internal collections without worrying about external
* callers disrupting things by invoking APIs back on us.
*/
typedef void(
aws_rr_subscription_status_event_callbacK_fn)(const struct aws_rr_subscription_status_event *event, void *userdata);

struct aws_rr_subscription_manager_options {
size_t max_subscriptions;
Expand All @@ -36,6 +42,18 @@ struct aws_rr_subscription_manager_options {
void *userdata;
};

/*
* The subscription manager works from a purely lazy perspective. Unsubscribes (from topic filters that are no longer
* referenced) occur when looking for new subscription space. Unsubscribe failures don't trigger anything special,
* we'll just try again next time we look for subscription space. Subscribes are attempted on idle subscriptions
* that still need them, either in response to a new operation listener or a connection resumption event.
*
* We only allow one subscribe or unsubscribe to be outstanding at once for a given topic. If an operation requires a
* subscription while an unsubscribe is in progress the operation is blocked until the unsubscribe resolves.
*
* These invariants are dropped during shutdown. In that case, we immediately send unsubscribes for everything
* that is not already unsubscribing.
*/
struct aws_rr_subscription_manager {
struct aws_allocator *allocator;

Expand All @@ -44,7 +62,7 @@ struct aws_rr_subscription_manager {
/* non-owning reference; the client is responsible for destroying this asynchronously (listener detachment) */
struct aws_mqtt_protocol_adapter *protocol_adapter;

/* &aws_request_response_subscription.topic_filter_cursor -> aws_request_response_subscription * */
/* &aws_rr_subscription_record.topic_filter_cursor -> aws_rr_subscription_record * */
struct aws_hash_table subscriptions;

bool is_protocol_client_connected;
Expand Down Expand Up @@ -76,17 +94,29 @@ enum aws_acquire_subscription_result_type {

AWS_EXTERN_C_BEGIN

int aws_rr_subscription_manager_init(struct aws_rr_subscription_manager *manager, struct aws_allocator *allocator, struct aws_mqtt_protocol_adapter *protocol_adapter, struct aws_rr_subscription_manager_options *options);
int aws_rr_subscription_manager_init(
struct aws_rr_subscription_manager *manager,
struct aws_allocator *allocator,
struct aws_mqtt_protocol_adapter *protocol_adapter,
const struct aws_rr_subscription_manager_options *options);

void aws_rr_subscription_manager_clean_up(struct aws_rr_subscription_manager *manager);

enum aws_acquire_subscription_result_type aws_rr_subscription_manager_acquire_subscription(struct aws_rr_subscription_manager *manager, struct aws_rr_acquire_subscription_options *options);
enum aws_acquire_subscription_result_type aws_rr_subscription_manager_acquire_subscription(
struct aws_rr_subscription_manager *manager,
const struct aws_rr_acquire_subscription_options *options);

void aws_rr_subscription_manager_release_subscription(struct aws_rr_subscription_manager *manager, struct aws_rr_release_subscription_options *options);
void aws_rr_subscription_manager_release_subscription(
struct aws_rr_subscription_manager *manager,
const struct aws_rr_release_subscription_options *options);

void aws_rr_subscription_manager_on_protocol_adapter_subscription_event(struct aws_rr_subscription_manager *manager, struct aws_protocol_adapter_subscription_event *event);
void aws_rr_subscription_manager_on_protocol_adapter_subscription_event(
struct aws_rr_subscription_manager *manager,
const struct aws_protocol_adapter_subscription_event *event);

void aws_rr_subscription_manager_on_protocol_adapter_connection_event(struct aws_rr_subscription_manager *manager, struct aws_protocol_adapter_connection_event *event);
void aws_rr_subscription_manager_on_protocol_adapter_connection_event(
struct aws_rr_subscription_manager *manager,
const struct aws_protocol_adapter_connection_event *event);

AWS_EXTERN_C_END

Expand Down
1 change: 1 addition & 0 deletions source/mqtt.c
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ static struct aws_error_info_list s_error_list = {
DEFINE_LOG_SUBJECT_INFO(AWS_LS_MQTT5_CLIENT, "mqtt5-client", "MQTT5 client and connections"),
DEFINE_LOG_SUBJECT_INFO(AWS_LS_MQTT5_CANARY, "mqtt5-canary", "MQTT5 canary logging"),
DEFINE_LOG_SUBJECT_INFO(AWS_LS_MQTT5_TO_MQTT3_ADAPTER, "mqtt5-to-mqtt3-adapter", "MQTT5-To-MQTT3 adapter logging"),
DEFINE_LOG_SUBJECT_INFO(AWS_LS_MQTT_REQUEST_RESPONSE, "mqtt-request-response-client", "MQTT request-response client logging"),
};
/* clang-format on */

Expand Down
Loading

0 comments on commit b9cee40

Please sign in to comment.