diff --git a/CMakeLists.txt b/CMakeLists.txt index 56a98381..c4c33241 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -112,5 +112,7 @@ if (BUILD_TESTING) add_subdirectory(bin/elastipubsub5) add_subdirectory(bin/elastishadow) add_subdirectory(bin/mqtt5canary) + add_subdirectory(bin/mqtt3_client_app) + add_subdirectory(bin/mqtt5_client_app) endif() endif () diff --git a/WORKSPACE b/WORKSPACE new file mode 100644 index 00000000..e69de29b diff --git a/bin/elastipubsub/main.c b/bin/elastipubsub/main.c index 1745fe9a..96d3ad1c 100644 --- a/bin/elastipubsub/main.c +++ b/bin/elastipubsub/main.c @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -20,11 +21,16 @@ #include #include #include +#include +#include #include #include #include +#include +#include +#include #ifdef _MSC_VER # pragma warning(disable : 4996) /* Disable warnings about fopen() being insecure */ @@ -32,6 +38,120 @@ # pragma warning(disable : 4221) /* Local var in declared initializer */ #endif +struct socks5_proxy_settings { + char *host; + char *username; + char *password; + uint16_t port; + bool resolve_host_with_proxy; +}; + +static void s_socks5_proxy_settings_clean_up( + struct socks5_proxy_settings *settings, + struct aws_allocator *allocator) { + if (!settings) { + return; + } + if (settings->host) { + aws_mem_release(allocator, settings->host); + } + if (settings->username) { + aws_mem_release(allocator, settings->username); + } + if (settings->password) { + aws_mem_release(allocator, settings->password); + } + AWS_ZERO_STRUCT(*settings); +} + +static int s_socks5_proxy_settings_init_from_uri( + struct socks5_proxy_settings *settings, + struct aws_allocator *allocator, + const char *proxy_uri) { + + if (!settings || !allocator || !proxy_uri) { + return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT); + } + + s_socks5_proxy_settings_clean_up(settings, allocator); + + struct aws_byte_cursor uri_cursor = aws_byte_cursor_from_c_str(proxy_uri); + struct aws_uri uri; + AWS_ZERO_STRUCT(uri); + + if (aws_uri_init_parse(&uri, allocator, &uri_cursor)) { + fprintf(stderr, "Failed to parse proxy URI \"%s\": %s\n", proxy_uri, aws_error_debug_str(aws_last_error())); + goto on_error; + } + + const struct aws_byte_cursor *scheme = aws_uri_scheme(&uri); + if (!scheme || !scheme->len) { + fprintf(stderr, "Proxy URI \"%s\" must include scheme socks5h://\n", proxy_uri); + goto on_error; + } + + if (aws_byte_cursor_eq_c_str_ignore_case(scheme, "socks5h")) { + settings->resolve_host_with_proxy = true; + } else if (aws_byte_cursor_eq_c_str_ignore_case(scheme, "socks5")) { + settings->resolve_host_with_proxy = false; + } else { + fprintf(stderr, "Unsupported proxy scheme in \"%s\". Expected socks5h://\n", proxy_uri); + goto on_error; + } + + const struct aws_byte_cursor *host = aws_uri_host_name(&uri); + if (!host || host->len == 0) { + fprintf(stderr, "Proxy URI \"%s\" must include a host\n", proxy_uri); + goto on_error; + } + + settings->host = aws_mem_calloc(allocator, host->len + 1, sizeof(char)); + if (!settings->host) { + fprintf(stderr, "Failed to allocate memory for proxy host\n"); + goto on_error; + } + memcpy(settings->host, host->ptr, host->len); + settings->host[host->len] = '\0'; + + uint32_t parsed_port = aws_uri_port(&uri); + if (parsed_port == 0) { + parsed_port = 1080; + } + if (parsed_port > UINT16_MAX) { + fprintf(stderr, "Proxy port %" PRIu32 " exceeds uint16_t range\n", parsed_port); + goto on_error; + } + settings->port = (uint16_t)parsed_port; + + if (uri.user.len > 0) { + settings->username = aws_mem_calloc(allocator, uri.user.len + 1, sizeof(char)); + if (!settings->username) { + fprintf(stderr, "Failed to allocate memory for proxy username\n"); + goto on_error; + } + memcpy(settings->username, uri.user.ptr, uri.user.len); + settings->username[uri.user.len] = '\0'; + } + + if (uri.password.len > 0) { + settings->password = aws_mem_calloc(allocator, uri.password.len + 1, sizeof(char)); + if (!settings->password) { + fprintf(stderr, "Failed to allocate memory for proxy password\n"); + goto on_error; + } + memcpy(settings->password, uri.password.ptr, uri.password.len); + settings->password[uri.password.len] = '\0'; + } + + aws_uri_clean_up(&uri); + return AWS_OP_SUCCESS; + +on_error: + aws_uri_clean_up(&uri); + s_socks5_proxy_settings_clean_up(settings, allocator); + return AWS_OP_ERR; +} + struct app_ctx { struct aws_allocator *allocator; struct aws_mutex lock; @@ -53,6 +173,10 @@ struct app_ctx { int received_messages; struct aws_tls_connection_options tls_connection_options; + struct socks5_proxy_settings proxy; + bool use_proxy; + struct aws_socks5_proxy_options socks5_options; + bool socks5_options_initialized; struct aws_linked_list pending_connection_list; struct aws_linked_list established_connection_list; @@ -71,6 +195,7 @@ static void s_usage(int exit_code) { fprintf(stderr, " --key FILE: Path to a PEM encoded private key that matches cert.\n"); fprintf(stderr, " --cops INT: target control (connect, subscribe) operations per second\n"); fprintf(stderr, " --connect-timeout INT: time in milliseconds to wait for a connection.\n"); + fprintf(stderr, " --proxy URL: SOCKS5 proxy URI (socks5h://... for proxy DNS, socks5://... for local DNS)\n"); fprintf(stderr, " -i, --iterations INT: number of independent iterations to run the test for\n"); fprintf(stderr, " -k, --connections INT: number of independent connections to make.\n"); fprintf(stderr, " -l, --log FILE: dumps logs to FILE instead of stderr.\n"); @@ -94,6 +219,7 @@ static struct aws_cli_option s_long_options[] = { {"messages", AWS_CLI_OPTIONS_REQUIRED_ARGUMENT, NULL, 'n'}, {"pops", AWS_CLI_OPTIONS_REQUIRED_ARGUMENT, NULL, 'p'}, {"verbose", AWS_CLI_OPTIONS_REQUIRED_ARGUMENT, NULL, 'v'}, + {"proxy", AWS_CLI_OPTIONS_REQUIRED_ARGUMENT, NULL, 'x'}, {"help", AWS_CLI_OPTIONS_NO_ARGUMENT, NULL, 'h'}, {"endpoint", AWS_CLI_OPTIONS_REQUIRED_ARGUMENT, NULL, 'E'}, /* Per getopt(3) the last element of the array has to be filled with all zeros */ @@ -104,7 +230,7 @@ static void s_parse_options(int argc, char **argv, struct app_ctx *ctx) { bool uri_found = false; while (true) { int option_index = 0; - int c = aws_cli_getopt_long(argc, argv, "a:c:C:e:f:i:k:l:n:p:v:h:E", s_long_options, &option_index); + int c = aws_cli_getopt_long(argc, argv, "a:c:C:e:f:i:k:l:n:p:v:h:E:x:", s_long_options, &option_index); if (c == -1) { break; } @@ -157,6 +283,12 @@ static void s_parse_options(int argc, char **argv, struct app_ctx *ctx) { s_usage(1); } break; + case 'x': + if (s_socks5_proxy_settings_init_from_uri(&ctx->proxy, ctx->allocator, aws_cli_optarg)) { + s_usage(1); + } + ctx->use_proxy = true; + break; case 'h': s_usage(0); break; @@ -324,6 +456,18 @@ static void s_establish_connections( .clean_session = true, }; + if (app_ctx->use_proxy && app_ctx->socks5_options_initialized) { + if (aws_mqtt_client_connection_set_socks5_proxy_options( + connection_data->connection, &app_ctx->socks5_options)) { + fprintf( + stderr, + "Failed to set SOCKS5 proxy options: %s\n", + aws_error_debug_str(aws_last_error())); + s_final_destroy_connection_data(connection_data); + continue; + } + } + if (aws_mqtt_client_connection_connect(connection_data->connection, &connection_options)) { s_final_destroy_connection_data(connection_data); } @@ -728,6 +872,46 @@ int main(int argc, char **argv) { aws_mem_calloc(allocator, app_ctx.connection_count, sizeof(struct connection_user_data)); AWS_FATAL_ASSERT(connections != NULL); + if (app_ctx.use_proxy) { + if (!app_ctx.proxy.host) { + fprintf(stderr, "Proxy URI was requested but no host was parsed.\n"); + exit(1); + } + + if (app_ctx.socks5_options_initialized) { + aws_socks5_proxy_options_clean_up(&app_ctx.socks5_options); + AWS_ZERO_STRUCT(app_ctx.socks5_options); + app_ctx.socks5_options_initialized = false; + } + + struct aws_byte_cursor proxy_host = aws_byte_cursor_from_c_str(app_ctx.proxy.host); + if (aws_socks5_proxy_options_init(&app_ctx.socks5_options, allocator, proxy_host, app_ctx.proxy.port)) { + fprintf( + stderr, + "Failed to initialize SOCKS5 proxy options: %s\n", + aws_error_debug_str(aws_last_error())); + exit(1); + } + aws_socks5_proxy_options_set_host_resolution_mode( + &app_ctx.socks5_options, + app_ctx.proxy.resolve_host_with_proxy ? AWS_SOCKS5_HOST_RESOLUTION_PROXY + : AWS_SOCKS5_HOST_RESOLUTION_CLIENT); + + if (app_ctx.proxy.username && app_ctx.proxy.password) { + struct aws_byte_cursor username = aws_byte_cursor_from_c_str(app_ctx.proxy.username); + struct aws_byte_cursor password = aws_byte_cursor_from_c_str(app_ctx.proxy.password); + if (aws_socks5_proxy_options_set_auth(&app_ctx.socks5_options, allocator, username, password)) { + fprintf( + stderr, + "Failed to set SOCKS5 auth: %s\n", + aws_error_debug_str(aws_last_error())); + exit(1); + } + } + + app_ctx.socks5_options_initialized = true; + } + s_establish_connections(&app_ctx, connections, bootstrap, &tls_connection_options); s_establish_subscriptions(&app_ctx); @@ -736,6 +920,12 @@ int main(int argc, char **argv) { s_teardown_connections(&app_ctx); + if (app_ctx.socks5_options_initialized) { + aws_socks5_proxy_options_clean_up(&app_ctx.socks5_options); + AWS_ZERO_STRUCT(app_ctx.socks5_options); + app_ctx.socks5_options_initialized = false; + } + aws_mem_release(allocator, connections); aws_client_bootstrap_release(bootstrap); @@ -762,6 +952,10 @@ int main(int argc, char **argv) { aws_logger_clean_up(&logger); } + if (app_ctx.socks5_options_initialized) { + aws_socks5_proxy_options_clean_up(&app_ctx.socks5_options); + } + s_socks5_proxy_settings_clean_up(&app_ctx.proxy, app_ctx.allocator); aws_uri_clean_up(&app_ctx.uri); aws_mqtt_library_clean_up(); diff --git a/bin/elastipubsub5/main.c b/bin/elastipubsub5/main.c index 2c84e796..25d3d449 100644 --- a/bin/elastipubsub5/main.c +++ b/bin/elastipubsub5/main.c @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -20,12 +21,17 @@ #include #include #include +#include +#include #include #include #include #include +#include +#include +#include #ifdef _MSC_VER # pragma warning(disable : 4996) /* Disable warnings about fopen() being insecure */ @@ -39,6 +45,120 @@ # include #endif +struct socks5_proxy_settings { + char *host; + char *username; + char *password; + uint16_t port; + bool resolve_host_with_proxy; +}; + +static void s_socks5_proxy_settings_clean_up( + struct socks5_proxy_settings *settings, + struct aws_allocator *allocator) { + if (!settings) { + return; + } + if (settings->host) { + aws_mem_release(allocator, settings->host); + } + if (settings->username) { + aws_mem_release(allocator, settings->username); + } + if (settings->password) { + aws_mem_release(allocator, settings->password); + } + AWS_ZERO_STRUCT(*settings); +} + +static int s_socks5_proxy_settings_init_from_uri( + struct socks5_proxy_settings *settings, + struct aws_allocator *allocator, + const char *proxy_uri) { + + if (!settings || !allocator || !proxy_uri) { + return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT); + } + + s_socks5_proxy_settings_clean_up(settings, allocator); + + struct aws_byte_cursor uri_cursor = aws_byte_cursor_from_c_str(proxy_uri); + struct aws_uri uri; + AWS_ZERO_STRUCT(uri); + + if (aws_uri_init_parse(&uri, allocator, &uri_cursor)) { + fprintf(stderr, "Failed to parse proxy URI \"%s\": %s\n", proxy_uri, aws_error_debug_str(aws_last_error())); + goto on_error; + } + + const struct aws_byte_cursor *scheme = aws_uri_scheme(&uri); + if (!scheme || !scheme->len) { + fprintf(stderr, "Proxy URI \"%s\" must include scheme socks5h://\n", proxy_uri); + goto on_error; + } + + if (aws_byte_cursor_eq_c_str_ignore_case(scheme, "socks5h")) { + settings->resolve_host_with_proxy = true; + } else if (aws_byte_cursor_eq_c_str_ignore_case(scheme, "socks5")) { + settings->resolve_host_with_proxy = false; + } else { + fprintf(stderr, "Unsupported proxy scheme in \"%s\". Expected socks5h://\n", proxy_uri); + goto on_error; + } + + const struct aws_byte_cursor *host = aws_uri_host_name(&uri); + if (!host || host->len == 0) { + fprintf(stderr, "Proxy URI \"%s\" must include a host\n", proxy_uri); + goto on_error; + } + + settings->host = aws_mem_calloc(allocator, host->len + 1, sizeof(char)); + if (!settings->host) { + fprintf(stderr, "Failed to allocate memory for proxy host\n"); + goto on_error; + } + memcpy(settings->host, host->ptr, host->len); + settings->host[host->len] = '\0'; + + uint32_t parsed_port = aws_uri_port(&uri); + if (parsed_port == 0) { + parsed_port = 1080; + } + if (parsed_port > UINT16_MAX) { + fprintf(stderr, "Proxy port %" PRIu32 " exceeds uint16_t range\n", parsed_port); + goto on_error; + } + settings->port = (uint16_t)parsed_port; + + if (uri.user.len > 0) { + settings->username = aws_mem_calloc(allocator, uri.user.len + 1, sizeof(char)); + if (!settings->username) { + fprintf(stderr, "Failed to allocate memory for proxy username\n"); + goto on_error; + } + memcpy(settings->username, uri.user.ptr, uri.user.len); + settings->username[uri.user.len] = '\0'; + } + + if (uri.password.len > 0) { + settings->password = aws_mem_calloc(allocator, uri.password.len + 1, sizeof(char)); + if (!settings->password) { + fprintf(stderr, "Failed to allocate memory for proxy password\n"); + goto on_error; + } + memcpy(settings->password, uri.password.ptr, uri.password.len); + settings->password[uri.password.len] = '\0'; + } + + aws_uri_clean_up(&uri); + return AWS_OP_SUCCESS; + +on_error: + aws_uri_clean_up(&uri); + s_socks5_proxy_settings_clean_up(settings, allocator); + return AWS_OP_ERR; +} + struct app_ctx { struct aws_allocator *allocator; struct aws_mutex lock; @@ -55,6 +175,8 @@ struct app_ctx { const char *log_filename; enum aws_log_level log_level; + struct socks5_proxy_settings proxy; + bool use_proxy; }; static void s_usage(int exit_code) { @@ -66,6 +188,7 @@ static void s_usage(int exit_code) { fprintf(stderr, " --cert FILE: path to a PEM encoded certificate to use with mTLS\n"); fprintf(stderr, " --key FILE: Path to a PEM encoded private key that matches cert.\n"); fprintf(stderr, " --connect-timeout INT: time in milliseconds to wait for a connection.\n"); + fprintf(stderr, " --proxy URL: SOCKS5 proxy URI (socks5h://... for proxy DNS, socks5://... for local DNS)\n"); fprintf(stderr, " -l, --log FILE: dumps logs to FILE instead of stderr.\n"); fprintf(stderr, " -v, --verbose: ERROR|INFO|DEBUG|TRACE: log level to configure. Default is none.\n"); fprintf(stderr, " -w, --websockets: use mqtt-over-websockets rather than direct mqtt\n"); @@ -81,6 +204,7 @@ static struct aws_cli_option s_long_options[] = { {"connect-timeout", AWS_CLI_OPTIONS_REQUIRED_ARGUMENT, NULL, 'f'}, {"log", AWS_CLI_OPTIONS_REQUIRED_ARGUMENT, NULL, 'l'}, {"verbose", AWS_CLI_OPTIONS_REQUIRED_ARGUMENT, NULL, 'v'}, + {"proxy", AWS_CLI_OPTIONS_REQUIRED_ARGUMENT, NULL, 'x'}, {"websockets", AWS_CLI_OPTIONS_NO_ARGUMENT, NULL, 'w'}, {"help", AWS_CLI_OPTIONS_NO_ARGUMENT, NULL, 'h'}, /* Per getopt(3) the last element of the array has to be filled with all zeros */ @@ -92,7 +216,7 @@ static void s_parse_options(int argc, char **argv, struct app_ctx *ctx) { while (true) { int option_index = 0; - int c = aws_cli_getopt_long(argc, argv, "a:c:e:f:l:v:wh", s_long_options, &option_index); + int c = aws_cli_getopt_long(argc, argv, "a:c:e:f:l:v:whx:", s_long_options, &option_index); if (c == -1) { break; } @@ -130,6 +254,12 @@ static void s_parse_options(int argc, char **argv, struct app_ctx *ctx) { s_usage(1); } break; + case 'x': + if (s_socks5_proxy_settings_init_from_uri(&ctx->proxy, ctx->allocator, aws_cli_optarg)) { + s_usage(1); + } + ctx->use_proxy = true; + break; case 'h': s_usage(0); break; @@ -592,6 +722,9 @@ int main(int argc, char **argv) { } bool use_tls = false; + bool socks5_options_valid = false; + struct aws_socks5_proxy_options socks5_options; + AWS_ZERO_STRUCT(socks5_options); struct aws_tls_ctx *tls_ctx = NULL; struct aws_tls_ctx_options tls_ctx_options; AWS_ZERO_STRUCT(tls_ctx_options); @@ -662,6 +795,40 @@ int main(int argc, char **argv) { .keep_alive_interval_sec = 0, }; + if (app_ctx.use_proxy) { + if (!app_ctx.proxy.host) { + fprintf(stderr, "Proxy URI was requested but no host was parsed.\n"); + exit(1); + } + + struct aws_byte_cursor proxy_host = aws_byte_cursor_from_c_str(app_ctx.proxy.host); + if (aws_socks5_proxy_options_init(&socks5_options, allocator, proxy_host, app_ctx.proxy.port) != AWS_OP_SUCCESS) { + fprintf( + stderr, + "Failed to initialize SOCKS5 proxy options: %s\n", + aws_error_debug_str(aws_last_error())); + exit(1); + } + aws_socks5_proxy_options_set_host_resolution_mode( + &socks5_options, + app_ctx.proxy.resolve_host_with_proxy ? AWS_SOCKS5_HOST_RESOLUTION_PROXY + : AWS_SOCKS5_HOST_RESOLUTION_CLIENT); + + if (app_ctx.proxy.username && app_ctx.proxy.password) { + struct aws_byte_cursor username = aws_byte_cursor_from_c_str(app_ctx.proxy.username); + struct aws_byte_cursor password = aws_byte_cursor_from_c_str(app_ctx.proxy.password); + if (aws_socks5_proxy_options_set_auth(&socks5_options, allocator, username, password) != AWS_OP_SUCCESS) { + fprintf( + stderr, + "Failed to set SOCKS5 auth: %s\n", + aws_error_debug_str(aws_last_error())); + exit(1); + } + } + + socks5_options_valid = true; + } + uint16_t receive_maximum = 9; uint32_t maximum_packet_size = 128 * 1024; @@ -686,6 +853,7 @@ int main(int argc, char **argv) { .socket_options = &socket_options, .tls_options = (use_tls) ? &tls_connection_options : NULL, .connect_options = &connect_options, + .socks5_proxy_options = socks5_options_valid ? &socks5_options : NULL, .session_behavior = AWS_MQTT5_CSBT_CLEAN, .lifecycle_event_handler = s_lifecycle_event_callback, .lifecycle_event_handler_user_data = NULL, @@ -717,6 +885,10 @@ int main(int argc, char **argv) { aws_mqtt5_client_release(client); + if (socks5_options_valid) { + aws_socks5_proxy_options_clean_up(&socks5_options); + } + aws_client_bootstrap_release(bootstrap); aws_host_resolver_release(resolver); aws_event_loop_group_release(el_group); @@ -738,6 +910,7 @@ int main(int argc, char **argv) { aws_logger_clean_up(&logger); } + s_socks5_proxy_settings_clean_up(&app_ctx.proxy, app_ctx.allocator); aws_uri_clean_up(&app_ctx.uri); aws_mqtt_library_clean_up(); diff --git a/bin/mqtt3_client_app/CMakeLists.txt b/bin/mqtt3_client_app/CMakeLists.txt new file mode 100644 index 00000000..e026dc21 --- /dev/null +++ b/bin/mqtt3_client_app/CMakeLists.txt @@ -0,0 +1,28 @@ +project(mqtt3_client_app C) + +list(APPEND CMAKE_MODULE_PATH "${CMAKE_INSTALL_PREFIX}/lib/cmake") + +file(GLOB MQTT3_CLIENT_APP_SRC + "*.c" + ) + +set(MQTT3_CLIENT_APP_PROJECT_NAME mqtt3_client_app) +add_executable(${MQTT3_CLIENT_APP_PROJECT_NAME} ${MQTT3_CLIENT_APP_SRC}) +aws_set_common_properties(${MQTT3_CLIENT_APP_PROJECT_NAME}) + +target_include_directories(${MQTT3_CLIENT_APP_PROJECT_NAME} PUBLIC + $ + $) + +target_link_libraries(${MQTT3_CLIENT_APP_PROJECT_NAME} PRIVATE aws-c-mqtt) + +if (BUILD_SHARED_LIBS AND NOT WIN32) + message(INFO " mqtt3_client_app will be built with shared libs, but you may need to set LD_LIBRARY_PATH=${CMAKE_INSTALL_PREFIX}/lib to run the application") +endif() + +install(TARGETS ${MQTT3_CLIENT_APP_PROJECT_NAME} + EXPORT ${MQTT3_CLIENT_APP_PROJECT_NAME}-targets + COMPONENT Runtime + RUNTIME + DESTINATION bin + COMPONENT Runtime) \ No newline at end of file diff --git a/bin/mqtt3_client_app/main.c b/bin/mqtt3_client_app/main.c new file mode 100644 index 00000000..17f13449 --- /dev/null +++ b/bin/mqtt3_client_app/main.c @@ -0,0 +1,1280 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +struct socks5_proxy_settings { + char *host; + char *username; + char *password; + uint16_t port; + bool resolve_host_with_proxy; +}; + +static void s_socks5_proxy_settings_clean_up( + struct socks5_proxy_settings *settings, + struct aws_allocator *allocator) { + if (!settings) { + return; + } + if (settings->host) { + aws_mem_release(allocator, settings->host); + } + if (settings->username) { + aws_mem_release(allocator, settings->username); + } + if (settings->password) { + aws_mem_release(allocator, settings->password); + } + AWS_ZERO_STRUCT(*settings); +} + +static int s_socks5_proxy_settings_init_from_uri( + struct socks5_proxy_settings *settings, + struct aws_allocator *allocator, + const char *proxy_uri) { + + if (!settings || !allocator || !proxy_uri) { + return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT); + } + + s_socks5_proxy_settings_clean_up(settings, allocator); + + struct aws_byte_cursor uri_cursor = aws_byte_cursor_from_c_str(proxy_uri); + struct aws_uri uri; + AWS_ZERO_STRUCT(uri); + + if (aws_uri_init_parse(&uri, allocator, &uri_cursor)) { + fprintf(stderr, "Failed to parse proxy URI \"%s\": %s\n", proxy_uri, aws_error_debug_str(aws_last_error())); + goto on_error; + } + + const struct aws_byte_cursor *scheme = aws_uri_scheme(&uri); + if (!scheme || !scheme->len) { + fprintf(stderr, "Proxy URI \"%s\" must include scheme socks5h://\n", proxy_uri); + goto on_error; + } + + if (aws_byte_cursor_eq_c_str_ignore_case(scheme, "socks5h")) { + settings->resolve_host_with_proxy = true; + } else if (aws_byte_cursor_eq_c_str_ignore_case(scheme, "socks5")) { + settings->resolve_host_with_proxy = false; + } else { + fprintf(stderr, "Unsupported proxy scheme in \"%s\". Expected socks5h://\n", proxy_uri); + goto on_error; + } + + const struct aws_byte_cursor *host = aws_uri_host_name(&uri); + if (!host || host->len == 0) { + fprintf(stderr, "Proxy URI \"%s\" must include a host\n", proxy_uri); + goto on_error; + } + + settings->host = aws_mem_calloc(allocator, host->len + 1, sizeof(char)); + if (!settings->host) { + fprintf(stderr, "Failed to allocate memory for proxy host\n"); + goto on_error; + } + memcpy(settings->host, host->ptr, host->len); + settings->host[host->len] = '\0'; + + uint32_t parsed_port = aws_uri_port(&uri); + if (parsed_port == 0) { + parsed_port = 1080; + } + if (parsed_port > UINT16_MAX) { + fprintf(stderr, "Proxy port %" PRIu32 " exceeds uint16_t range\n", parsed_port); + goto on_error; + } + settings->port = (uint16_t)parsed_port; + + if (uri.user.len > 0) { + settings->username = aws_mem_calloc(allocator, uri.user.len + 1, sizeof(char)); + if (!settings->username) { + fprintf(stderr, "Failed to allocate memory for proxy username\n"); + goto on_error; + } + memcpy(settings->username, uri.user.ptr, uri.user.len); + settings->username[uri.user.len] = '\0'; + } + + if (uri.password.len > 0) { + settings->password = aws_mem_calloc(allocator, uri.password.len + 1, sizeof(char)); + if (!settings->password) { + fprintf(stderr, "Failed to allocate memory for proxy password\n"); + goto on_error; + } + memcpy(settings->password, uri.password.ptr, uri.password.len); + settings->password[uri.password.len] = '\0'; + } + + aws_uri_clean_up(&uri); + return AWS_OP_SUCCESS; + +on_error: + aws_uri_clean_up(&uri); + s_socks5_proxy_settings_clean_up(settings, allocator); + return AWS_OP_ERR; +} + +/* ANSI Color codes for terminal output */ +#define COLOR_RED "\x1b[31m" +#define COLOR_GREEN "\x1b[32m" +#define COLOR_YELLOW "\x1b[33m" +#define COLOR_BLUE "\x1b[34m" +#define COLOR_RESET "\x1b[0m" + +/* + * This example demonstrates how to establish an MQTT connection through a SOCKS5 proxy + * or directly to the MQTT broker. It supports various features including: + * + * - MQTT connection via SOCKS5 proxy or direct connection + * - TLS support for secure MQTT (MQTTS) + * - WebSocket transport support (MQTT over WebSockets) + * - Basic MQTT operations (subscribe, publish, receive) + * - SOCKS5 proxy authentication + * + * The various transport options can be combined to support: + * - Direct MQTT + * - MQTT over SOCKS5 + * - MQTT over TLS + * - MQTT over TLS over SOCKS5 + * - MQTT over WebSocket + * - MQTT over WebSocket over SOCKS5 + * - MQTT over WebSocket over TLS + * - MQTT over WebSocket over TLS over SOCKS5 + */ + +struct app_ctx { + struct aws_allocator *allocator; + struct aws_event_loop_group *event_loop_group; + struct aws_host_resolver *host_resolver; + struct aws_client_bootstrap *bootstrap; + + struct aws_mqtt_client *mqtt_client; + struct aws_mqtt_client_connection *mqtt_connection; + + struct aws_mutex lock; + struct aws_condition_variable signal; + + /* MQTT broker settings */ + const char *broker_host; + uint16_t broker_port; + + /* MQTT client settings */ + const char *client_id; + bool clean_session; + uint16_t keep_alive_secs; + + /* SOCKS5 proxy settings */ + struct socks5_proxy_settings proxy; + + /* WebSocket settings */ + const char *ws_path; + + /* Certificate settings */ + const char *cert_file; + const char *key_file; + const char *ca_file; + const char *ca_dir; + + /* Topic settings */ + const char *pub_topic; + const char *sub_topic; + const char *pub_message; + + /* Connection options */ + bool use_proxy; + bool use_tls; + bool use_websocket; + bool verbose; + + /* State tracking */ + bool connection_complete; + int connection_error_code; + bool subscription_complete; + int subscription_error_code; + bool publish_complete; + int publish_error_code; + bool message_received; + + /* Packet IDs */ + uint16_t sub_packet_id; + uint16_t pub_packet_id; + + /* Message buffer */ + struct aws_byte_buf received_message; +}; + +static void s_usage(int exit_code) { + fprintf(stderr, "usage: mqtt_socks5_example [options]\n"); + fprintf(stderr, " --broker-host HOST: MQTT broker hostname (default: test.mosquitto.org)\n"); + fprintf(stderr, " --broker-port PORT: MQTT broker port (default: 1883 for MQTT, 8883 for MQTTS, 8080/8443 for WebSockets)\n"); + fprintf(stderr, " --client-id ID: MQTT client ID (default: aws-mqtt-socks5-example)\n"); + fprintf(stderr, " --clean-session: Use clean session (default: true)\n"); + fprintf(stderr, " --keep-alive SECS: MQTT keep-alive in seconds (default: 60)\n"); + fprintf( + stderr, + " --proxy URL: SOCKS5 proxy URI (socks5h://... for proxy DNS, socks5://... for local DNS)\n"); + + fprintf(stderr, " --cert FILE: Client certificate file path (PEM format)\n"); + fprintf(stderr, " --key FILE: Private key file path (PEM format)\n"); + fprintf(stderr, " --ca-file FILE: CA certificate file path (PEM format)\n"); + fprintf(stderr, " --ca-dir DIR: Directory containing CA certificates\n"); + + fprintf(stderr, " --ws-path PATH: WebSocket path (default: /mqtt)\n"); + + fprintf(stderr, " --pub-topic TOPIC: Topic to publish to (default: test/mqtt-socks5-example)\n"); + fprintf(stderr, " --sub-topic TOPIC: Topic to subscribe to (default: same as pub-topic)\n"); + fprintf(stderr, " --message MSG: Message to publish (default: \"Hello from MQTT SOCKS5 example\")\n"); + fprintf(stderr, " --websocket: Use WebSockets for transport\n"); + fprintf(stderr, " --verbose: Print detailed logging\n"); + fprintf(stderr, " --help: Display this message and exit\n"); + + exit(exit_code); +} + +static struct aws_cli_option s_long_options[] = { + {"broker-host", AWS_CLI_OPTIONS_REQUIRED_ARGUMENT, NULL, 'b'}, + {"broker-port", AWS_CLI_OPTIONS_REQUIRED_ARGUMENT, NULL, 'p'}, + {"client-id", AWS_CLI_OPTIONS_REQUIRED_ARGUMENT, NULL, 'i'}, + {"clean-session", AWS_CLI_OPTIONS_NO_ARGUMENT, NULL, 'c'}, + {"keep-alive", AWS_CLI_OPTIONS_REQUIRED_ARGUMENT, NULL, 'k'}, + + {"proxy", AWS_CLI_OPTIONS_REQUIRED_ARGUMENT, NULL, 'x'}, + + {"ws-path", AWS_CLI_OPTIONS_REQUIRED_ARGUMENT, NULL, 'P'}, + + {"cert", AWS_CLI_OPTIONS_REQUIRED_ARGUMENT, NULL, 'C'}, + {"key", AWS_CLI_OPTIONS_REQUIRED_ARGUMENT, NULL, 'K'}, + {"ca-file", AWS_CLI_OPTIONS_REQUIRED_ARGUMENT, NULL, 'A'}, + {"ca-dir", AWS_CLI_OPTIONS_REQUIRED_ARGUMENT, NULL, 'D'}, + + {"pub-topic", AWS_CLI_OPTIONS_REQUIRED_ARGUMENT, NULL, 't'}, + {"sub-topic", AWS_CLI_OPTIONS_REQUIRED_ARGUMENT, NULL, 's'}, + {"message", AWS_CLI_OPTIONS_REQUIRED_ARGUMENT, NULL, 'm'}, + {"websocket", AWS_CLI_OPTIONS_NO_ARGUMENT, NULL, 'W'}, + {"verbose", AWS_CLI_OPTIONS_NO_ARGUMENT, NULL, 'v'}, + {"help", AWS_CLI_OPTIONS_NO_ARGUMENT, NULL, 'h'}, + {NULL, 0, NULL, 0}, +}; + +static void s_parse_options(int argc, char **argv, struct app_ctx *ctx) { + /* Default values */ + ctx->broker_host = "test.mosquitto.org"; + ctx->broker_port = 0; /* Will be set later based on options */ + ctx->client_id = "aws-mqtt-socks5-example"; + ctx->clean_session = true; + ctx->keep_alive_secs = 60; + + ctx->cert_file = NULL; + ctx->key_file = NULL; + ctx->ca_file = NULL; + ctx->ca_dir = NULL; + + ctx->ws_path = "/mqtt"; + + ctx->pub_topic = "test/mqtt-socks5-example"; + ctx->sub_topic = NULL; /* Default to same as pub_topic */ + ctx->pub_message = "Hello from MQTT SOCKS5 example"; + + ctx->use_proxy = false; + ctx->use_tls = false; + ctx->use_websocket = false; + ctx->verbose = false; + + while (true) { + int option_index = 0; + int c = aws_cli_getopt_long(argc, argv, "b:p:i:ck:x:P:C:K:A:D:t:s:m:Wvh", + s_long_options, &option_index); + if (c == -1) { + break; + } + + switch (c) { + case 'b': + ctx->broker_host = aws_cli_optarg; + break; + case 'p': + ctx->broker_port = (uint16_t)atoi(aws_cli_optarg); + break; + case 'i': + ctx->client_id = aws_cli_optarg; + break; + case 'c': + ctx->clean_session = true; + break; + case 'k': + ctx->keep_alive_secs = (uint16_t)atoi(aws_cli_optarg); + break; + + case 'x': + if (s_socks5_proxy_settings_init_from_uri(&ctx->proxy, ctx->allocator, aws_cli_optarg)) { + s_usage(1); + } + ctx->use_proxy = true; + break; + + case 'P': + ctx->ws_path = aws_cli_optarg; + break; + + case 'C': + ctx->cert_file = aws_cli_optarg; + break; + case 'K': + ctx->key_file = aws_cli_optarg; + break; + case 'A': + ctx->ca_file = aws_cli_optarg; + break; + case 'D': + ctx->ca_dir = aws_cli_optarg; + break; + + case 't': + ctx->pub_topic = aws_cli_optarg; + break; + case 's': + ctx->sub_topic = aws_cli_optarg; + break; + case 'm': + ctx->pub_message = aws_cli_optarg; + break; + case 'W': + ctx->use_websocket = true; + break; + case 'v': + ctx->verbose = true; + break; + case 'h': + s_usage(0); + break; + default: + fprintf(stderr, "Unknown option: %c\n", c); + s_usage(1); + break; + } + } + + if (!ctx->use_tls) { + ctx->use_tls = ctx->ca_file || ctx->cert_file || ctx->key_file; + } + + /* If sub_topic not specified, use the same as pub_topic */ + if (ctx->sub_topic == NULL) { + ctx->sub_topic = ctx->pub_topic; + } + + /* Set default port if not specified */ + if (ctx->broker_port == 0) { + if (ctx->use_websocket) { + ctx->broker_port = ctx->use_tls ? 443 : 80; + } else { + ctx->broker_port = ctx->use_tls ? 8883 : 1883; + } + } +} + +/* Structure for user data passed to the socket channel creation callback */ +/* Helper function to print SOCKS5 proxy configuration details */ +static void print_socks5_config(struct aws_socks5_proxy_options *socks5_options) { + printf("SOCKS5 proxy configuration:\n"); + struct aws_string * proxy_host = socks5_options->host; + printf(" Proxy host: %.*s\n", (int)proxy_host->len, (const char *)proxy_host->bytes); + printf(" Proxy port: %u\n", socks5_options->port); + size_t username_len = socks5_options->username->len; + size_t password_len = socks5_options->password->len; + if (username_len > 0 && password_len > 0) { + printf(" Authentication: Username/Password\n"); + } else { + printf(" Authentication: None\n"); + } +} + +/* Predicate functions for condition variables */ +static bool s_connection_completed_predicate(void *arg) { + struct app_ctx *ctx = arg; + return ctx->connection_complete; +} + +static bool s_subscription_completed_predicate(void *arg) { + struct app_ctx *ctx = arg; + return ctx->subscription_complete; +} + +static bool s_publish_completed_predicate(void *arg) { + struct app_ctx *ctx = arg; + return ctx->publish_complete; +} + +static void s_app_ctx_init(struct app_ctx *ctx, struct aws_allocator *allocator) { + AWS_ZERO_STRUCT(*ctx); + ctx->allocator = allocator; + aws_mutex_init(&ctx->lock); + aws_condition_variable_init(&ctx->signal); + aws_byte_buf_init(&ctx->received_message, allocator, 1024); /* Initial buffer capacity */ +} + +static void s_app_ctx_clean_up(struct app_ctx *ctx) { + if (ctx->mqtt_connection) { + aws_mqtt_client_connection_release(ctx->mqtt_connection); + } + + if (ctx->mqtt_client) { + aws_mqtt_client_release(ctx->mqtt_client); + } + + if (ctx->bootstrap) { + aws_client_bootstrap_release(ctx->bootstrap); + } + + if (ctx->host_resolver) { + aws_host_resolver_release(ctx->host_resolver); + } + + if (ctx->event_loop_group) { + aws_event_loop_group_release(ctx->event_loop_group); + } + + aws_byte_buf_clean_up(&ctx->received_message); + s_socks5_proxy_settings_clean_up(&ctx->proxy, ctx->allocator); + aws_condition_variable_clean_up(&ctx->signal); + aws_mutex_clean_up(&ctx->lock); +} + +/* MQTT Connection callbacks */ +static void s_on_connection_complete( + struct aws_mqtt_client_connection *connection, + int error_code, + enum aws_mqtt_connect_return_code return_code, + bool session_present, + void *userdata) { + + struct app_ctx *ctx = userdata; + (void)connection; + (void)return_code; + (void)session_present; + + aws_mutex_lock(&ctx->lock); + + ctx->connection_complete = true; + ctx->connection_error_code = error_code; + + if (error_code == AWS_ERROR_SUCCESS) { + if (ctx->verbose) { + printf(COLOR_GREEN "MQTT connection established successfully" COLOR_RESET "\n"); + printf("Session present: %s\n", session_present ? "true" : "false"); + } + } else { + fprintf(stderr, COLOR_RED "MQTT connection failed: %s (error code: %d)" COLOR_RESET "\n", + aws_error_debug_str(error_code), error_code); + fprintf(stderr, COLOR_YELLOW "Connection details: %s:%d (proxy: %s)" COLOR_RESET "\n", + ctx->broker_host, ctx->broker_port, + ctx->use_proxy ? "yes" : "no"); + } + + aws_condition_variable_notify_one(&ctx->signal); + aws_mutex_unlock(&ctx->lock); +} + +/* MQTT Subscription callbacks */ +static void s_on_suback( + struct aws_mqtt_client_connection *connection, + uint16_t packet_id, + const struct aws_byte_cursor *topic, + enum aws_mqtt_qos qos, + int error_code, + void *userdata) { + + struct app_ctx *ctx = userdata; + (void)connection; + (void)topic; + (void)qos; + + aws_mutex_lock(&ctx->lock); + + if (packet_id == ctx->sub_packet_id) { + ctx->subscription_complete = true; + ctx->subscription_error_code = error_code; + + if (ctx->verbose) { + if (error_code == AWS_ERROR_SUCCESS) { + printf(COLOR_GREEN "Subscription succeeded for topic: %.*s with QoS %d" COLOR_RESET "\n", + (int)topic->len, topic->ptr, qos); + } else { + printf(COLOR_RED "Subscription failed: %s" COLOR_RESET "\n", aws_error_debug_str(error_code)); + } + } + + aws_condition_variable_notify_one(&ctx->signal); + } + + aws_mutex_unlock(&ctx->lock); +} + +/* MQTT Message received callback */ +static void s_on_publish_received( + struct aws_mqtt_client_connection *connection, + const struct aws_byte_cursor *topic, + const struct aws_byte_cursor *payload, + bool dup, + enum aws_mqtt_qos qos, + bool retain, + void *userdata) { + + struct app_ctx *ctx = userdata; + (void)connection; + (void)dup; + (void)qos; + (void)retain; + + aws_mutex_lock(&ctx->lock); + + ctx->message_received = true; + + /* Clear previous message if any */ + aws_byte_buf_reset(&ctx->received_message, 0); + + /* Store the received message */ + aws_byte_buf_append_dynamic(&ctx->received_message, payload); + + if (ctx->verbose) { + printf(COLOR_GREEN "Message received on topic '%.*s': %.*s" COLOR_RESET "\n", + (int)topic->len, topic->ptr, + (int)payload->len, payload->ptr); + } + + aws_mutex_unlock(&ctx->lock); +} + +/* MQTT Publish completion callback */ +static void s_on_publish_complete( + struct aws_mqtt_client_connection *connection, + uint16_t packet_id, + int error_code, + void *userdata) { + + struct app_ctx *ctx = userdata; + (void)connection; + + aws_mutex_lock(&ctx->lock); + + if (packet_id == ctx->pub_packet_id) { + ctx->publish_complete = true; + ctx->publish_error_code = error_code; + + if (error_code == AWS_ERROR_SUCCESS) { + if (ctx->verbose) { + printf(COLOR_GREEN "Publish completed successfully" COLOR_RESET "\n"); + } + } else { + fprintf(stderr, COLOR_RED "Publish failed: %s" COLOR_RESET "\n", aws_error_debug_str(error_code)); + } + + aws_condition_variable_notify_one(&ctx->signal); + } + + aws_mutex_unlock(&ctx->lock); +} + +static int s_subscribe_to_topic(struct app_ctx *ctx) { + if (!ctx->mqtt_connection) { + fprintf(stderr, "Cannot subscribe: No active MQTT connection\n"); + return AWS_OP_ERR; + } + + struct aws_byte_cursor topic_cur = aws_byte_cursor_from_c_str(ctx->sub_topic); + + /* Subscribe to the topic with QoS 1 */ + ctx->sub_packet_id = aws_mqtt_client_connection_subscribe( + ctx->mqtt_connection, + &topic_cur, + AWS_MQTT_QOS_AT_LEAST_ONCE, + s_on_publish_received, + ctx, + NULL, + s_on_suback, + ctx); + + if (ctx->sub_packet_id == 0) { + fprintf(stderr, COLOR_RED "Failed to subscribe: %s" COLOR_RESET "\n", aws_error_debug_str(aws_last_error())); + return AWS_OP_ERR; + } + + if (ctx->verbose) { + printf("Subscription request sent for topic: %s\n", ctx->sub_topic); + } + + return AWS_OP_SUCCESS; +} + +static int s_publish_message(struct app_ctx *ctx) { + if (!ctx->mqtt_connection) { + fprintf(stderr, "Cannot publish: No active MQTT connection\n"); + return AWS_OP_ERR; + } + + struct aws_byte_cursor topic_cur = aws_byte_cursor_from_c_str(ctx->pub_topic); + struct aws_byte_cursor message_cur = aws_byte_cursor_from_c_str(ctx->pub_message); + + /* Publish with QoS 1 */ + ctx->pub_packet_id = aws_mqtt_client_connection_publish( + ctx->mqtt_connection, + &topic_cur, + AWS_MQTT_QOS_AT_LEAST_ONCE, + false, /* retain */ + &message_cur, + s_on_publish_complete, + ctx); + + if (ctx->pub_packet_id == 0) { + fprintf(stderr, COLOR_RED "Failed to publish: %s" COLOR_RESET "\n", aws_error_debug_str(aws_last_error())); + return AWS_OP_ERR; + } + + if (ctx->verbose) { + printf("Publish request sent to topic: %s\n", ctx->pub_topic); + } + + return AWS_OP_SUCCESS; +} + +int main(int argc, char **argv) { + + int main_error_code = 0; + + struct aws_allocator *allocator = aws_default_allocator(); + aws_common_library_init(allocator); + aws_io_library_init(allocator); + aws_mqtt_library_init(allocator); + + struct app_ctx app_ctx; + s_app_ctx_init(&app_ctx, allocator); + + /* Parse command line arguments */ + s_parse_options(argc, argv, &app_ctx); + + struct aws_tls_connection_options *tls_connection_options = NULL; + struct aws_tls_ctx *tls_ctx = NULL; + struct aws_socks5_proxy_options *socks5_options = NULL; + + /* Initialize AWS CRT logger */ + struct aws_logger logger; + struct aws_logger_standard_options logger_options = { + /* Force debug level output for this run to diagnose the TLS issue */ + .level = app_ctx.verbose ? AWS_LL_TRACE : AWS_LL_ERROR, + .file = stderr, + }; + + bool logger_initialized = false; + if (aws_logger_init_standard(&logger, allocator, &logger_options) == AWS_OP_SUCCESS) { + aws_logger_set(&logger); + logger_initialized = true; + + if (app_ctx.verbose) { + printf("Verbose mode enabled, using TRACE log level\n"); + } + } else { + fprintf(stderr, "[WARN] Failed to initialize AWS logger, logs will not be shown.\n"); + } + + /* Log the configuration */ + printf("MQTT%s connection to %s:%d\n", + app_ctx.use_tls ? "S" : "", + app_ctx.broker_host, + app_ctx.broker_port); + + printf("Client ID: %s\n", app_ctx.client_id); + printf("Clean session: %s\n", app_ctx.clean_session ? "yes" : "no"); + printf("Keep alive: %d seconds\n", app_ctx.keep_alive_secs); + + if (app_ctx.use_websocket) { + printf("Using WebSocket transport\n"); + } + + if (app_ctx.use_proxy && app_ctx.proxy.host) { + printf( + "Using SOCKS5 proxy at %s:%" PRIu16 " (%s DNS)\n", + app_ctx.proxy.host, + app_ctx.proxy.port, + app_ctx.proxy.resolve_host_with_proxy ? "proxy" : "client-side"); + if (app_ctx.proxy.username) { + printf("With proxy authentication: username=%s\n", app_ctx.proxy.username); + } + } else { + printf("Using direct connection (no proxy)\n"); + } + + /* Create event loop group */ + app_ctx.event_loop_group = aws_event_loop_group_new_default(allocator, 1, NULL); + if (!app_ctx.event_loop_group) { + fprintf(stderr, "Failed to create event loop group: %s\n", aws_error_debug_str(aws_last_error())); + main_error_code = 1; + goto cleanup; + } + + /* Create host resolver */ + struct aws_host_resolver_default_options resolver_options = { + .max_entries = 8, + .el_group = app_ctx.event_loop_group + }; + app_ctx.host_resolver = aws_host_resolver_new_default(allocator, &resolver_options); + if (!app_ctx.host_resolver) { + fprintf(stderr, "Failed to create host resolver: %s\n", aws_error_debug_str(aws_last_error())); + main_error_code = 2; + goto cleanup; + } + + /* Create client bootstrap */ + struct aws_client_bootstrap_options bootstrap_options = { + .event_loop_group = app_ctx.event_loop_group, + .host_resolver = app_ctx.host_resolver + }; + app_ctx.bootstrap = aws_client_bootstrap_new(allocator, &bootstrap_options); + if (!app_ctx.bootstrap) { + fprintf(stderr, "Failed to create client bootstrap: %s\n", aws_error_debug_str(aws_last_error())); + main_error_code = 3; + goto cleanup; + } + + /* Create MQTT client */ + app_ctx.mqtt_client = aws_mqtt_client_new(allocator, app_ctx.bootstrap); + if (!app_ctx.mqtt_client) { + fprintf(stderr, "Failed to create MQTT client: %s\n", aws_error_debug_str(aws_last_error())); + main_error_code = 4; + goto cleanup; + } + + /* Create TLS connection options if using TLS */ + if (app_ctx.use_tls) { + /* Initialize TLS context options */ + struct aws_tls_ctx_options tls_ctx_options; + aws_tls_ctx_options_init_default_client(&tls_ctx_options, allocator); + + /* If certificate files are provided, use them */ + if (app_ctx.cert_file && app_ctx.key_file) { + printf("Using certificate (%s) and key (%s) files\n", app_ctx.cert_file, app_ctx.key_file); + + /* Clean up any previous options and reinitialize with client certificates */ + aws_tls_ctx_options_clean_up(&tls_ctx_options); + + if (aws_tls_ctx_options_init_client_mtls_from_path( + &tls_ctx_options, allocator, app_ctx.cert_file, app_ctx.key_file) != AWS_OP_SUCCESS) { + int err = aws_last_error(); + fprintf(stderr, COLOR_RED "Failed to load client certificates: %s (%d)" COLOR_RESET "\n", + aws_error_debug_str(err), err); + main_error_code = 5; + goto cleanup; + } + printf(COLOR_GREEN "Successfully loaded client certificates" COLOR_RESET "\n"); + } + + /* If CA file is provided, override the default trust store */ + if (app_ctx.ca_file) { + printf("Using CA certificate file: %s\n", app_ctx.ca_file); + + /* Print some debug info about the CA certificate file */ + FILE *ca_file = fopen(app_ctx.ca_file, "r"); + if (ca_file) { + char buf[128]; + size_t bytes_read = fread(buf, 1, sizeof(buf) - 1, ca_file); + buf[bytes_read] = '\0'; + printf("CA file exists and is readable. First bytes: %.40s...\n", buf); + fclose(ca_file); + } else { + printf("WARNING: CA file cannot be opened\n"); + fprintf(stderr, "Error opening CA file: %s\n", strerror(errno)); + goto cleanup; + } + + /* The AWS CRT AWS_IO_TLS_CTX_ERROR often means the certificate is malformed */ + printf("Setting CA certificate (file: %s, dir: %s)...\n", + app_ctx.ca_file, app_ctx.ca_dir ? app_ctx.ca_dir : "NULL"); + + /* Clean up any previous TLS context options before reconfiguring */ + aws_tls_ctx_options_clean_up(&tls_ctx_options); + aws_tls_ctx_options_init_default_client(&tls_ctx_options, allocator); + + /* Method 1: Load CA certificate directly from PEM file content */ + /* Read CA file into memory first */ + FILE *cert_file = fopen(app_ctx.ca_file, "rb"); + if (!cert_file) { + fprintf(stderr, "Failed to open CA certificate file: %s\n", strerror(errno)); + goto cleanup; + } + + /* Get file size */ + fseek(cert_file, 0, SEEK_END); + size_t file_size = ftell(cert_file); + fseek(cert_file, 0, SEEK_SET); + + /* Allocate memory for certificate content */ + char *cert_content = aws_mem_acquire(allocator, file_size + 1); + if (!cert_content) { + fprintf(stderr, "Failed to allocate memory for certificate\n"); + fclose(cert_file); + main_error_code = 6; + goto cleanup; + } + + /* Read certificate content */ + size_t bytes_read = fread(cert_content, 1, file_size, cert_file); + fclose(cert_file); + + if (bytes_read != file_size) { + fprintf(stderr, "Failed to read certificate file completely\n"); + aws_mem_release(allocator, cert_content); + main_error_code = 7; + goto cleanup; + } + + cert_content[bytes_read] = '\0'; + + /* Load CA certificate from memory */ + printf("Loading CA certificate from memory (size: %zu bytes)\n", bytes_read); + + struct aws_byte_cursor ca_file_cursor = aws_byte_cursor_from_array(cert_content, bytes_read); + if (aws_tls_ctx_options_override_default_trust_store(&tls_ctx_options, &ca_file_cursor) != AWS_OP_SUCCESS) { + int err = aws_last_error(); + fprintf(stderr, "Failed to load CA certificate directly: %s (%d)\n", + aws_error_debug_str(err), err); + + /* Try alternative method: Using system path-based trust store override */ + aws_tls_ctx_options_clean_up(&tls_ctx_options); + aws_tls_ctx_options_init_default_client(&tls_ctx_options, allocator); + + if (aws_tls_ctx_options_override_default_trust_store_from_path( + &tls_ctx_options, app_ctx.ca_file, app_ctx.ca_dir) != AWS_OP_SUCCESS) { + err = aws_last_error(); + fprintf(stderr, "Failed to load CA certificate from path: %s (%d)\n", + aws_error_debug_str(err), err); + main_error_code = 8; + goto cleanup; + } else { + printf("Successfully loaded CA certificate from path\n"); + } + } else { + printf(COLOR_GREEN "Successfully loaded CA certificate directly" COLOR_RESET "\n"); + } + + /* Free the certificate content */ + aws_mem_release(allocator, cert_content); + + /* Always explicitly enable peer verification */ + aws_tls_ctx_options_set_verify_peer(&tls_ctx_options, true); + } else { + /* When no CA file specified, still ensure peer verification is enabled */ + printf("No CA certificate file specified, using system defaults\n"); + aws_tls_ctx_options_set_verify_peer(&tls_ctx_options, true); + } + + /* Create a new TLS context */ + printf("Creating TLS context...\n"); + + /* Set minimum TLS version to TLS 1.2 to avoid compatibility issues */ + tls_ctx_options.minimum_tls_version = AWS_IO_TLSv1_2; + + tls_ctx = aws_tls_client_ctx_new(allocator, &tls_ctx_options); + if (!tls_ctx) { + int err = aws_last_error(); + fprintf(stderr, COLOR_RED "Failed to create TLS context: %s (%d)" COLOR_RESET "\n", + aws_error_debug_str(err), err); + aws_tls_ctx_options_clean_up(&tls_ctx_options); + main_error_code = 9; + goto cleanup; + } + + printf(COLOR_GREEN "TLS context created successfully" COLOR_RESET "\n"); + + /* Initialize TLS connection options */ + tls_connection_options = aws_mem_calloc(allocator, 1, sizeof(struct aws_tls_connection_options)); + if (!tls_connection_options) { + fprintf(stderr, "Failed to allocate memory for TLS connection options\n"); + aws_tls_ctx_options_clean_up(&tls_ctx_options); + aws_tls_ctx_release(tls_ctx); + main_error_code = 10; + goto cleanup; + } + + /* Initialize TLS connection options from context */ + aws_tls_connection_options_init_from_ctx(tls_connection_options, tls_ctx); + + /* Set server name for SNI */ + struct aws_byte_cursor server_name = aws_byte_cursor_from_c_str(app_ctx.broker_host); + if (aws_tls_connection_options_set_server_name(tls_connection_options, allocator, + &server_name) != AWS_OP_SUCCESS) { + fprintf(stderr, "Failed to set server name: %s\n", aws_error_debug_str(aws_last_error())); + main_error_code = 11; + goto cleanup; + } + + /* Set options for AWS IoT Core endpoints */ + if (strstr(app_ctx.broker_host, "iot.") && strstr(app_ctx.broker_host, ".amazonaws.com")) { + /* In AWS CRT, we need to use the exact certificate that's registered in AWS IoT Core */ + printf("Connecting to AWS IoT Core endpoint: %s\n", app_ctx.broker_host); + printf("Make sure the certificate is registered in AWS IoT Core for this endpoint\n"); + printf("The certificate should have an IoT policy attached that allows connect, publish, and subscribe\n"); + + /* Override the default certificate path */ + if (app_ctx.verbose) { + printf("Setting ALPN protocol to 'mqtt' for AWS IoT Core connection\n"); + printf("Note: ALPN protocol 'mqtt' would be applied for AWS IoT Core\n"); + } + } + + printf("TLS enabled for connection to %s\n", app_ctx.broker_host); + } + + /* Configure SOCKS5 proxy options if using proxy */ + + if (app_ctx.use_proxy) { + if (!app_ctx.proxy.host) { + fprintf(stderr, "Proxy URI was requested but no host was parsed.\n"); + main_error_code = 12; + goto cleanup; + } + + printf("Configuring SOCKS5 proxy %s:%" PRIu16 "\n", app_ctx.proxy.host, app_ctx.proxy.port); + + /* Allocate and initialize the SOCKS5 options structure */ + socks5_options = aws_mem_calloc(allocator, 1, sizeof(struct aws_socks5_proxy_options)); + if (!socks5_options) { + fprintf(stderr, "Failed to allocate memory for SOCKS5 proxy options\n"); + main_error_code = 12; + goto cleanup; + } + + /* Set up SOCKS5-specific options */ + struct aws_byte_cursor proxy_host = aws_byte_cursor_from_c_str(app_ctx.proxy.host); + + /* Use the standard AWS SOCKS5 initialization function */ + int result = aws_socks5_proxy_options_init(socks5_options, allocator, proxy_host, app_ctx.proxy.port); + if (result != AWS_OP_SUCCESS) { + int error_code = aws_last_error(); + fprintf( + stderr, + "Failed to initialize SOCKS5 proxy options: %s (code: %d)\n", + aws_error_debug_str(error_code), + error_code); + main_error_code = 13; + goto cleanup; + } + aws_socks5_proxy_options_set_host_resolution_mode( + socks5_options, + app_ctx.proxy.resolve_host_with_proxy ? AWS_SOCKS5_HOST_RESOLUTION_PROXY + : AWS_SOCKS5_HOST_RESOLUTION_CLIENT); + + /* Setup auth if provided */ + if (app_ctx.proxy.username && app_ctx.proxy.password) { + struct aws_byte_cursor username = aws_byte_cursor_from_c_str(app_ctx.proxy.username); + struct aws_byte_cursor password = aws_byte_cursor_from_c_str(app_ctx.proxy.password); + if (aws_socks5_proxy_options_set_auth(socks5_options, allocator, username, password) != AWS_OP_SUCCESS) { + int error_code = aws_last_error(); + fprintf( + stderr, + "Failed to set SOCKS5 auth: %s (code: %d)\n", + aws_error_debug_str(error_code), + error_code); + main_error_code = 14; + goto cleanup; + } + } + + /* Set connection timeout */ + socks5_options->connection_timeout_ms = 5000; + + if (app_ctx.verbose) { + printf( + "SOCKS5 proxy configured with target %s:%d%s (destination resolved by %s)\n", + app_ctx.broker_host, + app_ctx.broker_port, + app_ctx.use_tls ? " (with TLS)" : "", + app_ctx.proxy.resolve_host_with_proxy ? "proxy" : "client"); + } + } else { + /* Ensure we're using a direct connection with no proxy options */ + if (app_ctx.verbose) { + printf("Using direct connection without SOCKS5 proxy\n"); + } + } + + /* Create MQTT connection */ + app_ctx.mqtt_connection = aws_mqtt_client_connection_new(app_ctx.mqtt_client); + if (!app_ctx.mqtt_connection) { + fprintf(stderr, "Failed to create MQTT connection: %s\n", aws_error_debug_str(aws_last_error())); + main_error_code = 16; + goto cleanup; + } + + /* Configure MQTT connection options */ + struct aws_mqtt_connection_options mqtt_conn_options = { + .host_name = aws_byte_cursor_from_c_str(app_ctx.broker_host), + .port = app_ctx.broker_port, + .client_id = aws_byte_cursor_from_c_str(app_ctx.client_id), + .keep_alive_time_secs = app_ctx.keep_alive_secs, + .ping_timeout_ms = 5000, + .protocol_operation_timeout_ms = 5000, + .clean_session = app_ctx.clean_session, + .on_connection_complete = s_on_connection_complete, + .user_data = &app_ctx, + .tls_options = app_ctx.use_tls ? tls_connection_options : NULL + }; + + /* Set up socket options for the connection */ + struct aws_socket_options socket_options = { + .type = AWS_SOCKET_STREAM, + .domain = AWS_SOCKET_IPV4, /* Use IPv4 for better compatibility */ + .connect_timeout_ms = 10000, /* Allow enough time for connection */ + }; + + mqtt_conn_options.socket_options = &socket_options; + + /* Configure WebSocket transport if needed */ + if (app_ctx.use_websocket) { + if (aws_mqtt_client_connection_use_websockets( + app_ctx.mqtt_connection, NULL, NULL, NULL, NULL) != AWS_OP_SUCCESS) { + fprintf(stderr, "Failed to configure WebSocket transport: %s\n", aws_error_debug_str(aws_last_error())); + main_error_code = 17; + goto cleanup; + } + + printf("Using WebSocket transport with path '%s'\n", app_ctx.ws_path); + } + + /* Configure proxy options */ + if (app_ctx.use_proxy && app_ctx.proxy.host) { + printf( + "Using SOCKS5 proxy at %s:%" PRIu16 " (%s DNS)\n", + app_ctx.proxy.host, + app_ctx.proxy.port, + app_ctx.proxy.resolve_host_with_proxy ? "proxy" : "client-side"); + + /* SOCKS5 options were already configured above */ + if (app_ctx.verbose) { + printf("SOCKS5 proxy options have already been configured.\n"); + } + } else { + printf("Not using any proxy, connecting directly.\n"); + /* Ensure socks5_options is NULL for direct connections */ + if (socks5_options) { + aws_socks5_proxy_options_clean_up(socks5_options); + aws_mem_release(allocator, socks5_options); + socks5_options = NULL; + } + } + + /* Connect to MQTT broker */ + printf("Connecting to MQTT broker %s:%d...\n", app_ctx.broker_host, app_ctx.broker_port); + + if (app_ctx.verbose) { + printf("Connection details:\n"); + printf(" Transport: %s%s\n", + app_ctx.use_tls ? "TLS" : "TCP", + app_ctx.use_websocket ? " over WebSocket" : ""); + printf(" Client ID: %s\n", app_ctx.client_id); + printf(" Clean session: %s\n", app_ctx.clean_session ? "yes" : "no"); + if (app_ctx.use_proxy && app_ctx.proxy.host) { + printf(" Via SOCKS5 proxy: %s:%" PRIu16 "\n", app_ctx.proxy.host, app_ctx.proxy.port); + } else { + printf(" Direct connection (no proxy)\n"); + } + } + + int connect_result; + if (app_ctx.use_proxy && app_ctx.proxy.host && socks5_options) { + /* Using SOCKS5 proxy for MQTT connection */ + printf( + "Connecting via SOCKS5 proxy %s:%" PRIu16 " to %s:%d\n", + app_ctx.proxy.host, + app_ctx.proxy.port, + app_ctx.broker_host, + app_ctx.broker_port); + + /* Print the SOCKS5 configuration */ + if (app_ctx.verbose) { + print_socks5_config(socks5_options); + } + + /* IMPORTANT: For SOCKS5 with TLS, make sure TLS options are properly set BEFORE setting SOCKS5 options */ + if (app_ctx.use_tls) { + if (app_ctx.verbose) { + printf("TLS with SOCKS5: Ensuring TLS options are properly configured\n"); + printf(" CA file: %s\n", app_ctx.ca_file ? app_ctx.ca_file : "(system default)"); + if (app_ctx.cert_file && app_ctx.key_file) { + printf(" Client certificate: %s\n", app_ctx.cert_file); + } else { + printf(" Using one-way TLS (server authentication only)\n"); + } + } + + /* Critical: Ensure TLS options are set in the MQTT connection options */ + if (tls_connection_options) { + mqtt_conn_options.tls_options = tls_connection_options; + } else if (app_ctx.verbose) { + fprintf(stderr, "WARNING: TLS is enabled but TLS options are NULL!\n"); + } + } + + /* Set the SOCKS5 proxy options in the MQTT client connection AFTER TLS is configured */ + if (aws_mqtt_client_connection_set_socks5_proxy_options(app_ctx.mqtt_connection, socks5_options) != AWS_OP_SUCCESS) { + fprintf(stderr, "Failed to set SOCKS5 proxy options: %s\n", aws_error_debug_str(aws_last_error())); + main_error_code = 18; + goto cleanup; + } + + /* Proceed with connection using the MQTT connection API */ + connect_result = aws_mqtt_client_connection_connect(app_ctx.mqtt_connection, &mqtt_conn_options); + } else { + /* Use the standard connection function for direct connections */ + connect_result = aws_mqtt_client_connection_connect(app_ctx.mqtt_connection, &mqtt_conn_options); + } + + if (connect_result != AWS_OP_SUCCESS) { + int error_code = aws_last_error(); + fprintf(stderr, "Failed to start MQTT connection: %s (error code: %d)\n", + aws_error_debug_str(error_code), error_code); + main_error_code = 19; + goto cleanup; + } + + /* Wait for connection to complete */ + aws_mutex_lock(&app_ctx.lock); + aws_condition_variable_wait_pred( + &app_ctx.signal, &app_ctx.lock, s_connection_completed_predicate, &app_ctx); + aws_mutex_unlock(&app_ctx.lock); + + if (app_ctx.connection_error_code != AWS_ERROR_SUCCESS) { + fprintf(stderr, COLOR_RED "Failed to connect to MQTT broker: %s" COLOR_RESET "\n", + aws_error_debug_str(app_ctx.connection_error_code)); + main_error_code = 20; + goto cleanup; + } + + printf(COLOR_GREEN "Connected to MQTT broker successfully" COLOR_RESET "\n"); + + /* Subscribe to topic */ + printf("Subscribing to topic: %s\n", app_ctx.sub_topic); + if (s_subscribe_to_topic(&app_ctx) != AWS_OP_SUCCESS) { + main_error_code = 21; + goto cleanup; + } + + /* Wait for subscription to complete */ + aws_mutex_lock(&app_ctx.lock); + aws_condition_variable_wait_pred( + &app_ctx.signal, &app_ctx.lock, s_subscription_completed_predicate, &app_ctx); + aws_mutex_unlock(&app_ctx.lock); + + if (app_ctx.subscription_error_code != AWS_ERROR_SUCCESS) { + fprintf(stderr, COLOR_RED "Failed to subscribe: %s" COLOR_RESET "\n", + aws_error_debug_str(app_ctx.subscription_error_code)); + main_error_code = 22; + goto cleanup; + } + + printf(COLOR_GREEN "Subscribed to topic: %s" COLOR_RESET "\n", app_ctx.sub_topic); + + /* Publish a message */ + printf("Publishing message to topic: %s\n", app_ctx.pub_topic); + if (s_publish_message(&app_ctx) != AWS_OP_SUCCESS) { + main_error_code = 23; + goto cleanup; + } + + /* Wait for publish to complete */ + aws_mutex_lock(&app_ctx.lock); + aws_condition_variable_wait_pred( + &app_ctx.signal, &app_ctx.lock, s_publish_completed_predicate, &app_ctx); + aws_mutex_unlock(&app_ctx.lock); + + if (app_ctx.publish_error_code != AWS_ERROR_SUCCESS) { + fprintf(stderr, COLOR_RED "Failed to publish: %s" COLOR_RESET "\n", aws_error_debug_str(app_ctx.publish_error_code)); + main_error_code = 24; + goto cleanup; + } + + printf(COLOR_GREEN "Published message successfully" COLOR_RESET "\n"); + + /* Wait a short time to potentially receive the message we just published */ + printf("Waiting to receive message...\n"); + aws_thread_current_sleep(3000000000ULL); /* 3 seconds */ + + aws_mutex_lock(&app_ctx.lock); + if (app_ctx.message_received) { + printf(COLOR_GREEN "Received message: %.*s" COLOR_RESET "\n", + (int)app_ctx.received_message.len, + (char *)app_ctx.received_message.buffer); + } else { + printf(COLOR_YELLOW "No message received within timeout" COLOR_RESET "\n"); + } + aws_mutex_unlock(&app_ctx.lock); + + /* Disconnect from MQTT broker */ + printf("Disconnecting from MQTT broker...\n"); + aws_mqtt_client_connection_disconnect(app_ctx.mqtt_connection, NULL, NULL); + + /* Wait a bit for the disconnect to complete */ + aws_thread_current_sleep(1 * 1000000000); /* 1 second */ + +cleanup: + /* Clean up TLS resources if used */ + if (tls_connection_options != NULL) { + aws_tls_connection_options_clean_up(tls_connection_options); + aws_mem_release(allocator, tls_connection_options); + } + + if (tls_ctx != NULL) { + aws_tls_ctx_release(tls_ctx); + } + + /* Clean up proxy options if used */ + if (socks5_options) { + aws_socks5_proxy_options_clean_up(socks5_options); + aws_mem_release(allocator, socks5_options); + } + + /* Clean up the app context */ + s_app_ctx_clean_up(&app_ctx); + + /* Clean up libraries */ + aws_mqtt_library_clean_up(); + aws_io_library_clean_up(); + aws_common_library_clean_up(); + + /* Clean up logger before exit */ + if (logger_initialized) { + aws_logger_clean_up(&logger); + } + + return main_error_code; +} diff --git a/bin/mqtt5_client_app/CMakeLists.txt b/bin/mqtt5_client_app/CMakeLists.txt new file mode 100644 index 00000000..f9b18751 --- /dev/null +++ b/bin/mqtt5_client_app/CMakeLists.txt @@ -0,0 +1,28 @@ +project(mqtt5_client_app C) + +list(APPEND CMAKE_MODULE_PATH "${CMAKE_INSTALL_PREFIX}/lib/cmake") + +file(GLOB MQTT5_CLIENT_APP_SRC + "*.c" + ) + +set(MQTT5_CLIENT_APP_PROJECT_NAME mqtt5_client_app) +add_executable(${MQTT5_CLIENT_APP_PROJECT_NAME} ${MQTT5_CLIENT_APP_SRC}) +aws_set_common_properties(${MQTT5_CLIENT_APP_PROJECT_NAME}) + +target_include_directories(${MQTT5_CLIENT_APP_PROJECT_NAME} PUBLIC + $ + $) + +target_link_libraries(${MQTT5_CLIENT_APP_PROJECT_NAME} PRIVATE aws-c-mqtt) + +if (BUILD_SHARED_LIBS AND NOT WIN32) + message(INFO " mqtt5_client_app will be built with shared libs, but you may need to set LD_LIBRARY_PATH=${CMAKE_INSTALL_PREFIX}/lib to run the application") +endif() + +install(TARGETS ${MQTT5_CLIENT_APP_PROJECT_NAME} + EXPORT ${MQTT5_CLIENT_APP_PROJECT_NAME}-targets + COMPONENT Runtime + RUNTIME + DESTINATION bin + COMPONENT Runtime) \ No newline at end of file diff --git a/bin/mqtt5_client_app/main.c b/bin/mqtt5_client_app/main.c new file mode 100644 index 00000000..5a17d36c --- /dev/null +++ b/bin/mqtt5_client_app/main.c @@ -0,0 +1,971 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +struct socks5_proxy_settings { + char *host; + char *username; + char *password; + uint16_t port; + bool resolve_host_with_proxy; +}; + +static void s_socks5_proxy_settings_clean_up( + struct socks5_proxy_settings *settings, + struct aws_allocator *allocator) { + if (!settings) { + return; + } + if (settings->host) { + aws_mem_release(allocator, settings->host); + } + if (settings->username) { + aws_mem_release(allocator, settings->username); + } + if (settings->password) { + aws_mem_release(allocator, settings->password); + } + AWS_ZERO_STRUCT(*settings); +} + +static int s_socks5_proxy_settings_init_from_uri( + struct socks5_proxy_settings *settings, + struct aws_allocator *allocator, + const char *proxy_uri) { + + if (!settings || !allocator || !proxy_uri) { + return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT); + } + + s_socks5_proxy_settings_clean_up(settings, allocator); + + struct aws_byte_cursor uri_cursor = aws_byte_cursor_from_c_str(proxy_uri); + struct aws_uri uri; + AWS_ZERO_STRUCT(uri); + + if (aws_uri_init_parse(&uri, allocator, &uri_cursor)) { + fprintf(stderr, "Failed to parse proxy URI \"%s\": %s\n", proxy_uri, aws_error_debug_str(aws_last_error())); + goto on_error; + } + + const struct aws_byte_cursor *scheme = aws_uri_scheme(&uri); + if (!scheme || !scheme->len) { + fprintf(stderr, "Proxy URI \"%s\" must include scheme socks5h://\n", proxy_uri); + goto on_error; + } + + if (aws_byte_cursor_eq_c_str_ignore_case(scheme, "socks5h")) { + settings->resolve_host_with_proxy = true; + } else if (aws_byte_cursor_eq_c_str_ignore_case(scheme, "socks5")) { + settings->resolve_host_with_proxy = false; + } else { + fprintf(stderr, "Unsupported proxy scheme in \"%s\". Expected socks5h://\n", proxy_uri); + goto on_error; + } + + const struct aws_byte_cursor *host = aws_uri_host_name(&uri); + if (!host || host->len == 0) { + fprintf(stderr, "Proxy URI \"%s\" must include a host\n", proxy_uri); + goto on_error; + } + + settings->host = aws_mem_calloc(allocator, host->len + 1, sizeof(char)); + if (!settings->host) { + fprintf(stderr, "Failed to allocate memory for proxy host\n"); + goto on_error; + } + memcpy(settings->host, host->ptr, host->len); + settings->host[host->len] = '\0'; + + uint32_t parsed_port = aws_uri_port(&uri); + if (parsed_port == 0) { + parsed_port = 1080; + } + if (parsed_port > UINT16_MAX) { + fprintf(stderr, "Proxy port %" PRIu32 " exceeds uint16_t range\n", parsed_port); + goto on_error; + } + settings->port = (uint16_t)parsed_port; + + if (uri.user.len > 0) { + settings->username = aws_mem_calloc(allocator, uri.user.len + 1, sizeof(char)); + if (!settings->username) { + fprintf(stderr, "Failed to allocate memory for proxy username\n"); + goto on_error; + } + memcpy(settings->username, uri.user.ptr, uri.user.len); + settings->username[uri.user.len] = '\0'; + } + + if (uri.password.len > 0) { + settings->password = aws_mem_calloc(allocator, uri.password.len + 1, sizeof(char)); + if (!settings->password) { + fprintf(stderr, "Failed to allocate memory for proxy password\n"); + goto on_error; + } + memcpy(settings->password, uri.password.ptr, uri.password.len); + settings->password[uri.password.len] = '\0'; + } + + aws_uri_clean_up(&uri); + return AWS_OP_SUCCESS; + +on_error: + aws_uri_clean_up(&uri); + s_socks5_proxy_settings_clean_up(settings, allocator); + return AWS_OP_ERR; +} + +/* ANSI Color codes for terminal output */ +#define COLOR_RED "\x1b[31m" +#define COLOR_GREEN "\x1b[32m" +#define COLOR_YELLOW "\x1b[33m" +#define COLOR_BLUE "\x1b[34m" +#define COLOR_RESET "\x1b[0m" + +/* + * This example demonstrates how to establish an MQTT connection through a SOCKS5 proxy + * or directly to the MQTT broker. It supports various features including: + * + * - MQTT connection via SOCKS5 proxy or direct connection + * - TLS support for secure MQTT (MQTTS) + * - WebSocket transport support (MQTT over WebSockets) + * - Basic MQTT operations (subscribe, publish, receive) + * - SOCKS5 proxy authentication + * + * The various transport options can be combined to support: + * - Direct MQTT + * - MQTT over SOCKS5 + * - MQTT over TLS + * - MQTT over TLS over SOCKS5 + * - MQTT over WebSocket + * - MQTT over WebSocket over SOCKS5 + * - MQTT over WebSocket over TLS + * - MQTT over WebSocket over TLS over SOCKS5 + */ + +struct app_ctx { + struct aws_allocator *allocator; + struct aws_event_loop_group *event_loop_group; + struct aws_host_resolver *host_resolver; + struct aws_client_bootstrap *bootstrap; + + struct aws_mqtt5_client *mqtt_client; + struct aws_mqtt5_client_connection *mqtt_connection; + + struct aws_mutex lock; + struct aws_condition_variable signal; + + /* MQTT broker settings */ + const char *broker_host; + uint16_t broker_port; + + /* MQTT client settings */ + const char *client_id; + bool clean_session; + uint16_t keep_alive_secs; + + /* SOCKS5 proxy settings */ + struct socks5_proxy_settings proxy; + + /* WebSocket settings */ + const char *ws_path; + + /* Certificate settings */ + const char *cert_file; + const char *key_file; + const char *ca_file; + const char *ca_dir; + + /* Topic settings */ + const char *pub_topic; + const char *sub_topic; + const char *pub_message; + + /* Connection options */ + bool use_proxy; + bool use_tls; + bool use_websocket; + bool verbose; + + /* State tracking */ + bool connection_complete; + int connection_error_code; + bool subscription_complete; + int subscription_error_code; + bool publish_complete; + int publish_error_code; + bool message_received; + + /* Packet IDs */ + uint16_t sub_packet_id; + uint16_t pub_packet_id; + + /* Message buffer */ + struct aws_byte_buf received_message; +}; + + +static void s_aws_mqtt5_transform_websocket_handshake_fn( + struct aws_http_message *request, + void *user_data, + aws_mqtt5_transform_websocket_handshake_complete_fn *complete_fn, + void *complete_ctx) { + + (void)user_data; + + (*complete_fn)(request, AWS_ERROR_SUCCESS, complete_ctx); +} + +static bool s_subscription_completed_pred(void *user_data) { + struct app_ctx *ctx = user_data; + return ctx->subscription_complete; +} + +static bool s_message_received_pred(void *user_data) { + struct app_ctx *ctx = user_data; + return ctx->message_received; +} + +static void s_usage(int exit_code) { + fprintf(stderr, "usage: mqtt_socks5_example [options]\n"); + fprintf(stderr, " --broker-host HOST: MQTT broker hostname (default: test.mosquitto.org)\n"); + fprintf(stderr, " --broker-port PORT: MQTT broker port (default: 1883 for MQTT, 8883 for MQTTS, 8080/8443 for WebSockets)\n"); + fprintf(stderr, " --client-id ID: MQTT client ID (default: aws-mqtt-socks5-example)\n"); + fprintf(stderr, " --clean-session: Use clean session (default: true)\n"); + fprintf(stderr, " --keep-alive SECS: MQTT keep-alive in seconds (default: 60)\n"); + + fprintf( + stderr, + " --proxy URL: SOCKS5 proxy URI (socks5h://... for proxy DNS, socks5://... for local DNS)\n"); + + fprintf(stderr, " --cert FILE: Client certificate file path (PEM format)\n"); + fprintf(stderr, " --key FILE: Private key file path (PEM format)\n"); + fprintf(stderr, " --ca-file FILE: CA certificate file path (PEM format)\n"); + fprintf(stderr, " --ca-dir DIR: Directory containing CA certificates\n"); + + fprintf(stderr, " --ws-path PATH: WebSocket path (default: /mqtt)\n"); + + fprintf(stderr, " --pub-topic TOPIC: Topic to publish to (default: test/mqtt-socks5-example)\n"); + fprintf(stderr, " --sub-topic TOPIC: Topic to subscribe to (default: same as pub-topic)\n"); + fprintf(stderr, " --message MSG: Message to publish (default: \"Hello from MQTT SOCKS5 example\")\n"); + fprintf(stderr, " --websocket: Use WebSockets for transport\n"); + fprintf(stderr, " --verbose: Print detailed logging\n"); + fprintf(stderr, " --help: Display this message and exit\n"); + + exit(exit_code); +} + +static struct aws_cli_option s_long_options[] = { + {"broker-host", AWS_CLI_OPTIONS_REQUIRED_ARGUMENT, NULL, 'b'}, + {"broker-port", AWS_CLI_OPTIONS_REQUIRED_ARGUMENT, NULL, 'p'}, + {"client-id", AWS_CLI_OPTIONS_REQUIRED_ARGUMENT, NULL, 'i'}, + {"clean-session", AWS_CLI_OPTIONS_NO_ARGUMENT, NULL, 'c'}, + {"keep-alive", AWS_CLI_OPTIONS_REQUIRED_ARGUMENT, NULL, 'k'}, + + {"proxy", AWS_CLI_OPTIONS_REQUIRED_ARGUMENT, NULL, 'x'}, + + {"ws-path", AWS_CLI_OPTIONS_REQUIRED_ARGUMENT, NULL, 'P'}, + + {"cert", AWS_CLI_OPTIONS_REQUIRED_ARGUMENT, NULL, 'C'}, + {"key", AWS_CLI_OPTIONS_REQUIRED_ARGUMENT, NULL, 'K'}, + {"ca-file", AWS_CLI_OPTIONS_REQUIRED_ARGUMENT, NULL, 'A'}, + {"ca-dir", AWS_CLI_OPTIONS_REQUIRED_ARGUMENT, NULL, 'D'}, + + {"pub-topic", AWS_CLI_OPTIONS_REQUIRED_ARGUMENT, NULL, 't'}, + {"sub-topic", AWS_CLI_OPTIONS_REQUIRED_ARGUMENT, NULL, 's'}, + {"message", AWS_CLI_OPTIONS_REQUIRED_ARGUMENT, NULL, 'm'}, + {"websocket", AWS_CLI_OPTIONS_NO_ARGUMENT, NULL, 'W'}, + {"verbose", AWS_CLI_OPTIONS_NO_ARGUMENT, NULL, 'v'}, + {"help", AWS_CLI_OPTIONS_NO_ARGUMENT, NULL, 'h'}, + {NULL, 0, NULL, 0}, +}; + +static void s_parse_options(int argc, char **argv, struct app_ctx *ctx) { + /* Default values */ + ctx->broker_host = "test.mosquitto.org"; + ctx->broker_port = 0; /* Will be set later based on options */ + ctx->client_id = "aws-mqtt-socks5-example"; + ctx->clean_session = true; + ctx->keep_alive_secs = 60; + + ctx->cert_file = NULL; + ctx->key_file = NULL; + ctx->ca_file = NULL; + ctx->ca_dir = NULL; + + ctx->ws_path = "/mqtt"; + + ctx->pub_topic = "test/mqtt-socks5-example"; + ctx->sub_topic = NULL; /* Default to same as pub_topic */ + ctx->pub_message = "Hello from MQTT SOCKS5 example"; + + ctx->use_proxy = false; + ctx->use_tls = false; + ctx->use_websocket = false; + ctx->verbose = false; + + while (true) { + int option_index = 0; + int c = aws_cli_getopt_long(argc, argv, "b:p:i:ck:x:P:C:K:A:D:t:s:m:Wvh", + s_long_options, &option_index); + if (c == -1) { + break; + } + + switch (c) { + case 'b': + ctx->broker_host = aws_cli_optarg; + break; + case 'p': + ctx->broker_port = (uint16_t)atoi(aws_cli_optarg); + break; + case 'i': + ctx->client_id = aws_cli_optarg; + break; + case 'c': + ctx->clean_session = true; + break; + case 'k': + ctx->keep_alive_secs = (uint16_t)atoi(aws_cli_optarg); + break; + + case 'x': + if (s_socks5_proxy_settings_init_from_uri(&ctx->proxy, ctx->allocator, aws_cli_optarg)) { + s_usage(1); + } + ctx->use_proxy = true; + break; + + case 'P': + ctx->ws_path = aws_cli_optarg; + break; + + case 'C': + ctx->cert_file = aws_cli_optarg; + break; + case 'K': + ctx->key_file = aws_cli_optarg; + break; + case 'A': + ctx->ca_file = aws_cli_optarg; + break; + case 'D': + ctx->ca_dir = aws_cli_optarg; + break; + + case 't': + ctx->pub_topic = aws_cli_optarg; + break; + case 's': + ctx->sub_topic = aws_cli_optarg; + break; + case 'm': + ctx->pub_message = aws_cli_optarg; + break; + case 'W': + ctx->use_websocket = true; + break; + case 'v': + ctx->verbose = true; + break; + case 'h': + s_usage(0); + break; + default: + fprintf(stderr, "Unknown option: %c\n", c); + s_usage(1); + break; + } + } + + if (!ctx->use_tls) { + ctx->use_tls = ctx->ca_file || ctx->cert_file || ctx->key_file; + } + + /* If sub_topic not specified, use the same as pub_topic */ + if (ctx->sub_topic == NULL) { + ctx->sub_topic = ctx->pub_topic; + } + + /* Set default port if not specified */ + if (ctx->broker_port == 0) { + if (ctx->use_websocket) { + ctx->broker_port = ctx->use_tls ? 443 : 80; + } else { + ctx->broker_port = ctx->use_tls ? 8883 : 1883; + } + } +} + +/* Structure for user data passed to the socket channel creation callback */ +static void s_app_ctx_init(struct app_ctx *ctx, struct aws_allocator *allocator) { + AWS_ZERO_STRUCT(*ctx); + ctx->allocator = allocator; + aws_mutex_init(&ctx->lock); + aws_condition_variable_init(&ctx->signal); + aws_byte_buf_init(&ctx->received_message, allocator, 1024); /* Initial buffer capacity */ +} + +static void s_app_ctx_clean_up(struct app_ctx *ctx) { + if (ctx->mqtt_connection) { + //aws_mqtt5_client_connection_release(ctx->mqtt_connection); + } + + if (ctx->mqtt_client) { + //aws_mqtt5_client_release(ctx->mqtt_client); + } + + if (ctx->bootstrap) { + aws_client_bootstrap_release(ctx->bootstrap); + } + + if (ctx->host_resolver) { + aws_host_resolver_release(ctx->host_resolver); + } + + if (ctx->event_loop_group) { + aws_event_loop_group_release(ctx->event_loop_group); + } + + aws_byte_buf_clean_up(&ctx->received_message); + s_socks5_proxy_settings_clean_up(&ctx->proxy, ctx->allocator); + aws_condition_variable_clean_up(&ctx->signal); + aws_mutex_clean_up(&ctx->lock); +} + +/* Define a lifecycle event handler that signals main thread to exit on connection failure */ +static void s_lifecycle_event_handler(const struct aws_mqtt5_client_lifecycle_event *event) { + struct app_ctx *ctx = (struct app_ctx *)event->user_data; + if (event->event_type == AWS_MQTT5_CLET_CONNECTION_SUCCESS) { + printf("MQTT5 connection established successfully.\n"); + aws_mutex_lock(&ctx->lock); + ctx->connection_complete = true; + aws_condition_variable_notify_one(&ctx->signal); + aws_mutex_unlock(&ctx->lock); + } else if (event->event_type == AWS_MQTT5_CLET_CONNECTION_FAILURE) { + fprintf(stderr, "MQTT5 connection failed: %s\n", aws_error_debug_str(event->error_code)); + if (ctx) { + aws_mutex_lock(&ctx->lock); + ctx->connection_complete = true; + ctx->connection_error_code = event->error_code; + aws_condition_variable_notify_one(&ctx->signal); + aws_mutex_unlock(&ctx->lock); + } + } else if (event->event_type == AWS_MQTT5_CLET_DISCONNECTION) { + fprintf(stderr, "MQTT5 client disconnected: %s\n", aws_error_debug_str(event->error_code)); + } +} + +/* Define a publish received handler */ +static void s_publish_received_handler(const struct aws_mqtt5_packet_publish_view *publish, void *user_data) { + printf("Message received on topic '%.*s': %.*s\n", + (int)publish->topic.len, publish->topic.ptr, + (int)publish->payload.len, publish->payload.ptr); + + struct app_ctx *ctx = user_data; + if (ctx == NULL) { + return; + } + + aws_mutex_lock(&ctx->lock); + ctx->message_received = true; + aws_condition_variable_notify_one(&ctx->signal); + aws_mutex_unlock(&ctx->lock); +} + +static void s_on_subscribe_complete( + const struct aws_mqtt5_packet_suback_view *suback, + int error_code, + void *user_data) { + + (void)suback; + + struct app_ctx *ctx = user_data; + if (ctx == NULL) { + return; + } + + aws_mutex_lock(&ctx->lock); + ctx->subscription_complete = true; + ctx->subscription_error_code = error_code; + aws_condition_variable_notify_one(&ctx->signal); + aws_mutex_unlock(&ctx->lock); +} + +// Subscribe to a topic +static int s_subscribe_to_topic(struct app_ctx *ctx) { + if (!ctx->mqtt_client) { + fprintf(stderr, "Cannot subscribe: No active MQTT client\n"); + return AWS_OP_ERR; + } + + struct aws_mqtt5_subscription_view subscription = { + .topic_filter = aws_byte_cursor_from_c_str(ctx->sub_topic), + .qos = AWS_MQTT5_QOS_AT_LEAST_ONCE, + }; + + struct aws_mqtt5_packet_subscribe_view subscribe_view; + AWS_ZERO_STRUCT(subscribe_view); + subscribe_view.subscriptions = &subscription; + subscribe_view.subscription_count = 1; + + struct aws_mqtt5_subscribe_completion_options completion_options; + AWS_ZERO_STRUCT(completion_options); + completion_options.completion_callback = s_on_subscribe_complete; + completion_options.completion_user_data = ctx; + + aws_mutex_lock(&ctx->lock); + ctx->subscription_complete = false; + ctx->subscription_error_code = AWS_ERROR_SUCCESS; + aws_mutex_unlock(&ctx->lock); + + if (aws_mqtt5_client_subscribe(ctx->mqtt_client, &subscribe_view, &completion_options)) { + fprintf(stderr, COLOR_RED "Failed to subscribe: %s" COLOR_RESET "\n", aws_error_debug_str(aws_last_error())); + return AWS_OP_ERR; + } + + if (ctx->verbose) { + printf("Subscription request sent for topic: %s\n", ctx->sub_topic); + } + + aws_mutex_lock(&ctx->lock); + int wait_result = aws_condition_variable_wait_pred(&ctx->signal, &ctx->lock, s_subscription_completed_pred, ctx); + int sub_error_code = ctx->subscription_error_code; + aws_mutex_unlock(&ctx->lock); + + if (wait_result != AWS_OP_SUCCESS) { + fprintf( + stderr, + COLOR_RED "Subscription wait failed: %s" COLOR_RESET "\n", + aws_error_debug_str(aws_last_error())); + return AWS_OP_ERR; + } + + if (sub_error_code != AWS_ERROR_SUCCESS) { + fprintf( + stderr, + COLOR_RED "Subscription failed: %s" COLOR_RESET "\n", + aws_error_debug_str(sub_error_code)); + return AWS_OP_ERR; + } + + if (ctx->verbose) { + printf("Subscription acknowledged by broker\n"); + } + + return AWS_OP_SUCCESS; +} + +// Publish a message +static int s_publish_message(struct app_ctx *ctx) { + if (!ctx->mqtt_client) { + fprintf(stderr, "Cannot publish: No active MQTT client\n"); + return AWS_OP_ERR; + } + + struct aws_mqtt5_packet_publish_view publish_view = { + .topic = aws_byte_cursor_from_c_str(ctx->pub_topic), + .payload = aws_byte_cursor_from_c_str(ctx->pub_message), + .qos = AWS_MQTT5_QOS_AT_LEAST_ONCE, + .retain = false, + }; + + if (aws_mqtt5_client_publish(ctx->mqtt_client, &publish_view, NULL)) { + fprintf(stderr, COLOR_RED "Failed to publish: %s" COLOR_RESET "\n", aws_error_debug_str(aws_last_error())); + return AWS_OP_ERR; + } + + if (ctx->verbose) { + printf("Publish request sent to topic: %s\n", ctx->pub_topic); + } + + return AWS_OP_SUCCESS; +} + +int main(int argc, char **argv) { + + int main_error_code = 0; + bool socks5_options_valid = false; + + struct aws_allocator *allocator = aws_default_allocator(); + aws_common_library_init(allocator); + aws_io_library_init(allocator); + aws_mqtt_library_init(allocator); + + struct aws_tls_ctx *tls_ctx = NULL; + struct app_ctx app_ctx; + s_app_ctx_init(&app_ctx, allocator); + + /* Parse command line arguments */ + s_parse_options(argc, argv, &app_ctx); + + /* Initialize AWS CRT logger */ + struct aws_logger logger; + struct aws_logger_standard_options logger_options = { + /* Force debug level output for this run to diagnose the TLS issue */ + .level = app_ctx.verbose ? AWS_LL_TRACE : AWS_LL_ERROR, + .file = stderr, + }; + + bool logger_initialized = false; + if (aws_logger_init_standard(&logger, allocator, &logger_options) == AWS_OP_SUCCESS) { + aws_logger_set(&logger); + logger_initialized = true; + + if (app_ctx.verbose) { + printf("Verbose mode enabled, using TRACE log level\n"); + } + } else { + fprintf(stderr, "[WARN] Failed to initialize AWS logger, logs will not be shown.\n"); + } + + /* Log the configuration */ + printf("MQTT%s connection to %s:%d\n", + app_ctx.use_tls ? "S" : "", + app_ctx.broker_host, + app_ctx.broker_port); + + printf("Client ID: %s\n", app_ctx.client_id); + printf("Clean session: %s\n", app_ctx.clean_session ? "yes" : "no"); + printf("Keep alive: %d seconds\n", app_ctx.keep_alive_secs); + + if (app_ctx.use_websocket) { + printf("Using WebSocket transport\n"); + } + + if (app_ctx.use_proxy && app_ctx.proxy.host) { + printf( + "Using SOCKS5 proxy at %s:%" PRIu16 " (%s DNS)\n", + app_ctx.proxy.host, + app_ctx.proxy.port, + app_ctx.proxy.resolve_host_with_proxy ? "proxy" : "client-side"); + if (app_ctx.proxy.username) { + printf("With proxy authentication: username=%s\n", app_ctx.proxy.username); + } + } else { + printf("Using direct connection (no proxy)\n"); + } + + /* Create event loop group */ + app_ctx.event_loop_group = aws_event_loop_group_new_default(allocator, 1, NULL); + if (!app_ctx.event_loop_group) { + fprintf(stderr, "Failed to create event loop group: %s\n", aws_error_debug_str(aws_last_error())); + main_error_code = 1; + goto cleanup; + } + + /* Create host resolver */ + struct aws_host_resolver_default_options resolver_options = { + .max_entries = 8, + .el_group = app_ctx.event_loop_group + }; + app_ctx.host_resolver = aws_host_resolver_new_default(allocator, &resolver_options); + if (!app_ctx.host_resolver) { + fprintf(stderr, "Failed to create host resolver: %s\n", aws_error_debug_str(aws_last_error())); + main_error_code = 2; + goto cleanup; + } + + /* Create client bootstrap */ + struct aws_client_bootstrap_options bootstrap_options = { + .event_loop_group = app_ctx.event_loop_group, + .host_resolver = app_ctx.host_resolver + }; + app_ctx.bootstrap = aws_client_bootstrap_new(allocator, &bootstrap_options); + if (!app_ctx.bootstrap) { + fprintf(stderr, "Failed to create client bootstrap: %s\n", aws_error_debug_str(aws_last_error())); + main_error_code = 3; + goto cleanup; + } + + /* TLS configuration copied from mqtt5canary */ + struct aws_tls_connection_options tls_connection_options; + AWS_ZERO_STRUCT(tls_connection_options); + struct aws_socks5_proxy_options socks5_options; + AWS_ZERO_STRUCT(socks5_options); + + if (app_ctx.use_tls) { + struct aws_byte_cursor broker_host_cursor = aws_byte_cursor_from_c_str(app_ctx.broker_host); + struct aws_tls_ctx_options tls_ctx_options; + AWS_ZERO_STRUCT(tls_ctx_options); + + int tls_result = AWS_OP_SUCCESS; + if (app_ctx.cert_file && app_ctx.key_file) { + tls_result = aws_tls_ctx_options_init_client_mtls_from_path( + &tls_ctx_options, allocator, app_ctx.cert_file, app_ctx.key_file); + } else { + aws_tls_ctx_options_init_default_client(&tls_ctx_options, allocator); + } + + if (tls_result != AWS_OP_SUCCESS) { + fprintf( + stderr, + "Failed to initialize TLS context options: %s\n", + aws_error_debug_str(aws_last_error())); + main_error_code = 9; + goto cleanup; + } + + if ((app_ctx.ca_dir || app_ctx.ca_file) && + aws_tls_ctx_options_override_default_trust_store_from_path( + &tls_ctx_options, app_ctx.ca_dir, app_ctx.ca_file)) { + fprintf( + stderr, + "Failed to configure TLS trust store (dir=%s, file=%s): %s\n", + app_ctx.ca_dir ? app_ctx.ca_dir : "NULL", + app_ctx.ca_file ? app_ctx.ca_file : "NULL", + aws_error_debug_str(aws_last_error())); + aws_tls_ctx_options_clean_up(&tls_ctx_options); + main_error_code = 10; + goto cleanup; + } + + if (aws_tls_ctx_options_set_alpn_list(&tls_ctx_options, "x-amzn-mqtt-ca")) { + fprintf( + stderr, + "Failed to set TLS ALPN list: %s\n", + aws_error_debug_str(aws_last_error())); + aws_tls_ctx_options_clean_up(&tls_ctx_options); + main_error_code = 11; + goto cleanup; + } + + tls_ctx = aws_tls_client_ctx_new(allocator, &tls_ctx_options); + aws_tls_ctx_options_clean_up(&tls_ctx_options); + if (!tls_ctx) { + fprintf(stderr, "Failed to create TLS context: %s\n", aws_error_debug_str(aws_last_error())); + main_error_code = 12; + goto cleanup; + } + + aws_tls_connection_options_init_from_ctx(&tls_connection_options, tls_ctx); + if (aws_tls_connection_options_set_server_name(&tls_connection_options, allocator, &broker_host_cursor)) { + fprintf( + stderr, + "Failed to set TLS server name (%s): %s\n", + app_ctx.broker_host, + aws_error_debug_str(aws_last_error())); + main_error_code = 13; + goto cleanup; + } + } + + /* Correct socks5_options */ + if (app_ctx.use_proxy) { + if (!app_ctx.proxy.host) { + fprintf(stderr, "Proxy URI was requested but no host was parsed.\n"); + main_error_code = 14; + goto cleanup; + } + + struct aws_byte_cursor proxy_host = aws_byte_cursor_from_c_str(app_ctx.proxy.host); + if (aws_socks5_proxy_options_init(&socks5_options, allocator, proxy_host, app_ctx.proxy.port) != AWS_OP_SUCCESS) { + fprintf( + stderr, + "Failed to initialize SOCKS5 proxy options: %s\n", + aws_error_debug_str(aws_last_error())); + main_error_code = 14; + goto cleanup; + } + aws_socks5_proxy_options_set_host_resolution_mode( + &socks5_options, + app_ctx.proxy.resolve_host_with_proxy ? AWS_SOCKS5_HOST_RESOLUTION_PROXY + : AWS_SOCKS5_HOST_RESOLUTION_CLIENT); + + if (app_ctx.proxy.username && app_ctx.proxy.password) { + struct aws_byte_cursor username = aws_byte_cursor_from_c_str(app_ctx.proxy.username); + struct aws_byte_cursor password = aws_byte_cursor_from_c_str(app_ctx.proxy.password); + if (aws_socks5_proxy_options_set_auth(&socks5_options, allocator, username, password) != AWS_OP_SUCCESS) { + fprintf( + stderr, + "Failed to set SOCKS5 auth: %s\n", + aws_error_debug_str(aws_last_error())); + main_error_code = 14; + goto cleanup; + } + } + + socks5_options_valid = true; + } + + + /* Set up socket options for the connection */ + struct aws_socket_options socket_options = { + .type = AWS_SOCKET_STREAM, + .domain = AWS_SOCKET_IPV4, /* Use IPv4 for better compatibility */ + .connect_timeout_ms = 10000, /* Allow enough time for connection */ + }; + + /* Define mqtt5_options */ + struct aws_mqtt5_client_options mqtt5_options; + AWS_ZERO_STRUCT(mqtt5_options); + mqtt5_options.host_name = aws_byte_cursor_from_c_str(app_ctx.broker_host); + mqtt5_options.port = app_ctx.broker_port; + mqtt5_options.bootstrap = app_ctx.bootstrap; + mqtt5_options.socket_options = &socket_options; + mqtt5_options.tls_options = app_ctx.use_tls ? &tls_connection_options : NULL; + mqtt5_options.lifecycle_event_handler = s_lifecycle_event_handler; + mqtt5_options.lifecycle_event_handler_user_data = &app_ctx; + mqtt5_options.publish_received_handler = s_publish_received_handler; + mqtt5_options.publish_received_handler_user_data = &app_ctx; + mqtt5_options.socks5_proxy_options = socks5_options_valid ? &socks5_options : NULL; + + aws_mqtt5_transform_websocket_handshake_fn *websocket_handshake_transform = NULL; + void *websocket_handshake_transform_user_data = NULL; + + if (app_ctx.use_websocket) { + websocket_handshake_transform = &s_aws_mqtt5_transform_websocket_handshake_fn; + } + mqtt5_options.websocket_handshake_transform = websocket_handshake_transform; + mqtt5_options.websocket_handshake_transform_user_data = websocket_handshake_transform_user_data; + + /* Correct the initialization of connect options */ + struct aws_mqtt5_packet_connect_view connect_view = { + .client_id = aws_byte_cursor_from_c_str(app_ctx.client_id), + .keep_alive_interval_seconds = app_ctx.keep_alive_secs, + .clean_start = app_ctx.clean_session, + .will_delay_interval_seconds = 0, // Default value + .session_expiry_interval_seconds = 0, // Default value + }; + + /* Update mqtt5_options to include the corrected connect options */ + mqtt5_options.connect_options = &connect_view; + + /* Create MQTT5 client */ + app_ctx.mqtt_client = aws_mqtt5_client_new(allocator, &mqtt5_options); + if (!app_ctx.mqtt_client) { + fprintf(stderr, "Failed to create MQTT 5 client: %s\n", aws_error_debug_str(aws_last_error())); + main_error_code = 4; + goto cleanup; + } + + + if (aws_mqtt5_client_start(app_ctx.mqtt_client) != AWS_OP_SUCCESS) { + fprintf(stderr, "Failed to start MQTT 5 client: %s\n", aws_error_debug_str(aws_last_error())); + main_error_code = 5; + goto cleanup; + } + + printf("MQTT 5 client started successfully\n"); + + // Wait for connection to complete (success or failure) + aws_mutex_lock(&app_ctx.lock); + app_ctx.connection_complete = false; + app_ctx.connection_error_code = AWS_ERROR_SUCCESS; + while (!app_ctx.connection_complete) { + aws_condition_variable_wait(&app_ctx.signal, &app_ctx.lock); + } + int connection_error = app_ctx.connection_error_code; + aws_mutex_unlock(&app_ctx.lock); + + if (connection_error != AWS_ERROR_SUCCESS) { + fprintf(stderr, COLOR_RED "Exiting due to connection failure.\n" COLOR_RESET); + main_error_code = 6; + goto cleanup; + } + + // Subscribe to topic + printf("Subscribing to topic: %s\n", app_ctx.sub_topic); + if (s_subscribe_to_topic(&app_ctx) != AWS_OP_SUCCESS) { + main_error_code = 21; + goto cleanup; + } + + aws_mutex_lock(&app_ctx.lock); + app_ctx.message_received = false; + aws_mutex_unlock(&app_ctx.lock); + + // Publish a message + printf("Publishing message to topic: %s\n", app_ctx.pub_topic); + if (s_publish_message(&app_ctx) != AWS_OP_SUCCESS) { + main_error_code = 23; + goto cleanup; + } + + // Wait for message reception or timeout + printf("Waiting to receive messages...\n"); + aws_mutex_lock(&app_ctx.lock); + uint64_t wait_time_ns = aws_timestamp_convert(5, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL); + int wait_for_result = aws_condition_variable_wait_for_pred( + &app_ctx.signal, &app_ctx.lock, (int64_t)wait_time_ns, s_message_received_pred, &app_ctx); + bool received = app_ctx.message_received; + aws_mutex_unlock(&app_ctx.lock); + printf("Done waiting for messages.\n"); + + int wait_error = AWS_ERROR_SUCCESS; + if (wait_for_result != AWS_OP_SUCCESS) { + wait_error = aws_last_error(); + if (wait_error != AWS_ERROR_COND_VARIABLE_TIMED_OUT) { + fprintf( + stderr, + COLOR_RED "Error while waiting for publish response: %s" COLOR_RESET "\n", + aws_error_debug_str(wait_error)); + main_error_code = 24; + goto cleanup; + } + } + + if (!received) { + fprintf(stderr, COLOR_YELLOW "Timed out waiting for message.\n" COLOR_RESET); + } else { + printf("Message received and processed.\n"); + } + +cleanup: + aws_mqtt5_client_release(app_ctx.mqtt_client); + + if (app_ctx.use_tls) { + aws_tls_connection_options_clean_up(&tls_connection_options); + if (tls_ctx != NULL) { + aws_tls_ctx_release(tls_ctx); + } + } + if (socks5_options_valid) { + aws_socks5_proxy_options_clean_up(&socks5_options); + } + + aws_tls_connection_options_clean_up(&tls_connection_options); + s_app_ctx_clean_up(&app_ctx); + aws_thread_join_all_managed(); + + aws_mqtt_library_clean_up(); + aws_io_library_clean_up(); + aws_common_library_clean_up(); + if (logger_initialized) { + aws_logger_clean_up(&logger); + } + return main_error_code; +} diff --git a/bin/mqtt5canary/main.c b/bin/mqtt5canary/main.c index 4431d8f0..0dac0baa 100644 --- a/bin/mqtt5canary/main.c +++ b/bin/mqtt5canary/main.c @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -20,6 +21,8 @@ #include #include #include +#include +#include #include #include @@ -27,6 +30,9 @@ #include #include +#include +#include +#include #ifdef _MSC_VER # pragma warning(disable : 4996) /* Disable warnings about fopen() being insecure */ @@ -40,6 +46,120 @@ #define AWS_MQTT5_CANARY_CLIENT_MAX 50 #define AWS_MQTT5_CANARY_PAYLOAD_SIZE_MAX UINT16_MAX +struct socks5_proxy_settings { + char *host; + char *username; + char *password; + uint16_t port; + bool resolve_host_with_proxy; +}; + +static void s_socks5_proxy_settings_clean_up( + struct socks5_proxy_settings *settings, + struct aws_allocator *allocator) { + if (!settings) { + return; + } + if (settings->host) { + aws_mem_release(allocator, settings->host); + } + if (settings->username) { + aws_mem_release(allocator, settings->username); + } + if (settings->password) { + aws_mem_release(allocator, settings->password); + } + AWS_ZERO_STRUCT(*settings); +} + +static int s_socks5_proxy_settings_init_from_uri( + struct socks5_proxy_settings *settings, + struct aws_allocator *allocator, + const char *proxy_uri) { + + if (!settings || !allocator || !proxy_uri) { + return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT); + } + + s_socks5_proxy_settings_clean_up(settings, allocator); + + struct aws_byte_cursor uri_cursor = aws_byte_cursor_from_c_str(proxy_uri); + struct aws_uri uri; + AWS_ZERO_STRUCT(uri); + + if (aws_uri_init_parse(&uri, allocator, &uri_cursor)) { + fprintf(stderr, "Failed to parse proxy URI \"%s\": %s\n", proxy_uri, aws_error_debug_str(aws_last_error())); + goto on_error; + } + + const struct aws_byte_cursor *scheme = aws_uri_scheme(&uri); + if (!scheme || !scheme->len) { + fprintf(stderr, "Proxy URI \"%s\" must include scheme socks5h://\n", proxy_uri); + goto on_error; + } + + if (aws_byte_cursor_eq_c_str_ignore_case(scheme, "socks5h")) { + settings->resolve_host_with_proxy = true; + } else if (aws_byte_cursor_eq_c_str_ignore_case(scheme, "socks5")) { + settings->resolve_host_with_proxy = false; + } else { + fprintf(stderr, "Unsupported proxy scheme in \"%s\". Expected socks5h://\n", proxy_uri); + goto on_error; + } + + const struct aws_byte_cursor *host = aws_uri_host_name(&uri); + if (!host || host->len == 0) { + fprintf(stderr, "Proxy URI \"%s\" must include a host\n", proxy_uri); + goto on_error; + } + + settings->host = aws_mem_calloc(allocator, host->len + 1, sizeof(char)); + if (!settings->host) { + fprintf(stderr, "Failed to allocate memory for proxy host\n"); + goto on_error; + } + memcpy(settings->host, host->ptr, host->len); + settings->host[host->len] = '\0'; + + uint32_t parsed_port = aws_uri_port(&uri); + if (parsed_port == 0) { + parsed_port = 1080; + } + if (parsed_port > UINT16_MAX) { + fprintf(stderr, "Proxy port %" PRIu32 " exceeds uint16_t range\n", parsed_port); + goto on_error; + } + settings->port = (uint16_t)parsed_port; + + if (uri.user.len > 0) { + settings->username = aws_mem_calloc(allocator, uri.user.len + 1, sizeof(char)); + if (!settings->username) { + fprintf(stderr, "Failed to allocate memory for proxy username\n"); + goto on_error; + } + memcpy(settings->username, uri.user.ptr, uri.user.len); + settings->username[uri.user.len] = '\0'; + } + + if (uri.password.len > 0) { + settings->password = aws_mem_calloc(allocator, uri.password.len + 1, sizeof(char)); + if (!settings->password) { + fprintf(stderr, "Failed to allocate memory for proxy password\n"); + goto on_error; + } + memcpy(settings->password, uri.password.ptr, uri.password.len); + settings->password[uri.password.len] = '\0'; + } + + aws_uri_clean_up(&uri); + return AWS_OP_SUCCESS; + +on_error: + aws_uri_clean_up(&uri); + s_socks5_proxy_settings_clean_up(settings, allocator); + return AWS_OP_ERR; +} + struct app_ctx { struct aws_allocator *allocator; struct aws_mutex lock; @@ -56,6 +176,8 @@ struct app_ctx { const char *log_filename; enum aws_log_level log_level; + struct socks5_proxy_settings proxy; + bool use_proxy; }; enum aws_mqtt5_canary_operations { @@ -94,6 +216,7 @@ static void s_usage(int exit_code) { fprintf(stderr, " --cert FILE: path to a PEM encoded certificate to use with mTLS\n"); fprintf(stderr, " --key FILE: Path to a PEM encoded private key that matches cert.\n"); fprintf(stderr, " --connect-timeout INT: time in milliseconds to wait for a connection.\n"); + fprintf(stderr, " --proxy URL: SOCKS5 proxy URI (socks5h://... for proxy DNS, socks5://... for local DNS)\n"); fprintf(stderr, " -l, --log FILE: dumps logs to FILE instead of stderr.\n"); fprintf(stderr, " -v, --verbose: ERROR|INFO|DEBUG|TRACE: log level to configure. Default is none.\n"); fprintf(stderr, " -w, --websockets: use mqtt-over-websockets rather than direct mqtt\n"); @@ -115,6 +238,7 @@ static struct aws_cli_option s_long_options[] = { {"connect-timeout", AWS_CLI_OPTIONS_REQUIRED_ARGUMENT, NULL, 'f'}, {"log", AWS_CLI_OPTIONS_REQUIRED_ARGUMENT, NULL, 'l'}, {"verbose", AWS_CLI_OPTIONS_REQUIRED_ARGUMENT, NULL, 'v'}, + {"proxy", AWS_CLI_OPTIONS_REQUIRED_ARGUMENT, NULL, 'x'}, {"websockets", AWS_CLI_OPTIONS_NO_ARGUMENT, NULL, 'w'}, {"port", AWS_CLI_OPTIONS_REQUIRED_ARGUMENT, NULL, 'p'}, {"help", AWS_CLI_OPTIONS_NO_ARGUMENT, NULL, 'h'}, @@ -136,7 +260,7 @@ static void s_parse_options( while (true) { int option_index = 0; - int c = aws_cli_getopt_long(argc, argv, "a:c:e:f:l:v:wht:p:C:T:s:", s_long_options, &option_index); + int c = aws_cli_getopt_long(argc, argv, "a:c:e:f:l:v:wht:p:C:T:s:x:", s_long_options, &option_index); if (c == -1) { break; } @@ -174,6 +298,12 @@ static void s_parse_options( s_usage(1); } break; + case 'x': + if (s_socks5_proxy_settings_init_from_uri(&ctx->proxy, ctx->allocator, aws_cli_optarg)) { + s_usage(1); + } + ctx->use_proxy = true; + break; case 'h': s_usage(0); break; @@ -831,6 +961,9 @@ int main(int argc, char **argv) { * TLS **********************************************************/ bool use_tls = false; + bool socks5_options_valid = false; + struct aws_socks5_proxy_options socks5_options; + AWS_ZERO_STRUCT(socks5_options); struct aws_tls_ctx *tls_ctx = NULL; struct aws_tls_ctx_options tls_ctx_options; AWS_ZERO_STRUCT(tls_ctx_options); @@ -905,6 +1038,40 @@ int main(int argc, char **argv) { .keep_alive_interval_sec = 0, }; + if (app_ctx.use_proxy) { + if (!app_ctx.proxy.host) { + fprintf(stderr, "Proxy URI was requested but no host was parsed.\n"); + exit(1); + } + + struct aws_byte_cursor proxy_host = aws_byte_cursor_from_c_str(app_ctx.proxy.host); + if (aws_socks5_proxy_options_init(&socks5_options, allocator, proxy_host, app_ctx.proxy.port) != AWS_OP_SUCCESS) { + fprintf( + stderr, + "Failed to initialize SOCKS5 proxy options: %s\n", + aws_error_debug_str(aws_last_error())); + exit(1); + } + aws_socks5_proxy_options_set_host_resolution_mode( + &socks5_options, + app_ctx.proxy.resolve_host_with_proxy ? AWS_SOCKS5_HOST_RESOLUTION_PROXY + : AWS_SOCKS5_HOST_RESOLUTION_CLIENT); + + if (app_ctx.proxy.username && app_ctx.proxy.password) { + struct aws_byte_cursor username = aws_byte_cursor_from_c_str(app_ctx.proxy.username); + struct aws_byte_cursor password = aws_byte_cursor_from_c_str(app_ctx.proxy.password); + if (aws_socks5_proxy_options_set_auth(&socks5_options, allocator, username, password) != AWS_OP_SUCCESS) { + fprintf( + stderr, + "Failed to set SOCKS5 auth: %s\n", + aws_error_debug_str(aws_last_error())); + exit(1); + } + } + + socks5_options_valid = true; + } + uint16_t receive_maximum = 9; uint32_t maximum_packet_size = 128 * 1024; @@ -932,6 +1099,7 @@ int main(int argc, char **argv) { .socket_options = &socket_options, .tls_options = (use_tls) ? &tls_connection_options : NULL, .connect_options = &connect_options, + .socks5_proxy_options = socks5_options_valid ? &socks5_options : NULL, .session_behavior = AWS_MQTT5_CSBT_CLEAN, .lifecycle_event_handler = s_lifecycle_event_callback, .retry_jitter_mode = AWS_EXPONENTIAL_BACKOFF_JITTER_NONE, @@ -1013,6 +1181,10 @@ int main(int argc, char **argv) { aws_mqtt5_client_release(client); } + if (socks5_options_valid) { + aws_socks5_proxy_options_clean_up(&socks5_options); + } + aws_client_bootstrap_release(bootstrap); aws_host_resolver_release(resolver); aws_event_loop_group_release(el_group); @@ -1034,6 +1206,7 @@ int main(int argc, char **argv) { aws_logger_clean_up(&logger); } + s_socks5_proxy_settings_clean_up(&app_ctx.proxy, app_ctx.allocator); aws_uri_clean_up(&app_ctx.uri); aws_mqtt_library_clean_up(); diff --git a/include/aws/mqtt/client.h b/include/aws/mqtt/client.h index 8d85bfe8..d83b2921 100644 --- a/include/aws/mqtt/client.h +++ b/include/aws/mqtt/client.h @@ -26,6 +26,7 @@ struct aws_http_message; struct aws_http_proxy_options; struct aws_mqtt5_client; struct aws_socket_options; +struct aws_socks5_proxy_options; struct aws_tls_connection_options; /** @@ -425,6 +426,17 @@ int aws_mqtt_client_connection_set_http_proxy_options( struct aws_mqtt_client_connection *connection, struct aws_http_proxy_options *proxy_options); +/** + * Set SOCKS5 proxy options for the connection. + * + * \param[in] connection The connection object + * \param[in] socks5_proxy_options Configuration options for the SOCKS5 proxy connection + */ +AWS_MQTT_API +int aws_mqtt_client_connection_set_socks5_proxy_options( + struct aws_mqtt_client_connection *connection, + struct aws_socks5_proxy_options *socks5_proxy_options); + /** * Set host resolution ooptions for the connection. */ diff --git a/include/aws/mqtt/private/client_impl.h b/include/aws/mqtt/private/client_impl.h index ab9822c3..9f68948d 100644 --- a/include/aws/mqtt/private/client_impl.h +++ b/include/aws/mqtt/private/client_impl.h @@ -226,6 +226,7 @@ struct aws_mqtt_client_connection_311_impl { struct aws_tls_connection_options tls_options; struct aws_socket_options socket_options; struct aws_http_proxy_config *http_proxy_config; + struct aws_socks5_proxy_options *socks5_proxy_options; struct aws_event_loop *loop; struct aws_host_resolution_config host_resolution_config; diff --git a/include/aws/mqtt/private/client_impl_shared.h b/include/aws/mqtt/private/client_impl_shared.h index 248b4658..d69088f3 100644 --- a/include/aws/mqtt/private/client_impl_shared.h +++ b/include/aws/mqtt/private/client_impl_shared.h @@ -48,6 +48,8 @@ struct aws_mqtt_client_connection_vtable { int (*set_http_proxy_options_fn)(void *impl, struct aws_http_proxy_options *proxy_options); + int (*set_socks5_proxy_options_fn)(void *impl, struct aws_socks5_proxy_options *proxy_options); + int (*set_host_resolution_options_fn)(void *impl, const struct aws_host_resolution_config *host_resolution_config); int (*set_reconnect_timeout_fn)(void *impl, uint64_t min_timeout, uint64_t max_timeout); diff --git a/include/aws/mqtt/private/v5/mqtt5_options_storage.h b/include/aws/mqtt/private/v5/mqtt5_options_storage.h index ddcb0ab5..c7cbf282 100644 --- a/include/aws/mqtt/private/v5/mqtt5_options_storage.h +++ b/include/aws/mqtt/private/v5/mqtt5_options_storage.h @@ -16,6 +16,7 @@ #include #include #include +#include #include #include #include @@ -182,6 +183,8 @@ struct aws_mqtt5_client_options_storage { void *client_termination_handler_user_data; struct aws_host_resolution_config host_resolution_override; + + struct aws_socks5_proxy_options *socks5_proxy_options; }; AWS_EXTERN_C_BEGIN diff --git a/include/aws/mqtt/v5/mqtt5_client.h b/include/aws/mqtt/v5/mqtt5_client.h index 5cac371c..42fb185f 100644 --- a/include/aws/mqtt/v5/mqtt5_client.h +++ b/include/aws/mqtt/v5/mqtt5_client.h @@ -657,6 +657,9 @@ struct aws_mqtt5_client_options { * configuration but changes the refresh frequency to a value that prevents DNS pinging. */ struct aws_host_resolution_config *host_resolution_override; + + /** Options for SOCKS5 proxy */ + const struct aws_socks5_proxy_options * socks5_proxy_options; }; AWS_EXTERN_C_BEGIN diff --git a/integration-testing/mqtt3_client_test.bash b/integration-testing/mqtt3_client_test.bash new file mode 100755 index 00000000..d18b4a71 --- /dev/null +++ b/integration-testing/mqtt3_client_test.bash @@ -0,0 +1,89 @@ +#!/bin/bash + +# Print a summary of all test results + +BROKER_HOST=localhost +CA_FILE=/home/carlos.sanvicente/mosquitto/certs/ca.crt +PROXY_HOST=localhost +PROXY_PORT=1080 +PROXY_URI="socks5h://testuser:testpass@${PROXY_HOST}:${PROXY_PORT}" +EXECUTABLE=./bin/mqtt3_client_app/mqtt3_client_app + +declare -a TEST_NAMES +declare -a TEST_RESULTS +declare -a TEST_CODES + +run_case() { + echo "" + echo "" + local test_title="$1" + echo "===== $test_title =====" + shift + "$@" + local status=$? + TEST_NAMES+=("$test_title") + TEST_RESULTS+=("$status") + TEST_CODES+=("$status") +} + +print_summary() { + GREEN='\033[0;32m' + RED='\033[0;31m' + NC='\033[0m' # No Color + echo "====================" + echo "Test Summary:" + pass_count=0 + fail_count=0 + for i in "${!TEST_NAMES[@]}"; do + name="${TEST_NAMES[$i]}" + result="${TEST_RESULTS[$i]}" + if [ "$result" -eq 0 ]; then + echo -e "${GREEN}[PASS]${NC} $name" + ((pass_count++)) + else + echo -e "${RED}[FAIL]${NC} $name (exit code ${TEST_CODES[$i]})" + ((fail_count++)) + fi + done + echo "--------------------" + echo "Total: $((pass_count+fail_count)), Passed: $pass_count, Failed: $fail_count" + echo "====================" +} + + +# Test case functions +test_proxy_mqtt_unencrypted() { + run_case "Proxy, MQTT, unencrypted, authenticated (1883)" \ + $EXECUTABLE --broker-host $BROKER_HOST --broker-port 1883 \ + --proxy "$PROXY_URI" +} + +test_proxy_mqtt_encrypted() { + run_case "Proxy, MQTT, encrypted, authenticated (8883)" \ + $EXECUTABLE --broker-host $BROKER_HOST --broker-port 8883 \ + --proxy "$PROXY_URI" \ + --ca-file $CA_FILE +} + +test_proxy_mqtt_ws_unencrypted() { + run_case "Proxy, MQTT over WS, unencrypted, authenticated (8080)" \ + $EXECUTABLE --broker-host $BROKER_HOST --broker-port 8080 \ + --proxy "$PROXY_URI" \ + --websocket --ws-path /mqtt +} + +test_proxy_mqtt_ws_encrypted() { + run_case "Proxy, MQTT over WS, encrypted, authenticated (8081)" \ + $EXECUTABLE --broker-host $BROKER_HOST --broker-port 8081 \ + --proxy "$PROXY_URI" \ + --websocket --ws-path /mqtt \ + --ca-file $CA_FILE +} + +# Call all test cases +test_proxy_mqtt_unencrypted +test_proxy_mqtt_encrypted +test_proxy_mqtt_ws_unencrypted +test_proxy_mqtt_ws_encrypted + +print_summary diff --git a/source/client.c b/source/client.c index 5a3b309c..31acaf7a 100644 --- a/source/client.c +++ b/source/client.c @@ -18,6 +18,8 @@ #include #include #include +#include +#include #include #include @@ -844,6 +846,12 @@ static void s_mqtt_client_connection_destroy_final(struct aws_mqtt_client_connec connection->http_proxy_config = NULL; } + if (connection->socks5_proxy_options) { + aws_socks5_proxy_options_clean_up(connection->socks5_proxy_options); + aws_mem_release(connection->allocator, connection->socks5_proxy_options); + connection->socks5_proxy_options = NULL; + } + aws_mqtt_client_release(connection->client); /* Frees all allocated memory */ @@ -1235,6 +1243,35 @@ static int s_aws_mqtt_client_connection_311_set_http_proxy_options( return connection->http_proxy_config != NULL ? AWS_OP_SUCCESS : AWS_OP_ERR; } +static int s_aws_mqtt_client_connection_311_set_socks5_proxy_options( + void *impl, + struct aws_socks5_proxy_options *socks5_proxy_options) { + + struct aws_mqtt_client_connection_311_impl *connection = impl; + + if (connection->socks5_proxy_options) { + aws_socks5_proxy_options_clean_up(connection->socks5_proxy_options); + aws_mem_release(connection->allocator, connection->socks5_proxy_options); + connection->socks5_proxy_options = NULL; + } + + if (socks5_proxy_options) { + connection->socks5_proxy_options = + aws_mem_calloc(connection->allocator, 1, sizeof(struct aws_socks5_proxy_options)); + if (!connection->socks5_proxy_options) { + return AWS_OP_ERR; + } + + if (aws_socks5_proxy_options_copy(connection->socks5_proxy_options, socks5_proxy_options) != AWS_OP_SUCCESS) { + aws_mem_release(connection->allocator, connection->socks5_proxy_options); + connection->socks5_proxy_options = NULL; + return AWS_OP_ERR; + } + } + + return AWS_OP_SUCCESS; +} + static int s_aws_mqtt_client_connection_311_set_host_resolution_options( void *impl, const struct aws_host_resolution_config *host_resolution_config) { @@ -1426,6 +1463,8 @@ static void s_websocket_handshake_transform_complete( if (connection->http_proxy_config != NULL) { aws_http_proxy_options_init_from_config(&proxy_options, connection->http_proxy_config); websocket_options.proxy_options = &proxy_options; + } else if (connection->socks5_proxy_options != NULL) { + websocket_options.socks5_proxy_options = connection->socks5_proxy_options; } if (aws_websocket_client_connect(&websocket_options)) { @@ -1695,14 +1734,17 @@ static int s_mqtt_client_connect( channel_options.requested_event_loop = connection->loop; channel_options.host_resolution_override_config = &connection->host_resolution_config; - if (connection->http_proxy_config == NULL) { - result = aws_client_bootstrap_new_socket_channel(&channel_options); - } else { + // SOCKS5 proxy takes precedence if both HTTP and SOCKS5 are configured + if (connection->socks5_proxy_options != NULL) { + result = aws_client_bootstrap_new_socket_channel_with_socks5( + connection->allocator, &channel_options, connection->socks5_proxy_options); + } else if (connection->http_proxy_config != NULL) { struct aws_http_proxy_options proxy_options; AWS_ZERO_STRUCT(proxy_options); - aws_http_proxy_options_init_from_config(&proxy_options, connection->http_proxy_config); result = aws_http_proxy_new_socket_channel(&channel_options, &proxy_options); + } else { + result = aws_client_bootstrap_new_socket_channel(&channel_options); } } @@ -3374,6 +3416,7 @@ static struct aws_mqtt_client_connection_vtable s_aws_mqtt_client_connection_311 .set_login_fn = s_aws_mqtt_client_connection_311_set_login, .use_websockets_fn = s_aws_mqtt_client_connection_311_use_websockets, .set_http_proxy_options_fn = s_aws_mqtt_client_connection_311_set_http_proxy_options, + .set_socks5_proxy_options_fn = s_aws_mqtt_client_connection_311_set_socks5_proxy_options, .set_host_resolution_options_fn = s_aws_mqtt_client_connection_311_set_host_resolution_options, .set_reconnect_timeout_fn = s_aws_mqtt_client_connection_311_set_reconnect_timeout, .set_connection_result_handlers = s_aws_mqtt_client_connection_311_set_connection_result_handlers, diff --git a/source/client_impl_shared.c b/source/client_impl_shared.c index 019adc5c..5d9d7efd 100644 --- a/source/client_impl_shared.c +++ b/source/client_impl_shared.c @@ -56,6 +56,13 @@ int aws_mqtt_client_connection_set_http_proxy_options( return (*connection->vtable->set_http_proxy_options_fn)(connection->impl, proxy_options); } +int aws_mqtt_client_connection_set_socks5_proxy_options( + struct aws_mqtt_client_connection *connection, + struct aws_socks5_proxy_options *proxy_options) { + + return (*connection->vtable->set_socks5_proxy_options_fn)(connection->impl, proxy_options); +} + int aws_mqtt_client_connection_set_host_resolution_options( struct aws_mqtt_client_connection *connection, const struct aws_host_resolution_config *host_resolution_config) { diff --git a/source/v5/mqtt5_client.c b/source/v5/mqtt5_client.c index 33bad2e6..ea289442 100644 --- a/source/v5/mqtt5_client.c +++ b/source/v5/mqtt5_client.c @@ -12,6 +12,8 @@ #include #include #include +#include +#include #include #include #include @@ -888,7 +890,10 @@ void s_websocket_transform_complete_task_fn(struct aws_task *task, void *arg, en .requested_event_loop = client->loop, .host_resolution_config = &client->config->host_resolution_override}; - if (client->config->http_proxy_config != NULL) { + // SOCKS5 and HTTP proxy are mutually exclusive, prefer SOCKS5 if both are set + if (client->config->socks5_proxy_options != NULL) { + websocket_options.socks5_proxy_options = client->config->socks5_proxy_options; + } else if (client->config->http_proxy_config != NULL) { websocket_options.proxy_options = &client->config->http_proxy_options; } @@ -1007,11 +1012,15 @@ static void s_change_current_state_to_connecting(struct aws_mqtt5_client *client channel_options.requested_event_loop = client->loop; channel_options.host_resolution_override_config = &client->config->host_resolution_override; - if (client->config->http_proxy_config == NULL) { - result = (*client->vtable->client_bootstrap_new_socket_channel_fn)(&channel_options); - } else { + // SOCKS5 and HTTP proxy are mutually exclusive, prefer SOCKS5 if both are set + if (client->config->socks5_proxy_options != NULL) { + result = aws_client_bootstrap_new_socket_channel_with_socks5( + client->allocator, &channel_options, client->config->socks5_proxy_options); + } else if (client->config->http_proxy_config != NULL) { result = (*client->vtable->http_proxy_new_socket_channel_fn)( &channel_options, &client->config->http_proxy_options); + } else { + result = (*client->vtable->client_bootstrap_new_socket_channel_fn)(&channel_options); } } diff --git a/source/v5/mqtt5_options_storage.c b/source/v5/mqtt5_options_storage.c index 81f997b5..05ca60ea 100644 --- a/source/v5/mqtt5_options_storage.c +++ b/source/v5/mqtt5_options_storage.c @@ -3738,6 +3738,11 @@ void aws_mqtt5_client_options_storage_destroy(struct aws_mqtt5_client_options_st aws_tls_connection_options_clean_up(&options_storage->tls_options); aws_http_proxy_config_destroy(options_storage->http_proxy_config); + if (options_storage->socks5_proxy_options != NULL) { + aws_socks5_proxy_options_clean_up(options_storage->socks5_proxy_options); + aws_mem_release(options_storage->allocator, options_storage->socks5_proxy_options); + } + if (options_storage->connect != NULL) { aws_mqtt5_packet_connect_storage_clean_up(options_storage->connect); aws_mem_release(options_storage->connect->allocator, options_storage->connect); @@ -3844,6 +3849,17 @@ struct aws_mqtt5_client_options_storage *aws_mqtt5_client_options_storage_new( &options_storage->http_proxy_options, options_storage->http_proxy_config); } + if (options->socks5_proxy_options != NULL) { + options_storage->socks5_proxy_options = aws_mem_calloc(allocator, 1, sizeof(struct aws_socks5_proxy_options)); + if (!options_storage->socks5_proxy_options) { + goto error; + } + + if (aws_socks5_proxy_options_copy(options_storage->socks5_proxy_options, options->socks5_proxy_options)) { + goto error; + } + } + options_storage->websocket_handshake_transform = options->websocket_handshake_transform; options_storage->websocket_handshake_transform_user_data = options->websocket_handshake_transform_user_data; diff --git a/source/v5/mqtt5_to_mqtt3_adapter.c b/source/v5/mqtt5_to_mqtt3_adapter.c index 691d2c9a..d4cdf879 100644 --- a/source/v5/mqtt5_to_mqtt3_adapter.c +++ b/source/v5/mqtt5_to_mqtt3_adapter.c @@ -13,6 +13,7 @@ #include #include #include +#include /* * A best-effort-but-not-100%-accurate translation from mqtt5 error codes to mqtt311 error codes. @@ -1218,6 +1219,107 @@ static void s_set_http_proxy_options_task_fn(struct aws_task *task, void *arg, e aws_mem_release(set_task->allocator, set_task); } +struct aws_mqtt_set_socks5_proxy_options_task { + struct aws_task task; + struct aws_allocator *allocator; + struct aws_mqtt_client_connection_5_impl *adapter; + struct aws_allocator *proxy_allocator; + struct aws_socks5_proxy_options *proxy_options; +}; + +static void s_set_socks5_proxy_options_task_fn(struct aws_task *task, void *arg, enum aws_task_status status) { + (void)task; + + struct aws_mqtt_set_socks5_proxy_options_task *set_task = arg; + struct aws_mqtt_client_connection_5_impl *adapter = set_task->adapter; + + if (status == AWS_TASK_STATUS_RUN_READY) { + struct aws_mqtt5_client_options_storage *config = adapter->client->config; + + if (config->socks5_proxy_options != NULL) { + aws_socks5_proxy_options_clean_up(config->socks5_proxy_options); + aws_mem_release(config->allocator, config->socks5_proxy_options); + config->socks5_proxy_options = NULL; + } + + config->socks5_proxy_options = set_task->proxy_options; + set_task->proxy_options = NULL; + } + + aws_ref_count_release(&adapter->internal_refs); + + if (set_task->proxy_options != NULL) { + aws_socks5_proxy_options_clean_up(set_task->proxy_options); + aws_mem_release(set_task->proxy_allocator, set_task->proxy_options); + } + + aws_mem_release(set_task->allocator, set_task); +} + +static struct aws_mqtt_set_socks5_proxy_options_task *s_aws_mqtt_set_socks5_proxy_options_task_new( + struct aws_allocator *allocator, + struct aws_mqtt_client_connection_5_impl *adapter, + struct aws_socks5_proxy_options *proxy_options) { + + struct aws_mqtt5_client_options_storage *config = adapter->client->config; + struct aws_allocator *config_allocator = config->allocator; + + struct aws_socks5_proxy_options *proxy_copy = NULL; + if (proxy_options != NULL) { + proxy_copy = aws_mem_calloc(config_allocator, 1, sizeof(struct aws_socks5_proxy_options)); + if (!proxy_copy) { + return NULL; + } + + if (aws_socks5_proxy_options_copy(proxy_copy, proxy_options) != AWS_OP_SUCCESS) { + aws_mem_release(config_allocator, proxy_copy); + return NULL; + } + } + + struct aws_mqtt_set_socks5_proxy_options_task *set_task = + aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt_set_socks5_proxy_options_task)); + if (set_task == NULL) { + if (proxy_copy != NULL) { + aws_socks5_proxy_options_clean_up(proxy_copy); + aws_mem_release(config_allocator, proxy_copy); + } + return NULL; + } + + aws_task_init(&set_task->task, s_set_socks5_proxy_options_task_fn, (void *)set_task, "SetSocks5ProxyOptionsTask"); + set_task->allocator = adapter->allocator; + set_task->adapter = (struct aws_mqtt_client_connection_5_impl *)aws_ref_count_acquire(&adapter->internal_refs); + set_task->proxy_allocator = config_allocator; + set_task->proxy_options = proxy_copy; + + return set_task; +} + +static int s_aws_mqtt_client_connection_5_set_socks5_proxy_options( + void *impl, + struct aws_socks5_proxy_options *proxy_options) { + + struct aws_mqtt_client_connection_5_impl *adapter = impl; + + struct aws_mqtt_set_socks5_proxy_options_task *task = + s_aws_mqtt_set_socks5_proxy_options_task_new(adapter->allocator, adapter, proxy_options); + if (task == NULL) { + int error_code = aws_last_error(); + AWS_LOGF_ERROR( + AWS_LS_MQTT5_TO_MQTT3_ADAPTER, + "id=%p: failed to create set socks5 proxy options task, error code %d(%s)", + (void *)adapter, + error_code, + aws_error_debug_str(error_code)); + return AWS_OP_ERR; + } + + aws_event_loop_schedule_task_now(adapter->loop, &task->task); + + return AWS_OP_SUCCESS; +} + static struct aws_mqtt_set_http_proxy_options_task *s_aws_mqtt_set_http_proxy_options_task_new( struct aws_allocator *allocator, struct aws_mqtt_client_connection_5_impl *adapter, @@ -2869,6 +2971,7 @@ static struct aws_mqtt_client_connection_vtable s_aws_mqtt_client_connection_5_v .set_login_fn = s_aws_mqtt_client_connection_5_set_login, .use_websockets_fn = s_aws_mqtt_client_connection_5_use_websockets, .set_http_proxy_options_fn = s_aws_mqtt_client_connection_5_set_http_proxy_options, + .set_socks5_proxy_options_fn = s_aws_mqtt_client_connection_5_set_socks5_proxy_options, .set_host_resolution_options_fn = s_aws_mqtt_client_connection_5_set_host_resolution_options, .set_reconnect_timeout_fn = s_aws_mqtt_client_connection_5_set_reconnect_timeout, .set_connection_result_handlers = s_aws_mqtt_client_connection_5_set_connection_result_handlers, diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 16ce1533..2c5cece8 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -28,6 +28,9 @@ add_test_case(mqtt_packet_pingreq) add_test_case(mqtt_packet_pingresp) add_test_case(mqtt_packet_disconnect) +add_test_case(mqtt_set_socks5_proxy_options_invokes_vtable) +add_test_case(mqtt_connect_uses_socks5_after_set) + add_test_case(mqtt_packet_connack_decode_failure_reserved) add_test_case(mqtt_packet_ack_decode_failure_reserved) add_test_case(mqtt_packet_pingresp_decode_failure_reserved) diff --git a/tests/socks5_proxy_tests.c b/tests/socks5_proxy_tests.c new file mode 100644 index 00000000..8cfde46f --- /dev/null +++ b/tests/socks5_proxy_tests.c @@ -0,0 +1,122 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include +#include + +#include +#include + +/* + * Fake implementation to track SOCKS5 proxy option usage and connection calls. + */ +struct fake_socks5_impl { + struct aws_socks5_proxy_options *last_options; + struct aws_socks5_proxy_options *expected_options; + bool set_called; + bool connect_called; +}; + +/* Called when SOCKS5 proxy options are set on the connection. */ +static int s_fake_set_socks5(void *impl, struct aws_socks5_proxy_options *options) { + struct fake_socks5_impl *fake = impl; + fake->set_called = true; + fake->last_options = options; + return AWS_OP_SUCCESS; +} + +/* Called when connect is invoked; checks that SOCKS5 options were set. */ +static int s_fake_connect(void *impl, const struct aws_mqtt_connection_options *connection_options) { + (void)connection_options; + struct fake_socks5_impl *fake = impl; + fake->connect_called = true; + ASSERT_PTR_EQUALS(fake->expected_options, fake->last_options); + return AWS_OP_SUCCESS; +} + +/* + * Vtable with only SOCKS5 proxy and connect functions implemented for testing. + */ +static struct aws_mqtt_client_connection_vtable s_fake_vtable = { + .acquire_fn = NULL, + .release_fn = NULL, + .set_will_fn = NULL, + .set_login_fn = NULL, + .use_websockets_fn = NULL, + .set_http_proxy_options_fn = NULL, + .set_socks5_proxy_options_fn = s_fake_set_socks5, + .set_host_resolution_options_fn = NULL, + .set_reconnect_timeout_fn = NULL, + .set_connection_interruption_handlers_fn = NULL, + .set_connection_result_handlers = NULL, + .set_connection_closed_handler_fn = NULL, + .set_on_any_publish_handler_fn = NULL, + .set_connection_termination_handler_fn = NULL, + .connect_fn = s_fake_connect, + .reconnect_fn = NULL, + .disconnect_fn = NULL, + .subscribe_multiple_fn = NULL, + .subscribe_fn = NULL, + .resubscribe_existing_topics_fn = NULL, + .unsubscribe_fn = NULL, + .publish_fn = NULL, + .get_stats_fn = NULL, +}; + +/* + * Test: Setting SOCKS5 proxy options on the connection invokes the vtable callback. + */ +static int s_mqtt_set_socks5_proxy_options_invokes_vtable(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + struct aws_socks5_proxy_options socks5_options; + ASSERT_SUCCESS(aws_socks5_proxy_options_init( + &socks5_options, allocator, aws_byte_cursor_from_c_str("proxy.example.com"), 1080)); + + struct fake_socks5_impl fake_impl; + AWS_ZERO_STRUCT(fake_impl); + struct aws_mqtt_client_connection connection = { + .vtable = &s_fake_vtable, + .impl = &fake_impl, + }; + + ASSERT_SUCCESS(aws_mqtt_client_connection_set_socks5_proxy_options(&connection, &socks5_options)); + ASSERT_TRUE(fake_impl.set_called); + ASSERT_PTR_EQUALS(&socks5_options, fake_impl.last_options); + + aws_socks5_proxy_options_clean_up(&socks5_options); + return AWS_OP_SUCCESS; +} +AWS_TEST_CASE(mqtt_set_socks5_proxy_options_invokes_vtable, s_mqtt_set_socks5_proxy_options_invokes_vtable); + +/* + * Test: After setting SOCKS5 proxy options, connect uses the correct options. + */ +static int s_mqtt_connect_uses_socks5_after_set(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + struct aws_socks5_proxy_options socks5_options; + ASSERT_SUCCESS(aws_socks5_proxy_options_init( + &socks5_options, allocator, aws_byte_cursor_from_c_str("proxy.example.com"), 1080)); + + struct fake_socks5_impl fake_impl; + AWS_ZERO_STRUCT(fake_impl); + fake_impl.expected_options = &socks5_options; + + struct aws_mqtt_client_connection connection = { + .vtable = &s_fake_vtable, + .impl = &fake_impl, + }; + + ASSERT_SUCCESS(aws_mqtt_client_connection_set_socks5_proxy_options(&connection, &socks5_options)); + ASSERT_TRUE(fake_impl.set_called); + + struct aws_mqtt_connection_options connect_options; + AWS_ZERO_STRUCT(connect_options); + ASSERT_SUCCESS(aws_mqtt_client_connection_connect(&connection, &connect_options)); + ASSERT_TRUE(fake_impl.connect_called); + + aws_socks5_proxy_options_clean_up(&socks5_options); + return AWS_OP_SUCCESS; +} +AWS_TEST_CASE(mqtt_connect_uses_socks5_after_set, s_mqtt_connect_uses_socks5_after_set);