Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -112,5 +112,7 @@ if (BUILD_TESTING)
add_subdirectory(bin/elastipubsub5)
add_subdirectory(bin/elastishadow)
add_subdirectory(bin/mqtt5canary)
add_subdirectory(bin/mqtt3_client_app)
add_subdirectory(bin/mqtt5_client_app)
endif()
endif ()
Empty file added WORKSPACE
Empty file.
196 changes: 195 additions & 1 deletion bin/elastipubsub/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <aws/common/clock.h>
#include <aws/common/command_line_parser.h>
#include <aws/common/condition_variable.h>
#include <aws/common/error.h>
#include <aws/common/hash_table.h>
#include <aws/common/log_channel.h>
#include <aws/common/log_formatter.h>
Expand All @@ -20,18 +21,137 @@
#include <aws/io/stream.h>
#include <aws/io/tls_channel_handler.h>
#include <aws/io/uri.h>
#include <aws/io/socks5.h>
#include <aws/io/socks5_channel_handler.h>

#include <aws/mqtt/client.h>
#include <aws/mqtt/mqtt.h>

#include <inttypes.h>
#include <stdbool.h>
#include <stdlib.h>
#include <string.h>

#ifdef _MSC_VER
# pragma warning(disable : 4996) /* Disable warnings about fopen() being insecure */
# pragma warning(disable : 4204) /* Declared initializers */
# pragma warning(disable : 4221) /* Local var in declared initializer */
#endif

struct socks5_proxy_settings {
char *host;
char *username;
char *password;
uint16_t port;
bool resolve_host_with_proxy;
};

static void s_socks5_proxy_settings_clean_up(
struct socks5_proxy_settings *settings,
struct aws_allocator *allocator) {
if (!settings) {
return;
}
if (settings->host) {
aws_mem_release(allocator, settings->host);
}
if (settings->username) {
aws_mem_release(allocator, settings->username);
}
if (settings->password) {
aws_mem_release(allocator, settings->password);
}
AWS_ZERO_STRUCT(*settings);
}

static int s_socks5_proxy_settings_init_from_uri(
struct socks5_proxy_settings *settings,
struct aws_allocator *allocator,
const char *proxy_uri) {

if (!settings || !allocator || !proxy_uri) {
return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
}

s_socks5_proxy_settings_clean_up(settings, allocator);

struct aws_byte_cursor uri_cursor = aws_byte_cursor_from_c_str(proxy_uri);
struct aws_uri uri;
AWS_ZERO_STRUCT(uri);

if (aws_uri_init_parse(&uri, allocator, &uri_cursor)) {
fprintf(stderr, "Failed to parse proxy URI \"%s\": %s\n", proxy_uri, aws_error_debug_str(aws_last_error()));
goto on_error;
}

const struct aws_byte_cursor *scheme = aws_uri_scheme(&uri);
if (!scheme || !scheme->len) {
fprintf(stderr, "Proxy URI \"%s\" must include scheme socks5h://\n", proxy_uri);
goto on_error;
}

if (aws_byte_cursor_eq_c_str_ignore_case(scheme, "socks5h")) {
settings->resolve_host_with_proxy = true;
} else if (aws_byte_cursor_eq_c_str_ignore_case(scheme, "socks5")) {
settings->resolve_host_with_proxy = false;
} else {
fprintf(stderr, "Unsupported proxy scheme in \"%s\". Expected socks5h://\n", proxy_uri);
goto on_error;
}

const struct aws_byte_cursor *host = aws_uri_host_name(&uri);
if (!host || host->len == 0) {
fprintf(stderr, "Proxy URI \"%s\" must include a host\n", proxy_uri);
goto on_error;
}

settings->host = aws_mem_calloc(allocator, host->len + 1, sizeof(char));
if (!settings->host) {
fprintf(stderr, "Failed to allocate memory for proxy host\n");
goto on_error;
}
memcpy(settings->host, host->ptr, host->len);
settings->host[host->len] = '\0';

uint32_t parsed_port = aws_uri_port(&uri);
if (parsed_port == 0) {
parsed_port = 1080;
}
if (parsed_port > UINT16_MAX) {
fprintf(stderr, "Proxy port %" PRIu32 " exceeds uint16_t range\n", parsed_port);
goto on_error;
}
settings->port = (uint16_t)parsed_port;

if (uri.user.len > 0) {
settings->username = aws_mem_calloc(allocator, uri.user.len + 1, sizeof(char));
if (!settings->username) {
fprintf(stderr, "Failed to allocate memory for proxy username\n");
goto on_error;
}
memcpy(settings->username, uri.user.ptr, uri.user.len);
settings->username[uri.user.len] = '\0';
}

if (uri.password.len > 0) {
settings->password = aws_mem_calloc(allocator, uri.password.len + 1, sizeof(char));
if (!settings->password) {
fprintf(stderr, "Failed to allocate memory for proxy password\n");
goto on_error;
}
memcpy(settings->password, uri.password.ptr, uri.password.len);
settings->password[uri.password.len] = '\0';
}

aws_uri_clean_up(&uri);
return AWS_OP_SUCCESS;

on_error:
aws_uri_clean_up(&uri);
s_socks5_proxy_settings_clean_up(settings, allocator);
return AWS_OP_ERR;
}

struct app_ctx {
struct aws_allocator *allocator;
struct aws_mutex lock;
Expand All @@ -53,6 +173,10 @@ struct app_ctx {
int received_messages;

struct aws_tls_connection_options tls_connection_options;
struct socks5_proxy_settings proxy;
bool use_proxy;
struct aws_socks5_proxy_options socks5_options;
bool socks5_options_initialized;

struct aws_linked_list pending_connection_list;
struct aws_linked_list established_connection_list;
Expand All @@ -71,6 +195,7 @@ static void s_usage(int exit_code) {
fprintf(stderr, " --key FILE: Path to a PEM encoded private key that matches cert.\n");
fprintf(stderr, " --cops INT: target control (connect, subscribe) operations per second\n");
fprintf(stderr, " --connect-timeout INT: time in milliseconds to wait for a connection.\n");
fprintf(stderr, " --proxy URL: SOCKS5 proxy URI (socks5h://... for proxy DNS, socks5://... for local DNS)\n");
fprintf(stderr, " -i, --iterations INT: number of independent iterations to run the test for\n");
fprintf(stderr, " -k, --connections INT: number of independent connections to make.\n");
fprintf(stderr, " -l, --log FILE: dumps logs to FILE instead of stderr.\n");
Expand All @@ -94,6 +219,7 @@ static struct aws_cli_option s_long_options[] = {
{"messages", AWS_CLI_OPTIONS_REQUIRED_ARGUMENT, NULL, 'n'},
{"pops", AWS_CLI_OPTIONS_REQUIRED_ARGUMENT, NULL, 'p'},
{"verbose", AWS_CLI_OPTIONS_REQUIRED_ARGUMENT, NULL, 'v'},
{"proxy", AWS_CLI_OPTIONS_REQUIRED_ARGUMENT, NULL, 'x'},
{"help", AWS_CLI_OPTIONS_NO_ARGUMENT, NULL, 'h'},
{"endpoint", AWS_CLI_OPTIONS_REQUIRED_ARGUMENT, NULL, 'E'},
/* Per getopt(3) the last element of the array has to be filled with all zeros */
Expand All @@ -104,7 +230,7 @@ static void s_parse_options(int argc, char **argv, struct app_ctx *ctx) {
bool uri_found = false;
while (true) {
int option_index = 0;
int c = aws_cli_getopt_long(argc, argv, "a:c:C:e:f:i:k:l:n:p:v:h:E", s_long_options, &option_index);
int c = aws_cli_getopt_long(argc, argv, "a:c:C:e:f:i:k:l:n:p:v:h:E:x:", s_long_options, &option_index);
if (c == -1) {
break;
}
Expand Down Expand Up @@ -157,6 +283,12 @@ static void s_parse_options(int argc, char **argv, struct app_ctx *ctx) {
s_usage(1);
}
break;
case 'x':
if (s_socks5_proxy_settings_init_from_uri(&ctx->proxy, ctx->allocator, aws_cli_optarg)) {
s_usage(1);
}
ctx->use_proxy = true;
break;
case 'h':
s_usage(0);
break;
Expand Down Expand Up @@ -324,6 +456,18 @@ static void s_establish_connections(
.clean_session = true,
};

if (app_ctx->use_proxy && app_ctx->socks5_options_initialized) {
if (aws_mqtt_client_connection_set_socks5_proxy_options(
connection_data->connection, &app_ctx->socks5_options)) {
fprintf(
stderr,
"Failed to set SOCKS5 proxy options: %s\n",
aws_error_debug_str(aws_last_error()));
s_final_destroy_connection_data(connection_data);
continue;
}
}

if (aws_mqtt_client_connection_connect(connection_data->connection, &connection_options)) {
s_final_destroy_connection_data(connection_data);
}
Expand Down Expand Up @@ -728,6 +872,46 @@ int main(int argc, char **argv) {
aws_mem_calloc(allocator, app_ctx.connection_count, sizeof(struct connection_user_data));
AWS_FATAL_ASSERT(connections != NULL);

if (app_ctx.use_proxy) {
if (!app_ctx.proxy.host) {
fprintf(stderr, "Proxy URI was requested but no host was parsed.\n");
exit(1);
}

if (app_ctx.socks5_options_initialized) {
aws_socks5_proxy_options_clean_up(&app_ctx.socks5_options);
AWS_ZERO_STRUCT(app_ctx.socks5_options);
app_ctx.socks5_options_initialized = false;
}

struct aws_byte_cursor proxy_host = aws_byte_cursor_from_c_str(app_ctx.proxy.host);
if (aws_socks5_proxy_options_init(&app_ctx.socks5_options, allocator, proxy_host, app_ctx.proxy.port)) {
fprintf(
stderr,
"Failed to initialize SOCKS5 proxy options: %s\n",
aws_error_debug_str(aws_last_error()));
exit(1);
}
aws_socks5_proxy_options_set_host_resolution_mode(
&app_ctx.socks5_options,
app_ctx.proxy.resolve_host_with_proxy ? AWS_SOCKS5_HOST_RESOLUTION_PROXY
: AWS_SOCKS5_HOST_RESOLUTION_CLIENT);

if (app_ctx.proxy.username && app_ctx.proxy.password) {
struct aws_byte_cursor username = aws_byte_cursor_from_c_str(app_ctx.proxy.username);
struct aws_byte_cursor password = aws_byte_cursor_from_c_str(app_ctx.proxy.password);
if (aws_socks5_proxy_options_set_auth(&app_ctx.socks5_options, allocator, username, password)) {
fprintf(
stderr,
"Failed to set SOCKS5 auth: %s\n",
aws_error_debug_str(aws_last_error()));
exit(1);
}
}

app_ctx.socks5_options_initialized = true;
}

s_establish_connections(&app_ctx, connections, bootstrap, &tls_connection_options);

s_establish_subscriptions(&app_ctx);
Expand All @@ -736,6 +920,12 @@ int main(int argc, char **argv) {

s_teardown_connections(&app_ctx);

if (app_ctx.socks5_options_initialized) {
aws_socks5_proxy_options_clean_up(&app_ctx.socks5_options);
AWS_ZERO_STRUCT(app_ctx.socks5_options);
app_ctx.socks5_options_initialized = false;
}

aws_mem_release(allocator, connections);

aws_client_bootstrap_release(bootstrap);
Expand All @@ -762,6 +952,10 @@ int main(int argc, char **argv) {
aws_logger_clean_up(&logger);
}

if (app_ctx.socks5_options_initialized) {
aws_socks5_proxy_options_clean_up(&app_ctx.socks5_options);
}
s_socks5_proxy_settings_clean_up(&app_ctx.proxy, app_ctx.allocator);
aws_uri_clean_up(&app_ctx.uri);

aws_mqtt_library_clean_up();
Expand Down
Loading
Loading