Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adapter connection result callbacks #306

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions include/aws/mqtt/private/v5/mqtt3_to_mqtt5_adapter_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,12 @@ struct aws_mqtt_client_connection_5_impl {
aws_mqtt_client_on_connection_closed_fn *on_closed;
void *on_closed_user_data;

aws_mqtt_client_on_connection_success_fn *on_connection_success;
void *on_connection_success_user_data;

aws_mqtt_client_on_connection_failure_fn *on_connection_failure;
void *on_connection_failure_user_data;

aws_mqtt_client_publish_received_fn *on_any_publish;
void *on_any_publish_user_data;

Expand Down
170 changes: 134 additions & 36 deletions source/v5/mqtt3_to_mqtt5_adapter.c
Original file line number Diff line number Diff line change
Expand Up @@ -338,31 +338,38 @@ static int s_aws_mqtt3_to_mqtt5_adapter_safe_lifecycle_handler(
switch (event->event_type) {

case AWS_MQTT5_CLET_CONNECTION_SUCCESS:
if (adapter->adapter_state == AWS_MQTT_AS_FIRST_CONNECT) {
/*
* If the 311 view is that this is an initial connection attempt, then invoke the completion callback
* and move to the stay-connected state.
*/
if (adapter->on_connection_complete != NULL) {
(*adapter->on_connection_complete)(
&adapter->base,
event->error_code,
0,
event->settings->rejoined_session,
adapter->on_connection_complete_user_data);

adapter->on_connection_complete = NULL;
adapter->on_connection_complete_user_data = NULL;
if (adapter->adapter_state != AWS_MQTT_AS_STAY_DISCONNECTED) {
if (adapter->on_connection_success != NULL) {
(*adapter->on_connection_success)(
&adapter->base, 0, event->settings->rejoined_session, adapter->on_connection_success_user_data);
}
adapter->adapter_state = AWS_MQTT_AS_STAY_CONNECTED;
} else if (adapter->adapter_state == AWS_MQTT_AS_STAY_CONNECTED) {
/*
* If the 311 view is that we're in the stay-connected state (ie we've successfully done or simulated
* an initial connection), then invoke the connection resumption callback.
*/
if (adapter->on_resumed != NULL) {
(*adapter->on_resumed)(
&adapter->base, 0, event->settings->rejoined_session, adapter->on_resumed_user_data);

if (adapter->adapter_state == AWS_MQTT_AS_FIRST_CONNECT) {
/*
* If the 311 view is that this is an initial connection attempt, then invoke the completion
* callback and move to the stay-connected state.
*/
if (adapter->on_connection_complete != NULL) {
(*adapter->on_connection_complete)(
&adapter->base,
event->error_code,
0,
event->settings->rejoined_session,
adapter->on_connection_complete_user_data);

adapter->on_connection_complete = NULL;
adapter->on_connection_complete_user_data = NULL;
}
adapter->adapter_state = AWS_MQTT_AS_STAY_CONNECTED;
} else if (adapter->adapter_state == AWS_MQTT_AS_STAY_CONNECTED) {
/*
* If the 311 view is that we're in the stay-connected state (ie we've successfully done or
* simulated an initial connection), then invoke the connection resumption callback.
*/
if (adapter->on_resumed != NULL) {
(*adapter->on_resumed)(
&adapter->base, 0, event->settings->rejoined_session, adapter->on_resumed_user_data);
}
}
}
break;
Expand All @@ -380,19 +387,27 @@ static int s_aws_mqtt3_to_mqtt5_adapter_safe_lifecycle_handler(
* put the adapter into the "disconnected" state, simulating the way the 311 client stops after an
* initial connection failure.
*/
if (event->error_code != AWS_ERROR_MQTT_CONNECTION_RESET_FOR_ADAPTER_CONNECT &&
adapter->adapter_state == AWS_MQTT_AS_FIRST_CONNECT) {

if (adapter->on_connection_complete != NULL) {
(*adapter->on_connection_complete)(
&adapter->base, event->error_code, 0, false, adapter->on_connection_complete_user_data);

adapter->on_connection_complete = NULL;
adapter->on_connection_complete_user_data = NULL;
if (event->error_code != AWS_ERROR_MQTT_CONNECTION_RESET_FOR_ADAPTER_CONNECT) {
if (adapter->adapter_state != AWS_MQTT_AS_STAY_DISCONNECTED) {
if (adapter->on_connection_failure != NULL) {
(*adapter->on_connection_failure)(
&adapter->base, event->error_code, adapter->on_connection_failure_user_data);
}

if (adapter->adapter_state == AWS_MQTT_AS_FIRST_CONNECT) {
if (adapter->on_connection_complete != NULL) {
(*adapter->on_connection_complete)(
&adapter->base, event->error_code, 0, false, adapter->on_connection_complete_user_data);

adapter->on_connection_complete = NULL;
adapter->on_connection_complete_user_data = NULL;
}

adapter->adapter_state = AWS_MQTT_AS_STAY_DISCONNECTED;
}
}

adapter->adapter_state = AWS_MQTT_AS_STAY_DISCONNECTED;
}

break;

case AWS_MQTT5_CLET_DISCONNECTION:
Expand Down Expand Up @@ -745,6 +760,89 @@ static int s_aws_mqtt_client_connection_5_set_interruption_handlers(
return AWS_OP_SUCCESS;
}

struct aws_mqtt_set_connection_result_handlers_task {
struct aws_task task;
struct aws_allocator *allocator;
struct aws_mqtt_client_connection_5_impl *adapter;

aws_mqtt_client_on_connection_success_fn *on_connection_success;
void *on_connection_success_user_data;

aws_mqtt_client_on_connection_failure_fn *on_connection_failure;
void *on_connection_failure_user_data;
};

static void s_set_connection_result_handlers_task_fn(struct aws_task *task, void *arg, enum aws_task_status status) {
(void)task;

struct aws_mqtt_set_connection_result_handlers_task *set_task = arg;
struct aws_mqtt_client_connection_5_impl *adapter = set_task->adapter;

if (status != AWS_TASK_STATUS_RUN_READY) {
goto done;
}

adapter->on_connection_success = set_task->on_connection_success;
adapter->on_connection_success_user_data = set_task->on_connection_success_user_data;
adapter->on_connection_failure = set_task->on_connection_failure;
adapter->on_connection_failure_user_data = set_task->on_connection_failure_user_data;

done:

aws_ref_count_release(&adapter->internal_refs);

aws_mem_release(set_task->allocator, set_task);
}

static struct aws_mqtt_set_connection_result_handlers_task *s_aws_mqtt_set_connection_result_handlers_task_new(
struct aws_allocator *allocator,
struct aws_mqtt_client_connection_5_impl *adapter,
aws_mqtt_client_on_connection_success_fn *on_connection_success,
void *on_connection_success_user_data,
aws_mqtt_client_on_connection_failure_fn *on_connection_failure,
void *on_connection_failure_user_data) {

struct aws_mqtt_set_connection_result_handlers_task *set_task =
aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt_set_connection_result_handlers_task));

aws_task_init(
&set_task->task, s_set_connection_result_handlers_task_fn, (void *)set_task, "SetConnectionResultHandlersTask");
set_task->allocator = adapter->allocator;
set_task->adapter = (struct aws_mqtt_client_connection_5_impl *)aws_ref_count_acquire(&adapter->internal_refs);
set_task->on_connection_success = on_connection_success;
set_task->on_connection_success_user_data = on_connection_success_user_data;
set_task->on_connection_failure = on_connection_failure;
set_task->on_connection_failure_user_data = on_connection_failure_user_data;

return set_task;
}

static int s_aws_mqtt_client_connection_5_set_connection_result_handlers(
void *impl,
aws_mqtt_client_on_connection_success_fn *on_connection_success,
void *on_connection_success_user_data,
aws_mqtt_client_on_connection_failure_fn *on_connection_failure,
void *on_connection_failure_user_data) {
struct aws_mqtt_client_connection_5_impl *adapter = impl;

struct aws_mqtt_set_connection_result_handlers_task *task = s_aws_mqtt_set_connection_result_handlers_task_new(
adapter->allocator,
adapter,
on_connection_success,
on_connection_success_user_data,
on_connection_failure,
on_connection_failure_user_data);
if (task == NULL) {
AWS_LOGF_ERROR(
AWS_LS_MQTT_CLIENT, "id=%p: failed to create set connection result handlers task", (void *)adapter);
return AWS_OP_ERR;
}

aws_event_loop_schedule_task_now(adapter->loop, &task->task);

return AWS_OP_SUCCESS;
}

struct aws_mqtt_set_on_closed_handler_task {
struct aws_task task;
struct aws_allocator *allocator;
Expand Down Expand Up @@ -2415,7 +2513,7 @@ static struct aws_mqtt_client_connection_vtable s_aws_mqtt_client_connection_5_v
.set_http_proxy_options_fn = s_aws_mqtt_client_connection_5_set_http_proxy_options,
.set_host_resolution_options_fn = s_aws_mqtt_client_connection_5_set_host_resolution_options,
.set_reconnect_timeout_fn = s_aws_mqtt_client_connection_5_set_reconnect_timeout,
.set_connection_result_handlers = NULL, // TODO: Need update with introduction of mqtt5 lifeCycleEventCallback
.set_connection_result_handlers = s_aws_mqtt_client_connection_5_set_connection_result_handlers,
.set_connection_interruption_handlers_fn = s_aws_mqtt_client_connection_5_set_interruption_handlers,
.set_connection_closed_handler_fn = s_aws_mqtt_client_connection_5_set_on_closed_handler,
.set_on_any_publish_handler_fn = s_aws_mqtt_client_connection_5_set_on_any_publish_handler,
Expand Down
1 change: 1 addition & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ add_test_case(mqtt3to5_adapter_connect_bad_connectivity)
add_test_case(mqtt3to5_adapter_connect_bad_connectivity_with_mqtt5_restart)
add_test_case(mqtt3to5_adapter_connect_failure_connect_success_via_mqtt5)
add_test_case(mqtt3to5_adapter_connect_failure_bad_config_success_good_config)
add_test_case(mqtt3to5_adapter_connect_reconnect_failures)
add_test_case(mqtt3to5_adapter_connect_success_disconnect_connect)
add_test_case(mqtt3to5_adapter_connect_success_stop_mqtt5_disconnect_success)
add_test_case(mqtt3to5_adapter_disconnect_success)
Expand Down
Loading