Skip to content

Commit

Permalink
Convert to session event
Browse files Browse the repository at this point in the history
  • Loading branch information
bretambrose committed Jan 24, 2024
1 parent b1fe1f9 commit e5f93e1
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 84 deletions.
20 changes: 5 additions & 15 deletions include/aws/mqtt/private/request-response/protocol_adapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,19 +90,10 @@ struct aws_protocol_adapter_incoming_publish_event {
};

/*
* Describes the type of connection event emitted by the protocol adapter
* An event emitted by the protocol adapter whenever the protocol client successfully reconnects to the broker.
*/
enum aws_protocol_adapter_connection_event_type {
AWS_PACET_OFFLINE,
AWS_PACET_ONLINE,
};

/*
* An event emitted by the protocol adapter whenever the protocol client encounters a change in connectivity state.
*/
struct aws_protocol_adapter_connection_event {
enum aws_protocol_adapter_connection_event_type event_type;
bool rejoined_session;
struct aws_protocol_adapter_session_event {
bool joined_session;
};

typedef void(
Expand All @@ -111,8 +102,7 @@ typedef void(aws_protocol_adapter_incoming_publish_fn)(
struct aws_protocol_adapter_incoming_publish_event *publish,
void *user_data);
typedef void(aws_protocol_adapter_terminate_callback_fn)(void *user_data);
typedef void(
aws_protocol_adapter_connection_event_fn)(struct aws_protocol_adapter_connection_event *event, void *user_data);
typedef void(aws_protocol_adapter_session_event_fn)(struct aws_protocol_adapter_session_event *event, void *user_data);

/*
* Set of callbacks invoked by the protocol adapter. These must all be set.
Expand All @@ -121,7 +111,7 @@ struct aws_mqtt_protocol_adapter_options {
aws_protocol_adapter_subscription_event_fn *subscription_event_callback;
aws_protocol_adapter_incoming_publish_fn *incoming_publish_callback;
aws_protocol_adapter_terminate_callback_fn *terminate_callback;
aws_protocol_adapter_connection_event_fn *connection_event_callback;
aws_protocol_adapter_session_event_fn *session_event_callback;

/*
* User data to pass into all singleton protocol adapter callbacks. Likely either the request-response client
Expand Down
14 changes: 4 additions & 10 deletions source/request-response/protocol_adapter.c
Original file line number Diff line number Diff line change
Expand Up @@ -322,21 +322,15 @@ static bool s_protocol_adapter_mqtt5_listener_publish_received(
static void s_protocol_adapter_mqtt5_lifecycle_event_callback(const struct aws_mqtt5_client_lifecycle_event *event) {
struct aws_mqtt_protocol_adapter_5_impl *adapter = event->user_data;

if (event->event_type != AWS_MQTT5_CLET_CONNECTION_SUCCESS && event->event_type != AWS_MQTT5_CLET_DISCONNECTION) {
if (event->event_type != AWS_MQTT5_CLET_CONNECTION_SUCCESS) {
return;
}

bool is_connection_success = event->event_type == AWS_MQTT5_CLET_CONNECTION_SUCCESS;

struct aws_protocol_adapter_connection_event connection_event = {
.event_type = is_connection_success ? AWS_PACET_ONLINE : AWS_PACET_OFFLINE,
struct aws_protocol_adapter_session_event session_event = {
.joined_session = event->settings->rejoined_session,
};

if (is_connection_success) {
connection_event.rejoined_session = event->settings->rejoined_session;
}

(*adapter->config.connection_event_callback)(&connection_event, adapter->config.user_data);
(*adapter->config.session_event_callback)(&session_event, adapter->config.user_data);
}

static void s_protocol_adapter_mqtt5_listener_termination_callback(void *user_data) {
Expand Down
3 changes: 2 additions & 1 deletion tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,8 @@ add_test_case(request_response_mqtt5_protocol_adapter_publish_success)
add_test_case(request_response_mqtt5_protocol_adapter_publish_failure_error_code)
add_test_case(request_response_mqtt5_protocol_adapter_publish_failure_reason_code)
add_test_case(request_response_mqtt5_protocol_adapter_publish_failure_timeout)
add_test_case(request_response_mqtt5_protocol_adapter_connection_event_sequence)
add_test_case(request_response_mqtt5_protocol_adapter_session_event_no_rejoin)
add_test_case(request_response_mqtt5_protocol_adapter_session_event_rejoin)
add_test_case(request_response_mqtt5_protocol_adapter_incoming_publish)
add_test_case(request_response_mqtt5_protocol_adapter_shutdown_while_pending)

Expand Down
106 changes: 48 additions & 58 deletions tests/request-response/request_response_protocol_adapter_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,6 @@ static void s_request_response_protocol_adapter_incoming_publish_event_record_cl
aws_byte_buf_clean_up(&record->payload);
}

struct request_response_protocol_adapter_connection_event_record {
enum aws_protocol_adapter_connection_event_type event_type;
bool rejoined_session;
};

struct request_response_protocol_adapter_subscription_event_record {
enum aws_protocol_adapter_subscription_event_type event_type;
struct aws_byte_buf topic_filter;
Expand All @@ -61,7 +56,7 @@ struct aws_request_response_mqtt5_adapter_test_fixture {
struct aws_mqtt_protocol_adapter *protocol_adapter;

struct aws_array_list incoming_publish_events;
struct aws_array_list connection_events;
struct aws_array_list session_events;
struct aws_array_list subscription_events;
struct aws_array_list publish_results;

Expand Down Expand Up @@ -111,16 +106,13 @@ static void s_rr_mqtt5_protocol_adapter_test_on_terminate_callback(void *user_da
aws_condition_variable_notify_all(&fixture->signal);
}

static void s_rr_mqtt5_protocol_adapter_test_on_connection_event(
struct aws_protocol_adapter_connection_event *event,
static void s_rr_mqtt5_protocol_adapter_test_on_session_event(
struct aws_protocol_adapter_session_event *event,
void *user_data) {
struct aws_request_response_mqtt5_adapter_test_fixture *fixture = user_data;

struct request_response_protocol_adapter_connection_event_record record = {
.event_type = event->event_type, .rejoined_session = event->rejoined_session};

aws_mutex_lock(&fixture->lock);
aws_array_list_push_back(&fixture->connection_events, &record);
aws_array_list_push_back(&fixture->session_events, event);
aws_mutex_unlock(&fixture->lock);
aws_condition_variable_notify_all(&fixture->signal);
}
Expand Down Expand Up @@ -151,7 +143,7 @@ static int s_aws_request_response_mqtt5_adapter_test_fixture_init(
.subscription_event_callback = s_rr_mqtt5_protocol_adapter_test_on_subscription_event,
.incoming_publish_callback = s_rr_mqtt5_protocol_adapter_test_on_incoming_publish,
.terminate_callback = s_rr_mqtt5_protocol_adapter_test_on_terminate_callback,
.connection_event_callback = s_rr_mqtt5_protocol_adapter_test_on_connection_event,
.session_event_callback = s_rr_mqtt5_protocol_adapter_test_on_session_event,
.user_data = fixture};

fixture->protocol_adapter =
Expand All @@ -164,10 +156,7 @@ static int s_aws_request_response_mqtt5_adapter_test_fixture_init(
10,
sizeof(struct request_response_protocol_adapter_incoming_publish_event_record));
aws_array_list_init_dynamic(
&fixture->connection_events,
allocator,
10,
sizeof(struct request_response_protocol_adapter_connection_event_record));
&fixture->session_events, allocator, 10, sizeof(struct aws_protocol_adapter_session_event));
aws_array_list_init_dynamic(
&fixture->subscription_events,
allocator,
Expand Down Expand Up @@ -220,7 +209,7 @@ static void s_aws_request_response_mqtt5_adapter_test_fixture_clean_up(
}
aws_array_list_clean_up(&fixture->incoming_publish_events);

aws_array_list_clean_up(&fixture->connection_events);
aws_array_list_clean_up(&fixture->session_events);
aws_array_list_clean_up(&fixture->publish_results);

aws_mutex_clean_up(&fixture->lock);
Expand Down Expand Up @@ -272,44 +261,43 @@ static void s_wait_for_subscription_events_contains(
aws_mutex_unlock(&fixture->lock);
}

struct test_connection_event_wait_context {
struct request_response_protocol_adapter_connection_event_record *expected_event;
struct test_session_event_wait_context {
struct aws_protocol_adapter_session_event *expected_event;
size_t expected_count;
struct aws_request_response_mqtt5_adapter_test_fixture *fixture;
};

static bool s_do_connection_events_contain(void *context) {
struct test_connection_event_wait_context *wait_context = context;
static bool s_do_session_events_contain(void *context) {
struct test_session_event_wait_context *wait_context = context;

size_t found = 0;

size_t num_events = aws_array_list_length(&wait_context->fixture->connection_events);
size_t num_events = aws_array_list_length(&wait_context->fixture->session_events);
for (size_t i = 0; i < num_events; ++i) {
struct request_response_protocol_adapter_connection_event_record record;
aws_array_list_get_at(&wait_context->fixture->connection_events, &record, i);
struct aws_protocol_adapter_session_event record;
aws_array_list_get_at(&wait_context->fixture->session_events, &record, i);

if (record.event_type == wait_context->expected_event->event_type &&
record.rejoined_session == wait_context->expected_event->rejoined_session) {
if (record.joined_session == wait_context->expected_event->joined_session) {
++found;
}
}

return found >= wait_context->expected_count;
}

static void s_wait_for_connection_events_contains(
static void s_wait_for_session_events_contains(
struct aws_request_response_mqtt5_adapter_test_fixture *fixture,
struct request_response_protocol_adapter_connection_event_record *expected_event,
struct aws_protocol_adapter_session_event *expected_event,
size_t expected_count) {

struct test_connection_event_wait_context context = {
struct test_session_event_wait_context context = {
.expected_event = expected_event,
.expected_count = expected_count,
.fixture = fixture,
};

aws_mutex_lock(&fixture->lock);
aws_condition_variable_wait_pred(&fixture->signal, &fixture->lock, s_do_connection_events_contain, &context);
aws_condition_variable_wait_pred(&fixture->signal, &fixture->lock, s_do_session_events_contain, &context);
aws_mutex_unlock(&fixture->lock);
}

Expand Down Expand Up @@ -818,19 +806,17 @@ AWS_TEST_CASE(
request_response_mqtt5_protocol_adapter_publish_failure_error_code,
s_request_response_mqtt5_protocol_adapter_publish_failure_error_code_fn)

static int s_request_response_mqtt5_protocol_adapter_connection_event_sequence_fn(
static int s_do_request_response_mqtt5_protocol_adapter_session_event_test(
struct aws_allocator *allocator,
void *ctx) {
(void)ctx;

bool rejoin_session) {
aws_mqtt_library_init(allocator);

struct mqtt5_client_test_options test_options;
aws_mqtt5_client_test_init_default_options(&test_options);

test_options.server_function_table.packet_handlers[AWS_MQTT5_PT_CONNECT] =
aws_mqtt5_mock_server_handle_connect_honor_session_unconditional;
test_options.client_options.session_behavior = AWS_MQTT5_CSBT_REJOIN_POST_SUCCESS;
test_options.client_options.session_behavior = rejoin_session ? AWS_MQTT5_CSBT_REJOIN_ALWAYS : AWS_MQTT5_CSBT_CLEAN;

struct aws_mqtt5_client_mqtt5_mock_test_fixture_options mqtt5_test_fixture_options = {
.client_options = &test_options.client_options,
Expand All @@ -843,31 +829,15 @@ static int s_request_response_mqtt5_protocol_adapter_connection_event_sequence_f

struct aws_mqtt5_client *client = fixture.mqtt5_fixture.client;

struct request_response_protocol_adapter_connection_event_record online_record1 = {
.event_type = AWS_PACET_ONLINE,
.rejoined_session = false,
};

ASSERT_SUCCESS(aws_mqtt5_client_start(client));
s_wait_for_connection_events_contains(&fixture, &online_record1, 1);

struct request_response_protocol_adapter_connection_event_record offline_record = {
.event_type = AWS_PACET_OFFLINE,
};

ASSERT_SUCCESS(aws_mqtt5_client_stop(client, NULL, NULL));
s_wait_for_connection_events_contains(&fixture, &offline_record, 1);

struct request_response_protocol_adapter_connection_event_record online_record2 = {
.event_type = AWS_PACET_ONLINE,
.rejoined_session = true,
struct aws_protocol_adapter_session_event expected_session_record = {
.joined_session = rejoin_session,
};

ASSERT_SUCCESS(aws_mqtt5_client_start(client));
s_wait_for_connection_events_contains(&fixture, &online_record2, 1);
s_wait_for_session_events_contains(&fixture, &expected_session_record, 1);

ASSERT_SUCCESS(aws_mqtt5_client_stop(client, NULL, NULL));
s_wait_for_connection_events_contains(&fixture, &offline_record, 2);
aws_wait_for_stopped_lifecycle_event(&fixture.mqtt5_fixture);

s_aws_request_response_mqtt5_adapter_test_fixture_clean_up(&fixture);

Expand All @@ -876,9 +846,29 @@ static int s_request_response_mqtt5_protocol_adapter_connection_event_sequence_f
return AWS_OP_SUCCESS;
}

static int s_request_response_mqtt5_protocol_adapter_session_event_no_rejoin_fn(
struct aws_allocator *allocator,
void *ctx) {
(void)ctx;

return s_do_request_response_mqtt5_protocol_adapter_session_event_test(allocator, false);
}

AWS_TEST_CASE(
request_response_mqtt5_protocol_adapter_session_event_no_rejoin,
s_request_response_mqtt5_protocol_adapter_session_event_no_rejoin_fn)

static int s_request_response_mqtt5_protocol_adapter_session_event_rejoin_fn(
struct aws_allocator *allocator,
void *ctx) {
(void)ctx;

return s_do_request_response_mqtt5_protocol_adapter_session_event_test(allocator, true);
}

AWS_TEST_CASE(
request_response_mqtt5_protocol_adapter_connection_event_sequence,
s_request_response_mqtt5_protocol_adapter_connection_event_sequence_fn)
request_response_mqtt5_protocol_adapter_session_event_rejoin,
s_request_response_mqtt5_protocol_adapter_session_event_rejoin_fn)

static int s_request_response_mqtt5_protocol_adapter_incoming_publish_fn(struct aws_allocator *allocator, void *ctx) {
(void)ctx;
Expand Down

0 comments on commit e5f93e1

Please sign in to comment.