Skip to content

Commit

Permalink
On further reflection, we need connection state in the subscription m…
Browse files Browse the repository at this point in the history
…anager
  • Loading branch information
bretambrose committed Jan 30, 2024
1 parent 426878f commit 4783ea1
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 42 deletions.
21 changes: 18 additions & 3 deletions include/aws/mqtt/private/request-response/protocol_adapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,16 @@ struct aws_protocol_adapter_incoming_publish_event {
struct aws_byte_cursor payload;
};

enum aws_protocol_adapter_connection_event_type {
AWS_PACET_CONNECTED,
AWS_PACET_DISCONNECTED,
};

/*
* An event emitted by the protocol adapter whenever the protocol client successfully reconnects to the broker.
*/
struct aws_protocol_adapter_session_event {
struct aws_protocol_adapter_connection_event {
enum aws_protocol_adapter_connection_event_type event_type;
bool joined_session;
};

Expand All @@ -101,7 +107,8 @@ typedef void(aws_protocol_adapter_incoming_publish_fn)(
struct aws_protocol_adapter_incoming_publish_event *publish,
void *user_data);
typedef void(aws_protocol_adapter_terminate_callback_fn)(void *user_data);
typedef void(aws_protocol_adapter_session_event_fn)(struct aws_protocol_adapter_session_event *event, void *user_data);
typedef void(
aws_protocol_adapter_connection_event_fn)(struct aws_protocol_adapter_connection_event *event, void *user_data);

/*
* Set of callbacks invoked by the protocol adapter. These must all be set.
Expand All @@ -110,7 +117,7 @@ struct aws_mqtt_protocol_adapter_options {
aws_protocol_adapter_subscription_event_fn *subscription_event_callback;
aws_protocol_adapter_incoming_publish_fn *incoming_publish_callback;
aws_protocol_adapter_terminate_callback_fn *terminate_callback;
aws_protocol_adapter_session_event_fn *session_event_callback;
aws_protocol_adapter_connection_event_fn *connection_event_callback;

/*
* User data to pass into all singleton protocol adapter callbacks. Likely either the request-response client
Expand All @@ -128,6 +135,8 @@ struct aws_mqtt_protocol_adapter_vtable {
int (*aws_mqtt_protocol_adapter_unsubscribe_fn)(void *, struct aws_protocol_adapter_unsubscribe_options *);

int (*aws_mqtt_protocol_adapter_publish_fn)(void *, struct aws_protocol_adapter_publish_options *);

bool (*aws_mqtt_protocol_adapter_is_connected_fn)(void *);
};

struct aws_mqtt_protocol_adapter {
Expand Down Expand Up @@ -180,6 +189,12 @@ AWS_MQTT_API int aws_mqtt_protocol_adapter_publish(
struct aws_mqtt_protocol_adapter *adapter,
struct aws_protocol_adapter_publish_options *options);

/*
* Synchronously checks the connection state of the adapted protocol client. May only be called from the
* protocol client's event loop.
*/
AWS_MQTT_API bool aws_mqtt_protocol_adapter_is_connected(struct aws_mqtt_protocol_adapter *adapter);

AWS_EXTERN_C_END

#endif /* AWS_MQTT_PRIVATE_REQUEST_RESPONSE_PROTOCOL_ADAPTER_H */
40 changes: 32 additions & 8 deletions source/request-response/protocol_adapter.c
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,16 @@ int s_aws_mqtt_protocol_adapter_5_publish(void *impl, struct aws_protocol_adapte
return AWS_OP_ERR;
}

static bool s_aws_mqtt_protocol_adapter_5_is_connected(void *impl) {
struct aws_mqtt_protocol_adapter_5_impl *adapter = impl;

AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(adapter->client->loop));

enum aws_mqtt5_client_state current_state = adapter->client->current_state;

return current_state == AWS_MCS_CONNECTED;
}

static bool s_protocol_adapter_mqtt5_listener_publish_received(
const struct aws_mqtt5_packet_publish_view *publish,
void *user_data) {
Expand All @@ -327,15 +337,25 @@ static bool s_protocol_adapter_mqtt5_listener_publish_received(
static void s_protocol_adapter_mqtt5_lifecycle_event_callback(const struct aws_mqtt5_client_lifecycle_event *event) {
struct aws_mqtt_protocol_adapter_5_impl *adapter = event->user_data;

if (event->event_type != AWS_MQTT5_CLET_CONNECTION_SUCCESS) {
return;
}
switch (event->event_type) {
case AWS_MQTT5_CLET_CONNECTION_SUCCESS: {
struct aws_protocol_adapter_connection_event connection_event = {
.event_type = AWS_PACET_CONNECTED,
.joined_session = event->settings->rejoined_session,
};

struct aws_protocol_adapter_session_event session_event = {
.joined_session = event->settings->rejoined_session,
};
(*adapter->config.connection_event_callback)(&connection_event, adapter->config.user_data);
}
case AWS_MQTT5_CLET_DISCONNECTION: {
struct aws_protocol_adapter_connection_event connection_event = {
.event_type = AWS_PACET_DISCONNECTED,
};

(*adapter->config.session_event_callback)(&session_event, adapter->config.user_data);
(*adapter->config.connection_event_callback)(&connection_event, adapter->config.user_data);
}
default:
break;
}
}

static void s_protocol_adapter_mqtt5_listener_termination_callback(void *user_data) {
Expand Down Expand Up @@ -363,7 +383,7 @@ static struct aws_mqtt_protocol_adapter_vtable s_protocol_adapter_mqtt5_vtable =
.aws_mqtt_protocol_adapter_subscribe_fn = s_aws_mqtt_protocol_adapter_5_subscribe,
.aws_mqtt_protocol_adapter_unsubscribe_fn = s_aws_mqtt_protocol_adapter_5_unsubscribe,
.aws_mqtt_protocol_adapter_publish_fn = s_aws_mqtt_protocol_adapter_5_publish,
};
.aws_mqtt_protocol_adapter_is_connected_fn = s_aws_mqtt_protocol_adapter_5_is_connected};

struct aws_mqtt_protocol_adapter *aws_mqtt_protocol_adapter_new_from_5(
struct aws_allocator *allocator,
Expand Down Expand Up @@ -425,3 +445,7 @@ int aws_mqtt_protocol_adapter_publish(
struct aws_protocol_adapter_publish_options *options) {
return (*adapter->vtable->aws_mqtt_protocol_adapter_publish_fn)(adapter->impl, options);
}

bool aws_mqtt_protocol_adapter_is_connected(struct aws_mqtt_protocol_adapter *adapter) {
return (*adapter->vtable->aws_mqtt_protocol_adapter_is_connected_fn)(adapter->impl);
}
4 changes: 2 additions & 2 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -454,8 +454,8 @@ add_test_case(request_response_mqtt5_protocol_adapter_publish_success)
add_test_case(request_response_mqtt5_protocol_adapter_publish_failure_error_code)
add_test_case(request_response_mqtt5_protocol_adapter_publish_failure_reason_code)
add_test_case(request_response_mqtt5_protocol_adapter_publish_failure_timeout)
add_test_case(request_response_mqtt5_protocol_adapter_session_event_no_rejoin)
add_test_case(request_response_mqtt5_protocol_adapter_session_event_rejoin)
add_test_case(request_response_mqtt5_protocol_adapter_connection_event_connect_no_session)
add_test_case(request_response_mqtt5_protocol_adapter_connection_event_connect_session)
add_test_case(request_response_mqtt5_protocol_adapter_incoming_publish)
add_test_case(request_response_mqtt5_protocol_adapter_shutdown_while_pending)

Expand Down
69 changes: 40 additions & 29 deletions tests/request-response/request_response_protocol_adapter_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ struct aws_request_response_mqtt5_adapter_test_fixture {
struct aws_mqtt_protocol_adapter *protocol_adapter;

struct aws_array_list incoming_publish_events;
struct aws_array_list session_events;
struct aws_array_list connection_events;
struct aws_array_list subscription_events;
struct aws_array_list publish_results;

Expand Down Expand Up @@ -113,13 +113,13 @@ static void s_rr_mqtt5_protocol_adapter_test_on_terminate_callback(void *user_da
aws_condition_variable_notify_all(&fixture->signal);
}

static void s_rr_mqtt5_protocol_adapter_test_on_session_event(
struct aws_protocol_adapter_session_event *event,
static void s_rr_mqtt5_protocol_adapter_test_on_connection_event(
struct aws_protocol_adapter_connection_event *event,
void *user_data) {
struct aws_request_response_mqtt5_adapter_test_fixture *fixture = user_data;

aws_mutex_lock(&fixture->lock);
aws_array_list_push_back(&fixture->session_events, event);
aws_array_list_push_back(&fixture->connection_events, event);
aws_mutex_unlock(&fixture->lock);
aws_condition_variable_notify_all(&fixture->signal);
}
Expand Down Expand Up @@ -150,7 +150,7 @@ static int s_aws_request_response_mqtt5_adapter_test_fixture_init(
.subscription_event_callback = s_rr_mqtt5_protocol_adapter_test_on_subscription_event,
.incoming_publish_callback = s_rr_mqtt5_protocol_adapter_test_on_incoming_publish,
.terminate_callback = s_rr_mqtt5_protocol_adapter_test_on_terminate_callback,
.session_event_callback = s_rr_mqtt5_protocol_adapter_test_on_session_event,
.connection_event_callback = s_rr_mqtt5_protocol_adapter_test_on_connection_event,
.user_data = fixture};

fixture->protocol_adapter =
Expand All @@ -163,7 +163,7 @@ static int s_aws_request_response_mqtt5_adapter_test_fixture_init(
10,
sizeof(struct request_response_protocol_adapter_incoming_publish_event_record));
aws_array_list_init_dynamic(
&fixture->session_events, allocator, 10, sizeof(struct aws_protocol_adapter_session_event));
&fixture->connection_events, allocator, 10, sizeof(struct aws_protocol_adapter_connection_event));
aws_array_list_init_dynamic(
&fixture->subscription_events,
allocator,
Expand Down Expand Up @@ -216,7 +216,7 @@ static void s_aws_request_response_mqtt5_adapter_test_fixture_clean_up(
}
aws_array_list_clean_up(&fixture->incoming_publish_events);

aws_array_list_clean_up(&fixture->session_events);
aws_array_list_clean_up(&fixture->connection_events);
aws_array_list_clean_up(&fixture->publish_results);

aws_mutex_clean_up(&fixture->lock);
Expand Down Expand Up @@ -276,21 +276,25 @@ static void s_wait_for_subscription_events_contains(
aws_mutex_unlock(&fixture->lock);
}

struct test_session_event_wait_context {
struct aws_protocol_adapter_session_event *expected_event;
struct test_connection_event_wait_context {
struct aws_protocol_adapter_connection_event *expected_event;
size_t expected_count;
struct aws_request_response_mqtt5_adapter_test_fixture *fixture;
};

static bool s_do_session_events_contain(void *context) {
struct test_session_event_wait_context *wait_context = context;
static bool s_do_connection_events_contain(void *context) {
struct test_connection_event_wait_context *wait_context = context;

size_t found = 0;

size_t num_events = aws_array_list_length(&wait_context->fixture->session_events);
size_t num_events = aws_array_list_length(&wait_context->fixture->connection_events);
for (size_t i = 0; i < num_events; ++i) {
struct aws_protocol_adapter_session_event record;
aws_array_list_get_at(&wait_context->fixture->session_events, &record, i);
struct aws_protocol_adapter_connection_event record;
aws_array_list_get_at(&wait_context->fixture->connection_events, &record, i);

if (record.event_type != wait_context->expected_event->event_type) {
continue;
}

if (record.joined_session != wait_context->expected_event->joined_session) {
continue;
Expand All @@ -302,19 +306,19 @@ static bool s_do_session_events_contain(void *context) {
return found >= wait_context->expected_count;
}

static void s_wait_for_session_events_contains(
static void s_wait_for_connection_events_contains(
struct aws_request_response_mqtt5_adapter_test_fixture *fixture,
struct aws_protocol_adapter_session_event *expected_event,
struct aws_protocol_adapter_connection_event *expected_event,
size_t expected_count) {

struct test_session_event_wait_context context = {
struct test_connection_event_wait_context context = {
.expected_event = expected_event,
.expected_count = expected_count,
.fixture = fixture,
};

aws_mutex_lock(&fixture->lock);
aws_condition_variable_wait_pred(&fixture->signal, &fixture->lock, s_do_session_events_contain, &context);
aws_condition_variable_wait_pred(&fixture->signal, &fixture->lock, s_do_connection_events_contain, &context);
aws_mutex_unlock(&fixture->lock);
}

Expand Down Expand Up @@ -843,7 +847,7 @@ AWS_TEST_CASE(
request_response_mqtt5_protocol_adapter_publish_failure_error_code,
s_request_response_mqtt5_protocol_adapter_publish_failure_error_code_fn)

static int s_do_request_response_mqtt5_protocol_adapter_session_event_test(
static int s_do_request_response_mqtt5_protocol_adapter_connection_event_connect_test(
struct aws_allocator *allocator,
bool rejoin_session) {
aws_mqtt_library_init(allocator);
Expand All @@ -866,46 +870,53 @@ static int s_do_request_response_mqtt5_protocol_adapter_session_event_test(

struct aws_mqtt5_client *client = fixture.mqtt5_fixture.client;

struct aws_protocol_adapter_session_event expected_session_record = {
struct aws_protocol_adapter_connection_event expected_connect_record = {
.event_type = AWS_PACET_CONNECTED,
.joined_session = rejoin_session,
};

ASSERT_SUCCESS(aws_mqtt5_client_start(client));
s_wait_for_session_events_contains(&fixture, &expected_session_record, 1);
s_wait_for_connection_events_contains(&fixture, &expected_connect_record, 1);

ASSERT_SUCCESS(aws_mqtt5_client_stop(client, NULL, NULL));
aws_wait_for_stopped_lifecycle_event(&fixture.mqtt5_fixture);

struct aws_protocol_adapter_connection_event expected_disconnect_record = {
.event_type = AWS_PACET_DISCONNECTED,
};

s_wait_for_connection_events_contains(&fixture, &expected_disconnect_record, 1);

s_aws_request_response_mqtt5_adapter_test_fixture_clean_up(&fixture);

aws_mqtt_library_clean_up();

return AWS_OP_SUCCESS;
}

static int s_request_response_mqtt5_protocol_adapter_session_event_no_rejoin_fn(
static int s_request_response_mqtt5_protocol_adapter_connection_event_connect_no_session_fn(
struct aws_allocator *allocator,
void *ctx) {
(void)ctx;

return s_do_request_response_mqtt5_protocol_adapter_session_event_test(allocator, false);
return s_do_request_response_mqtt5_protocol_adapter_connection_event_connect_test(allocator, false);
}

AWS_TEST_CASE(
request_response_mqtt5_protocol_adapter_session_event_no_rejoin,
s_request_response_mqtt5_protocol_adapter_session_event_no_rejoin_fn)
request_response_mqtt5_protocol_adapter_connection_event_connect_no_session,
s_request_response_mqtt5_protocol_adapter_connection_event_connect_no_session_fn)

static int s_request_response_mqtt5_protocol_adapter_session_event_rejoin_fn(
static int s_request_response_mqtt5_protocol_adapter_connection_event_connect_session_fn(
struct aws_allocator *allocator,
void *ctx) {
(void)ctx;

return s_do_request_response_mqtt5_protocol_adapter_session_event_test(allocator, true);
return s_do_request_response_mqtt5_protocol_adapter_connection_event_connect_test(allocator, true);
}

AWS_TEST_CASE(
request_response_mqtt5_protocol_adapter_session_event_rejoin,
s_request_response_mqtt5_protocol_adapter_session_event_rejoin_fn)
request_response_mqtt5_protocol_adapter_connection_event_connect_session,
s_request_response_mqtt5_protocol_adapter_connection_event_connect_session_fn)

static int s_request_response_mqtt5_protocol_adapter_incoming_publish_fn(struct aws_allocator *allocator, void *ctx) {
(void)ctx;
Expand Down

0 comments on commit 4783ea1

Please sign in to comment.