diff --git a/bin/elastipubsub5/main.c b/bin/elastipubsub5/main.c index 2c84e796..2336ce4e 100644 --- a/bin/elastipubsub5/main.c +++ b/bin/elastipubsub5/main.c @@ -27,6 +27,8 @@ #include +#include + #ifdef _MSC_VER # pragma warning(disable : 4996) /* Disable warnings about fopen() being insecure */ # pragma warning(disable : 4204) /* Declared initializers */ @@ -55,6 +57,9 @@ struct app_ctx { const char *log_filename; enum aws_log_level log_level; + + size_t publishes_received; + size_t publish_bytes_received; }; static void s_usage(int exit_code) { @@ -198,6 +203,8 @@ static void s_on_publish_complete_fn( int error_code, void *complete_ctx) { (void)complete_ctx; + (void)packet_type; + (void)packet; switch (error_code) { case AWS_ERROR_MQTT5_OPERATION_FAILED_DUE_TO_OFFLINE_QUEUE_POLICY: @@ -207,12 +214,12 @@ static void s_on_publish_complete_fn( printf("PUBLISH FAILED due to MQTT Timeout"); break; case AWS_ERROR_SUCCESS: - printf("PUBLISH SUCCESS\n"); + //printf("PUBLISH SUCCESS\n"); break; default: break; } - +/* if (packet_type == AWS_MQTT5_PT_PUBACK) { const struct aws_mqtt5_packet_puback_view *puback = packet; printf("PUBACK received!\n"); @@ -222,17 +229,26 @@ static void s_on_publish_complete_fn( } fflush(stdout); + */ } static void s_on_publish_received(const struct aws_mqtt5_packet_publish_view *publish, void *user_data) { (void)publish; (void)user_data; + struct app_ctx *context = user_data; + ++context->publishes_received; + context->publish_bytes_received += publish->payload.len; + + if (context->publishes_received % 100 == 0) { + printf("%zu publishes received\n", context->publishes_received); + } + /* printf("PUBLISH received!\n"); printf( - "Publish received to topic:'" PRInSTR "' payload '" PRInSTR "'\n", + "Publish received to topic:'" PRInSTR "' payload: %zu bytes\n", AWS_BYTE_CURSOR_PRI(publish->topic), - AWS_BYTE_CURSOR_PRI(publish->payload)); + publish->payload.len);*/ } static void s_lifecycle_event_callback(const struct aws_mqtt5_client_lifecycle_event *event) { @@ -487,6 +503,85 @@ static void s_handle_stop( } } +static int s_int_from_args(struct aws_array_list *arguments, size_t arg_index) { + struct aws_byte_cursor value_cursor; + AWS_ZERO_STRUCT(value_cursor); + aws_array_list_get_at(arguments, &value_cursor, arg_index); + struct aws_string *arg_string = aws_string_new_from_cursor(arguments->alloc, &value_cursor); + return atoi((const char *)arg_string->bytes); +} + +static void s_handle_spam( + struct aws_mqtt5_client *client, + struct aws_allocator *allocator, + struct aws_array_list *arguments) { + + size_t argument_count = aws_array_list_length(arguments); + if (argument_count != 5) { + printf("invalid spam call:\n"); + printf(" spam \n"); + return; + } + + struct aws_byte_cursor topic_cursor; + AWS_ZERO_STRUCT(topic_cursor); + aws_array_list_get_at(arguments, &topic_cursor, 1); + + int payload_size = s_int_from_args(arguments, 2); + if (payload_size < 0) { + printf("Invalid payload size parameter\n"); + return; + } + + int publish_count = s_int_from_args(arguments, 3); + if (publish_count <= 0) { + printf("Invalid publish count parameter\n"); + return; + } + + int publish_delay_ms = s_int_from_args(arguments, 4); + if (publish_delay_ms < 0) { + printf("Invalid publish delay parameter\n"); + return; + } + + uint64_t sleep_nanos = aws_timestamp_convert((uint64_t) publish_delay_ms, AWS_TIMESTAMP_MILLIS, AWS_TIMESTAMP_NANOS, NULL); + + struct aws_mqtt5_publish_completion_options publish_completion_options = { + .completion_callback = &s_on_publish_complete_fn, + .completion_user_data = NULL, + }; + + struct aws_byte_buf payload; + aws_byte_buf_init(&payload, allocator, (size_t)payload_size); + + struct aws_byte_cursor payload_cursor = aws_byte_cursor_from_buf(&payload); + payload_cursor.len = payload.capacity; + + for (size_t i = 0; i < (size_t)publish_count; ++i) { + + struct aws_mqtt5_packet_publish_view packet_publish_view = { + .qos = AWS_MQTT5_QOS_AT_LEAST_ONCE, + .topic = topic_cursor, + .retain = false, + .duplicate = false, + .payload = payload_cursor, + }; + + aws_mqtt5_client_publish(client, &packet_publish_view, &publish_completion_options); + + if (sleep_nanos > 0) { + aws_thread_current_sleep(sleep_nanos); + } + + if ((i + 1) % 100 == 0) { + printf("Publish count: %zu\n", i + 1); + } + } + + aws_byte_buf_clean_up(&payload); +} + static bool s_handle_input(struct aws_mqtt5_client *client, struct aws_allocator *allocator, const char *input_line) { struct aws_byte_cursor quit_cursor = aws_byte_cursor_from_c_str("quit"); @@ -495,6 +590,7 @@ static bool s_handle_input(struct aws_mqtt5_client *client, struct aws_allocator struct aws_byte_cursor subscribe_cursor = aws_byte_cursor_from_c_str("subscribe"); struct aws_byte_cursor unsubscribe_cursor = aws_byte_cursor_from_c_str("unsubscribe"); struct aws_byte_cursor publish_cursor = aws_byte_cursor_from_c_str("publish"); + struct aws_byte_cursor spam_cursor = aws_byte_cursor_from_c_str("spam"); struct aws_array_list words; aws_array_list_init_dynamic(&words, allocator, 10, sizeof(struct aws_byte_cursor)); @@ -528,6 +624,8 @@ static bool s_handle_input(struct aws_mqtt5_client *client, struct aws_allocator s_handle_unsubscribe(client, &words); } else if (aws_byte_cursor_eq_ignore_case(&command_cursor, &publish_cursor)) { s_handle_publish(client, allocator, &words, &line_cursor); + } else if (aws_byte_cursor_eq_ignore_case(&command_cursor, &spam_cursor)) { + s_handle_spam(client, allocator, &words); } else { printf("Unknown command: " PRInSTR "\n", AWS_BYTE_CURSOR_PRI(command_cursor)); } @@ -549,7 +647,6 @@ static void s_aws_mqtt5_transform_websocket_handshake_fn( (*complete_fn)(request, AWS_ERROR_SUCCESS, complete_ctx); } -AWS_STATIC_STRING_FROM_LITERAL(s_client_id, "HelloWorld"); int main(int argc, char **argv) { struct aws_allocator *allocator = aws_mem_tracer_new(aws_default_allocator(), NULL, AWS_MEMTRACE_STACKS, 15); @@ -598,6 +695,7 @@ int main(int argc, char **argv) { struct aws_tls_connection_options tls_connection_options; AWS_ZERO_STRUCT(tls_connection_options); + use_tls = true; if (app_ctx.cert && app_ctx.key) { if (aws_tls_ctx_options_init_client_mtls_from_path(&tls_ctx_options, allocator, app_ctx.cert, app_ctx.key)) { fprintf( @@ -634,8 +732,31 @@ int main(int argc, char **argv) { fprintf(stderr, "Failed to set servername with error %s.", aws_error_debug_str(aws_last_error())); exit(1); } + } else { + aws_tls_ctx_options_init_default_client(&tls_ctx_options, allocator); + + if (app_ctx.cacert) { + if (aws_tls_ctx_options_override_default_trust_store_from_path(&tls_ctx_options, NULL, app_ctx.cacert)) { + fprintf( + stderr, "Failed to load %s with error %s", app_ctx.cacert, aws_error_debug_str(aws_last_error())); + exit(1); + } + } + + aws_tls_ctx_options_set_verify_peer(&tls_ctx_options, false); + + tls_ctx = aws_tls_client_ctx_new(allocator, &tls_ctx_options); + + if (!tls_ctx) { + fprintf(stderr, "Failed to initialize TLS context with error %s.", aws_error_debug_str(aws_last_error())); + exit(1); + } - use_tls = true; + aws_tls_connection_options_init_from_ctx(&tls_connection_options, tls_ctx); + if (aws_tls_connection_options_set_server_name(&tls_connection_options, allocator, &app_ctx.uri.host_name)) { + fprintf(stderr, "Failed to set servername with error %s.", aws_error_debug_str(aws_last_error())); + exit(1); + } } struct aws_event_loop_group *el_group = aws_event_loop_group_new_default(allocator, 2, NULL); @@ -665,9 +786,18 @@ int main(int argc, char **argv) { uint16_t receive_maximum = 9; uint32_t maximum_packet_size = 128 * 1024; + char client_id[128]; + struct aws_byte_buf client_id_buf = aws_byte_buf_from_empty_array(client_id, AWS_ARRAY_SIZE(client_id)); + + struct aws_uuid uuid; + aws_uuid_init(&uuid); + aws_uuid_to_str(&uuid, &client_id_buf); + + struct aws_byte_cursor client_id_cur = aws_byte_cursor_from_buf(&client_id_buf); + struct aws_mqtt5_packet_connect_view connect_options = { .keep_alive_interval_seconds = 30, - .client_id = aws_byte_cursor_from_string(s_client_id), + .client_id = client_id_cur, .clean_start = true, .maximum_packet_size_bytes = &maximum_packet_size, .receive_maximum = &receive_maximum, @@ -697,6 +827,7 @@ int main(int argc, char **argv) { .websocket_handshake_transform = websocket_handshake_transform, .websocket_handshake_transform_user_data = websocket_handshake_transform_user_data, .publish_received_handler = s_on_publish_received, + .publish_received_handler_user_data = &app_ctx, }; struct aws_mqtt5_client *client = aws_mqtt5_client_new(allocator, &client_options);