Skip to content

Commit

Permalink
success/failure -> error code
Browse files Browse the repository at this point in the history
  • Loading branch information
Bret Ambrose committed Jan 26, 2024
1 parent abbcdc1 commit 426878f
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 48 deletions.
1 change: 1 addition & 0 deletions include/aws/mqtt/mqtt.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ enum aws_mqtt_error {
AWS_ERROR_MQTT_CONNECTION_RESET_FOR_ADAPTER_CONNECT,
AWS_ERROR_MQTT_CONNECTION_RESUBSCRIBE_NO_TOPICS,
AWS_ERROR_MQTT_CONNECTION_SUBSCRIBE_FAILURE,
AWS_ERROR_MQTT_PROTOCOL_ADAPTER_FAILING_REASON_CODE,

AWS_ERROR_END_MQTT_RANGE = AWS_ERROR_ENUM_END_RANGE(AWS_C_MQTT_PACKAGE_ID),
};
Expand Down
8 changes: 3 additions & 5 deletions include/aws/mqtt/private/request-response/protocol_adapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ struct aws_protocol_adapter_publish_options {
* Invoked on success/failure of the publish itself. Our implementations use QoS1 which means that success
* will be on puback receipt.
*/
void (*completion_callback_fn)(bool, void *);
void (*completion_callback_fn)(int, void *);

/*
* User data to pass in when invoking the completion callback
Expand All @@ -64,10 +64,8 @@ struct aws_protocol_adapter_publish_options {
* Describes the type of subscription event (relative to a topic filter)
*/
enum aws_protocol_adapter_subscription_event_type {
AWS_PASET_SUBSCRIBE_SUCCESS,
AWS_PASET_SUBSCRIBE_FAILURE,
AWS_PASET_UNSUBSCRIBE_SUCCESS,
AWS_PASET_UNSUBSCRIBE_FAILURE,
AWS_PASET_SUBSCRIBE,
AWS_PASET_UNSUBSCRIBE,
};

/*
Expand Down
3 changes: 3 additions & 0 deletions source/mqtt.c
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,9 @@ bool aws_mqtt_is_valid_topic_filter(const struct aws_byte_cursor *topic_filter)
AWS_DEFINE_ERROR_INFO_MQTT(
AWS_ERROR_MQTT_CONNECTION_SUBSCRIBE_FAILURE,
"MQTT subscribe operation failed"),
AWS_DEFINE_ERROR_INFO_MQTT(
AWS_ERROR_MQTT_PROTOCOL_ADAPTER_FAILING_REASON_CODE,
"MQTT operation returned a failing reason code"),
};
/* clang-format on */
#undef AWS_DEFINE_ERROR_INFO_MQTT
Expand Down
27 changes: 16 additions & 11 deletions source/request-response/protocol_adapter.c
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,15 @@ static void s_protocol_adapter_5_subscribe_completion(
goto done;
}

bool success = error_code == AWS_ERROR_SUCCESS && suback != NULL && suback->reason_code_count == 1 &&
suback->reason_codes[0] <= AWS_MQTT5_SARC_GRANTED_QOS_2;
if (error_code == AWS_ERROR_SUCCESS) {
if (suback == NULL || suback->reason_code_count != 1 || suback->reason_codes[0] >= 128) {
error_code = AWS_ERROR_MQTT_PROTOCOL_ADAPTER_FAILING_REASON_CODE;
}
}

struct aws_protocol_adapter_subscription_event subscribe_event = {
.topic_filter = aws_byte_cursor_from_buf(&subscribe_data->topic_filter),
.event_type = success ? AWS_PASET_SUBSCRIBE_SUCCESS : AWS_PASET_SUBSCRIBE_FAILURE,
.event_type = AWS_PASET_SUBSCRIBE,
.error_code = error_code,
};

Expand Down Expand Up @@ -171,12 +174,15 @@ static void s_protocol_adapter_5_unsubscribe_completion(
goto done;
}

bool success = error_code == AWS_ERROR_SUCCESS && unsuback != NULL && unsuback->reason_code_count == 1 &&
unsuback->reason_codes[0] < 128;
if (error_code == AWS_ERROR_SUCCESS) {
if (unsuback == NULL || unsuback->reason_code_count != 1 || unsuback->reason_codes[0] >= 128) {
error_code = AWS_ERROR_MQTT_PROTOCOL_ADAPTER_FAILING_REASON_CODE;
}
}

struct aws_protocol_adapter_subscription_event unsubscribe_event = {
.topic_filter = aws_byte_cursor_from_buf(&unsubscribe_data->topic_filter),
.event_type = success ? AWS_PASET_UNSUBSCRIBE_SUCCESS : AWS_PASET_UNSUBSCRIBE_FAILURE,
.event_type = AWS_PASET_UNSUBSCRIBE,
.error_code = error_code,
};

Expand Down Expand Up @@ -224,7 +230,7 @@ struct aws_mqtt_protocol_adapter_5_publish_op_data {
struct aws_allocator *allocator;
struct aws_weak_ref *callback_ref;

void (*completion_callback_fn)(bool, void *);
void (*completion_callback_fn)(int, void *);
void *user_data;
};

Expand Down Expand Up @@ -262,15 +268,14 @@ static void s_protocol_adapter_5_publish_completion(
goto done;
}

bool success = false;
if (error_code == AWS_ERROR_SUCCESS && packet_type == AWS_MQTT5_PT_PUBACK) {
const struct aws_mqtt5_packet_puback_view *puback = packet;
if (puback->reason_code < 128) {
success = true;
if (puback->reason_code >= 128) {
error_code = AWS_ERROR_MQTT_PROTOCOL_ADAPTER_FAILING_REASON_CODE;
}
}

(*publish_data->completion_callback_fn)(success, publish_data->user_data);
(*publish_data->completion_callback_fn)(error_code, publish_data->user_data);

done:

Expand Down
101 changes: 69 additions & 32 deletions tests/request-response/request_response_protocol_adapter_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,20 @@ static void s_request_response_protocol_adapter_incoming_publish_event_record_cl
struct request_response_protocol_adapter_subscription_event_record {
enum aws_protocol_adapter_subscription_event_type event_type;
struct aws_byte_buf topic_filter;
int error_code;
};

static void s_request_response_protocol_adapter_subscription_event_record_init(
struct request_response_protocol_adapter_subscription_event_record *record,
struct aws_allocator *allocator,
struct aws_byte_cursor topic_filter) {
enum aws_protocol_adapter_subscription_event_type event_type,
struct aws_byte_cursor topic_filter,
int error_code) {

AWS_ZERO_STRUCT(*record);

record->event_type = event_type;
record->error_code = error_code;
aws_byte_buf_init_copy_from_cursor(&record->topic_filter, allocator, topic_filter);
}

Expand Down Expand Up @@ -71,9 +78,9 @@ static void s_rr_mqtt5_protocol_adapter_test_on_subscription_event(
void *user_data) {
struct aws_request_response_mqtt5_adapter_test_fixture *fixture = user_data;

struct request_response_protocol_adapter_subscription_event_record record = {.event_type = event->event_type};
struct request_response_protocol_adapter_subscription_event_record record;
s_request_response_protocol_adapter_subscription_event_record_init(
&record, fixture->allocator, event->topic_filter);
&record, fixture->allocator, event->event_type, event->topic_filter, event->error_code);

aws_mutex_lock(&fixture->lock);
aws_array_list_push_back(&fixture->subscription_events, &record);
Expand Down Expand Up @@ -117,11 +124,11 @@ static void s_rr_mqtt5_protocol_adapter_test_on_session_event(
aws_condition_variable_notify_all(&fixture->signal);
}

static void s_rr_mqtt5_protocol_adapter_test_on_publish_result(bool success, void *user_data) {
static void s_rr_mqtt5_protocol_adapter_test_on_publish_result(int error_code, void *user_data) {
struct aws_request_response_mqtt5_adapter_test_fixture *fixture = user_data;

aws_mutex_lock(&fixture->lock);
aws_array_list_push_back(&fixture->publish_results, &success);
aws_array_list_push_back(&fixture->publish_results, &error_code);
aws_mutex_unlock(&fixture->lock);
aws_condition_variable_notify_all(&fixture->signal);
}
Expand Down Expand Up @@ -162,7 +169,7 @@ static int s_aws_request_response_mqtt5_adapter_test_fixture_init(
allocator,
10,
sizeof(struct request_response_protocol_adapter_subscription_event_record));
aws_array_list_init_dynamic(&fixture->publish_results, allocator, 10, sizeof(bool));
aws_array_list_init_dynamic(&fixture->publish_results, allocator, 10, sizeof(int));

aws_mutex_init(&fixture->lock);
aws_condition_variable_init(&fixture->signal);
Expand Down Expand Up @@ -232,14 +239,22 @@ static bool s_do_subscription_events_contain(void *context) {
struct request_response_protocol_adapter_subscription_event_record record;
aws_array_list_get_at(&wait_context->fixture->subscription_events, &record, i);

if (record.event_type == wait_context->expected_event->event_type) {
struct aws_byte_cursor record_topic_filter = aws_byte_cursor_from_buf(&record.topic_filter);
struct aws_byte_cursor expected_topic_filter =
aws_byte_cursor_from_buf(&wait_context->expected_event->topic_filter);
if (aws_byte_cursor_eq(&record_topic_filter, &expected_topic_filter)) {
++found;
}
if (record.event_type != wait_context->expected_event->event_type) {
continue;
}

if (record.error_code != wait_context->expected_event->error_code) {
continue;
}

struct aws_byte_cursor record_topic_filter = aws_byte_cursor_from_buf(&record.topic_filter);
struct aws_byte_cursor expected_topic_filter =
aws_byte_cursor_from_buf(&wait_context->expected_event->topic_filter);
if (!aws_byte_cursor_eq(&record_topic_filter, &expected_topic_filter)) {
continue;
}

++found;
}

return found >= wait_context->expected_count;
Expand Down Expand Up @@ -277,9 +292,11 @@ static bool s_do_session_events_contain(void *context) {
struct aws_protocol_adapter_session_event record;
aws_array_list_get_at(&wait_context->fixture->session_events, &record, i);

if (record.joined_session == wait_context->expected_event->joined_session) {
++found;
if (record.joined_session != wait_context->expected_event->joined_session) {
continue;
}

++found;
}

return found >= wait_context->expected_count;
Expand Down Expand Up @@ -352,7 +369,7 @@ static void s_wait_for_incoming_publish_events_contains(
}

struct test_publish_result_wait_context {
bool expected_success;
int expected_error_code;
size_t expected_count;
struct aws_request_response_mqtt5_adapter_test_fixture *fixture;
};
Expand All @@ -364,10 +381,10 @@ static bool s_do_publish_results_contain(void *context) {

size_t num_events = aws_array_list_length(&wait_context->fixture->publish_results);
for (size_t i = 0; i < num_events; ++i) {
bool success = false;
aws_array_list_get_at(&wait_context->fixture->publish_results, &success, i);
int error_code = AWS_ERROR_SUCCESS;
aws_array_list_get_at(&wait_context->fixture->publish_results, &error_code, i);

if (success == wait_context->expected_success) {
if (error_code == wait_context->expected_error_code) {
++found;
}
}
Expand All @@ -377,11 +394,11 @@ static bool s_do_publish_results_contain(void *context) {

static void s_wait_for_publish_results_contains(
struct aws_request_response_mqtt5_adapter_test_fixture *fixture,
bool success,
int expected_error_code,
size_t expected_count) {

struct test_publish_result_wait_context context = {
.expected_success = success,
.expected_error_code = expected_error_code,
.expected_count = expected_count,
.fixture = fixture,
};
Expand Down Expand Up @@ -420,6 +437,19 @@ static int s_aws_mqtt5_server_send_failed_suback_on_subscribe(
return aws_mqtt5_mock_server_send_packet(connection, AWS_MQTT5_PT_SUBACK, &suback_view);
}

static int s_test_type_to_expected_error_code(enum protocol_adapter_operation_test_type test_type) {
switch (test_type) {
case PAOTT_FAILURE_TIMEOUT:
return AWS_ERROR_MQTT_TIMEOUT;
case PAOTT_FAILURE_REASON_CODE:
return AWS_ERROR_MQTT_PROTOCOL_ADAPTER_FAILING_REASON_CODE;
case PAOTT_FAILURE_ERROR_CODE:
return AWS_ERROR_MQTT5_OPERATION_FAILED_DUE_TO_OFFLINE_QUEUE_POLICY;
default:
return AWS_ERROR_SUCCESS;
}
}

static int s_do_request_response_mqtt5_protocol_adapter_subscribe_test(
struct aws_allocator *allocator,
enum protocol_adapter_operation_test_type test_type) {
Expand Down Expand Up @@ -457,12 +487,15 @@ static int s_do_request_response_mqtt5_protocol_adapter_subscribe_test(
aws_wait_for_connected_lifecycle_event(&fixture.mqtt5_fixture);
}

struct request_response_protocol_adapter_subscription_event_record expected_outcome = {
.event_type = (test_type == PAOTT_SUCCESS) ? AWS_PASET_SUBSCRIBE_SUCCESS : AWS_PASET_SUBSCRIBE_FAILURE,
};
int expected_error_code = s_test_type_to_expected_error_code(test_type);

aws_byte_buf_init_copy_from_cursor(
&expected_outcome.topic_filter, allocator, aws_byte_cursor_from_c_str("hello/world"));
struct request_response_protocol_adapter_subscription_event_record expected_outcome;
s_request_response_protocol_adapter_subscription_event_record_init(
&expected_outcome,
allocator,
AWS_PASET_SUBSCRIBE,
aws_byte_cursor_from_c_str("hello/world"),
expected_error_code);

struct aws_protocol_adapter_subscribe_options subscribe_options = {
.topic_filter = aws_byte_cursor_from_buf(&expected_outcome.topic_filter),
Expand Down Expand Up @@ -595,12 +628,15 @@ static int s_do_request_response_mqtt5_protocol_adapter_unsubscribe_test(
aws_wait_for_connected_lifecycle_event(&fixture.mqtt5_fixture);
}

struct request_response_protocol_adapter_subscription_event_record expected_outcome = {
.event_type = (test_type == PAOTT_SUCCESS) ? AWS_PASET_UNSUBSCRIBE_SUCCESS : AWS_PASET_UNSUBSCRIBE_FAILURE,
};
int expected_error_code = s_test_type_to_expected_error_code(test_type);

aws_byte_buf_init_copy_from_cursor(
&expected_outcome.topic_filter, allocator, aws_byte_cursor_from_c_str("hello/world"));
struct request_response_protocol_adapter_subscription_event_record expected_outcome;
s_request_response_protocol_adapter_subscription_event_record_init(
&expected_outcome,
allocator,
AWS_PASET_UNSUBSCRIBE,
aws_byte_cursor_from_c_str("hello/world"),
expected_error_code);

struct aws_protocol_adapter_unsubscribe_options unsubscribe_options = {
.topic_filter = aws_byte_cursor_from_buf(&expected_outcome.topic_filter),
Expand Down Expand Up @@ -743,7 +779,8 @@ static int s_do_request_response_mqtt5_protocol_adapter_publish_test(

aws_mqtt_protocol_adapter_publish(fixture.protocol_adapter, &publish_options);

s_wait_for_publish_results_contains(&fixture, test_type == PAOTT_SUCCESS, 1);
int expected_error_code = s_test_type_to_expected_error_code(test_type);
s_wait_for_publish_results_contains(&fixture, expected_error_code, 1);

s_aws_request_response_mqtt5_adapter_test_fixture_clean_up(&fixture);

Expand Down

0 comments on commit 426878f

Please sign in to comment.