From 4783ea1cfed65a8c2ab57e325db5ad32a5c25071 Mon Sep 17 00:00:00 2001 From: Bret Ambrose Date: Tue, 30 Jan 2024 11:06:17 -0800 Subject: [PATCH] On further reflection, we need connection state in the subscription manager --- .../request-response/protocol_adapter.h | 21 +++++- source/request-response/protocol_adapter.c | 40 ++++++++--- tests/CMakeLists.txt | 4 +- .../request_response_protocol_adapter_tests.c | 69 +++++++++++-------- 4 files changed, 92 insertions(+), 42 deletions(-) diff --git a/include/aws/mqtt/private/request-response/protocol_adapter.h b/include/aws/mqtt/private/request-response/protocol_adapter.h index f09d6135..6561c5ab 100644 --- a/include/aws/mqtt/private/request-response/protocol_adapter.h +++ b/include/aws/mqtt/private/request-response/protocol_adapter.h @@ -88,10 +88,16 @@ struct aws_protocol_adapter_incoming_publish_event { struct aws_byte_cursor payload; }; +enum aws_protocol_adapter_connection_event_type { + AWS_PACET_CONNECTED, + AWS_PACET_DISCONNECTED, +}; + /* * An event emitted by the protocol adapter whenever the protocol client successfully reconnects to the broker. */ -struct aws_protocol_adapter_session_event { +struct aws_protocol_adapter_connection_event { + enum aws_protocol_adapter_connection_event_type event_type; bool joined_session; }; @@ -101,7 +107,8 @@ typedef void(aws_protocol_adapter_incoming_publish_fn)( struct aws_protocol_adapter_incoming_publish_event *publish, void *user_data); typedef void(aws_protocol_adapter_terminate_callback_fn)(void *user_data); -typedef void(aws_protocol_adapter_session_event_fn)(struct aws_protocol_adapter_session_event *event, void *user_data); +typedef void( + aws_protocol_adapter_connection_event_fn)(struct aws_protocol_adapter_connection_event *event, void *user_data); /* * Set of callbacks invoked by the protocol adapter. These must all be set. @@ -110,7 +117,7 @@ struct aws_mqtt_protocol_adapter_options { aws_protocol_adapter_subscription_event_fn *subscription_event_callback; aws_protocol_adapter_incoming_publish_fn *incoming_publish_callback; aws_protocol_adapter_terminate_callback_fn *terminate_callback; - aws_protocol_adapter_session_event_fn *session_event_callback; + aws_protocol_adapter_connection_event_fn *connection_event_callback; /* * User data to pass into all singleton protocol adapter callbacks. Likely either the request-response client @@ -128,6 +135,8 @@ struct aws_mqtt_protocol_adapter_vtable { int (*aws_mqtt_protocol_adapter_unsubscribe_fn)(void *, struct aws_protocol_adapter_unsubscribe_options *); int (*aws_mqtt_protocol_adapter_publish_fn)(void *, struct aws_protocol_adapter_publish_options *); + + bool (*aws_mqtt_protocol_adapter_is_connected_fn)(void *); }; struct aws_mqtt_protocol_adapter { @@ -180,6 +189,12 @@ AWS_MQTT_API int aws_mqtt_protocol_adapter_publish( struct aws_mqtt_protocol_adapter *adapter, struct aws_protocol_adapter_publish_options *options); +/* + * Synchronously checks the connection state of the adapted protocol client. May only be called from the + * protocol client's event loop. + */ +AWS_MQTT_API bool aws_mqtt_protocol_adapter_is_connected(struct aws_mqtt_protocol_adapter *adapter); + AWS_EXTERN_C_END #endif /* AWS_MQTT_PRIVATE_REQUEST_RESPONSE_PROTOCOL_ADAPTER_H */ diff --git a/source/request-response/protocol_adapter.c b/source/request-response/protocol_adapter.c index 68cc8667..ac8105d4 100644 --- a/source/request-response/protocol_adapter.c +++ b/source/request-response/protocol_adapter.c @@ -309,6 +309,16 @@ int s_aws_mqtt_protocol_adapter_5_publish(void *impl, struct aws_protocol_adapte return AWS_OP_ERR; } +static bool s_aws_mqtt_protocol_adapter_5_is_connected(void *impl) { + struct aws_mqtt_protocol_adapter_5_impl *adapter = impl; + + AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(adapter->client->loop)); + + enum aws_mqtt5_client_state current_state = adapter->client->current_state; + + return current_state == AWS_MCS_CONNECTED; +} + static bool s_protocol_adapter_mqtt5_listener_publish_received( const struct aws_mqtt5_packet_publish_view *publish, void *user_data) { @@ -327,15 +337,25 @@ static bool s_protocol_adapter_mqtt5_listener_publish_received( static void s_protocol_adapter_mqtt5_lifecycle_event_callback(const struct aws_mqtt5_client_lifecycle_event *event) { struct aws_mqtt_protocol_adapter_5_impl *adapter = event->user_data; - if (event->event_type != AWS_MQTT5_CLET_CONNECTION_SUCCESS) { - return; - } + switch (event->event_type) { + case AWS_MQTT5_CLET_CONNECTION_SUCCESS: { + struct aws_protocol_adapter_connection_event connection_event = { + .event_type = AWS_PACET_CONNECTED, + .joined_session = event->settings->rejoined_session, + }; - struct aws_protocol_adapter_session_event session_event = { - .joined_session = event->settings->rejoined_session, - }; + (*adapter->config.connection_event_callback)(&connection_event, adapter->config.user_data); + } + case AWS_MQTT5_CLET_DISCONNECTION: { + struct aws_protocol_adapter_connection_event connection_event = { + .event_type = AWS_PACET_DISCONNECTED, + }; - (*adapter->config.session_event_callback)(&session_event, adapter->config.user_data); + (*adapter->config.connection_event_callback)(&connection_event, adapter->config.user_data); + } + default: + break; + } } static void s_protocol_adapter_mqtt5_listener_termination_callback(void *user_data) { @@ -363,7 +383,7 @@ static struct aws_mqtt_protocol_adapter_vtable s_protocol_adapter_mqtt5_vtable = .aws_mqtt_protocol_adapter_subscribe_fn = s_aws_mqtt_protocol_adapter_5_subscribe, .aws_mqtt_protocol_adapter_unsubscribe_fn = s_aws_mqtt_protocol_adapter_5_unsubscribe, .aws_mqtt_protocol_adapter_publish_fn = s_aws_mqtt_protocol_adapter_5_publish, -}; + .aws_mqtt_protocol_adapter_is_connected_fn = s_aws_mqtt_protocol_adapter_5_is_connected}; struct aws_mqtt_protocol_adapter *aws_mqtt_protocol_adapter_new_from_5( struct aws_allocator *allocator, @@ -425,3 +445,7 @@ int aws_mqtt_protocol_adapter_publish( struct aws_protocol_adapter_publish_options *options) { return (*adapter->vtable->aws_mqtt_protocol_adapter_publish_fn)(adapter->impl, options); } + +bool aws_mqtt_protocol_adapter_is_connected(struct aws_mqtt_protocol_adapter *adapter) { + return (*adapter->vtable->aws_mqtt_protocol_adapter_is_connected_fn)(adapter->impl); +} diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index d93b7427..da1bf3a4 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -454,8 +454,8 @@ add_test_case(request_response_mqtt5_protocol_adapter_publish_success) add_test_case(request_response_mqtt5_protocol_adapter_publish_failure_error_code) add_test_case(request_response_mqtt5_protocol_adapter_publish_failure_reason_code) add_test_case(request_response_mqtt5_protocol_adapter_publish_failure_timeout) -add_test_case(request_response_mqtt5_protocol_adapter_session_event_no_rejoin) -add_test_case(request_response_mqtt5_protocol_adapter_session_event_rejoin) +add_test_case(request_response_mqtt5_protocol_adapter_connection_event_connect_no_session) +add_test_case(request_response_mqtt5_protocol_adapter_connection_event_connect_session) add_test_case(request_response_mqtt5_protocol_adapter_incoming_publish) add_test_case(request_response_mqtt5_protocol_adapter_shutdown_while_pending) diff --git a/tests/request-response/request_response_protocol_adapter_tests.c b/tests/request-response/request_response_protocol_adapter_tests.c index 2034f460..2ffca2aa 100644 --- a/tests/request-response/request_response_protocol_adapter_tests.c +++ b/tests/request-response/request_response_protocol_adapter_tests.c @@ -63,7 +63,7 @@ struct aws_request_response_mqtt5_adapter_test_fixture { struct aws_mqtt_protocol_adapter *protocol_adapter; struct aws_array_list incoming_publish_events; - struct aws_array_list session_events; + struct aws_array_list connection_events; struct aws_array_list subscription_events; struct aws_array_list publish_results; @@ -113,13 +113,13 @@ static void s_rr_mqtt5_protocol_adapter_test_on_terminate_callback(void *user_da aws_condition_variable_notify_all(&fixture->signal); } -static void s_rr_mqtt5_protocol_adapter_test_on_session_event( - struct aws_protocol_adapter_session_event *event, +static void s_rr_mqtt5_protocol_adapter_test_on_connection_event( + struct aws_protocol_adapter_connection_event *event, void *user_data) { struct aws_request_response_mqtt5_adapter_test_fixture *fixture = user_data; aws_mutex_lock(&fixture->lock); - aws_array_list_push_back(&fixture->session_events, event); + aws_array_list_push_back(&fixture->connection_events, event); aws_mutex_unlock(&fixture->lock); aws_condition_variable_notify_all(&fixture->signal); } @@ -150,7 +150,7 @@ static int s_aws_request_response_mqtt5_adapter_test_fixture_init( .subscription_event_callback = s_rr_mqtt5_protocol_adapter_test_on_subscription_event, .incoming_publish_callback = s_rr_mqtt5_protocol_adapter_test_on_incoming_publish, .terminate_callback = s_rr_mqtt5_protocol_adapter_test_on_terminate_callback, - .session_event_callback = s_rr_mqtt5_protocol_adapter_test_on_session_event, + .connection_event_callback = s_rr_mqtt5_protocol_adapter_test_on_connection_event, .user_data = fixture}; fixture->protocol_adapter = @@ -163,7 +163,7 @@ static int s_aws_request_response_mqtt5_adapter_test_fixture_init( 10, sizeof(struct request_response_protocol_adapter_incoming_publish_event_record)); aws_array_list_init_dynamic( - &fixture->session_events, allocator, 10, sizeof(struct aws_protocol_adapter_session_event)); + &fixture->connection_events, allocator, 10, sizeof(struct aws_protocol_adapter_connection_event)); aws_array_list_init_dynamic( &fixture->subscription_events, allocator, @@ -216,7 +216,7 @@ static void s_aws_request_response_mqtt5_adapter_test_fixture_clean_up( } aws_array_list_clean_up(&fixture->incoming_publish_events); - aws_array_list_clean_up(&fixture->session_events); + aws_array_list_clean_up(&fixture->connection_events); aws_array_list_clean_up(&fixture->publish_results); aws_mutex_clean_up(&fixture->lock); @@ -276,21 +276,25 @@ static void s_wait_for_subscription_events_contains( aws_mutex_unlock(&fixture->lock); } -struct test_session_event_wait_context { - struct aws_protocol_adapter_session_event *expected_event; +struct test_connection_event_wait_context { + struct aws_protocol_adapter_connection_event *expected_event; size_t expected_count; struct aws_request_response_mqtt5_adapter_test_fixture *fixture; }; -static bool s_do_session_events_contain(void *context) { - struct test_session_event_wait_context *wait_context = context; +static bool s_do_connection_events_contain(void *context) { + struct test_connection_event_wait_context *wait_context = context; size_t found = 0; - size_t num_events = aws_array_list_length(&wait_context->fixture->session_events); + size_t num_events = aws_array_list_length(&wait_context->fixture->connection_events); for (size_t i = 0; i < num_events; ++i) { - struct aws_protocol_adapter_session_event record; - aws_array_list_get_at(&wait_context->fixture->session_events, &record, i); + struct aws_protocol_adapter_connection_event record; + aws_array_list_get_at(&wait_context->fixture->connection_events, &record, i); + + if (record.event_type != wait_context->expected_event->event_type) { + continue; + } if (record.joined_session != wait_context->expected_event->joined_session) { continue; @@ -302,19 +306,19 @@ static bool s_do_session_events_contain(void *context) { return found >= wait_context->expected_count; } -static void s_wait_for_session_events_contains( +static void s_wait_for_connection_events_contains( struct aws_request_response_mqtt5_adapter_test_fixture *fixture, - struct aws_protocol_adapter_session_event *expected_event, + struct aws_protocol_adapter_connection_event *expected_event, size_t expected_count) { - struct test_session_event_wait_context context = { + struct test_connection_event_wait_context context = { .expected_event = expected_event, .expected_count = expected_count, .fixture = fixture, }; aws_mutex_lock(&fixture->lock); - aws_condition_variable_wait_pred(&fixture->signal, &fixture->lock, s_do_session_events_contain, &context); + aws_condition_variable_wait_pred(&fixture->signal, &fixture->lock, s_do_connection_events_contain, &context); aws_mutex_unlock(&fixture->lock); } @@ -843,7 +847,7 @@ AWS_TEST_CASE( request_response_mqtt5_protocol_adapter_publish_failure_error_code, s_request_response_mqtt5_protocol_adapter_publish_failure_error_code_fn) -static int s_do_request_response_mqtt5_protocol_adapter_session_event_test( +static int s_do_request_response_mqtt5_protocol_adapter_connection_event_connect_test( struct aws_allocator *allocator, bool rejoin_session) { aws_mqtt_library_init(allocator); @@ -866,16 +870,23 @@ static int s_do_request_response_mqtt5_protocol_adapter_session_event_test( struct aws_mqtt5_client *client = fixture.mqtt5_fixture.client; - struct aws_protocol_adapter_session_event expected_session_record = { + struct aws_protocol_adapter_connection_event expected_connect_record = { + .event_type = AWS_PACET_CONNECTED, .joined_session = rejoin_session, }; ASSERT_SUCCESS(aws_mqtt5_client_start(client)); - s_wait_for_session_events_contains(&fixture, &expected_session_record, 1); + s_wait_for_connection_events_contains(&fixture, &expected_connect_record, 1); ASSERT_SUCCESS(aws_mqtt5_client_stop(client, NULL, NULL)); aws_wait_for_stopped_lifecycle_event(&fixture.mqtt5_fixture); + struct aws_protocol_adapter_connection_event expected_disconnect_record = { + .event_type = AWS_PACET_DISCONNECTED, + }; + + s_wait_for_connection_events_contains(&fixture, &expected_disconnect_record, 1); + s_aws_request_response_mqtt5_adapter_test_fixture_clean_up(&fixture); aws_mqtt_library_clean_up(); @@ -883,29 +894,29 @@ static int s_do_request_response_mqtt5_protocol_adapter_session_event_test( return AWS_OP_SUCCESS; } -static int s_request_response_mqtt5_protocol_adapter_session_event_no_rejoin_fn( +static int s_request_response_mqtt5_protocol_adapter_connection_event_connect_no_session_fn( struct aws_allocator *allocator, void *ctx) { (void)ctx; - return s_do_request_response_mqtt5_protocol_adapter_session_event_test(allocator, false); + return s_do_request_response_mqtt5_protocol_adapter_connection_event_connect_test(allocator, false); } AWS_TEST_CASE( - request_response_mqtt5_protocol_adapter_session_event_no_rejoin, - s_request_response_mqtt5_protocol_adapter_session_event_no_rejoin_fn) + request_response_mqtt5_protocol_adapter_connection_event_connect_no_session, + s_request_response_mqtt5_protocol_adapter_connection_event_connect_no_session_fn) -static int s_request_response_mqtt5_protocol_adapter_session_event_rejoin_fn( +static int s_request_response_mqtt5_protocol_adapter_connection_event_connect_session_fn( struct aws_allocator *allocator, void *ctx) { (void)ctx; - return s_do_request_response_mqtt5_protocol_adapter_session_event_test(allocator, true); + return s_do_request_response_mqtt5_protocol_adapter_connection_event_connect_test(allocator, true); } AWS_TEST_CASE( - request_response_mqtt5_protocol_adapter_session_event_rejoin, - s_request_response_mqtt5_protocol_adapter_session_event_rejoin_fn) + request_response_mqtt5_protocol_adapter_connection_event_connect_session, + s_request_response_mqtt5_protocol_adapter_connection_event_connect_session_fn) static int s_request_response_mqtt5_protocol_adapter_incoming_publish_fn(struct aws_allocator *allocator, void *ctx) { (void)ctx;