Skip to content

Commit

Permalink
create destroy tests
Browse files Browse the repository at this point in the history
  • Loading branch information
bretambrose committed Mar 1, 2024
1 parent 0da55e2 commit c7d7cbe
Show file tree
Hide file tree
Showing 9 changed files with 323 additions and 141 deletions.
10 changes: 6 additions & 4 deletions include/aws/mqtt/private/request-response/protocol_adapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,17 +102,19 @@ struct aws_protocol_adapter_connection_event {
bool joined_session;
};

typedef void(
aws_protocol_adapter_subscription_event_fn)(const struct aws_protocol_adapter_subscription_event *event, void *user_data);
typedef void(aws_protocol_adapter_subscription_event_fn)(
const struct aws_protocol_adapter_subscription_event *event,
void *user_data);

typedef void(aws_protocol_adapter_incoming_publish_fn)(
const struct aws_protocol_adapter_incoming_publish_event *publish,
void *user_data);

typedef void(aws_protocol_adapter_terminate_callback_fn)(void *user_data);

typedef void(
aws_protocol_adapter_connection_event_fn)(const struct aws_protocol_adapter_connection_event *event, void *user_data);
typedef void(aws_protocol_adapter_connection_event_fn)(
const struct aws_protocol_adapter_connection_event *event,
void *user_data);

/*
* Set of callbacks invoked by the protocol adapter. These must all be set.
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ AWS_EXTERN_C_BEGIN
/*
* Initializes a subscription manager. Every native request-response client owns a single subscription manager.
*/
AWS_MQTT_API int aws_rr_subscription_manager_init(
AWS_MQTT_API void aws_rr_subscription_manager_init(
struct aws_rr_subscription_manager *manager,
struct aws_allocator *allocator,
struct aws_mqtt_protocol_adapter *protocol_adapter,
Expand Down Expand Up @@ -201,6 +201,12 @@ AWS_MQTT_API void aws_rr_subscription_manager_on_protocol_adapter_connection_eve
struct aws_rr_subscription_manager *manager,
const struct aws_protocol_adapter_connection_event *event);

/*
* Checks subscription manager options for validity.
*/
AWS_MQTT_API bool aws_rr_subscription_manager_are_options_valid(
const struct aws_rr_subscription_manager_options *options);

AWS_EXTERN_C_END

#endif /* AWS_MQTT_PRIVATE_REQUEST_RESPONSE_SUBSCRIPTION_MANAGER_H */
26 changes: 21 additions & 5 deletions include/aws/mqtt/request-response/request_response_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,37 @@ struct aws_mqtt_request_response_client;
struct aws_mqtt_client_connection;
struct aws_mqtt5_client;

typedef void(aws_mqtt_request_response_client_initialized_callback_fn)(void *user_data);
typedef void(aws_mqtt_request_response_client_terminated_callback_fn)(void *user_data);

struct aws_mqtt_request_response_client_options {
size_t max_subscriptions;
uint32_t operation_timeout_seconds;

// Do not bind the initialized callback; it exists mostly for tests and should not be exposed
aws_mqtt_request_response_client_initialized_callback_fn *initialized_callback;

aws_mqtt_request_response_client_terminated_callback_fn *terminated_callback;
void *user_data;
};

AWS_EXTERN_C_BEGIN

struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_new_from_mqtt311_client(struct aws_allocator *allocator, struct aws_mqtt_client_connection *client, const struct aws_mqtt_request_response_client_options *options);

struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_new_from_mqtt5_client(struct aws_allocator *allocator, struct aws_mqtt5_client *client, const struct aws_mqtt_request_response_client_options *options);
struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_new_from_mqtt311_client(
struct aws_allocator *allocator,
struct aws_mqtt_client_connection *client,
const struct aws_mqtt_request_response_client_options *options);

struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_acquire(struct aws_mqtt_request_response_client *client);
struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_new_from_mqtt5_client(
struct aws_allocator *allocator,
struct aws_mqtt5_client *client,
const struct aws_mqtt_request_response_client_options *options);

struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_release(struct aws_mqtt_request_response_client *client);
struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_acquire(
struct aws_mqtt_request_response_client *client);

struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_release(
struct aws_mqtt_request_response_client *client);

AWS_EXTERN_C_END

Expand Down
110 changes: 75 additions & 35 deletions source/request-response/request_response_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@
#include <aws/io/event_loop.h>
#include <aws/mqtt/private/client_impl_shared.h>
#include <aws/mqtt/private/request-response/protocol_adapter.h>
#include <aws/mqtt/private/request-response/request_response_client.h>
#include <aws/mqtt/private/request-response/subscription_manager.h>
#include <aws/mqtt/private/v5/mqtt5_client_impl.h>

enum aws_request_response_client_state {
// cross-thread initialization has not completed and all protocol adapter callbacks are ignored
AWS_RRCS_UNINITIALIZED,

AWS_RRCS_ACTIVE,

// asynchronously shutting down, no more servicing will be done and all protocol adapter callbacks are ignored
Expand All @@ -35,6 +37,7 @@ struct aws_mqtt_request_response_client {

struct aws_event_loop *loop;

struct aws_task initialize_task;
struct aws_task external_shutdown_task;
struct aws_task internal_shutdown_task;

Expand All @@ -54,7 +57,14 @@ static void s_aws_rr_client_on_zero_external_ref_count(void *context) {
}

static void s_mqtt_request_response_client_final_destroy(struct aws_mqtt_request_response_client *client) {
aws_mqtt_request_response_client_terminated_callback_fn *terminate_callback = client->config.terminated_callback;
void *user_data = client->config.user_data;

aws_mem_release(client->allocator, client);

if (terminate_callback != NULL) {
(*terminate_callback)(user_data);
}
}

static void s_mqtt_request_response_client_internal_shutdown_task_fn(struct aws_task *task, void *arg, enum aws_task_status task_status) {
Expand Down Expand Up @@ -153,12 +163,23 @@ static void s_aws_rr_client_protocol_adapter_connection_event_callback(const str
}

static struct aws_mqtt_request_response_client *s_aws_mqtt_request_response_client_new(struct aws_allocator *allocator, const struct aws_mqtt_request_response_client_options *options, struct aws_event_loop *loop) {
struct aws_rr_subscription_manager_options sm_options = {
.max_subscriptions = options->max_subscriptions,
.operation_timeout_seconds = options->operation_timeout_seconds,
};

// we can't initialize the subscription manager until we're running on the event loop, so make sure that
// initialize can't fail by checking its options for validity now.
if (!aws_rr_subscription_manager_are_options_valid(&sm_options)) {
return NULL;
}

struct aws_mqtt_request_response_client *rr_client = aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt_request_response_client));

rr_client->allocator = allocator;
rr_client->config = *options;
rr_client->loop = loop;
rr_client->state = AWS_RRCS_ACTIVE;
rr_client->state = AWS_RRCS_UNINITIALIZED;

aws_task_init(&rr_client->external_shutdown_task, s_mqtt_request_response_client_external_shutdown_task_fn, rr_client, "mqtt_rr_client_external_shutdown");
aws_task_init(&rr_client->internal_shutdown_task, s_mqtt_request_response_client_internal_shutdown_task_fn, rr_client, "mqtt_rr_client_internal_shutdown");
Expand All @@ -172,55 +193,80 @@ static struct aws_mqtt_request_response_client *s_aws_mqtt_request_response_clie
return rr_client;
}

static int s_aws_rr_client_init_subscription_manager(struct aws_mqtt_request_response_client *rr_client, struct aws_allocator *allocator) {
static void s_aws_rr_client_init_subscription_manager(struct aws_mqtt_request_response_client *rr_client, struct aws_allocator *allocator) {
struct aws_rr_subscription_manager_options subscription_manager_options = {
.operation_timeout_seconds = rr_client->config.operation_timeout_seconds,
.max_subscriptions = rr_client->config.max_subscriptions,
.subscription_status_callback = s_aws_rr_client_subscription_status_event_callback,
.userdata = rr_client,
};

return aws_rr_subscription_manager_init(&rr_client->subscription_manager, allocator, rr_client->client_adapter, &subscription_manager_options);
aws_rr_subscription_manager_init(&rr_client->subscription_manager, allocator, rr_client->client_adapter, &subscription_manager_options);
}

static struct aws_mqtt_protocol_adapter *s_mqtt_protocol_adaptor_factory_from_mqtt311_client(struct aws_mqtt_request_response_client *rr_client, struct aws_mqtt_protocol_adapter_options *adapter_options, void *context) {
static void s_mqtt_request_response_client_initialize_task_fn(struct aws_task *task, void *arg, enum aws_task_status task_status) {
(void)task;

struct aws_mqtt_client_connection *client = context;
AWS_FATAL_ASSERT(task_status != AWS_TASK_STATUS_CANCELED);

return aws_mqtt_protocol_adapter_new_from_311(rr_client->allocator, adapter_options, client);
}
struct aws_mqtt_request_response_client *client = arg;

struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_new_from_mqtt311_client(struct aws_allocator *allocator, struct aws_mqtt_client_connection *client, const struct aws_mqtt_request_response_client_options *options) {
if (client->state == AWS_RRCS_UNINITIALIZED) {
s_aws_rr_client_init_subscription_manager(client, client->allocator);

struct aws_protocol_adapter_factory_options mqtt311_factory_options = {
.loop = aws_mqtt_client_connection_get_event_loop(client),
.creation_context = client,
.mqtt_protocol_adaptor_factory_fn = s_mqtt_protocol_adaptor_factory_from_mqtt311_client,
};
client->state = AWS_RRCS_ACTIVE;
}

return aws_mqtt_request_response_client_new_from_adaptor_factory(allocator, &mqtt311_factory_options, options);
}
if (client->config.initialized_callback != NULL) {
(*client->config.initialized_callback)(client->config.user_data);
}

static struct aws_mqtt_protocol_adapter *s_mqtt_protocol_adaptor_factory_from_mqtt5_client(struct aws_mqtt_request_response_client *rr_client, struct aws_mqtt_protocol_adapter_options *adapter_options, void *context) {
// give up the internal ref we held while the task was pending
aws_ref_count_release(&client->internal_ref_count);
}

struct aws_mqtt5_client *client = context;
static void s_setup_cross_thread_initialization(struct aws_mqtt_request_response_client * rr_client) {
// now that it exists, 1 internal ref belongs to protocol adapter termination
aws_ref_count_acquire(&rr_client->internal_ref_count);

return aws_mqtt_protocol_adapter_new_from_5(rr_client->allocator, adapter_options, client);
// 1 internal ref belongs to the initialize task until it runs
aws_ref_count_acquire(&rr_client->internal_ref_count);
aws_task_init(&rr_client->initialize_task, s_mqtt_request_response_client_initialize_task_fn, rr_client, "mqtt_rr_client_initialize");
aws_event_loop_schedule_task_now(rr_client->loop, &rr_client->initialize_task);
}

struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_new_from_mqtt5_client(struct aws_allocator *allocator, struct aws_mqtt5_client *client, const struct aws_mqtt_request_response_client_options *options) {
struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_new_from_mqtt311_client(struct aws_allocator *allocator, struct aws_mqtt_client_connection *client, const struct aws_mqtt_request_response_client_options *options) {

struct aws_mqtt_request_response_client *rr_client = s_aws_mqtt_request_response_client_new(allocator, options, aws_mqtt_client_connection_get_event_loop(client));

struct aws_protocol_adapter_factory_options mqtt5_factory_options = {
.loop = client->loop,
.creation_context = client,
.mqtt_protocol_adaptor_factory_fn = s_mqtt_protocol_adaptor_factory_from_mqtt5_client,
struct aws_mqtt_protocol_adapter_options adapter_options = {
.subscription_event_callback = s_aws_rr_client_protocol_adapter_subscription_event_callback,
.incoming_publish_callback = s_aws_rr_client_protocol_adapter_incoming_publish_callback,
.terminate_callback = s_aws_rr_client_protocol_adapter_terminate_callback,
.connection_event_callback = s_aws_rr_client_protocol_adapter_connection_event_callback,
.user_data = rr_client,
};

return aws_mqtt_request_response_client_new_from_adaptor_factory(allocator, &mqtt5_factory_options, options);
rr_client->client_adapter = aws_mqtt_protocol_adapter_new_from_311(rr_client->allocator, &adapter_options, client);
if (rr_client->client_adapter == NULL) {
goto error;
}

s_setup_cross_thread_initialization(rr_client);

return rr_client;

error:

// even on construction failures we still need to walk through the async shutdown process
aws_mqtt_request_response_client_release(rr_client);

return NULL;
}

struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_new_from_adaptor_factory(struct aws_allocator *allocator, const struct aws_protocol_adapter_factory_options *factory_options, const struct aws_mqtt_request_response_client_options *client_options) {
struct aws_mqtt_request_response_client *rr_client = s_aws_mqtt_request_response_client_new(allocator, client_options, factory_options->loop);
struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_new_from_mqtt5_client(struct aws_allocator *allocator, struct aws_mqtt5_client *client, const struct aws_mqtt_request_response_client_options *options) {

struct aws_mqtt_request_response_client * rr_client = s_aws_mqtt_request_response_client_new(allocator, options, client->loop);

struct aws_mqtt_protocol_adapter_options adapter_options = {
.subscription_event_callback = s_aws_rr_client_protocol_adapter_subscription_event_callback,
Expand All @@ -230,17 +276,12 @@ struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_new_fr
.user_data = rr_client,
};

rr_client->client_adapter = (*factory_options->mqtt_protocol_adaptor_factory_fn)(rr_client, &adapter_options, factory_options->creation_context);
rr_client->client_adapter = aws_mqtt_protocol_adapter_new_from_5(rr_client->allocator, &adapter_options, client);
if (rr_client->client_adapter == NULL) {
goto error;
}

// now that it exists, 1 internal ref belongs to protocol adapter termination
aws_ref_count_acquire(&rr_client->internal_ref_count);

if (s_aws_rr_client_init_subscription_manager(rr_client, allocator)) {
goto error;
}
s_setup_cross_thread_initialization(rr_client);

return rr_client;

Expand All @@ -252,7 +293,6 @@ struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_new_fr
return NULL;
}


struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_acquire(struct aws_mqtt_request_response_client *client) {
if (client != NULL) {
aws_ref_count_acquire(&client->external_ref_count);
Expand Down
16 changes: 10 additions & 6 deletions source/request-response/subscription_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -589,16 +589,22 @@ void aws_rr_subscription_manager_on_protocol_adapter_connection_event(
}
}

int aws_rr_subscription_manager_init(
bool aws_rr_subscription_manager_are_options_valid(const struct aws_rr_subscription_manager_options *options) {
if (options == NULL || options->max_subscriptions < 1 || options->operation_timeout_seconds == 0) {
return false;
}

return true;
}

void aws_rr_subscription_manager_init(
struct aws_rr_subscription_manager *manager,
struct aws_allocator *allocator,
struct aws_mqtt_protocol_adapter *protocol_adapter,
const struct aws_rr_subscription_manager_options *options) {
AWS_ZERO_STRUCT(*manager);

if (options == NULL || options->max_subscriptions < 1 || options->operation_timeout_seconds == 0) {
return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
}
AWS_FATAL_ASSERT(aws_rr_subscription_manager_are_options_valid(options));

manager->allocator = allocator;
manager->config = *options;
Expand All @@ -614,8 +620,6 @@ int aws_rr_subscription_manager_init(
s_aws_rr_subscription_record_destroy);

manager->is_protocol_client_connected = aws_mqtt_protocol_adapter_is_connected(protocol_adapter);

return AWS_OP_SUCCESS;
}

void aws_rr_subscription_manager_clean_up(struct aws_rr_subscription_manager *manager) {
Expand Down
4 changes: 4 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,10 @@ add_test_case(rrsm_offline_acquire_pending_clean_up_unsubscribe_override)
add_test_case(rrsm_acquire_success_offline_online_no_session_subscription_lost_can_reacquire)
add_test_case(rrsm_subscription_lost_while_unsubscribing)

# "rrc" = request response client
add_test_case(rrc_mqtt5_create_destroy)
add_test_case(rrc_mqtt311_create_destroy)

generate_test_driver(${PROJECT_NAME}-tests)

set(TEST_PAHO_CLIENT_BINARY_NAME ${PROJECT_NAME}-paho-client)
Expand Down
Loading

0 comments on commit c7d7cbe

Please sign in to comment.