Skip to content

Commit

Permalink
Merge branch 'ProtocolAdapterSkeleton' into 311Listener
Browse files Browse the repository at this point in the history
  • Loading branch information
bretambrose committed Jan 24, 2024
2 parents 397b463 + b1fe1f9 commit 00a0e68
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 20 deletions.
4 changes: 2 additions & 2 deletions include/aws/mqtt/private/client_impl_shared.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ struct aws_mqtt_client_connection;
enum aws_mqtt311_impl_type {

/* 311 connection impl can be cast to `struct aws_mqtt_client_connection_311_impl` */
AWS_MQTT311_IT_311_CONNECTION_IMPL,
AWS_MQTT311_IT_311_CONNECTION,

/* 311 connection impl can be cast to `struct aws_mqtt_client_connection_5_impl`*/
AWS_MQTT311_IT_5_ADAPTER_IMPL,
AWS_MQTT311_IT_5_ADAPTER,
};

struct aws_mqtt_client_connection_vtable {
Expand Down
6 changes: 3 additions & 3 deletions include/aws/mqtt/private/request-response/protocol_adapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ struct aws_mqtt5_client;
* Valid protocol clients include the CRT MQTT5 client, the CRT MQTT311 client, and an eventstream RPC connection
* that belongs to a Greengrass IPC client. Each of these protocol clients has a different (or even implicit)
* contract for carrying out pub-sub operations. The protocol adapter abstracts these details with a simple,
* minimal interface based on the requirements identigied in the request-response design documents.
* minimal interface based on the requirements identified in the request-response design documents.
*/

/*
Expand Down Expand Up @@ -132,7 +132,7 @@ struct aws_mqtt_protocol_adapter_options {

struct aws_mqtt_protocol_adapter_vtable {

void (*aws_mqtt_protocol_adapter_delete_fn)(void *);
void (*aws_mqtt_protocol_adapter_destroy_fn)(void *);

int (*aws_mqtt_protocol_adapter_subscribe_fn)(void *, struct aws_protocol_adapter_subscribe_options *);

Expand Down Expand Up @@ -168,7 +168,7 @@ AWS_MQTT_API struct aws_mqtt_protocol_adapter *aws_mqtt_protocol_adapter_new_fro
* Destroys a request-response protocol adapter. Destruction is an asynchronous process and the caller must
* wait for the termination callback to be invoked before assuming that no further callbacks will be invoked.
*/
AWS_MQTT_API void aws_mqtt_protocol_adapter_delete(struct aws_mqtt_protocol_adapter *adapter);
AWS_MQTT_API void aws_mqtt_protocol_adapter_destroy(struct aws_mqtt_protocol_adapter *adapter);

/*
* Asks the adapted protocol client to perform an MQTT subscribe operation
Expand Down
3 changes: 3 additions & 0 deletions include/aws/mqtt/v5/mqtt5_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ struct aws_mqtt5_publish_completion_options {
aws_mqtt5_publish_completion_fn *completion_callback;
void *completion_user_data;

/** Overrides the client's ack timeout with this value, for this operation only */
uint32_t ack_timeout_seconds_override;
};

Expand All @@ -351,6 +352,7 @@ struct aws_mqtt5_subscribe_completion_options {
aws_mqtt5_subscribe_completion_fn *completion_callback;
void *completion_user_data;

/** Overrides the client's ack timeout with this value, for this operation only */
uint32_t ack_timeout_seconds_override;
};

Expand All @@ -361,6 +363,7 @@ struct aws_mqtt5_unsubscribe_completion_options {
aws_mqtt5_unsubscribe_completion_fn *completion_callback;
void *completion_user_data;

/** Overrides the client's ack timeout with this value, for this operation only */
uint32_t ack_timeout_seconds_override;
};

Expand Down
2 changes: 1 addition & 1 deletion source/client.c
Original file line number Diff line number Diff line change
Expand Up @@ -3226,7 +3226,7 @@ static void s_aws_mqtt_client_connection_311_release(void *impl) {
enum aws_mqtt311_impl_type s_aws_mqtt_client_connection_3_get_impl(const void *impl) {
(void)impl;

return AWS_MQTT311_IT_311_CONNECTION_IMPL;
return AWS_MQTT311_IT_311_CONNECTION;
}

static struct aws_mqtt_client_connection_vtable s_aws_mqtt_client_connection_311_vtable = {
Expand Down
30 changes: 18 additions & 12 deletions source/request-response/protocol_adapter.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ struct aws_mqtt_protocol_adapter_5_impl {
struct aws_mqtt5_listener *listener;
};

static void s_aws_mqtt_protocol_adapter_5_delete(void *impl) {
static void s_aws_mqtt_protocol_adapter_5_destroy(void *impl) {
struct aws_mqtt_protocol_adapter_5_impl *adapter = impl;

// all the real cleanup is done in the listener termination callback
Expand Down Expand Up @@ -87,7 +87,7 @@ static struct aws_mqtt_protocol_adapter_5_subscription_op_data *s_aws_mqtt_proto
return subscribe_data;
}

static void s_aws_mqtt_protocol_adapter_5_subscription_op_data_delete(
static void s_aws_mqtt_protocol_adapter_5_subscription_op_data_destroy(
struct aws_mqtt_protocol_adapter_5_subscription_op_data *subscribe_data) {
aws_weak_ref_release(subscribe_data->callback_ref);
aws_byte_buf_clean_up(&subscribe_data->topic_filter);
Expand Down Expand Up @@ -120,7 +120,7 @@ static void s_protocol_adapter_5_subscribe_completion(

done:

s_aws_mqtt_protocol_adapter_5_subscription_op_data_delete(subscribe_data);
s_aws_mqtt_protocol_adapter_5_subscription_op_data_destroy(subscribe_data);
}

int s_aws_mqtt_protocol_adapter_5_subscribe(void *impl, struct aws_protocol_adapter_subscribe_options *options) {
Expand Down Expand Up @@ -154,7 +154,7 @@ int s_aws_mqtt_protocol_adapter_5_subscribe(void *impl, struct aws_protocol_adap

error:

s_aws_mqtt_protocol_adapter_5_subscription_op_data_delete(subscribe_data);
s_aws_mqtt_protocol_adapter_5_subscription_op_data_destroy(subscribe_data);

return AWS_OP_ERR;
}
Expand Down Expand Up @@ -184,7 +184,7 @@ static void s_protocol_adapter_5_unsubscribe_completion(

done:

s_aws_mqtt_protocol_adapter_5_subscription_op_data_delete(unsubscribe_data);
s_aws_mqtt_protocol_adapter_5_subscription_op_data_destroy(unsubscribe_data);
}

int s_aws_mqtt_protocol_adapter_5_unsubscribe(void *impl, struct aws_protocol_adapter_unsubscribe_options *options) {
Expand Down Expand Up @@ -213,7 +213,7 @@ int s_aws_mqtt_protocol_adapter_5_unsubscribe(void *impl, struct aws_protocol_ad

error:

s_aws_mqtt_protocol_adapter_5_subscription_op_data_delete(unsubscribe_data);
s_aws_mqtt_protocol_adapter_5_subscription_op_data_destroy(unsubscribe_data);

return AWS_OP_ERR;
}
Expand Down Expand Up @@ -243,7 +243,7 @@ static struct aws_mqtt_protocol_adapter_5_publish_op_data *s_aws_mqtt_protocol_a
return publish_data;
}

static void s_aws_mqtt_protocol_adapter_5_publish_op_data_delete(
static void s_aws_mqtt_protocol_adapter_5_publish_op_data_destroy(
struct aws_mqtt_protocol_adapter_5_publish_op_data *publish_data) {
aws_weak_ref_release(publish_data->callback_ref);

Expand Down Expand Up @@ -274,7 +274,7 @@ static void s_protocol_adapter_5_publish_completion(

done:

s_aws_mqtt_protocol_adapter_5_publish_op_data_delete(publish_data);
s_aws_mqtt_protocol_adapter_5_publish_op_data_destroy(publish_data);
}

int s_aws_mqtt_protocol_adapter_5_publish(void *impl, struct aws_protocol_adapter_publish_options *options) {
Expand All @@ -299,7 +299,7 @@ int s_aws_mqtt_protocol_adapter_5_publish(void *impl, struct aws_protocol_adapte

error:

s_aws_mqtt_protocol_adapter_5_publish_op_data_delete(publish_data);
s_aws_mqtt_protocol_adapter_5_publish_op_data_destroy(publish_data);

return AWS_OP_ERR;
}
Expand Down Expand Up @@ -360,7 +360,7 @@ static void s_protocol_adapter_mqtt5_listener_termination_callback(void *user_da
}

static struct aws_mqtt_protocol_adapter_vtable s_protocol_adapter_mqtt5_vtable = {
.aws_mqtt_protocol_adapter_delete_fn = s_aws_mqtt_protocol_adapter_5_delete,
.aws_mqtt_protocol_adapter_destroy_fn = s_aws_mqtt_protocol_adapter_5_destroy,
.aws_mqtt_protocol_adapter_subscribe_fn = s_aws_mqtt_protocol_adapter_5_subscribe,
.aws_mqtt_protocol_adapter_unsubscribe_fn = s_aws_mqtt_protocol_adapter_5_unsubscribe,
.aws_mqtt_protocol_adapter_publish_fn = s_aws_mqtt_protocol_adapter_5_publish,
Expand All @@ -370,6 +370,12 @@ struct aws_mqtt_protocol_adapter *aws_mqtt_protocol_adapter_new_from_5(
struct aws_allocator *allocator,
struct aws_mqtt_protocol_adapter_options *options,
struct aws_mqtt5_client *client) {

if (options == NULL || client == NULL) {
aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
return NULL;
}

struct aws_mqtt_protocol_adapter_5_impl *adapter =
aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt_protocol_adapter_5_impl));

Expand Down Expand Up @@ -399,8 +405,8 @@ struct aws_mqtt_protocol_adapter *aws_mqtt_protocol_adapter_new_from_5(
return &adapter->base;
}

void aws_mqtt_protocol_adapter_delete(struct aws_mqtt_protocol_adapter *adapter) {
(*adapter->vtable->aws_mqtt_protocol_adapter_delete_fn)(adapter->impl);
void aws_mqtt_protocol_adapter_destroy(struct aws_mqtt_protocol_adapter *adapter) {
(*adapter->vtable->aws_mqtt_protocol_adapter_destroy_fn)(adapter->impl);
}

int aws_mqtt_protocol_adapter_subscribe(
Expand Down
2 changes: 1 addition & 1 deletion source/v5/mqtt5_to_mqtt3_adapter.c
Original file line number Diff line number Diff line change
Expand Up @@ -2857,7 +2857,7 @@ static uint16_t s_aws_mqtt_5_resubscribe_existing_topics(
enum aws_mqtt311_impl_type s_aws_mqtt_client_connection_5_get_impl(const void *impl) {
(void)impl;

return AWS_MQTT311_IT_5_ADAPTER_IMPL;
return AWS_MQTT311_IT_5_ADAPTER;
}

static struct aws_mqtt_client_connection_vtable s_aws_mqtt_client_connection_5_vtable = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ static bool s_is_adapter_terminated(void *context) {
static void s_aws_request_response_mqtt5_adapter_test_fixture_destroy_adapters(
struct aws_request_response_mqtt5_adapter_test_fixture *fixture) {
if (fixture->protocol_adapter != NULL) {
aws_mqtt_protocol_adapter_delete(fixture->protocol_adapter);
aws_mqtt_protocol_adapter_destroy(fixture->protocol_adapter);

aws_mutex_lock(&fixture->lock);
aws_condition_variable_wait_pred(&fixture->signal, &fixture->lock, s_is_adapter_terminated, fixture);
Expand Down

0 comments on commit 00a0e68

Please sign in to comment.