Skip to content

Commit

Permalink
Temporary stuff to trigger PublishOut.ClientError on IoTCore
Browse files Browse the repository at this point in the history
  • Loading branch information
Bret Ambrose committed Mar 26, 2024
1 parent aabe34d commit 5b466ec
Showing 1 changed file with 138 additions and 7 deletions.
145 changes: 138 additions & 7 deletions bin/elastipubsub5/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@

#include <inttypes.h>

#include <aws/common/uuid.h>

#ifdef _MSC_VER
# pragma warning(disable : 4996) /* Disable warnings about fopen() being insecure */
# pragma warning(disable : 4204) /* Declared initializers */
Expand Down Expand Up @@ -55,6 +57,9 @@ struct app_ctx {

const char *log_filename;
enum aws_log_level log_level;

size_t publishes_received;
size_t publish_bytes_received;
};

static void s_usage(int exit_code) {
Expand Down Expand Up @@ -198,6 +203,8 @@ static void s_on_publish_complete_fn(
int error_code,
void *complete_ctx) {
(void)complete_ctx;
(void)packet_type;
(void)packet;

switch (error_code) {
case AWS_ERROR_MQTT5_OPERATION_FAILED_DUE_TO_OFFLINE_QUEUE_POLICY:
Expand All @@ -207,12 +214,12 @@ static void s_on_publish_complete_fn(
printf("PUBLISH FAILED due to MQTT Timeout");
break;
case AWS_ERROR_SUCCESS:
printf("PUBLISH SUCCESS\n");
//printf("PUBLISH SUCCESS\n");
break;
default:
break;
}

/*
if (packet_type == AWS_MQTT5_PT_PUBACK) {
const struct aws_mqtt5_packet_puback_view *puback = packet;
printf("PUBACK received!\n");
Expand All @@ -222,17 +229,26 @@ static void s_on_publish_complete_fn(
}
fflush(stdout);
*/
}

static void s_on_publish_received(const struct aws_mqtt5_packet_publish_view *publish, void *user_data) {
(void)publish;
(void)user_data;

struct app_ctx *context = user_data;
++context->publishes_received;
context->publish_bytes_received += publish->payload.len;

if (context->publishes_received % 100 == 0) {
printf("%zu publishes received\n", context->publishes_received);
}
/*
printf("PUBLISH received!\n");
printf(
"Publish received to topic:'" PRInSTR "' payload '" PRInSTR "'\n",
"Publish received to topic:'" PRInSTR "' payload: %zu bytes\n",
AWS_BYTE_CURSOR_PRI(publish->topic),
AWS_BYTE_CURSOR_PRI(publish->payload));
publish->payload.len);*/
}

static void s_lifecycle_event_callback(const struct aws_mqtt5_client_lifecycle_event *event) {
Expand Down Expand Up @@ -487,6 +503,85 @@ static void s_handle_stop(
}
}

static int s_int_from_args(struct aws_array_list *arguments, size_t arg_index) {
struct aws_byte_cursor value_cursor;
AWS_ZERO_STRUCT(value_cursor);
aws_array_list_get_at(arguments, &value_cursor, arg_index);
struct aws_string *arg_string = aws_string_new_from_cursor(arguments->alloc, &value_cursor);
return atoi((const char *)arg_string->bytes);
}

static void s_handle_spam(
struct aws_mqtt5_client *client,
struct aws_allocator *allocator,
struct aws_array_list *arguments) {

size_t argument_count = aws_array_list_length(arguments);
if (argument_count != 5) {
printf("invalid spam call:\n");
printf(" spam <topic> <payload-size> <count> <delay-in-ms>\n");
return;
}

struct aws_byte_cursor topic_cursor;
AWS_ZERO_STRUCT(topic_cursor);
aws_array_list_get_at(arguments, &topic_cursor, 1);

int payload_size = s_int_from_args(arguments, 2);
if (payload_size < 0) {
printf("Invalid payload size parameter\n");
return;
}

int publish_count = s_int_from_args(arguments, 3);
if (publish_count <= 0) {
printf("Invalid publish count parameter\n");
return;
}

int publish_delay_ms = s_int_from_args(arguments, 4);
if (publish_delay_ms < 0) {
printf("Invalid publish delay parameter\n");
return;
}

uint64_t sleep_nanos = aws_timestamp_convert((uint64_t) publish_delay_ms, AWS_TIMESTAMP_MILLIS, AWS_TIMESTAMP_NANOS, NULL);

struct aws_mqtt5_publish_completion_options publish_completion_options = {
.completion_callback = &s_on_publish_complete_fn,
.completion_user_data = NULL,
};

struct aws_byte_buf payload;
aws_byte_buf_init(&payload, allocator, (size_t)payload_size);

struct aws_byte_cursor payload_cursor = aws_byte_cursor_from_buf(&payload);
payload_cursor.len = payload.capacity;

for (size_t i = 0; i < (size_t)publish_count; ++i) {

struct aws_mqtt5_packet_publish_view packet_publish_view = {
.qos = AWS_MQTT5_QOS_AT_LEAST_ONCE,
.topic = topic_cursor,
.retain = false,
.duplicate = false,
.payload = payload_cursor,
};

aws_mqtt5_client_publish(client, &packet_publish_view, &publish_completion_options);

if (sleep_nanos > 0) {
aws_thread_current_sleep(sleep_nanos);
}

if ((i + 1) % 100 == 0) {
printf("Publish count: %zu\n", i + 1);
}
}

aws_byte_buf_clean_up(&payload);
}

static bool s_handle_input(struct aws_mqtt5_client *client, struct aws_allocator *allocator, const char *input_line) {

struct aws_byte_cursor quit_cursor = aws_byte_cursor_from_c_str("quit");
Expand All @@ -495,6 +590,7 @@ static bool s_handle_input(struct aws_mqtt5_client *client, struct aws_allocator
struct aws_byte_cursor subscribe_cursor = aws_byte_cursor_from_c_str("subscribe");
struct aws_byte_cursor unsubscribe_cursor = aws_byte_cursor_from_c_str("unsubscribe");
struct aws_byte_cursor publish_cursor = aws_byte_cursor_from_c_str("publish");
struct aws_byte_cursor spam_cursor = aws_byte_cursor_from_c_str("spam");

struct aws_array_list words;
aws_array_list_init_dynamic(&words, allocator, 10, sizeof(struct aws_byte_cursor));
Expand Down Expand Up @@ -528,6 +624,8 @@ static bool s_handle_input(struct aws_mqtt5_client *client, struct aws_allocator
s_handle_unsubscribe(client, &words);
} else if (aws_byte_cursor_eq_ignore_case(&command_cursor, &publish_cursor)) {
s_handle_publish(client, allocator, &words, &line_cursor);
} else if (aws_byte_cursor_eq_ignore_case(&command_cursor, &spam_cursor)) {
s_handle_spam(client, allocator, &words);
} else {
printf("Unknown command: " PRInSTR "\n", AWS_BYTE_CURSOR_PRI(command_cursor));
}
Expand All @@ -549,7 +647,6 @@ static void s_aws_mqtt5_transform_websocket_handshake_fn(
(*complete_fn)(request, AWS_ERROR_SUCCESS, complete_ctx);
}

AWS_STATIC_STRING_FROM_LITERAL(s_client_id, "HelloWorld");

int main(int argc, char **argv) {
struct aws_allocator *allocator = aws_mem_tracer_new(aws_default_allocator(), NULL, AWS_MEMTRACE_STACKS, 15);
Expand Down Expand Up @@ -598,6 +695,7 @@ int main(int argc, char **argv) {
struct aws_tls_connection_options tls_connection_options;
AWS_ZERO_STRUCT(tls_connection_options);

use_tls = true;
if (app_ctx.cert && app_ctx.key) {
if (aws_tls_ctx_options_init_client_mtls_from_path(&tls_ctx_options, allocator, app_ctx.cert, app_ctx.key)) {
fprintf(
Expand Down Expand Up @@ -634,8 +732,31 @@ int main(int argc, char **argv) {
fprintf(stderr, "Failed to set servername with error %s.", aws_error_debug_str(aws_last_error()));
exit(1);
}
} else {
aws_tls_ctx_options_init_default_client(&tls_ctx_options, allocator);

if (app_ctx.cacert) {
if (aws_tls_ctx_options_override_default_trust_store_from_path(&tls_ctx_options, NULL, app_ctx.cacert)) {
fprintf(
stderr, "Failed to load %s with error %s", app_ctx.cacert, aws_error_debug_str(aws_last_error()));
exit(1);
}
}

aws_tls_ctx_options_set_verify_peer(&tls_ctx_options, false);

tls_ctx = aws_tls_client_ctx_new(allocator, &tls_ctx_options);

if (!tls_ctx) {
fprintf(stderr, "Failed to initialize TLS context with error %s.", aws_error_debug_str(aws_last_error()));
exit(1);
}

use_tls = true;
aws_tls_connection_options_init_from_ctx(&tls_connection_options, tls_ctx);
if (aws_tls_connection_options_set_server_name(&tls_connection_options, allocator, &app_ctx.uri.host_name)) {
fprintf(stderr, "Failed to set servername with error %s.", aws_error_debug_str(aws_last_error()));
exit(1);
}
}

struct aws_event_loop_group *el_group = aws_event_loop_group_new_default(allocator, 2, NULL);
Expand Down Expand Up @@ -665,9 +786,18 @@ int main(int argc, char **argv) {
uint16_t receive_maximum = 9;
uint32_t maximum_packet_size = 128 * 1024;

char client_id[128];
struct aws_byte_buf client_id_buf = aws_byte_buf_from_empty_array(client_id, AWS_ARRAY_SIZE(client_id));

struct aws_uuid uuid;
aws_uuid_init(&uuid);
aws_uuid_to_str(&uuid, &client_id_buf);

struct aws_byte_cursor client_id_cur = aws_byte_cursor_from_buf(&client_id_buf);

struct aws_mqtt5_packet_connect_view connect_options = {
.keep_alive_interval_seconds = 30,
.client_id = aws_byte_cursor_from_string(s_client_id),
.client_id = client_id_cur,
.clean_start = true,
.maximum_packet_size_bytes = &maximum_packet_size,
.receive_maximum = &receive_maximum,
Expand Down Expand Up @@ -697,6 +827,7 @@ int main(int argc, char **argv) {
.websocket_handshake_transform = websocket_handshake_transform,
.websocket_handshake_transform_user_data = websocket_handshake_transform_user_data,
.publish_received_handler = s_on_publish_received,
.publish_received_handler_user_data = &app_ctx,
};

struct aws_mqtt5_client *client = aws_mqtt5_client_new(allocator, &client_options);
Expand Down

0 comments on commit 5b466ec

Please sign in to comment.